mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 23:59:48 -08:00
Replace the header from source code files using the following script:
```Python
for dir_path, dir_names, file_names in os.walk("capa"):
for file_name in file_names:
# header are only in `.py` and `.toml` files
if file_name[-3:] not in (".py", "oml"):
continue
file_path = f"{dir_path}/{file_name}"
f = open(file_path, "rb+")
content = f.read()
m = re.search(OLD_HEADER, content)
if not m:
continue
print(f"{file_path}: {m.group('year')}")
content = content.replace(m.group(0), NEW_HEADER % m.group("year"))
f.seek(0)
f.write(content)
```
Some files had the copyright headers inside a `"""` comment and needed
manual changes before applying the script. `hook-vivisect.py` and
`pyinstaller.spec` didn't include the license in the header and also
needed manual changes.
The old header had the confusing sentence `All rights reserved`, which
does not make sense for an open source license. Replace the header by
the default Google header that corrects this issue and keep capa
consistent with other Google projects.
Adapt the linter to work with the new header.
Replace also the copyright text in the `web/public/index.html` file for
consistency.
1034 lines
34 KiB
Python
1034 lines
34 KiB
Python
# Copyright 2020 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Check the given capa rules for style issues.
|
|
|
|
Usage:
|
|
|
|
$ python scripts/lint.py rules/
|
|
"""
|
|
|
|
import gc
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import time
|
|
import string
|
|
import difflib
|
|
import hashlib
|
|
import logging
|
|
import argparse
|
|
import itertools
|
|
import posixpath
|
|
from pathlib import Path
|
|
from dataclasses import field, dataclass
|
|
|
|
import pydantic
|
|
import ruamel.yaml
|
|
from rich import print
|
|
|
|
import capa.main
|
|
import capa.rules
|
|
import capa.engine
|
|
import capa.loader
|
|
import capa.helpers
|
|
import capa.features.insn
|
|
import capa.capabilities.common
|
|
from capa.rules import Rule, RuleSet
|
|
from capa.features.common import OS_AUTO, String, Feature, Substring
|
|
from capa.render.result_document import RuleMetadata
|
|
|
|
logger = logging.getLogger("lint")
|
|
|
|
|
|
@dataclass
|
|
class Context:
|
|
"""
|
|
attributes:
|
|
samples: mapping from content hash (MD5, SHA, etc.) to file path.
|
|
rules: rules to inspect
|
|
is_thorough: should inspect long-running lints
|
|
capabilities_by_sample: cache of results, indexed by file path.
|
|
"""
|
|
|
|
samples: dict[str, Path]
|
|
rules: RuleSet
|
|
is_thorough: bool
|
|
capabilities_by_sample: dict[Path, set[str]] = field(default_factory=dict)
|
|
|
|
|
|
class Lint:
|
|
WARN = "[yellow]WARN[/yellow]"
|
|
FAIL = "[red]FAIL[/red]"
|
|
|
|
name = "lint"
|
|
level = FAIL
|
|
recommendation = ""
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return False
|
|
|
|
|
|
class NameCasing(Lint):
|
|
name = "rule name casing"
|
|
recommendation = "Rename rule using to start with lower case letters"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return rule.name[0] in string.ascii_uppercase and rule.name[1] not in string.ascii_uppercase
|
|
|
|
|
|
class FilenameDoesntMatchRuleName(Lint):
|
|
name = "filename doesn't match the rule name"
|
|
recommendation = "Rename rule file to match the rule name"
|
|
recommendation_template = 'Rename rule file to match the rule name, expected: "{:s}", found: "{:s}"'
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
expected = rule.name
|
|
expected = expected.lower()
|
|
expected = expected.replace(".net", "dotnet")
|
|
expected = expected.replace(" ", "-")
|
|
expected = expected.replace("(", "")
|
|
expected = expected.replace(")", "")
|
|
expected = expected.replace("+", "")
|
|
expected = expected.replace("/", "")
|
|
expected = expected.replace(".", "")
|
|
expected = expected + ".yml"
|
|
|
|
found = Path(rule.meta["capa/path"]).name
|
|
|
|
self.recommendation = self.recommendation_template.format(expected, found)
|
|
|
|
return expected != found
|
|
|
|
|
|
class MissingNamespace(Lint):
|
|
name = "missing rule namespace"
|
|
recommendation = "Add meta.namespace so that the rule is emitted correctly"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return (
|
|
"namespace" not in rule.meta
|
|
and not is_nursery_rule(rule)
|
|
and "maec/malware-category" not in rule.meta
|
|
and "lib" not in rule.meta
|
|
)
|
|
|
|
|
|
class NamespaceDoesntMatchRulePath(Lint):
|
|
name = "file path doesn't match rule namespace"
|
|
recommendation = "Move rule to appropriate directory or update the namespace"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
# let the other lints catch namespace issues
|
|
if "namespace" not in rule.meta:
|
|
return False
|
|
if is_nursery_rule(rule):
|
|
return False
|
|
if "maec/malware-category" in rule.meta:
|
|
return False
|
|
if "lib" in rule.meta:
|
|
return False
|
|
|
|
return rule.meta["namespace"] not in get_normpath(rule.meta["capa/path"])
|
|
|
|
|
|
class MissingScopes(Lint):
|
|
name = "missing scopes"
|
|
recommendation = (
|
|
"Add meta.scopes with both the static (meta.scopes.static) and dynamic (meta.scopes.dynamic) scopes"
|
|
)
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return "scopes" not in rule.meta
|
|
|
|
|
|
class MissingStaticScope(Lint):
|
|
name = "missing static scope"
|
|
recommendation = "Add a static scope for the rule (file, function, basic block, instruction, or unsupported)"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return "static" not in rule.meta.get("scopes")
|
|
|
|
|
|
class MissingDynamicScope(Lint):
|
|
name = "missing dynamic scope"
|
|
recommendation = "Add a dynamic scope for the rule (file, process, thread, call, or unsupported)"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return "dynamic" not in rule.meta.get("scopes")
|
|
|
|
|
|
class InvalidStaticScope(Lint):
|
|
name = "invalid static scope"
|
|
recommendation = "For the static scope, use either: file, function, basic block, instruction, or unsupported"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return rule.meta.get("scopes").get("static") not in (
|
|
"file",
|
|
"function",
|
|
"basic block",
|
|
"instruction",
|
|
"unsupported",
|
|
)
|
|
|
|
|
|
class InvalidDynamicScope(Lint):
|
|
name = "invalid static scope"
|
|
recommendation = "For the dynamic scope, use either: file, process, thread, call, or unsupported"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return rule.meta.get("scopes").get("dynamic") not in (
|
|
"file",
|
|
"process",
|
|
"thread",
|
|
"call",
|
|
"unsupported",
|
|
)
|
|
|
|
|
|
class InvalidScopes(Lint):
|
|
name = "invalid scopes"
|
|
recommendation = "At least one scope (static or dynamic) must be specified"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return (rule.meta.get("scopes").get("static") == "unsupported") and (
|
|
rule.meta.get("scopes").get("dynamic") == "unsupported"
|
|
)
|
|
|
|
|
|
class MissingAuthors(Lint):
|
|
name = "missing authors"
|
|
recommendation = "Add meta.authors so that users know who to contact with questions"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return "authors" not in rule.meta
|
|
|
|
|
|
class MissingExamples(Lint):
|
|
name = "missing examples"
|
|
recommendation = "Add meta.examples so that the rule can be tested and verified"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
return (
|
|
"examples" not in rule.meta
|
|
or not isinstance(rule.meta["examples"], list)
|
|
or len(rule.meta["examples"]) == 0
|
|
or rule.meta["examples"] == [None]
|
|
)
|
|
|
|
|
|
class MissingExampleOffset(Lint):
|
|
name = "missing example offset"
|
|
recommendation = "Add offset of example function"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
if rule.meta.get("scope") in ("function", "basic block"):
|
|
examples = rule.meta.get("examples")
|
|
if isinstance(examples, list):
|
|
for example in examples:
|
|
if example and ":" not in example:
|
|
logger.debug("example: %s", example)
|
|
return True
|
|
|
|
|
|
class ExampleFileDNE(Lint):
|
|
name = "referenced example doesn't exist"
|
|
recommendation = "Add the referenced example to samples directory ($capa-root/tests/data or supplied via --samples)"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
if not rule.meta.get("examples"):
|
|
# let the MissingExamples lint catch this case, don't double report.
|
|
return False
|
|
|
|
found = False
|
|
for example in rule.meta.get("examples", []):
|
|
if example:
|
|
example_id = example.partition(":")[0]
|
|
if example_id in ctx.samples:
|
|
found = True
|
|
break
|
|
|
|
return not found
|
|
|
|
|
|
class IncorrectValueType(Lint):
|
|
name = "incorrect value type"
|
|
recommendation = "Change value type"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
try:
|
|
_ = RuleMetadata.from_capa(rule)
|
|
except pydantic.ValidationError as e:
|
|
self.recommendation = str(e).strip()
|
|
return True
|
|
return False
|
|
|
|
|
|
class InvalidAttckOrMbcTechnique(Lint):
|
|
name = "att&ck/mbc entry is malformed or does not exist"
|
|
recommendation = """
|
|
The att&ck and mbc fields must respect the following format:
|
|
<Tactic/Objective>::<Technique/Behavior> [<ID>]
|
|
OR
|
|
<Tactic/Objective>::<Technique/Behavior>::<Subtechnique/Method> [<ID.SubID>]
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
try:
|
|
data_path = Path(__file__).resolve().parent / "linter-data.json"
|
|
with data_path.open("rb") as fd:
|
|
self.data = json.load(fd)
|
|
self.enabled_frameworks = self.data.keys()
|
|
except (FileNotFoundError, json.decoder.JSONDecodeError):
|
|
# linter-data.json missing, or JSON error: log an error and skip this lint
|
|
logger.warning(
|
|
"Could not load 'scripts/linter-data.json'. The att&ck and mbc information will not be linted."
|
|
)
|
|
self.enabled_frameworks = []
|
|
|
|
# This regex matches the format defined in the recommendation attribute
|
|
self.reg = re.compile(r"^([\w\s-]+)::(.+) \[([A-Za-z0-9.]+)\]$")
|
|
|
|
def _entry_check(self, framework, category, entry, eid):
|
|
if category not in self.data[framework].keys():
|
|
self.recommendation = f'Unknown category: "{category}"'
|
|
return True
|
|
if eid not in self.data[framework][category].keys():
|
|
self.recommendation = f"Unknown entry ID: {eid}"
|
|
return True
|
|
if self.data[framework][category][eid] != entry:
|
|
self.recommendation = (
|
|
f'{eid} should be associated to entry "{self.data[framework][category][eid]}" instead of "{entry}"'
|
|
)
|
|
return True
|
|
return False
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
for framework in self.enabled_frameworks:
|
|
if framework in rule.meta:
|
|
for r in rule.meta[framework]:
|
|
m = self.reg.match(r)
|
|
if m is None:
|
|
return True
|
|
|
|
args = m.group(1, 2, 3)
|
|
if self._entry_check(framework, *args):
|
|
return True
|
|
return False
|
|
|
|
|
|
DEFAULT_SIGNATURES = capa.main.get_default_signatures()
|
|
|
|
|
|
def get_sample_capabilities(ctx: Context, path: Path) -> set[str]:
|
|
nice_path = path.resolve().absolute()
|
|
if path in ctx.capabilities_by_sample:
|
|
logger.debug("found cached results: %s: %d capabilities", nice_path, len(ctx.capabilities_by_sample[path]))
|
|
return ctx.capabilities_by_sample[path]
|
|
|
|
logger.debug("analyzing sample: %s", nice_path)
|
|
|
|
args = argparse.Namespace(input_file=nice_path, format=capa.main.FORMAT_AUTO, backend=capa.main.BACKEND_AUTO)
|
|
format_ = capa.main.get_input_format_from_cli(args)
|
|
backend = capa.main.get_backend_from_cli(args, format_)
|
|
|
|
extractor = capa.loader.get_extractor(
|
|
nice_path,
|
|
format_,
|
|
OS_AUTO,
|
|
backend,
|
|
DEFAULT_SIGNATURES,
|
|
should_save_workspace=False,
|
|
disable_progress=True,
|
|
)
|
|
|
|
capabilities, _ = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True)
|
|
# mypy doesn't seem to be happy with the MatchResults type alias & set(...keys())?
|
|
# so we ignore a few types here.
|
|
capabilities = set(capabilities.keys()) # type: ignore
|
|
assert isinstance(capabilities, set)
|
|
|
|
logger.debug("computed results: %s: %d capabilities", nice_path, len(capabilities))
|
|
ctx.capabilities_by_sample[path] = capabilities
|
|
|
|
# when i (wb) run the linter in thorough mode locally,
|
|
# the OS occasionally kills the process due to memory usage.
|
|
# so, be extra aggressive in keeping memory usage down.
|
|
#
|
|
# tbh, im not sure this actually does anything, but maybe it helps?
|
|
gc.collect()
|
|
|
|
return capabilities
|
|
|
|
|
|
class DoesntMatchExample(Lint):
|
|
name = "doesn't match on referenced example"
|
|
recommendation = "Fix the rule logic or provide a different example"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
if not ctx.is_thorough:
|
|
return False
|
|
|
|
examples = rule.meta.get("examples", [])
|
|
if not examples:
|
|
return False
|
|
|
|
for example in examples:
|
|
example_id = example.partition(":")[0]
|
|
try:
|
|
path = ctx.samples[example_id]
|
|
except KeyError:
|
|
# lint ExampleFileDNE will catch this.
|
|
# don't double report.
|
|
continue
|
|
|
|
try:
|
|
capabilities = get_sample_capabilities(ctx, path)
|
|
except Exception as e:
|
|
logger.exception("failed to extract capabilities: %s %s %s", rule.name, path, e)
|
|
return True
|
|
|
|
if rule.name not in capabilities:
|
|
return True
|
|
|
|
|
|
class StatementWithSingleChildStatement(Lint):
|
|
name = "rule contains one or more statements with a single child statement"
|
|
recommendation = "remove the superfluous parent statement"
|
|
recommendation_template = "remove the superfluous parent statement: {:s}"
|
|
violation = False
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
self.violation = False
|
|
|
|
def rec(statement, is_root=False):
|
|
if isinstance(statement, (capa.engine.And, capa.engine.Or)):
|
|
children = list(statement.get_children())
|
|
if not is_root and len(children) == 1 and isinstance(children[0], capa.engine.Statement):
|
|
self.recommendation = self.recommendation_template.format(str(statement))
|
|
self.violation = True
|
|
for child in children:
|
|
rec(child)
|
|
|
|
rec(rule.statement, is_root=True)
|
|
|
|
return self.violation
|
|
|
|
|
|
class OrStatementWithAlwaysTrueChild(Lint):
|
|
name = "rule contains an `or` statement that's always True because of an `optional` or other child statement that's always True"
|
|
recommendation = "clarify the rule logic, e.g. by moving the always True child statement"
|
|
recommendation_template = "clarify the rule logic, e.g. by moving the always True child statement: {:s}"
|
|
violation = False
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
self.violation = False
|
|
|
|
def rec(statement):
|
|
if isinstance(statement, capa.engine.Or):
|
|
children = list(statement.get_children())
|
|
for child in children:
|
|
# `Some` implements `optional` which is an alias for `0 or more`
|
|
if isinstance(child, capa.engine.Some) and child.count == 0:
|
|
self.recommendation = self.recommendation_template.format(str(child))
|
|
self.violation = True
|
|
rec(child)
|
|
|
|
rec(rule.statement)
|
|
|
|
return self.violation
|
|
|
|
|
|
class NotNotUnderAnd(Lint):
|
|
name = "rule contains a `not` statement that's not found under an `and` statement"
|
|
recommendation = "clarify the rule logic and ensure `not` is always found under `and`"
|
|
violation = False
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
self.violation = False
|
|
|
|
def rec(statement):
|
|
if isinstance(statement, capa.engine.Statement):
|
|
if not isinstance(statement, capa.engine.And):
|
|
for child in statement.get_children():
|
|
if isinstance(child, capa.engine.Not):
|
|
self.violation = True
|
|
|
|
for child in statement.get_children():
|
|
rec(child)
|
|
|
|
rec(rule.statement)
|
|
|
|
return self.violation
|
|
|
|
|
|
class OptionalNotUnderAnd(Lint):
|
|
name = "rule contains an `optional` or `0 or more` statement that's not found under an `and` statement"
|
|
recommendation = "clarify the rule logic and ensure `optional` and `0 or more` is always found under `and`"
|
|
violation = False
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
self.violation = False
|
|
|
|
def rec(statement):
|
|
if isinstance(statement, capa.engine.Statement):
|
|
if not isinstance(statement, capa.engine.And):
|
|
for child in statement.get_children():
|
|
if isinstance(child, capa.engine.Some) and child.count == 0:
|
|
self.violation = True
|
|
|
|
for child in statement.get_children():
|
|
rec(child)
|
|
|
|
rec(rule.statement)
|
|
|
|
return self.violation
|
|
|
|
|
|
class UnusualMetaField(Lint):
|
|
name = "unusual meta field"
|
|
recommendation = "Remove the meta field"
|
|
recommendation_template = 'Remove the meta field: "{:s}"'
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
for key in rule.meta.keys():
|
|
if key in capa.rules.META_KEYS:
|
|
continue
|
|
if key in capa.rules.HIDDEN_META_KEYS:
|
|
continue
|
|
self.recommendation = self.recommendation_template.format(key)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
class LibRuleNotInLibDirectory(Lint):
|
|
name = "lib rule not found in lib directory"
|
|
recommendation = "Move the rule to the `lib` subdirectory of the rules path"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
if is_nursery_rule(rule):
|
|
return False
|
|
|
|
if "lib" not in rule.meta:
|
|
return False
|
|
|
|
return "lib/" not in get_normpath(rule.meta["capa/path"])
|
|
|
|
|
|
class LibRuleHasNamespace(Lint):
|
|
name = "lib rule has a namespace"
|
|
recommendation = "Remove the namespace from the rule"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
if "lib" not in rule.meta:
|
|
return False
|
|
|
|
return "namespace" in rule.meta
|
|
|
|
|
|
class FeatureStringTooShort(Lint):
|
|
name = "feature string too short"
|
|
recommendation = 'capa only extracts strings with length >= 4; will not match on "{:s}"'
|
|
|
|
def check_features(self, ctx: Context, features: list[Feature]):
|
|
for feature in features:
|
|
if isinstance(feature, (String, Substring)):
|
|
assert isinstance(feature.value, str)
|
|
if len(feature.value) < 4:
|
|
self.recommendation = self.recommendation.format(feature.value)
|
|
return True
|
|
return False
|
|
|
|
|
|
class FeatureNegativeNumber(Lint):
|
|
name = "feature value is negative"
|
|
recommendation = "specify the number's two's complement representation"
|
|
recommendation_template = (
|
|
"capa treats number features as unsigned values; you may specify the number's two's complement "
|
|
+ 'representation; will not match on "{:d}"'
|
|
)
|
|
|
|
def check_features(self, ctx: Context, features: list[Feature]):
|
|
for feature in features:
|
|
if isinstance(feature, (capa.features.insn.Number,)):
|
|
assert isinstance(feature.value, int)
|
|
if feature.value < 0:
|
|
self.recommendation = self.recommendation_template.format(feature.value)
|
|
return True
|
|
return False
|
|
|
|
|
|
class FeatureNtdllNtoskrnlApi(Lint):
|
|
name = "feature api may overlap with ntdll and ntoskrnl"
|
|
level = Lint.WARN
|
|
recommendation_template = (
|
|
"check if {:s} is exported by both ntdll and ntoskrnl; if true, consider removing {:s} "
|
|
+ "module requirement to improve detection"
|
|
)
|
|
|
|
def check_features(self, ctx: Context, features: list[Feature]):
|
|
for feature in features:
|
|
if isinstance(feature, capa.features.insn.API):
|
|
assert isinstance(feature.value, str)
|
|
modname, _, impname = feature.value.rpartition(".")
|
|
|
|
if modname == "ntdll" and impname in (
|
|
"LdrGetProcedureAddress",
|
|
"LdrLoadDll",
|
|
"NtCreateThread",
|
|
"NtCreatUserProcess",
|
|
"NtLoadDriver",
|
|
"NtQueryDirectoryObject",
|
|
"NtResumeThread",
|
|
"NtSuspendThread",
|
|
"NtTerminateProcess",
|
|
"NtWriteVirtualMemory",
|
|
"RtlGetNativeSystemInformation",
|
|
"NtCreateThreadEx",
|
|
"NtCreateUserProcess",
|
|
"NtOpenDirectoryObject",
|
|
"NtQueueApcThread",
|
|
"ZwResumeThread",
|
|
"ZwSuspendThread",
|
|
"ZwWriteVirtualMemory",
|
|
"NtCreateProcess",
|
|
"ZwCreateThread",
|
|
"NtCreateProcessEx",
|
|
"ZwCreateThreadEx",
|
|
"ZwCreateProcess",
|
|
"ZwCreateUserProcess",
|
|
"RtlCreateUserProcess",
|
|
"NtProtectVirtualMemory",
|
|
"NtEnumerateSystemEnvironmentValuesEx",
|
|
"NtQuerySystemEnvironmentValueEx",
|
|
"NtQuerySystemEnvironmentValue",
|
|
):
|
|
# ntoskrnl.exe does not export these routines
|
|
continue
|
|
|
|
if modname == "ntoskrnl" and impname in (
|
|
"PsGetVersion",
|
|
"PsLookupProcessByProcessId",
|
|
"KeStackAttachProcess",
|
|
"ObfDereferenceObject",
|
|
"KeUnstackDetachProcess",
|
|
"ExGetFirmwareEnvironmentVariable",
|
|
):
|
|
# ntdll.dll does not export these routines
|
|
continue
|
|
|
|
if modname in ("ntdll", "ntoskrnl"):
|
|
self.recommendation = self.recommendation_template.format(impname, modname)
|
|
return True
|
|
return False
|
|
|
|
|
|
class FormatSingleEmptyLineEOF(Lint):
|
|
name = "EOF format"
|
|
recommendation = "end file with a single empty line"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
if rule.definition.endswith("\n") and not rule.definition.endswith("\n\n"):
|
|
return False
|
|
return True
|
|
|
|
|
|
class FormatIncorrect(Lint):
|
|
name = "rule format incorrect"
|
|
recommendation_template = "use scripts/capafmt.py or adjust as follows\n{:s}"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
# EOL depends on Git and our .gitattributes defines text=auto (Git handles files it thinks is best)
|
|
# we prefer LF only, but enforcing across OSs seems tedious and unnecessary
|
|
actual = rule.definition.replace("\r\n", "\n")
|
|
expected = capa.rules.Rule.from_yaml(rule.definition, use_ruamel=True).to_yaml()
|
|
|
|
if actual != expected:
|
|
diff = difflib.ndiff(actual.splitlines(1), expected.splitlines(True))
|
|
recommendation_template = self.recommendation_template
|
|
self.recommendation = recommendation_template.format("".join(diff))
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
class FormatStringQuotesIncorrect(Lint):
|
|
name = "rule string quotes incorrect"
|
|
|
|
def check_rule(self, ctx: Context, rule: Rule):
|
|
events = capa.rules.Rule._get_ruamel_yaml_parser().parse(rule.definition)
|
|
for key in events:
|
|
if isinstance(key, ruamel.yaml.ScalarEvent) and key.value == "string":
|
|
value = next(events) # assume value is next event
|
|
if not isinstance(value, ruamel.yaml.ScalarEvent):
|
|
# ignore non-scalar
|
|
continue
|
|
if value.value.startswith("/") and value.value.endswith(("/", "/i")):
|
|
# ignore regex for now
|
|
continue
|
|
if value.style is None:
|
|
# no quotes
|
|
self.recommendation = f'add double quotes to "{value.value}"'
|
|
return True
|
|
if value.style == "'":
|
|
# single quote
|
|
self.recommendation = f'change single quotes to double quotes for "{value.value}"'
|
|
return True
|
|
|
|
elif isinstance(key, ruamel.yaml.ScalarEvent) and key.value == "substring":
|
|
value = next(events) # assume value is next event
|
|
if not isinstance(value, ruamel.yaml.ScalarEvent):
|
|
# ignore non-scalar
|
|
continue
|
|
if value.style is None:
|
|
# no quotes
|
|
self.recommendation = f'add double quotes to "{value.value}"'
|
|
return True
|
|
if value.style == "'":
|
|
# single quote
|
|
self.recommendation = f'change single quotes to double quotes for "{value.value}"'
|
|
return True
|
|
|
|
else:
|
|
continue
|
|
|
|
return False
|
|
|
|
|
|
def run_lints(lints, ctx: Context, rule: Rule):
|
|
for lint in lints:
|
|
if lint.check_rule(ctx, rule):
|
|
yield lint
|
|
|
|
|
|
def run_feature_lints(lints, ctx: Context, features: list[Feature]):
|
|
for lint in lints:
|
|
if lint.check_features(ctx, features):
|
|
yield lint
|
|
|
|
|
|
NAME_LINTS = (
|
|
NameCasing(),
|
|
FilenameDoesntMatchRuleName(),
|
|
)
|
|
|
|
|
|
def lint_name(ctx: Context, rule: Rule):
|
|
return run_lints(NAME_LINTS, ctx, rule)
|
|
|
|
|
|
SCOPES_LINTS = (
|
|
MissingScopes(),
|
|
MissingStaticScope(),
|
|
MissingDynamicScope(),
|
|
InvalidStaticScope(),
|
|
InvalidDynamicScope(),
|
|
InvalidScopes(),
|
|
)
|
|
|
|
|
|
def lint_scope(ctx: Context, rule: Rule):
|
|
return run_lints(SCOPES_LINTS, ctx, rule)
|
|
|
|
|
|
META_LINTS = (
|
|
MissingNamespace(),
|
|
NamespaceDoesntMatchRulePath(),
|
|
MissingAuthors(),
|
|
MissingExamples(),
|
|
MissingExampleOffset(),
|
|
ExampleFileDNE(),
|
|
UnusualMetaField(),
|
|
LibRuleNotInLibDirectory(),
|
|
LibRuleHasNamespace(),
|
|
InvalidAttckOrMbcTechnique(),
|
|
IncorrectValueType(),
|
|
)
|
|
|
|
|
|
def lint_meta(ctx: Context, rule: Rule):
|
|
return run_lints(META_LINTS, ctx, rule)
|
|
|
|
|
|
FEATURE_LINTS = (FeatureStringTooShort(), FeatureNegativeNumber(), FeatureNtdllNtoskrnlApi())
|
|
|
|
|
|
def lint_features(ctx: Context, rule: Rule):
|
|
features = get_features(ctx, rule)
|
|
return run_feature_lints(FEATURE_LINTS, ctx, features)
|
|
|
|
|
|
FORMAT_LINTS = (
|
|
FormatSingleEmptyLineEOF(),
|
|
FormatStringQuotesIncorrect(),
|
|
FormatIncorrect(),
|
|
)
|
|
|
|
|
|
def lint_format(ctx: Context, rule: Rule):
|
|
return run_lints(FORMAT_LINTS, ctx, rule)
|
|
|
|
|
|
def get_normpath(path):
|
|
return posixpath.normpath(path).replace(os.sep, "/")
|
|
|
|
|
|
def get_features(ctx: Context, rule: Rule):
|
|
# get features from rule and all dependencies including subscopes and matched rules
|
|
features = []
|
|
namespaces = ctx.rules.rules_by_namespace
|
|
deps = [ctx.rules.rules[dep] for dep in rule.get_dependencies(namespaces)]
|
|
for r in [rule] + deps:
|
|
features.extend(get_rule_features(r))
|
|
return features
|
|
|
|
|
|
def get_rule_features(rule):
|
|
features = []
|
|
|
|
def rec(statement):
|
|
if isinstance(statement, capa.engine.Statement):
|
|
for child in statement.get_children():
|
|
rec(child)
|
|
else:
|
|
features.append(statement)
|
|
|
|
rec(rule.statement)
|
|
return features
|
|
|
|
|
|
LOGIC_LINTS = (
|
|
DoesntMatchExample(),
|
|
StatementWithSingleChildStatement(),
|
|
OrStatementWithAlwaysTrueChild(),
|
|
NotNotUnderAnd(),
|
|
OptionalNotUnderAnd(),
|
|
)
|
|
|
|
|
|
def lint_logic(ctx: Context, rule: Rule):
|
|
return run_lints(LOGIC_LINTS, ctx, rule)
|
|
|
|
|
|
def is_nursery_rule(rule):
|
|
"""
|
|
The nursery is a spot for rules that have not yet been fully polished.
|
|
For example, they may not have references to public example of a technique.
|
|
Yet, we still want to capture and report on their matches.
|
|
"""
|
|
return rule.meta.get("capa/nursery")
|
|
|
|
|
|
def lint_rule(ctx: Context, rule: Rule):
|
|
logger.debug(rule.name)
|
|
|
|
violations = list(
|
|
itertools.chain(
|
|
lint_name(ctx, rule),
|
|
lint_scope(ctx, rule),
|
|
lint_meta(ctx, rule),
|
|
lint_logic(ctx, rule),
|
|
lint_features(ctx, rule),
|
|
lint_format(ctx, rule),
|
|
)
|
|
)
|
|
|
|
if len(violations) > 0:
|
|
# don't show nursery rules with a single violation: needs examples.
|
|
# this is by far the most common reason to be in the nursery,
|
|
# and ends up just producing a lot of noise.
|
|
if not (is_nursery_rule(rule) and len(violations) == 1 and violations[0].name == "missing examples"):
|
|
print("")
|
|
print(f'{" (nursery) " if is_nursery_rule(rule) else ""} {rule.name}')
|
|
|
|
for violation in violations:
|
|
print(
|
|
f"{' ' if is_nursery_rule(rule) else ''} {Lint.WARN if is_nursery_rule(rule) else violation.level}: {violation.name}: {violation.recommendation}"
|
|
)
|
|
print("")
|
|
|
|
if is_nursery_rule(rule):
|
|
has_examples = not any(v.level == Lint.FAIL and v.name == "missing examples" for v in violations)
|
|
lints_failed = len(
|
|
tuple(
|
|
filter(
|
|
lambda v: v.level == Lint.FAIL
|
|
and not (v.name == "missing examples" or v.name == "referenced example doesn't exist"),
|
|
violations,
|
|
)
|
|
)
|
|
)
|
|
lints_warned = len(
|
|
tuple(
|
|
filter(
|
|
lambda v: v.level == Lint.WARN
|
|
or (v.level == Lint.FAIL and v.name == "referenced example doesn't exist"),
|
|
violations,
|
|
)
|
|
)
|
|
)
|
|
|
|
if (not lints_failed) and (not lints_warned) and has_examples:
|
|
print("")
|
|
print(f'{" (nursery) " if is_nursery_rule(rule) else ""} {rule.name}')
|
|
print(f" {Lint.WARN}: '[green]no lint failures[/green]': Graduate the rule")
|
|
print("")
|
|
else:
|
|
lints_failed = len(tuple(filter(lambda v: v.level == Lint.FAIL, violations)))
|
|
lints_warned = len(tuple(filter(lambda v: v.level == Lint.WARN, violations)))
|
|
|
|
return (lints_failed, lints_warned)
|
|
|
|
|
|
def width(s, count):
|
|
if len(s) > count:
|
|
return s[: count - 3] + "..."
|
|
else:
|
|
return s.ljust(count)
|
|
|
|
|
|
def lint(ctx: Context):
|
|
"""
|
|
Returns: dict[string, tuple(int, int)]
|
|
- # lints failed
|
|
- # lints warned
|
|
"""
|
|
ret = {}
|
|
|
|
source_rules = [rule for rule in ctx.rules.rules.values() if not rule.is_subscope_rule()]
|
|
n_rules: int = len(source_rules)
|
|
|
|
with capa.helpers.CapaProgressBar(transient=True, console=capa.helpers.log_console) as pbar:
|
|
task = pbar.add_task(description="linting", total=n_rules, unit="rule")
|
|
for rule in source_rules:
|
|
name = rule.name
|
|
pbar.update(task, description=width(f"linting rule: {name}", 48))
|
|
ret[name] = lint_rule(ctx, rule)
|
|
pbar.advance(task)
|
|
|
|
return ret
|
|
|
|
|
|
def collect_samples(samples_path: Path) -> dict[str, Path]:
|
|
"""
|
|
recurse through the given path, collecting all file paths, indexed by their content sha256, md5, and filename.
|
|
"""
|
|
samples = {}
|
|
for path in samples_path.rglob("*"):
|
|
if path.suffix in [".viv", ".idb", ".i64", ".frz", ".fnames"]:
|
|
continue
|
|
|
|
try:
|
|
buf = path.read_bytes()
|
|
except IOError:
|
|
continue
|
|
|
|
sha256 = hashlib.sha256()
|
|
sha256.update(buf)
|
|
|
|
md5 = hashlib.md5()
|
|
md5.update(buf)
|
|
|
|
samples[sha256.hexdigest().lower()] = path
|
|
samples[sha256.hexdigest().upper()] = path
|
|
samples[md5.hexdigest().lower()] = path
|
|
samples[md5.hexdigest().upper()] = path
|
|
samples[path.name] = path
|
|
|
|
return samples
|
|
|
|
|
|
def main(argv=None):
|
|
if argv is None:
|
|
argv = sys.argv[1:]
|
|
|
|
default_samples_path = str(Path(__file__).resolve().parent.parent / "tests" / "data")
|
|
|
|
parser = argparse.ArgumentParser(description="Lint capa rules.")
|
|
capa.main.install_common_args(parser, wanted={"tag"})
|
|
parser.add_argument("rules", type=str, action="append", help="Path to rules")
|
|
parser.add_argument("--samples", type=str, default=default_samples_path, help="Path to samples")
|
|
parser.add_argument(
|
|
"--thorough",
|
|
action="store_true",
|
|
help="Enable thorough linting - takes more time, but does a better job",
|
|
)
|
|
args = parser.parse_args(args=argv)
|
|
|
|
try:
|
|
capa.main.handle_common_args(args)
|
|
except capa.main.ShouldExitError as e:
|
|
return e.status_code
|
|
|
|
if args.debug:
|
|
logging.getLogger("capa").setLevel(logging.DEBUG)
|
|
logging.getLogger("viv_utils").setLevel(logging.DEBUG)
|
|
else:
|
|
logging.getLogger("capa").setLevel(logging.ERROR)
|
|
logging.getLogger("viv_utils").setLevel(logging.ERROR)
|
|
|
|
time0 = time.time()
|
|
|
|
try:
|
|
rules = capa.main.get_rules_from_cli(args)
|
|
except capa.main.ShouldExitError as e:
|
|
return e.status_code
|
|
|
|
logger.info("collecting potentially referenced samples")
|
|
samples_path = Path(args.samples)
|
|
if not samples_path.exists():
|
|
logger.error("samples path %s does not exist", Path(samples_path))
|
|
return -1
|
|
|
|
samples = collect_samples(Path(samples_path))
|
|
|
|
ctx = Context(samples=samples, rules=rules, is_thorough=args.thorough)
|
|
|
|
results_by_name = lint(ctx)
|
|
failed_rules = []
|
|
warned_rules = []
|
|
for name, (fail_count, warn_count) in results_by_name.items():
|
|
if fail_count > 0:
|
|
failed_rules.append(name)
|
|
|
|
if warn_count > 0:
|
|
warned_rules.append(name)
|
|
|
|
min, sec = divmod(time.time() - time0, 60)
|
|
logger.debug("lints ran for ~ %02d:%02dm", min, sec)
|
|
|
|
if warned_rules:
|
|
print("[yellow]rules with WARN:[/yellow]")
|
|
for warned_rule in sorted(warned_rules):
|
|
print(" - " + warned_rule)
|
|
print()
|
|
|
|
if failed_rules:
|
|
print("[red]rules with FAIL:[/red]")
|
|
for failed_rule in sorted(failed_rules):
|
|
print(" - " + failed_rule)
|
|
return 1
|
|
else:
|
|
logger.info("[green]no lints failed, nice![/green]")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|