Merge branch 'master' into vmray-extractor

This commit is contained in:
Moritz
2024-08-26 16:18:34 +02:00
committed by GitHub
7 changed files with 126 additions and 4 deletions

View File

@@ -5,6 +5,7 @@
Unlock powerful malware analysis with capa's new [VMRay sandbox](https://www.vmray.com/) integration! Simply provide a VMRay analysis archive, and capa will automatically extract and match capabilties, streamlining your workflow.
### New Features
- regenerate ruleset cache automatically on source change (only in dev mode) #2133 @s-ff
- add landing page https://mandiant.github.io/capa/ @williballenthin #2310
- add rules website https://mandiant.github.io/capa/rules @DeeyaSingh #2310

View File

@@ -5,6 +5,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import os
import sys
import gzip
import inspect
@@ -14,6 +15,7 @@ import importlib.util
from typing import Dict, List, Union, BinaryIO, Iterator, NoReturn
from pathlib import Path
from zipfile import ZipFile
from datetime import datetime
import tqdm
import msgspec.json
@@ -309,3 +311,62 @@ def is_running_standalone() -> bool:
# so we keep this in a common area.
# generally, other library code should not use this function.
return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")
def is_dev_environment() -> bool:
if is_running_standalone():
return False
if "site-packages" in __file__:
# running from a site-packages installation
return False
capa_root = Path(__file__).resolve().parent.parent
git_dir = capa_root / ".git"
if not git_dir.is_dir():
# .git directory doesn't exist
return False
return True
def is_cache_newer_than_rule_code(cache_dir: Path) -> bool:
"""
basic check to prevent issues if the rules cache is older than relevant rules code
args:
cache_dir: the cache directory containing cache files
returns:
True if latest cache file is newer than relevant rule cache code
"""
# retrieve the latest modified cache file
cache_files = list(cache_dir.glob("*.cache"))
if not cache_files:
logger.debug("no rule cache files found")
return False
latest_cache_file = max(cache_files, key=os.path.getmtime)
cache_timestamp = os.path.getmtime(latest_cache_file)
# these are the relevant rules code files that could conflict with using an outdated cache
latest_rule_code_file = max([Path("capa/rules/__init__.py"), Path("capa/rules/cache.py")], key=os.path.getmtime)
rule_code_timestamp = os.path.getmtime(latest_rule_code_file)
if rule_code_timestamp > cache_timestamp:
def ts_to_str(ts):
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
logger.warning(
"latest rule code file %s (%s) is newer than the latest rule cache file %s (%s)",
latest_rule_code_file,
ts_to_str(rule_code_timestamp),
latest_cache_file,
ts_to_str(cache_timestamp),
)
return False
return True

View File

@@ -620,13 +620,22 @@ def get_rules_from_cli(args) -> RuleSet:
raises:
ShouldExitError: if the program is invoked incorrectly and should exit.
"""
enable_cache: bool = True
try:
if capa.helpers.is_running_standalone() and args.is_default_rules:
cache_dir = get_default_root() / "cache"
else:
cache_dir = capa.rules.cache.get_default_cache_directory()
rules = capa.rules.get_rules(args.rules, cache_dir=cache_dir)
if capa.helpers.is_dev_environment():
# using the rules cache during development may result in unexpected errors, see #1898
enable_cache = capa.helpers.is_cache_newer_than_rule_code(cache_dir)
if not enable_cache:
logger.debug("not using cache. delete the cache file manually to use rule caching again")
else:
logger.debug("cache can be used, no potentially outdated cache files found")
rules = capa.rules.get_rules(args.rules, cache_dir=cache_dir, enable_cache=enable_cache)
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
logger.error(

View File

@@ -2130,12 +2130,14 @@ def get_rules(
rule_paths: List[RulePath],
cache_dir=None,
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
enable_cache: bool = True,
) -> RuleSet:
"""
args:
rule_paths: list of paths to rules files or directories containing rules files
cache_dir: directory to use for caching rules, or will use the default detected cache directory if None
on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation
enable_cache: enable loading of a cached ruleset (default: True)
"""
if cache_dir is None:
cache_dir = capa.rules.cache.get_default_cache_directory()
@@ -2147,9 +2149,10 @@ def get_rules(
# rule_file_paths[i] corresponds to rule_contents[i].
rule_contents = [file_path.read_bytes() for file_path in rule_file_paths]
ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None:
return ruleset
if enable_cache:
ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None:
return ruleset
rules: List[Rule] = []

View File

@@ -8,6 +8,7 @@
import codecs
import capa.helpers
from capa.features.extractors import helpers
@@ -64,3 +65,8 @@ def test_generate_symbols():
symbols = list(helpers.generate_symbols("ws2_32", "#1", include_dll=False))
assert len(symbols) == 1
assert "ws2_32.#1" in symbols
def test_is_dev_environment():
# testing environment should be a dev environment
assert capa.helpers.is_dev_environment() is True

View File

@@ -6,10 +6,13 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import os
import textwrap
import contextlib
from pathlib import Path
import capa.rules
import capa.helpers
import capa.rules.cache
R1 = capa.rules.Rule.from_yaml(
@@ -113,3 +116,40 @@ def test_ruleset_cache_invalid():
assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is None
# the invalid cache should be deleted
assert not path.exists()
def test_rule_cache_dev_environment():
# generate rules cache
rs = capa.rules.RuleSet([R2])
content = capa.rules.cache.get_ruleset_content(rs)
id = capa.rules.cache.compute_cache_identifier(content)
cache_dir = capa.rules.cache.get_default_cache_directory()
cache_path = capa.rules.cache.get_cache_path(cache_dir, id)
# clear existing cache files
for f in cache_dir.glob("*.cache"):
f.unlink()
capa.rules.cache.cache_ruleset(cache_dir, rs)
assert cache_path.exists()
assert capa.helpers.is_cache_newer_than_rule_code(cache_dir) is True
capa_root = Path(__file__).resolve().parent.parent
cachepy = capa_root / "capa" / "rules" / "cache.py" # alternative: capa_root / "capa" / "rules" / "__init__.py"
# set cache's last modified time prior to code file's modified time
os.utime(cache_path, (cache_path.stat().st_atime, cachepy.stat().st_mtime - 600000))
# debug
def ts_to_str(ts):
from datetime import datetime
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
for g in ((capa_root / "capa" / "rules").glob("*.py"), cache_dir.glob("*.cache")):
for p in g:
print(p, "\t", ts_to_str(p.stat().st_mtime)) # noqa: T201
assert capa.helpers.is_dev_environment() is True
assert capa.helpers.is_cache_newer_than_rule_code(cache_dir) is False

View File

@@ -490,6 +490,8 @@ function getFeatureName(feature) {
return formatBytes(feature.bytes);
case "operand offset":
return `operand[${feature.index}].offset: 0x${feature.operand_offset.toString(16).toUpperCase()}`;
case "class":
return `${feature.class_}`;
default:
return `${feature[feature.type]}`;
}