Merge pull request #34 from fireeye/rule-organization

update output rendering
This commit is contained in:
Willi Ballenthin
2020-06-29 13:07:51 -06:00
committed by GitHub
17 changed files with 1095 additions and 238 deletions

216
README.md
View File

@@ -5,23 +5,51 @@ You run it against a .exe or .dll and it tells you what it thinks the program ca
For example, it might suggest that the file is a backdoor, is capable of installing services, or relies on HTTP to communicate.
```
λ capa.exe suspicious.exe -q
$ capa.exe suspicious.exe
objectives:
communication
data manipulation
machine access control
behaviors:
communication-via-http
encrypt data
load code functionality
techniques:
send-http-request
encrypt data using rc4
load pe
+------------------------+----------------------------------------------------------------------+
| ATT&CK Tactic | ATT&CK Technique |
|------------------------+----------------------------------------------------------------------|
| DEFENSE EVASION | Obfuscated Files or Information [T1027] |
| DISCOVERY | Query Registry [T1012] |
| | System Information Discovery [T1082] |
| EXECUTION | Command and Scripting Interpreter::Windows Command Shell [T1059.003] |
| | Shared Modules [T1129] |
| EXFILTRATION | Exfiltration Over C2 Channel [T1041] |
| PERSISTENCE | Create or Modify System Process::Windows Service [T1543.003] |
+------------------------+----------------------------------------------------------------------+
+-------------------------------------------------------+-------------------------------------------------+
| CAPABILITY | NAMESPACE |
|-------------------------------------------------------+-------------------------------------------------|
| check for OutputDebugString error | anti-analysis/anti-debugging/debugger-detection |
| read and send data from client to server | c2/file-transfer |
| execute shell command and capture output | c2/shell |
| receive data (2 matches) | communication |
| send data (6 matches) | communication |
| connect to HTTP server (3 matches) | communication/http/client |
| send HTTP request (3 matches) | communication/http/client |
| create pipe | communication/named-pipe/create |
| get socket status (2 matches) | communication/socket |
| receive data on socket (2 matches) | communication/socket/receive |
| send data on socket (3 matches) | communication/socket/send |
| connect TCP socket | communication/socket/tcp |
| encode data using Base64 | data-manipulation/encoding/base64 |
| encode data using XOR (6 matches) | data-manipulation/encoding/xor |
| run as a service | executable/pe |
| get common file path (3 matches) | host-interaction/file-system |
| read file | host-interaction/file-system/read |
| write file (2 matches) | host-interaction/file-system/write |
| print debug messages (2 matches) | host-interaction/log/debug/write-event |
| resolve DNS | host-interaction/network/dns/resolve |
| get hostname | host-interaction/os/hostname |
| create a process with modified I/O handles and window | host-interaction/process/create |
| create process | host-interaction/process/create |
| create registry key | host-interaction/registry/create |
| create service | host-interaction/service/create |
| create thread | host-interaction/thread/create |
| persist via Windows service | persistence/service |
+-------------------------------------------------------+-------------------------------------------------+
```
# download
@@ -66,57 +94,92 @@ For more information about how to use capa, including running it as an IDA scrip
# example
Here we run capa against an unknown binary (`level32.exe`),
Here we run capa against an unknown binary (`suspicious.exe`),
and the tool reports that the program can decode data via XOR,
references data in its resource section, writes to a file, and spawns a new process.
Taken together, this makes us think that `level32.exe` could be a dropper.
Therefore, our next analysis step might be to run `level32.exe` in a sandbox and try to recover the payload.
contains an embedded PE, writes to a file, and spawns a new process.
Taken together, this makes us think that `suspicious.exe` could be a dropper or backdoor.
Therefore, our next analysis step might be to run `suspicious.exe` in a sandbox and try to recover the payload.
```
λ capa.exe level32.exe -q
disposition: malicious
category: dropper
$ capa.exe suspicious.exe
objectives:
data manipulation
machine access control
+------------------------+----------------------------------------------------------------------+
| ATT&CK Tactic | ATT&CK Technique |
|------------------------+----------------------------------------------------------------------|
| DEFENSE EVASION | Obfuscated Files or Information [T1027] |
| DISCOVERY | Query Registry [T1012] |
| | System Information Discovery [T1082] |
| EXECUTION | Command and Scripting Interpreter::Windows Command Shell [T1059.003] |
| | Shared Modules [T1129] |
| EXFILTRATION | Exfiltration Over C2 Channel [T1041] |
| PERSISTENCE | Create or Modify System Process::Windows Service [T1543.003] |
+------------------------+----------------------------------------------------------------------+
behaviors:
encrypt data
load code functionality
techniques:
encrypt data using rc4
load pe
anomalies:
embedded PE file
+-------------------------------------------------------+-------------------------------------------------+
| CAPABILITY | NAMESPACE |
|-------------------------------------------------------+-------------------------------------------------|
| check for OutputDebugString error | anti-analysis/anti-debugging/debugger-detection |
| read and send data from client to server | c2/file-transfer |
| execute shell command and capture output | c2/shell |
| receive data (2 matches) | communication |
| send data (6 matches) | communication |
| connect to HTTP server (3 matches) | communication/http/client |
| send HTTP request (3 matches) | communication/http/client |
| create pipe | communication/named-pipe/create |
| get socket status (2 matches) | communication/socket |
| receive data on socket (2 matches) | communication/socket/receive |
| send data on socket (3 matches) | communication/socket/send |
| connect TCP socket | communication/socket/tcp |
| encode data using Base64 | data-manipulation/encoding/base64 |
| encode data using XOR (6 matches) | data-manipulation/encoding/xor |
| run as a service | executable/pe |
| contain an embedded PE file | executable/subfile/pe |
| get common file path (3 matches) | host-interaction/file-system |
| read file | host-interaction/file-system/read |
| write file (2 matches) | host-interaction/file-system/write |
| print debug messages (2 matches) | host-interaction/log/debug/write-event |
| resolve DNS | host-interaction/network/dns/resolve |
| get hostname | host-interaction/os/hostname |
| create a process with modified I/O handles and window | host-interaction/process/create |
| create process | host-interaction/process/create |
| create registry key | host-interaction/registry/create |
| create service | host-interaction/service/create |
| create thread | host-interaction/thread/create |
| persist via Windows service | persistence/service |
+-------------------------------------------------------+-------------------------------------------------+
```
By passing the `-vv` flag (for Very Verbose), capa reports exactly where it found evidence of these capabilities.
This is useful for at least two reasons:
- it helps explain why we should trust the results, and enables us to verify the conclusions
- it helps explain why we should trust the results, and enables us to verify the conclusions, and
- it shows where within the binary an experienced analyst might study with IDA Pro
```
λ capa.exe level32.exe -q -vv
rule load PE file:
- function 0x401c58:
λ capa.exe suspicious.exe -vv
execute shell command and capture output
namespace c2/shell
author matthew.williams@fireeye.com
scope function
att&ck Execution::Command and Scripting Interpreter::Windows Command Shell [T1059.003]
references https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/ns-processthreadsapi-startupinfoa
examples Practical Malware Analysis Lab 14-02.exe_:0x4011C0
function @ 0x10003A13
and:
match: create a process with modified I/O handles and window @ 0x10003A13
and:
or:
api: kernel32.CreateProcess @ 0x10003D6D
number: 0x101 @ 0x10003B03
or:
number: 0x44 @ 0x10003ADC
optional:
api: kernel32.GetStartupInfo @ 0x10003AE4
match: create pipe @ 0x10003A13
or:
and:
mnemonic(cmp):
- virtual address: 0x401c58
- virtual address: 0x401c68
- virtual address: 0x401c74
- virtual address: 0x401c7f
- virtual address: 0x401c8a
or:
number(0x4550):
- virtual address: 0x401c68
or:
number(0x5a4d):
- virtual address: 0x401c58
api: kernel32.CreatePipe @ 0x10003ACB
or:
string: cmd.exe /c @ 0x10003AED
...
```
@@ -131,24 +194,27 @@ In some regards, capa rules are a mixture of the OpenIOC, Yara, and YAML formats
Here's an example rule used by capa:
```
───────┬────────────────────────────────────────────────────────
│ File: rules/calculate-crc32.yml
───────┼────────────────────────────────────────────────────────
───────┬──────────────────────────────────────────────────────────────────────────
│ File: rules/data-manipulation/checksum/crc32/chechsum-data-with-crc32.yml
───────┼──────────────────────────────────────────────────────────────────────────
1 │ rule:
2 │ meta:
3 │ name: calculate CRC32
4 | rule-category: data-manipulation/hash-data/hash-data-using-crc32
3 │ name: checksum data with CRC32
4 namespace: data-manipulation/checksum/crc32
5 │ author: moritz.raabe@fireeye.com
6 │ scope: function
7 │ examples:
8 │ - 2D3EDC218A90F03089CC01715A9F047F:0x403CBD
9 │ features:
10 │ - and:
11 │ - mnemonic: shr
12 │ - number: 0xEDB88320
13 │ - number: 8
14 │ - characteristic(nzxor): True
───────┴────────────────────────────────────────────────────────
9 │ - 7D28CB106CB54876B2A5C111724A07CD:0x402350 # RtlComputeCrc32
10 │ features:
11 │ - or:
12 │ - and:
13 │ - mnemonic: shr
14 │ - number: 0xEDB88320
15 │ - number: 8
16 │ - characteristic(nzxor): true
17 │ - api: RtlComputeCrc32
──────────────────────────────────────────────────────────────────────────────────
```
Rules are yaml files that follow a certain schema.
@@ -159,18 +225,22 @@ The top-level element is a dictionary named `rule` with two required children di
## meta block
The meta block contains metadata that identifies the rule, categorizes into behaviors,
The meta block contains metadata that identifies the rule, groups the technique,
and provides references to additional documentation.
Here are the common fields:
- `name` is required. This string should uniquely identify the rule.
- `rule-category` is required when a rule describes a behavior (as opposed to matching a role or disposition).
The rule category specifies an objective, behavior, and technique matched by this rule,
using a format like `$objective/$behavior/$technique`.
An objective is a high-level goal of a program, such as "communication".
A behavior is something that a program may do, such as "communication via socket".
A technique is a way of implementing some behavior, such as "send-data".
- `namespace` is required when a rule describes a technique (as opposed to matching a role or disposition).
The namespace helps us group rules into buckets, such as `host-manipulation/file-system` or `impact/wipe-disk`.
When capa emits its final report, it orders the results by category, so related techniques show up together.
- `att&ck` is an optional list of [ATT&CK framework](https://attack.mitre.org/) techniques that the rule implies, like
`Discovery::Query Registry [T1012]` or `Persistence::Create or Modify System Process::Windows Service [T1543.003]`.
These tags are used to derive the ATT&CK mapping for the sample when the report gets rendered.
- `mbc` is an optional list of [Malware Behavior Catalog](https://github.com/MBCProject/mbc-markdown) techniques that the rule implies,
like the ATT&CK list.
- `maec/malware-category` is required when the rule describes a role, such as `dropper` or `backdoor`.
@@ -189,10 +259,10 @@ A technique is a way of implementing some behavior, such as "send-data".
- `author` specifies the name or handle of the rule author.
- `examples` is a list of references to samples that should match the capability.
- `examples` is a required list of references to samples that should match the capability.
When the rule scope is `function`, then the reference should be `<sample hash>:<function va>`.
- `reference` lists related information in a book, article, blog post, etc.
- `references` lists related information in a book, article, blog post, etc.
Other fields are allowed but not defined in this specification. `description` is probably a good one.

View File

@@ -155,7 +155,7 @@ class Range(Statement):
def evaluate(self, ctx):
if self.child not in ctx:
return Result(False, self, [self.child])
return Result(False, self, [])
count = len(ctx[self.child])
return Result(self.min <= count <= self.max, self, [], locations=ctx[self.child])
@@ -216,7 +216,7 @@ class Subscope(Statement):
def topologically_order_rules(rules):
'''
order the given rules such that dependencies show up before dependents.
this means that as we match rules, we can add features, and these
this means that as we match rules, we can add features for the matches, and these
will be matched by subsequent rules if they follow this order.
assumes that the rule dependency graph is a DAG.

View File

@@ -106,7 +106,7 @@ class Bytes(Feature):
def freeze_serialize(self):
return (self.__class__.__name__,
map(lambda x: codecs.encode(x, 'hex'), self.args))
map(lambda x: codecs.encode(x, 'hex').upper(), self.args))
@classmethod
def freeze_deserialize(cls, args):

View File

@@ -10,9 +10,11 @@ import collections
import tqdm
import argparse
import colorama
import capa.rules
import capa.engine
import capa.render
import capa.features
import capa.features.freeze
import capa.features.extractors
@@ -110,6 +112,7 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
matches.update(all_bb_matches)
matches.update(all_function_matches)
matches.update(all_file_matches)
return matches
@@ -415,18 +418,18 @@ def render_capabilities_vverbose(ruleset, results):
render_result(res, indent=' ')
def appears_rule_cat(rules, capabilities, rule_cat):
def has_rule_with_namespace(rules, capabilities, rule_cat):
for rule_name in capabilities.keys():
if rules.rules[rule_name].meta.get('rule-category', '').startswith(rule_cat):
if rules.rules[rule_name].meta.get('namespace', '').startswith(rule_cat):
return True
return False
def is_file_limitation(rules, capabilities, is_standalone=True):
def has_file_limitation(rules, capabilities, is_standalone=True):
file_limitations = {
# capa will likely detect installer specific functionality.
# this is probably not what the user wants.
'other-features/installer/': [
'executable/installer': [
' This sample appears to be an installer.',
' ',
' capa cannot handle installers well. This means the results may be misleading or incomplete.'
@@ -435,7 +438,7 @@ def is_file_limitation(rules, capabilities, is_standalone=True):
# capa won't detect much in .NET samples.
# it might match some file-level things.
# for consistency, bail on things that we don't support.
'other-features/compiled-to-dot-net': [
'runtime/dotnet': [
' This sample appears to be a .NET module.',
' ',
' .NET is a cross-platform framework for running managed applications.',
@@ -445,7 +448,7 @@ def is_file_limitation(rules, capabilities, is_standalone=True):
# capa will detect dozens of capabilities for AutoIt samples,
# but these are due to the AutoIt runtime, not the payload script.
# so, don't confuse the user with FP matches - bail instead
'other-features/compiled-with-autoit': [
'compiler/autoit': [
' This sample appears to be compiled with AutoIt.',
' ',
' AutoIt is a freeware BASIC-like scripting language designed for automating the Windows GUI.',
@@ -453,7 +456,7 @@ def is_file_limitation(rules, capabilities, is_standalone=True):
' You may have to analyze the file manually, using a tool like the AutoIt decompiler MyAut2Exe.'
],
# capa won't detect much in packed samples
'anti-analysis/packing/': [
'anti-analysis/packer/': [
' This sample appears to be packed.',
' ',
' Packed samples have often been obfuscated to hide their logic.',
@@ -463,7 +466,7 @@ def is_file_limitation(rules, capabilities, is_standalone=True):
}
for category, dialogue in file_limitations.items():
if not appears_rule_cat(rules, capabilities, category):
if not has_rule_with_namespace(rules, capabilities, category):
continue
logger.warning('-' * 80)
for line in dialogue:
@@ -583,38 +586,35 @@ def get_rules(rule_path):
if not os.path.exists(rule_path):
raise IOError('%s does not exist or cannot be accessed' % rule_path)
rules = []
rule_paths = []
if os.path.isfile(rule_path):
logger.info('reading rule file: %s', rule_path)
with open(rule_path, 'rb') as f:
rule = capa.rules.Rule.from_yaml(f.read().decode('utf-8'))
if is_nursery_rule_path(rule_path):
rule.meta['nursery'] = True
rules.append(rule)
logger.debug('rule: %s scope: %s', rule.name, rule.scope)
rule_paths.append(rule_path)
elif os.path.isdir(rule_path):
logger.info('reading rules from directory %s', rule_path)
logger.debug('reading rules from directory %s', rule_path)
for root, dirs, files in os.walk(rule_path):
for file in files:
if not file.endswith('.yml'):
logger.warning('skipping non-.yml file: %s', file)
continue
path = os.path.join(root, file)
logger.debug('reading rule file: %s', path)
try:
rule = capa.rules.Rule.from_yaml_file(path)
except capa.rules.InvalidRule:
raise
else:
if is_nursery_rule_path(root):
rule.meta['nursery'] = True
rule_path = os.path.join(root, file)
rule_paths.append(rule_path)
rules = []
for rule_path in rule_paths:
logger.debug('reading rule file: %s', rule_path)
try:
rule = capa.rules.Rule.from_yaml_file(rule_path)
except capa.rules.InvalidRule:
raise
else:
rule.meta['capa/path'] = rule_path
if is_nursery_rule_path(rule_path):
rule.meta['capa/nursery'] = True
rules.append(rule)
logger.debug('rule: %s scope: %s', rule.name, rule.scope)
rules.append(rule)
logger.debug('rule: %s scope: %s', rule.name, rule.scope)
return rules
@@ -638,10 +638,14 @@ def main(argv=None):
help='Path to rule file or directory, use embedded rules by default')
parser.add_argument('-t', '--tag', type=str,
help='Filter on rule meta field values')
parser.add_argument('-j', '--json', action='store_true',
help='Emit JSON instead of text')
parser.add_argument('-v', '--verbose', action='store_true',
help='Enable verbose output')
help='Enable verbose result document (no effect with --json)')
parser.add_argument('-vv', '--vverbose', action='store_true',
help='Enable very verbose output')
help='Enable very verbose result document (no effect with --json)')
parser.add_argument('-d', '--debug', action='store_true',
help='Enable debugging output on STDERR')
parser.add_argument('-q', '--quiet', action='store_true',
help='Disable all output but errors')
parser.add_argument('-f', '--format', choices=[f[0] for f in formats], default='auto',
@@ -651,7 +655,7 @@ def main(argv=None):
if args.quiet:
logging.basicConfig(level=logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
elif args.verbose:
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
@@ -732,18 +736,26 @@ def main(argv=None):
capabilities = find_capabilities(rules, extractor)
if is_file_limitation(rules, capabilities):
if has_file_limitation(rules, capabilities):
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose):
return -1
if args.vverbose:
render_capabilities_vverbose(rules, capabilities)
# colorama will detect:
# - when on Windows console, and fixup coloring, and
# - when not an interactive session, and disable coloring
# renderers should use coloring and assume it will be stripped out if necessary.
colorama.init()
if args.json:
print(capa.render.render_json(rules, capabilities))
elif args.vverbose:
print(capa.render.render_vverbose(rules, capabilities))
elif args.verbose:
render_capabilities_verbose(rules, capabilities)
print(capa.render.render_verbose(rules, capabilities))
else:
render_capabilities_default(rules, capabilities)
print(capa.render.render_default(rules, capabilities))
colorama.deinit()
logger.info('done.')
@@ -781,7 +793,7 @@ def ida_main():
import capa.features.extractors.ida
capabilities = find_capabilities(rules, capa.features.extractors.ida.IdaFeatureExtractor())
if is_file_limitation(rules, capabilities, is_standalone=False):
if has_file_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui('capa encountered warnings during analysis')
render_capabilities_default(rules, capabilities)

278
capa/render/__init__.py Normal file
View File

@@ -0,0 +1,278 @@
import json
import capa.engine
def convert_statement_to_result_document(statement):
"""
"statement": {
"type": "or"
},
"statement": {
"max": 9223372036854775808,
"min": 2,
"type": "range"
},
"""
if isinstance(statement, capa.engine.And):
return {
'type': 'and',
}
elif isinstance(statement, capa.engine.Or):
return {
'type': 'or',
}
elif isinstance(statement, capa.engine.Not):
return {
'type': 'not',
}
elif isinstance(statement, capa.engine.Some) and statement.count == 0:
return {
'type': 'optional'
}
elif isinstance(statement, capa.engine.Some) and statement.count > 0:
return {
'type': 'some',
'count': statement.count,
}
elif isinstance(statement, capa.engine.Range):
return {
'type': 'range',
'min': statement.min,
'max': statement.max,
'child': convert_feature_to_result_document(statement.child),
}
elif isinstance(statement, capa.engine.Regex):
return {
'type': 'regex',
'pattern': statement.pattern,
# the string that was matched
'match': statement.match,
}
elif isinstance(statement, capa.engine.Subscope):
return {
'type': 'subscope',
'subscope': statement.scope,
}
else:
raise RuntimeError("unexpected match statement type: " + str(statement))
def convert_feature_to_result_document(feature):
"""
"feature": {
"number": 6,
"type": "number"
},
"feature": {
"api": "ws2_32.WSASocket",
"type": "api"
},
"feature": {
"match": "create TCP socket",
"type": "match"
},
"feature": {
"characteristic": [
"loop",
true
],
"type": "characteristic"
},
"""
name, value = feature.freeze_serialize()
# make the terms pretty
name = name.lower()
if name == 'matchedrule':
name = 'match'
# in the common case, there's a single argument
# so use it directly.
# like: name=number value=1
if isinstance(value, list) and len(value) == 1:
value = value[0]
return {
'type': name,
name: value,
}
def convert_node_to_result_document(node):
"""
"node": {
"type": "statement",
"statement": { ... }
},
"node": {
"type": "feature",
"feature": { ... }
},
"""
if isinstance(node, capa.engine.Statement):
return {
'type': 'statement',
'statement': convert_statement_to_result_document(node),
}
elif isinstance(node, capa.features.Feature):
return {
'type': 'feature',
'feature': convert_feature_to_result_document(node),
}
else:
raise RuntimeError("unexpected match node type")
def convert_match_to_result_document(rules, capabilities, result):
"""
convert the given Result instance into a common, Python-native data structure.
this will become part of the "result document" format that can be emitted to JSON.
"""
doc = {
'success': bool(result.success),
'node': convert_node_to_result_document(result.statement),
'children': [
convert_match_to_result_document(rules, capabilities, child)
for child in result.children
],
}
# logic expression, like `and`, don't have locations - their children do.
# so only add `locations` to feature nodes.
if isinstance(result.statement, capa.features.Feature):
if bool(result.success):
doc['locations'] = result.locations
# if we have a `match` statement, then we're referencing another rule.
# this could an external rule (written by a human), or
# rule generated to support a subscope (basic block, etc.)
# we still want to include the matching logic in this tree.
#
# so, we need to lookup the other rule results
# and then filter those down to the address used here.
# finally, splice that logic into this tree.
if (doc['node']['type'] == 'feature'
and doc['node']['feature']['type'] == 'match'
# only add subtree on success,
# because there won't be results for the other rule on failure.
and doc['success']):
rule_name = doc['node']['feature']['match']
rule = rules[rule_name]
rule_matches = {address: result for (address, result) in capabilities[rule_name]}
if rule.meta.get('capa/subscope-rule'):
# for a subscope rule, fixup the node to be a scope node, rather than a match feature node.
#
# e.g. `contain loop/30c4c78e29bf4d54894fc74f664c62e8` -> `basic block`
scope = rule.meta['scope']
doc['node'] = {
'type': 'statement',
'statement': {
'type': 'subscope',
'subscope': scope,
},
}
for location in doc['locations']:
doc['children'].append(convert_match_to_result_document(rules, capabilities, rule_matches[location]))
return doc
def convert_capabilities_to_result_document(rules, capabilities):
"""
convert the given rule set and capabilities result to a common, Python-native data structure.
this format can be directly emitted to JSON, or passed to the other `render_*` routines
to render as text.
see examples of substructures in above routines.
schema:
```json
{
$rule-name: {
"meta": {...copied from rule.meta...},
"matches: {
$address: {...match details...},
...
}
},
...
}
```
Args:
rules (RuleSet):
capabilities (Dict[str, List[Tuple[int, Result]]]):
"""
doc = {}
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if rule.meta.get('capa/subscope-rule'):
continue
doc[rule_name] = {
'meta': dict(rule.meta),
'source': rule.definition,
'matches': {
addr: convert_match_to_result_document(rules, capabilities, match)
for (addr, match) in matches
},
}
return doc
def render_vverbose(rules, capabilities):
# there's an import loop here
# if capa.render imports capa.render.vverbose
# and capa.render.vverbose import capa.render (implicitly, as a submodule)
# so, defer the import until routine is called, breaking the import loop.
import capa.render.vverbose
doc = convert_capabilities_to_result_document(rules, capabilities)
return capa.render.vverbose.render_vverbose(doc)
def render_verbose(rules, capabilities):
# break import loop
import capa.render.verbose
doc = convert_capabilities_to_result_document(rules, capabilities)
return capa.render.verbose.render_verbose(doc)
def render_default(rules, capabilities):
# break import loop
import capa.render.verbose
import capa.render.default
doc = convert_capabilities_to_result_document(rules, capabilities)
return capa.render.default.render_default(doc)
class CapaJsonObjectEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (list, dict, str, unicode, int, float, bool, type(None))):
return json.JSONEncoder.default(self, obj)
elif isinstance(obj, set):
return list(sorted(obj))
else:
# probably will TypeError
return json.JSONEncoder.default(self, obj)
def render_json(rules, capabilities):
return json.dumps(
convert_capabilities_to_result_document(rules, capabilities),
cls=CapaJsonObjectEncoder,
sort_keys=True,
)

97
capa/render/default.py Normal file
View File

@@ -0,0 +1,97 @@
import collections
import six
import tabulate
import capa.render.utils as rutils
def width(s, character_count):
"""pad the given string to at least `character_count`"""
if len(s) < character_count:
return s + ' ' * (character_count - len(s))
else:
return s
def render_capabilities(doc, ostream):
"""
example::
+-------------------------------------------------------+-------------------------------------------------+
| CAPABILITY | NAMESPACE |
|-------------------------------------------------------+-------------------------------------------------|
| check for OutputDebugString error (2 matches) | anti-analysis/anti-debugging/debugger-detection |
| read and send data from client to server | c2/file-transfer |
| ... | ... |
+-------------------------------------------------------+-------------------------------------------------+
"""
rows = []
for rule in rutils.capability_rules(doc):
count = len(rule['matches'])
if count == 1:
capability = rutils.bold(rule['meta']['name'])
else:
capability = '%s (%d matches)' % (rutils.bold(rule['meta']['name']), count)
rows.append((capability, rule['meta']['namespace']))
ostream.write(tabulate.tabulate(rows, headers=[width('CAPABILITY', 40), width('NAMESPACE', 40)], tablefmt='psql'))
ostream.write('\n')
def render_attack(doc, ostream):
"""
example::
+------------------------+----------------------------------------------------------------------+
| ATT&CK Tactic | ATT&CK Technique |
|------------------------+----------------------------------------------------------------------|
| DEFENSE EVASION | Obfuscated Files or Information [T1027] |
| DISCOVERY | Query Registry [T1012] |
| | System Information Discovery [T1082] |
| EXECUTION | Command and Scripting Interpreter::Windows Command Shell [T1059.003] |
| | Shared Modules [T1129] |
| EXFILTRATION | Exfiltration Over C2 Channel [T1041] |
| PERSISTENCE | Create or Modify System Process::Windows Service [T1543.003] |
+------------------------+----------------------------------------------------------------------+
"""
tactics = collections.defaultdict(set)
for rule in rutils.capability_rules(doc):
if not rule['meta'].get('att&ck'):
continue
for attack in rule['meta']['att&ck']:
tactic, _, rest = attack.partition('::')
if '::' in rest:
technique, _, rest = rest.partition('::')
subtechnique, _, id = rest.rpartition(' ')
tactics[tactic].add((technique, subtechnique, id))
else:
technique, _, id = rest.rpartition(' ')
tactics[tactic].add((technique, id))
rows = []
for tactic, techniques in sorted(tactics.items()):
inner_rows = []
for spec in sorted(techniques):
if len(spec) == 2:
technique, id = spec
inner_rows.append('%s %s' % (rutils.bold(technique), id))
elif len(spec) == 3:
technique, subtechnique, id = spec
inner_rows.append('%s::%s %s' % (rutils.bold(technique), subtechnique, id))
else:
raise RuntimeError('unexpected ATT&CK spec format')
rows.append((rutils.bold(tactic.upper()), '\n'.join(inner_rows), ))
ostream.write(tabulate.tabulate(rows, headers=[width('ATT&CK Tactic', 20), width('ATT&CK Technique', 60)], tablefmt='psql'))
ostream.write('\n')
def render_default(doc):
ostream = six.StringIO()
render_attack(doc, ostream)
ostream.write("\n")
render_capabilities(doc, ostream)
return ostream.getvalue()

42
capa/render/utils.py Normal file
View File

@@ -0,0 +1,42 @@
import six
import termcolor
def bold(s):
"""draw attention to the given string"""
return termcolor.colored(s, 'blue')
def bold2(s):
"""draw attention to the given string, within a `bold` section"""
return termcolor.colored(s, 'green')
def hex(n):
"""render the given number using upper case hex, like: 0x123ABC"""
return '0x%X' % n
def capability_rules(doc):
"""enumerate the rules in (namespace, name) order that are 'capability' rules (not lib/subscope/disposition/etc)."""
for (_, _, rule) in sorted(map(lambda rule: (rule['meta']['namespace'], rule['meta']['name'], rule), doc.values())):
if rule['meta'].get('lib'):
continue
if rule['meta'].get('capa/subscope'):
continue
if rule['meta'].get('maec/analysis-conclusion'):
continue
if rule['meta'].get('maec/analysis-conclusion-ov'):
continue
if rule['meta'].get('maec/malware-category'):
continue
if rule['meta'].get('maec/malware-category-ov'):
continue
yield rule
class StringIO(six.StringIO):
def writeln(self, s):
self.write(s)
self.write('\n')

52
capa/render/verbose.py Normal file
View File

@@ -0,0 +1,52 @@
"""
example::
send data
namespace communication
author william.ballenthin@fireeye.com
description all known techniques for sending data to a potential C2 server
scope function
examples BFB9B5391A13D0AFD787E87AB90F14F5:0x13145D60
matches 0x10004363
0x100046c9
0x1000454e
0x10003a13
0x10003415
0x10003797
"""
import tabulate
import capa.rules
import capa.render.utils as rutils
def render_verbose(doc):
ostream = rutils.StringIO()
for rule in rutils.capability_rules(doc):
count = len(rule['matches'])
if count == 1:
capability = rutils.bold(rule['meta']['name'])
else:
capability = '%s (%d matches)' % (rutils.bold(rule['meta']['name']), count)
ostream.writeln(capability)
rows = []
for key in ('namespace', 'description', 'scope'):
if key == 'name' or key not in rule['meta']:
continue
v = rule['meta'][key]
if isinstance(v, list) and len(v) == 1:
v = v[0]
rows.append((key, v))
if rule['meta']['scope'] != capa.rules.FILE_SCOPE:
locations = doc[rule['meta']['name']]['matches'].keys()
rows.append(('matches', '\n'.join(map(rutils.hex, locations))))
ostream.writeln(tabulate.tabulate(rows, tablefmt='plain'))
ostream.write('\n')
return ostream.getvalue()

195
capa/render/vverbose.py Normal file
View File

@@ -0,0 +1,195 @@
import tabulate
import capa.rules
import capa.render.utils as rutils
def render_statement(ostream, statement, indent=0):
ostream.write(' ' * indent)
if statement['type'] in ('and', 'or', 'optional'):
ostream.write(statement['type'])
ostream.writeln(':')
elif statement['type'] == 'not':
# this statement is handled specially in `render_match` using the MODE_SUCCESS/MODE_FAILURE flags.
ostream.writeln('not:')
elif statement['type'] == 'some':
ostream.write(statement['count'] + ' or more')
ostream.writeln(':')
elif statement['type'] == 'range':
# `range` is a weird node, its almost a hybrid of statement+feature.
# it is a specific feature repeated multiple times.
# there's no additional logic in the feature part, just the existence of a feature.
# so, we have to inline some of the feature rendering here.
child = statement['child']
if child['type'] in ('string', 'bytes', 'api', 'mnemonic', 'basic block', 'export', 'import', 'section', 'match'):
feature = '%s(%s)' % (child['type'], rutils.bold2(child[child['type']]))
elif child['type'] in ('number', 'offset'):
feature = '%s(%s)' % (child['type'], rutils.bold2(rutils.hex(child[child['type']])))
elif child['type'] == 'characteristic':
feature = 'characteristic(%s)' % (rutils.bold2(child['characteristic'][0]))
else:
raise RuntimeError('unexpected feature type: ' + str(child))
ostream.write('count(%s): ' % feature)
if statement['max'] == statement['min']:
ostream.writeln('%d' % (statement['min']))
elif statement['min'] == 0:
ostream.writeln('%d or fewer' % (statement['max']))
elif statement['max'] == (1 << 64 - 1):
ostream.writeln('%d or more' % (statement['min']))
else:
ostream.writeln('between %d and %d' % (statement['min'], statement['max']))
elif statement['type'] == 'subscope':
ostream.write(statement['subscope'])
ostream.writeln(':')
elif statement['type'] == 'regex':
# regex is a `Statement` not a `Feature`
# this is because it doesn't get extracted, but applies to all strings in scope.
# so we have to handle it here
ostream.writeln('string: %s' % (statement['match']))
else:
raise RuntimeError("unexpected match statement type: " + str(statement))
def render_feature(ostream, match, feature, indent=0):
ostream.write(' ' * indent)
if feature['type'] in ('string', 'api', 'mnemonic', 'basic block', 'export', 'import', 'section', 'match'):
ostream.write(feature['type'])
ostream.write(': ')
ostream.write(rutils.bold2(feature[feature['type']]))
elif feature['type'] in ('number', 'offset'):
ostream.write(feature['type'])
ostream.write(': ')
ostream.write(rutils.bold2(rutils.hex(feature[feature['type']])))
elif feature['type'] == 'bytes':
ostream.write('bytes: ')
# bytes is the uppercase, hex-encoded string.
# it should always be an even number of characters (its hex).
bytes = feature['bytes']
for i in range(len(bytes) // 2):
ostream.write(rutils.bold2(bytes[i:i + 2]))
ostream.write(' ')
elif feature['type'] == 'characteristic':
ostream.write('characteristic(%s)' % (rutils.bold2(feature['characteristic'][0])))
# note that regex is found in `render_statement`
else:
raise RuntimeError('unexpected feature type: ' + str(feature))
# its possible to have an empty locations array here,
# such as when we're in MODE_FAILURE and showing the logic
# under a `not` statement (which will have no matched locations).
locations = list(sorted(match.get('locations', [])))
if len(locations) == 1:
ostream.write(' @ ')
ostream.write(rutils.hex(locations[0]))
elif len(locations) > 1:
ostream.write(' @ ')
if len(locations) > 4:
# don't display too many locations, because it becomes very noisy.
# probably only the first handful of locations will be useful for inspection.
ostream.write(', '.join(map(rutils.hex, locations[0:4])))
ostream.write(', and %d more...' % (len(locations) - 4))
else:
ostream.write(', '.join(map(rutils.hex, locations)))
ostream.write('\n')
def render_node(ostream, match, node, indent=0):
if node['type'] == 'statement':
render_statement(ostream, node['statement'], indent=indent)
elif node['type'] == 'feature':
render_feature(ostream, match, node['feature'], indent=indent)
else:
raise RuntimeError('unexpected node type: ' + str(node))
# display nodes that successfully evaluated against the sample.
MODE_SUCCESS = 'success'
# display nodes that did not evaluate to True against the sample.
# this is useful when rendering the logic tree under a `not` node.
MODE_FAILURE = 'failure'
def render_match(ostream, match, indent=0, mode=MODE_SUCCESS):
child_mode = mode
if mode == MODE_SUCCESS:
# display only nodes that evaluated successfully.
if not match['success']:
return
# optional statement with no successful children is empty
if (match['node'].get('statement', {}).get('type') == 'optional'
and not any(map(lambda m: m['success'], match['children']))):
return
# not statement, so invert the child mode to show failed evaluations
if match['node'].get('statement', {}).get('type') == 'not':
child_mode = MODE_FAILURE
elif mode == MODE_FAILURE:
# display only nodes that did not evaluate to True
if match['success']:
return
# optional statement with successful children is not relevant
if (match['node'].get('statement', {}).get('type') == 'optional'
and any(map(lambda m: m['success'], match['children']))):
return
# not statement, so invert the child mode to show successful evaluations
if match['node'].get('statement', {}).get('type') == 'not':
child_mode = MODE_SUCCESS
else:
raise RuntimeError('unexpected mode: ' + mode)
render_node(ostream, match, match['node'], indent=indent)
for child in match['children']:
render_match(ostream, child, indent=indent + 1, mode=child_mode)
def render_vverbose(doc):
ostream = rutils.StringIO()
for rule in rutils.capability_rules(doc):
count = len(rule['matches'])
if count == 1:
capability = rutils.bold(rule['meta']['name'])
else:
capability = '%s (%d matches)' % (rutils.bold(rule['meta']['name']), count)
ostream.writeln(capability)
rows = []
for key in capa.rules.META_KEYS:
if key == 'name' or key not in rule['meta']:
continue
v = rule['meta'][key]
if isinstance(v, list) and len(v) == 1:
v = v[0]
elif isinstance(v, list) and len(v) > 1:
v = ', '.join(v)
rows.append((key, v))
ostream.writeln(tabulate.tabulate(rows, tablefmt='plain'))
if rule['meta']['scope'] == capa.rules.FILE_SCOPE:
matches = list(doc[rule['meta']['name']]['matches'].values())
if len(matches) != 1:
# i think there should only ever be one match per file-scope rule,
# because we do the file-scope evaluation a single time.
# but i'm not 100% sure if this is/will always be true.
# so, lets be explicit about our assumptions and raise an exception if they fail.
raise RuntimeError('unexpected file scope match count: ' + len(matches))
render_match(ostream, matches[0], indent=0)
else:
for location, match in sorted(doc[rule['meta']['name']]['matches'].items()):
ostream.write(rule['meta']['scope'])
ostream.write(' @ ')
ostream.writeln(rutils.hex(location))
render_match(ostream, match, indent=1)
ostream.write('\n')
return ostream.getvalue()

View File

@@ -21,7 +21,28 @@ logger = logging.getLogger(__name__)
# these are the standard metadata fields, in the preferred order.
# when reformatted, any custom keys will come after these.
META_KEYS = ("name", "namespace", "rule-category", "author", "description", "lib", "scope", "att&ck", "mbc", "references", "examples")
META_KEYS = (
'name',
'namespace',
'rule-category',
'maec/analysis-conclusion',
'maec/analysis-conclusion-ov',
'maec/malware-category',
'maec/malware-category-ov',
'author',
'description',
'lib',
'scope',
'att&ck',
'mbc',
'references',
'examples'
)
# these are meta fields that are internal to capa,
# and added during rule reading/construction.
# they may help use manipulate or index rules,
# but should not be exposed to clients.
HIDDEN_META_KEYS = ('capa/nursery', 'capa/path')
FILE_SCOPE = 'file'
@@ -540,11 +561,11 @@ class Rule(object):
definition = yaml.load(self.definition)
# definition retains a reference to `meta`,
# so we're updating that in place.
definition["rule"]["meta"] = self.meta
definition['rule']['meta'] = self.meta
meta = self.meta
meta["name"] = self.name
meta["scope"] = self.scope
meta['name'] = self.name
meta['scope'] = self.scope
def move_to_end(m, k):
# ruamel.yaml uses an ordereddict-like structure to track maps (CommentedMap).
@@ -554,8 +575,8 @@ class Rule(object):
del m[k]
m[k] = v
move_to_end(definition["rule"], "meta")
move_to_end(definition["rule"], "features")
move_to_end(definition['rule'], 'meta')
move_to_end(definition['rule'], 'features')
for key in META_KEYS:
if key in meta:
@@ -566,9 +587,26 @@ class Rule(object):
continue
move_to_end(meta, key)
# save off the existing hidden meta values,
# emit the document,
# and re-add the hidden meta.
hidden_meta = {
key: meta.get(key)
for key in HIDDEN_META_KEYS
}
for key in hidden_meta.keys():
del meta[key]
ostream = six.BytesIO()
yaml.dump(definition, ostream)
return ostream.getvalue().decode('utf-8').rstrip("\n") + "\n"
for key, value in hidden_meta.items():
if value is None:
continue
meta[key] = value
return ostream.getvalue().decode('utf-8').rstrip('\n') + '\n'
def get_rules_with_scope(rules, scope):
@@ -668,6 +706,9 @@ class RuleSet(object):
def __len__(self):
return len(self.rules)
def __getitem__(self, rulename):
return self.rules[rulename]
@staticmethod
def _get_rules_for_scope(rules, scope):
'''

2
rules

Submodule rules updated: 7f5fb71a5d...9f023a301a

View File

@@ -1,34 +1,38 @@
#!/usr/bin/env bash
# doesn't matter if this gets repeated later on in a hooks file
# Use a console with emojis support for a better experience
# Stash uncommited changes
MSG="post-commit-$(date +%s)"
git stash push -kqum $MSG
STASH_LIST=$(git stash list)
MSG="post-commit-$(date +%s)";
git stash push -kqum "$MSG";
STASH_LIST=$(git stash list);
if [[ "$STASH_LIST" == *"$MSG"* ]]; then
echo "Uncommited changes stashed with message '$MSG', if you abort before they are restored run \`git stash pop\`"
echo "Uncommited changes stashed with message '$MSG', if you abort before they are restored run \`git stash pop\`";
fi
# Run style checker and print state (it doesn't block the commit)
pycodestyle --config=./ci/tox.ini ./capa/ > style-checker-output.log 2>&1
pycodestyle --config=./ci/tox.ini ./capa/ > style-checker-output.log 2>&1;
if [ $? == 0 ]; then
echo 'Style checker succeeds!! 💘'
echo 'Style checker succeeds!! 💘';
else
echo 'Style checker failed 😭\nCheck style-checker-output.log for details'
exit 1
echo 'Style checker failed 😭';
echo 'Check style-checker-output.log for details';
exit 1;
fi
# Run rule linter and print state (it doesn't block the commit)
python ./scripts/lint.py ./rules/ > rule-linter-output.log 2>&1
python ./scripts/lint.py ./rules/ > rule-linter-output.log 2>&1;
if [ $? == 0 ]; then
echo 'Rule linter succeeds!! 💖'
echo 'Rule linter succeeds!! 💖';
else
echo 'Rule linter failed 😭\nCheck rule-linter-output.log for details'
exit 2
echo 'Rule linter failed 😭';
echo 'Check rule-linter-output.log for details';
exit 2;
fi
# Restore stashed changes
if [[ "$STASH_LIST" == *"$MSG"* ]]; then
git stash pop -q --index
echo "Stashed changes '$MSG' restored"
git stash pop -q --index;
echo "Stashed changes '$MSG' restored";
fi

View File

@@ -1,52 +1,57 @@
#!/usr/bin/env bash
# doesn't matter if this gets repeated later on in a hooks file
# Use a console with emojis support for a better experience
# Stash uncommited changes
MSG="pre-push-$(date +%s)"
git stash push -kqum $MSG
STASH_LIST=$(git stash list)
MSG="pre-push-$(date +%s)";
git stash push -kqum "$MSG";
STASH_LIST=$(git stash list);
if [[ "$STASH_LIST" == *"$MSG"* ]]; then
echo "Uncommited changes stashed with message '$MSG', if you abort before they are restored run \`git stash pop\`"
echo "Uncommited changes stashed with message '$MSG', if you abort before they are restored run \`git stash pop\`";
fi
restore_stashed() {
if [[ "$STASH_LIST" == *"$MSG"* ]]; then
git stash pop -q --index
echo "Stashed changes '$MSG' restored"
git stash pop -q --index;
echo "Stashed changes '$MSG' restored";
fi
}
# Run style checker and print state
pycodestyle --config=./ci/tox.ini ./capa/ > style-checker-output.log 2>&1
pycodestyle --config=./ci/tox.ini ./capa/ > style-checker-output.log 2>&1;
if [ $? == 0 ]; then
echo 'Style checker succeeds!! 💘'
echo 'Style checker succeeds!! 💘';
else
echo 'Style checker failed 😭 PUSH ABORTED\nCheck style-checker-output.log for details'
restore_stashed
exit 1
echo 'Style checker failed 😭 PUSH ABORTED';
echo 'Check style-checker-output.log for details';
restore_stashed;
exit 1;
fi
# Run rule linter and print state
python ./scripts/lint.py ./rules/ > rule-linter-output.log 2>&1
python ./scripts/lint.py ./rules/ > rule-linter-output.log 2>&1;
if [ $? == 0 ]; then
echo 'Rule linter succeeds!! 💖'
echo 'Rule linter succeeds!! 💖';
else
echo 'Rule linter failed 😭 PUSH ABORTED\nCheck rule-linter-output.log for details'
restore_stashed
exit 2
echo 'Rule linter failed 😭 PUSH ABORTED';
echo 'Check rule-linter-output.log for details';
restore_stashed;
exit 2;
fi
# Run tests
echo 'Running tests, please wait ⌛'
pytest tests/ --maxfail=1
echo 'Running tests, please wait ⌛';
pytest tests/ --maxfail=1;
if [ $? == 0 ]; then
echo 'Tests succeed!! 🎉'
echo 'Tests succeed!! 🎉';
else
echo 'Tests failed 😓 PUSH ABORTED\nRun `pytest -v --cov=capa test/` if you need more details'
restore_stashed
exit 3
echo 'Tests failed 😓 PUSH ABORTED';
echo 'Run `pytest -v --cov=capa test/` if you need more details';
restore_stashed;
exit 3;
fi
echo 'PUSH SUCCEEDED 🎉🎉'
echo 'PUSH SUCCEEDED 🎉🎉';
restore_stashed
restore_stashed;

View File

@@ -6,12 +6,14 @@ Usage:
$ python scripts/lint.py rules/
'''
import os
import os.path
import sys
import string
import hashlib
import logging
import os.path
import itertools
import posixpath
import argparse
@@ -39,16 +41,56 @@ class NameCasing(Lint):
rule.name[1] not in string.ascii_uppercase)
class MissingRuleCategory(Lint):
name = 'missing rule category'
recommendation = 'Add meta.rule-category so that the rule is emitted correctly'
class FilenameDoesntMatchRuleName(Lint):
name = 'filename doesn\'t match the rule name'
recommendation = 'Rename rule file to match the rule name, expected: "{:s}", found: "{:s}"'
def check_rule(self, ctx, rule):
return ('rule-category' not in rule.meta and
expected = rule.name
expected = expected.lower()
expected = expected.replace(' ', '-')
expected = expected.replace('(', '')
expected = expected.replace(')', '')
expected = expected.replace('+', '')
expected = expected.replace('/', '')
expected = expected + '.yml'
found = os.path.basename(rule.meta['capa/path'])
self.recommendation = self.recommendation.format(expected, found)
return expected != found
class MissingNamespace(Lint):
name = 'missing rule namespace'
recommendation = 'Add meta.namespace so that the rule is emitted correctly'
def check_rule(self, ctx, rule):
return ('namespace' not in rule.meta and
not is_nursery_rule(rule) and
'maec/malware-category' not in rule.meta and
'lib' not in rule.meta)
class NamespaceDoesntMatchRulePath(Lint):
name = 'file path doesn\'t match rule namespace'
recommendation = 'Move rule to appropriate directory or update the namespace'
def check_rule(self, ctx, rule):
# let the other lints catch namespace issues
if 'namespace' not in rule.meta:
return False
if is_nursery_rule(rule):
return False
if 'maec/malware-category' in rule.meta:
return False
if 'lib' in rule.meta:
return False
return rule.meta['namespace'] not in posixpath.normpath(rule.meta['capa/path'])
class MissingScope(Lint):
name = 'missing scope'
recommendation = 'Add meta.scope so that the scope is explicit (defaults to `function`)'
@@ -144,6 +186,22 @@ class DoesntMatchExample(Lint):
return True
class UnusualMetaField(Lint):
name = 'unusual meta field'
recommendation = 'Remove the meta field: "{:s}"'
def check_rule(self, ctx, rule):
for key in rule.meta.keys():
if key in capa.rules.META_KEYS:
continue
if key in capa.rules.HIDDEN_META_KEYS:
continue
self.recommendation = self.recommendation.format(key)
return True
return False
class FeatureStringTooShort(Lint):
name = 'feature string too short'
recommendation = 'capa only extracts strings with length >= 4; will not match on "{:s}"'
@@ -171,6 +229,7 @@ def run_feature_lints(lints, ctx, features):
NAME_LINTS = (
NameCasing(),
FilenameDoesntMatchRuleName(),
)
@@ -189,11 +248,13 @@ def lint_scope(ctx, rule):
META_LINTS = (
MissingRuleCategory(),
MissingNamespace(),
NamespaceDoesntMatchRulePath(),
MissingAuthor(),
MissingExamples(),
MissingExampleOffset(),
ExampleFileDNE(),
UnusualMetaField(),
)
@@ -249,7 +310,7 @@ def is_nursery_rule(rule):
For example, they may not have references to public example of a technique.
Yet, we still want to capture and report on their matches.
'''
return rule.meta.get('nursery')
return rule.meta.get('capa/nursery')
def lint_rule(ctx, rule):

View File

@@ -24,15 +24,15 @@ logger = logging.getLogger('migrate-rules')
def read_plan(plan_path):
with open(plan_path, 'rb') as f:
return list(csv.DictReader(f, restkey="other", fieldnames=(
"existing path",
"existing name",
"existing rule-category",
"proposed name",
"proposed namespace",
"ATT&CK",
"MBC",
"comment1",
return list(csv.DictReader(f, restkey='other', fieldnames=(
'existing path',
'existing name',
'existing rule-category',
'proposed name',
'proposed namespace',
'ATT&CK',
'MBC',
'comment1',
)))
@@ -48,8 +48,8 @@ def read_rules(rule_directory):
rule = capa.rules.Rule.from_yaml_file(path)
rules[rule.name] = rule
if "nursery" in path:
rule.meta["nursery"] = True
if 'nursery' in path:
rule.meta['capa/nursery'] = True
return rules
@@ -70,91 +70,89 @@ def main(argv=None):
logging.getLogger().setLevel(logging.INFO)
plan = read_plan(args.plan)
logger.info("read %d plan entries", len(plan))
logger.info('read %d plan entries', len(plan))
rules = read_rules(args.source)
logger.info("read %d rules", len(rules))
logger.info('read %d rules', len(rules))
planned_rules = set([row["existing name"] for row in plan])
planned_rules = set([row['existing name'] for row in plan])
unplanned_rules = [rule for (name, rule) in rules.items() if name not in planned_rules]
if unplanned_rules:
logger.error("plan does not account for %d rules:" % (len(unplanned_rules)))
logger.error('plan does not account for %d rules:' % (len(unplanned_rules)))
for rule in unplanned_rules:
logger.error(" " + rule.name)
logger.error(' ' + rule.name)
return -1
# pairs of strings (needle, replacement)
match_translations = []
for row in plan:
if not row["existing name"]:
if not row['existing name']:
continue
rule = rules[row["existing name"]]
rule = rules[row['existing name']]
if rule.meta["name"] != row["proposed name"]:
logger.info("renaming rule '%s' -> '%s'", rule.meta["name"], row["proposed name"])
if rule.meta['name'] != row['proposed name']:
logger.info("renaming rule '%s' -> '%s'", rule.meta['name'], row['proposed name'])
# assume the yaml is formatted like `- match: $rule-name`.
# but since its been linted, this should be ok.
match_translations.append(
("- match: " + rule.meta["name"],
"- match: " + row["proposed name"]))
('- match: ' + rule.meta['name'],
'- match: ' + row['proposed name']))
rule.meta["name"] = row["proposed name"]
rule.name = row["proposed name"]
rule.meta['name'] = row['proposed name']
rule.name = row['proposed name']
if "rule-category" in rule.meta:
logger.info("deleting rule category '%s'", rule.meta["rule-category"])
del rule.meta["rule-category"]
if 'rule-category' in rule.meta:
logger.info("deleting rule category '%s'", rule.meta['rule-category'])
del rule.meta['rule-category']
rule.meta["namespace"] = row["proposed namespace"]
rule.meta['namespace'] = row['proposed namespace']
if row["ATT&CK"] != 'n/a' and row["ATT&CK"] != "":
tag = row["ATT&CK"]
name, _, id = tag.rpartition(" ")
tag = "%s [%s]" % (name, id)
rule.meta["att&ck"] = [tag]
if row['ATT&CK'] != 'n/a' and row['ATT&CK'] != '':
tag = row['ATT&CK']
name, _, id = tag.rpartition(' ')
tag = '%s [%s]' % (name, id)
rule.meta['att&ck'] = [tag]
if row["MBC"] != 'n/a' and row["MBC"] != "":
tag = row["MBC"]
rule.meta["mbc"] = [tag]
if row['MBC'] != 'n/a' and row['MBC'] != '':
tag = row['MBC']
rule.meta['mbc'] = [tag]
for rule in rules.values():
filename = rule.name
filename = filename.lower()
filename = filename.replace(" ", "-")
filename = filename.replace("(", "")
filename = filename.replace(")", "")
filename = filename.replace("+", "")
filename = filename.replace("/", "")
filename = filename + ".yml"
filename = filename.replace(' ', '-')
filename = filename.replace('(', '')
filename = filename.replace(')', '')
filename = filename.replace('+', '')
filename = filename.replace('/', '')
filename = filename + '.yml'
try:
if rule.meta.get("nursery"):
directory = os.path.join(args.destination, "nursery")
# this isn't meant to be written into the rule
del rule.meta["nursery"]
elif rule.meta.get("lib"):
directory = os.path.join(args.destination, "lib")
if rule.meta.get('capa/nursery'):
directory = os.path.join(args.destination, 'nursery')
elif rule.meta.get('lib'):
directory = os.path.join(args.destination, 'lib')
else:
directory = os.path.join(args.destination, rule.meta.get("namespace"))
directory = os.path.join(args.destination, rule.meta.get('namespace'))
os.makedirs(directory)
except OSError:
pass
else:
logger.info("created namespace: %s", directory)
logger.info('created namespace: %s', directory)
path = os.path.join(directory, filename)
logger.info("writing rule %s", path)
logger.info('writing rule %s', path)
doc = rule.to_yaml().decode("utf-8")
doc = rule.to_yaml().decode('utf-8')
for (needle, replacement) in match_translations:
doc = doc.replace(needle, replacement)
with open(path, "wb") as f:
f.write(doc.encode("utf-8"))
with open(path, 'wb') as f:
f.write(doc.encode('utf-8'))
return 0

View File

@@ -1,28 +1,21 @@
#!/usr/bin/env bash
set -e
set -u
set -o pipefail
set -euo pipefail
GIT_DIR=`git rev-parse --show-toplevel`
cd $GIT_DIR
GIT_DIR=$(git rev-parse --show-toplevel);
cd "$GIT_DIR";
# hooks may exist already (e.g. git-lfs configuration)
# If the `.git/hooks/$arg` file doesn't exist it, initialize with `#!/bin/sh`
# After that append `scripts/hooks/$arg` and ensure they can be run
create_hook() {
if [[ ! -e .git/hooks/$1 ]]; then
echo "#!/bin/sh" > ".git/hooks/$1"
echo "#!/bin/sh" > ".git/hooks/$1";
fi
cat scripts/hooks/$1 >> ".git/hooks/$1"
chmod +x .git/hooks/$1
cat scripts/hooks/"$1" >> ".git/hooks/$1";
chmod +x .git/hooks/"$1";
}
echo '\n#### Copying hooks into .git/hooks'
create_hook 'post-commit'
create_hook 'pre-push'
echo '\n#### Installing linter/test dependencies\n'
pip install pycodestyle pytest-sugar
pip install https://github.com/williballenthin/vivisect/zipball/master
python setup.py develop
printf '\n#### Copying hooks into .git/hooks';
create_hook 'post-commit';
create_hook 'pre-push';

View File

@@ -9,6 +9,8 @@ requirements = [
"tqdm",
"pyyaml",
"tabulate",
"colorama",
"termcolor",
"ruamel.yaml"
]
@@ -51,6 +53,13 @@ setuptools.setup(
},
include_package_data=True,
install_requires=requirements,
extras_require={
'dev': [
'pytest',
'pytest-sugar',
'pycodestyle',
]
},
zip_safe=False,
keywords='capa',
classifiers=[