Compare commits

...

72 Commits

Author SHA1 Message Date
Willi Ballenthin
65b19ad46c add initial script parsing VMRay flog.txt files
ref #2452
2024-10-10 07:42:11 +00:00
Moritz
6a12ab8598 Merge pull request #2450 from mandiant/dependabot/pip/rich-13.9.2
build(deps): bump rich from 13.8.0 to 13.9.2
2024-10-08 10:57:04 +02:00
dependabot[bot]
a4fdb0a3ef build(deps): bump rich from 13.8.0 to 13.9.2
Bumps [rich](https://github.com/Textualize/rich) from 13.8.0 to 13.9.2.
- [Release notes](https://github.com/Textualize/rich/releases)
- [Changelog](https://github.com/Textualize/rich/blob/master/CHANGELOG.md)
- [Commits](https://github.com/Textualize/rich/compare/v13.8.0...v13.9.2)

---
updated-dependencies:
- dependency-name: rich
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-10-07 14:07:10 +00:00
Moritz
c7bb8b8e67 Update Node checkout Actions (#2446)
* Update setup Node Actions
2024-10-07 11:46:37 +02:00
Tamir K.
41c5194693 Fix/corrupted file architecture key error (#2444)
* Add try except clause
2024-10-06 08:46:16 +02:00
Moritz
8c8b67a6ea Merge pull request #2438 from mandiant/mr-tz-patch-2
Update build.yml
2024-10-04 14:22:45 +02:00
Moritz
f0cc0fb2b8 Update build.yml 2024-10-04 14:02:53 +02:00
Moritz
fc8089c248 Merge pull request #2426 from mandiant/release/v740
Release v7.4.0
2024-10-04 13:51:37 +02:00
mr-tz
d795db9017 include capa explorer web entry 2024-10-04 09:22:11 +00:00
mr-tz
544e3eee5b bump version to 7.4.0
tmp2

tmp2
2024-10-04 09:22:08 +00:00
mr-tz
dfc304d9f6 add Python 3.8 and 3.9 deprecation warning
tmp
2024-10-04 09:19:56 +00:00
Capa Bot
54688517c4 Sync capa rules submodule 2024-10-04 09:18:47 +00:00
Moritz
21fc77ea28 Merge pull request #2431 from s-ff/add-provide-feedback-button
capa Explorer Web: add provide feedback button
2024-10-03 12:28:17 +02:00
Capa Bot
2976974009 Sync capa rules submodule 2024-10-03 09:39:09 +00:00
Moritz
030954d556 Merge pull request #2433 from mandiant/fix/vmray-string-call-args
fix backslash handling in string call arguments
2024-10-03 11:28:34 +02:00
Capa Bot
389a5eb84f Sync capa-testfiles submodule 2024-10-02 16:56:11 +00:00
mr-tz
6d3b96f0b0 fix backslash handling in string call arguments 2024-10-02 16:54:38 +00:00
Soufiane Fariss
2a13bf6c0b capa Explorer Web: fix lint 2024-10-02 16:10:23 +02:00
Fariss
e9f4f5bc31 capa Explorer Web: remove unneeded attribute 2024-10-02 16:05:38 +02:00
Soufiane Fariss
e7400be99a capa Explorer Web: add provide feedback buttom 2024-10-02 15:54:07 +02:00
Moritz
591a1e8fbb Merge pull request #2430 from s-ff/web-fix-import-features
capa Explorer Web: fix import features
2024-10-02 15:29:35 +02:00
Soufiane Fariss
2f5a227fb0 capa Explorer Web: fix import features 2024-10-02 14:49:58 +02:00
Moritz
931ff62421 Merge pull request #2423 from mandiant/dependabot/pip/types-protobuf-5.28.0.20240924
build(deps): bump types-protobuf from 5.27.0.20240920 to 5.28.0.20240924
2024-10-02 11:21:12 +02:00
dependabot[bot]
3037307ee8 build(deps): bump pydantic from 2.9.1 to 2.9.2 (#2389)
* build(deps): bump pydantic from 2.9.1 to 2.9.2

Bumps [pydantic](https://github.com/pydantic/pydantic) from 2.9.1 to 2.9.2.
- [Release notes](https://github.com/pydantic/pydantic/releases)
- [Changelog](https://github.com/pydantic/pydantic/blob/main/HISTORY.md)
- [Commits](https://github.com/pydantic/pydantic/compare/v2.9.1...v2.9.2)

---
updated-dependencies:
- dependency-name: pydantic
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update requirements.txt

* remove pinned sub-dependency

Co-authored-by: Willi Ballenthin <wballenthin@google.com>

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Moritz <mr-tz@users.noreply.github.com>
Co-authored-by: Willi Ballenthin <wballenthin@google.com>
2024-10-02 11:20:54 +02:00
Capa Bot
d6c1725d7e Sync capa rules submodule 2024-10-02 08:41:23 +00:00
Fariss
16eae70c17 capa Explorer Web: improve url navigation (#2425)
* explorer web: improve url navigation

This commit enhances the navigation guard for the /analysis route to
provide a better user experience when loading data from a URL:

Previously: users browsing to /analysis were always redirected to
the homepage (/).

With this commit:
- If a user accesses /analysis without an rdoc parameter, they are still
  redirected to the homepage.
- If a user accesses /analysis with an rdoc parameter, the following
  occurs:
  The user is redirected to the homepage (/) and the rdoc parameter is
  preserved in the URL, capa Explorer Web then loads the rdoc from URL.

---------

Co-authored-by: Moritz <mr-tz@users.noreply.github.com>
2024-10-01 19:25:20 +02:00
dependabot[bot]
9e7e6be374 build(deps): bump types-protobuf from 5.27.0.20240920 to 5.28.0.20240924
Bumps [types-protobuf](https://github.com/python/typeshed) from 5.27.0.20240920 to 5.28.0.20240924.
- [Commits](https://github.com/python/typeshed/commits)

---
updated-dependencies:
- dependency-name: types-protobuf
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-10-01 11:44:19 +00:00
Moritz
3e8bed1db2 Merge pull request #2421 from mandiant/ci/dependabot-ignore-patch
Update dependabot.yml to ignore patch versions
2024-10-01 13:40:34 +02:00
Moritz
e4ac02a968 Update dependabot.yml 2024-10-01 13:32:31 +02:00
dependabot[bot]
eff358980a build(deps): bump pefile from 2023.2.7 to 2024.8.26 (#2413) 2024-09-30 20:24:09 +00:00
Capa Bot
108bd7f224 Sync capa-testfiles submodule 2024-09-30 12:08:25 +00:00
Willi Ballenthin
ab43c8c0c2 loader: fix unhandled name error (#2411) 2024-09-30 14:06:14 +02:00
Capa Bot
585dff8b48 Sync capa rules submodule 2024-09-30 12:06:04 +00:00
Capa Bot
cb09041387 Sync capa rules submodule 2024-09-30 12:05:43 +00:00
Capa Bot
80899f3f70 Sync capa-testfiles submodule 2024-09-27 09:53:30 +00:00
Moritz
00d2bb06fd Merge pull request #2409 from mandiant/fix/2408
dynamic: emit complete features for A/W APIs
2024-09-27 11:26:39 +02:00
Moritz
ff1043e976 Merge branch 'master' into fix/2408 2024-09-27 09:35:24 +02:00
Fariss
51a4eb46b8 replace tqdm, termcolor, tabulate with rich (#2374)
* logging: use rich handler for logging

* tqdm: remove unneeded redirecting_print_to_tqdm function

* tqdm: introduce `CapaProgressBar` rich `Progress` bar

* tqdm: replace tqdm with rich Progress bar

* tqdm: remove tqdm dependency

* termcolor: replace termcolor and update `scripts/`

* tests: update `test_render.py` to use rich.console.Console

* termcolor: remove termcolor dependency

* capa.render.utils: add `write` & `writeln` methods to subclass `Console`

* update markup util functions to use fmt strings

* tests: update `test_render.py` to use `capa.render.utils.Console`

* replace kwarg `end=""` with `write` and `writeln` methods

* tabulate: replace tabulate with `rich.table`

* tabulate: remove `tabulate` and its dependency `wcwidth`

* logging: handle logging in `capa.main`

* logging: set up logging in `capa.main`

this commit sets up logging in `capa.main` and uses a shared
`log_console` in `capa.helpers` for logging purposes

* changelog: replace packages with rich

* remove entry from pyinstaller and unneeded progress.update call

* update requirements.txt

* scripts: use `capa.helpers.log_console` in `CapaProgressBar`

* logging: configure root logger to use `RichHandler`

* remove unused import `inspect`
2024-09-27 09:34:21 +02:00
dependabot[bot]
558bf0fbf2 build(deps): bump protobuf from 5.27.3 to 5.28.2 (#2390)
Bumps [protobuf](https://github.com/protocolbuffers/protobuf) from 5.27.3 to 5.28.2.
- [Release notes](https://github.com/protocolbuffers/protobuf/releases)
- [Changelog](https://github.com/protocolbuffers/protobuf/blob/main/protobuf_release.bzl)
- [Commits](https://github.com/protocolbuffers/protobuf/compare/v5.27.3...v5.28.2)

---
updated-dependencies:
- dependency-name: protobuf
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-27 09:32:58 +02:00
dependabot[bot]
76aff57467 build(deps): bump setuptools from 70.0.0 to 75.1.0 (#2392)
Bumps [setuptools](https://github.com/pypa/setuptools) from 70.0.0 to 75.1.0.
- [Release notes](https://github.com/pypa/setuptools/releases)
- [Changelog](https://github.com/pypa/setuptools/blob/main/NEWS.rst)
- [Commits](https://github.com/pypa/setuptools/compare/v70.0.0...v75.1.0)

---
updated-dependencies:
- dependency-name: setuptools
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-27 09:32:18 +02:00
dependabot[bot]
f82fc1902c build(deps): bump types-protobuf from 5.27.0.20240907 to 5.27.0.20240920 (#2393)
Bumps [types-protobuf](https://github.com/python/typeshed) from 5.27.0.20240907 to 5.27.0.20240920.
- [Commits](https://github.com/python/typeshed/commits)

---
updated-dependencies:
- dependency-name: types-protobuf
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-09-27 09:32:08 +02:00
Capa Bot
e9e8fe42ed Sync capa rules submodule 2024-09-27 07:31:51 +00:00
Mike Hunhoff
80e007787c dynamic: update CHANGELOG 2024-09-26 14:43:20 -06:00
Mike Hunhoff
bfcc705117 dynamic: vmray: remove redundant test 2024-09-26 14:42:08 -06:00
Mike Hunhoff
834150ad1d dynamic: drakvuf: fix A/W API detection 2024-09-26 14:36:16 -06:00
Mike Hunhoff
31ec208a9b dynamic: cape: fix A/W API detection 2024-09-26 14:27:45 -06:00
Mike Hunhoff
a5d9459c42 dynamic: vmray: fix A/W API detection 2024-09-26 14:15:21 -06:00
Moritz
06271a88d4 Fix VMRay missing process data (#2396)
* get all processes, see #2394

* add tests for process recording

* rename symbols for clarification

* handle single and list entries

* update changelog

* dynamic: vmray: use monitor IDs to track processes and threads

* dynamic: vmray: code refactor

* dynamic: vmray: add sanity checks when processing monitor processes

* dynamic: vmray: remove unnecessary keys() access

* dynamic: vmray: clarify comments

* Update CHANGELOG.md

Co-authored-by: Willi Ballenthin <wballenthin@google.com>

* dynamic: vmray: update CHANGELOG

---------

Co-authored-by: Mike Hunhoff <mike.hunhoff@gmail.com>
Co-authored-by: Willi Ballenthin <wballenthin@google.com>
2024-09-26 13:57:30 -06:00
Capa Bot
c48bccf623 Sync capa rules submodule 2024-09-26 17:38:34 +00:00
Capa Bot
9975f769f9 Sync capa-testfiles submodule 2024-09-26 17:34:51 +00:00
Capa Bot
c5d8f99d6f Sync capa rules submodule 2024-09-26 12:25:36 +00:00
Willi Ballenthin
bcd57a9af1 detect and use third-party analysis backends when possible (#2380)
* introduce script to detect 3P backends

ref #2376

* add idalib backend

* binary ninja: search for API using XDG desktop entry

ref #2376

* binja: search more XDG locations for desktop entry

* binary ninja: optimize embedded PE scanning

closes #2397

* add script for comparing the performance of analysis backends
2024-09-26 13:21:55 +02:00
Capa Bot
12337be2b7 Sync capa-testfiles submodule 2024-09-25 09:17:50 +00:00
Moritz
25c4902c21 Merge pull request #2400 from mandiant/web/filesize
bump upload size limit to 100MB from 10MB
2024-09-24 14:14:42 +02:00
mr-tz
f024e1d54c bump upload size limit to 100MB from 10MB 2024-09-24 12:09:38 +00:00
Moritz
bab7ed9188 Merge pull request #2395 from mandiant/dependabot/npm_and_yarn/web/explorer/rollup-4.22.4
build(deps): bump rollup from 4.21.3 to 4.22.4 in /web/explorer
2024-09-24 13:49:10 +02:00
Capa Bot
6eda8c9713 Sync capa-testfiles submodule 2024-09-24 11:29:53 +00:00
Capa Bot
22e88c860f Sync capa-testfiles submodule 2024-09-24 11:25:28 +00:00
Capa Bot
7884248022 Sync capa rules submodule 2024-09-24 11:25:18 +00:00
dependabot[bot]
4891fd750f build(deps): bump rollup from 4.21.3 to 4.22.4 in /web/explorer
Bumps [rollup](https://github.com/rollup/rollup) from 4.21.3 to 4.22.4.
- [Release notes](https://github.com/rollup/rollup/releases)
- [Changelog](https://github.com/rollup/rollup/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rollup/rollup/compare/v4.21.3...v4.22.4)

---
updated-dependencies:
- dependency-name: rollup
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-09-24 04:55:36 +00:00
Willi Ballenthin
783e14b949 pyinstaller: use Python 3.12 for standalone build (#2385)
* pyinstaller: use Python 3.12 for standalone build

closes #2383

* changelog

* ci: build: fix test filename
2024-09-23 22:33:23 +02:00
Willi Ballenthin
74777ad23e changelog 2024-09-23 20:21:50 +00:00
Willi Ballenthin
01b35e7582 pyproject.toml: bump min python version to 3.8.1
fixed #2387
2024-09-23 20:21:50 +00:00
Capa Bot
e29288cc8d Sync capa rules submodule 2024-09-22 12:09:30 +00:00
Moritz
c4c35ca6e9 Merge pull request #2379 from mandiant/weg/update-homepage
update release v7.3.0 info and formatting
2024-09-20 14:46:42 +02:00
Moritz
3b1e0284c0 Merge pull request #2378 from mandiant/doc/update-homepage
add update homepage entry
2024-09-20 14:46:27 +02:00
Moritz
7b61d28dd2 Merge pull request #2375 from mandiant/dependabot/npm_and_yarn/web/explorer/vite-5.4.6
build(deps-dev): bump vite from 5.3.2 to 5.4.6 in /web/explorer
2024-09-20 12:02:31 +02:00
mr-tz
e3267df5b1 update release v7.3.0 info and formatting 2024-09-20 09:57:01 +00:00
Moritz
9076e5475d add update homepage entry 2024-09-20 11:14:16 +02:00
Moritz
d1d8badc2e Merge pull request #2370 from mandiant/release/v730
bump to v7.3.0
2024-09-20 10:41:27 +02:00
dependabot[bot]
84d2a18b52 build(deps-dev): bump vite from 5.3.2 to 5.4.6 in /web/explorer
Bumps [vite](https://github.com/vitejs/vite/tree/HEAD/packages/vite) from 5.3.2 to 5.4.6.
- [Release notes](https://github.com/vitejs/vite/releases)
- [Changelog](https://github.com/vitejs/vite/blob/v5.4.6/packages/vite/CHANGELOG.md)
- [Commits](https://github.com/vitejs/vite/commits/v5.4.6/packages/vite)

---
updated-dependencies:
- dependency-name: vite
  dependency-type: direct:development
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-09-17 19:16:36 +00:00
mr-tz
954aeb0ce4 bump to v7.3.0 2024-09-17 15:04:00 +00:00
55 changed files with 6529 additions and 5150 deletions

View File

@@ -4,3 +4,6 @@ updates:
directory: "/"
schedule:
interval: "weekly"
ignore:
- dependency-name: "*"
update-types: ["version-update:semver-patch"]

View File

@@ -1,8 +1,5 @@
[mypy]
[mypy-tqdm.*]
ignore_missing_imports = True
[mypy-ruamel.*]
ignore_missing_imports = True

View File

@@ -2,7 +2,6 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
import sys
import wcwidth
import capa.rules.cache
from pathlib import Path
@@ -29,13 +28,6 @@ a = Analysis(
("../../rules", "rules"),
("../../sigs", "sigs"),
("../../cache", "cache"),
# capa.render.default uses tabulate that depends on wcwidth.
# it seems wcwidth uses a json file `version.json`
# and this doesn't get picked up by pyinstaller automatically.
# so we manually embed the wcwidth resources here.
#
# ref: https://stackoverflow.com/a/62278462/87207
(Path(wcwidth.__file__).parent, "wcwidth"),
],
# when invoking pyinstaller from the project root,
# this gets run from the project root.
@@ -48,11 +40,6 @@ a = Analysis(
"tkinter",
"_tkinter",
"Tkinter",
# tqdm provides renderers for ipython,
# however, this drags in a lot of dependencies.
# since we don't spawn a notebook, we can safely remove these.
"IPython",
"ipywidgets",
# these are pulled in by networkx
# but we don't need to compute the strongly connected components.
"numpy",
@@ -70,7 +57,10 @@ a = Analysis(
"qt5",
"pyqtwebengine",
"pyasn1",
# don't pull in Binary Ninja/IDA bindings that should
# only be installed locally.
"binaryninja",
"ida",
],
)

View File

@@ -30,8 +30,8 @@ jobs:
python_version: 3.8
- os: ubuntu-20.04
artifact_name: capa
asset_name: linux-py311
python_version: 3.11
asset_name: linux-py312
python_version: 3.12
- os: windows-2019
artifact_name: capa.exe
asset_name: windows
@@ -88,7 +88,7 @@ jobs:
asset_name: linux
- os: ubuntu-22.04
artifact_name: capa
asset_name: linux-py311
asset_name: linux-py312
- os: windows-2022
artifact_name: capa.exe
asset_name: windows
@@ -114,7 +114,7 @@ jobs:
include:
- asset_name: linux
artifact_name: capa
- asset_name: linux-py311
- asset_name: linux-py312
artifact_name: capa
- asset_name: windows
artifact_name: capa.exe

View File

@@ -43,7 +43,7 @@ jobs:
fetch-depth: 1
show-progress: true
- name: Set up Node
uses: actions/setup-node@v4
uses: actions/setup-node@0a44ba7841725637a19e28fa30b79a866c81b0a6 # v4.0.4
with:
node-version: 20
cache: 'npm'

View File

@@ -19,7 +19,7 @@ jobs:
show-progress: true
- name: Set up Node
uses: actions/setup-node@v3
uses: actions/setup-node@0a44ba7841725637a19e28fa30b79a866c81b0a6 # v4.0.4
with:
node-version: 20
cache: 'npm'

1
.gitignore vendored
View File

@@ -127,3 +127,4 @@ Pipfile.lock
.github/binja/download_headless.py
.github/binja/BinaryNinja-headless.zip
justfile
data/

File diff suppressed because it is too large Load Diff

View File

@@ -6,20 +6,16 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import logging
import itertools
import collections
from typing import Any, Tuple
import tqdm
from typing import Any, List, Tuple
import capa.perf
import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.helpers import redirecting_print_to_tqdm
from capa.capabilities.common import find_file_capabilities
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor
@@ -139,38 +135,30 @@ def find_dynamic_capabilities(
feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=())
assert isinstance(extractor, DynamicFeatureExtractor)
with redirecting_print_to_tqdm(disable_progress):
with tqdm.contrib.logging.logging_redirect_tqdm():
pbar = tqdm.tqdm
if disable_progress:
# do not use tqdm to avoid unnecessary side effects when caller intends
# to disable progress completely
def pbar(s, *args, **kwargs):
return s
processes: List[ProcessHandle] = list(extractor.get_processes())
n_processes: int = len(processes)
elif not sys.stderr.isatty():
# don't display progress bar when stderr is redirected to a file
def pbar(s, *args, **kwargs):
return s
with capa.helpers.CapaProgressBar(
console=capa.helpers.log_console, transient=True, disable=disable_progress
) as pbar:
task = pbar.add_task("matching", total=n_processes, unit="processes")
for p in processes:
process_matches, thread_matches, call_matches, feature_count = find_process_capabilities(
ruleset, extractor, p
)
feature_counts.processes += (
rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count),
)
logger.debug("analyzed %s and extracted %d features", p.address, feature_count)
processes = list(extractor.get_processes())
for rule_name, res in process_matches.items():
all_process_matches[rule_name].extend(res)
for rule_name, res in thread_matches.items():
all_thread_matches[rule_name].extend(res)
for rule_name, res in call_matches.items():
all_call_matches[rule_name].extend(res)
pb = pbar(processes, desc="matching", unit=" processes", leave=False)
for p in pb:
process_matches, thread_matches, call_matches, feature_count = find_process_capabilities(
ruleset, extractor, p
)
feature_counts.processes += (
rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count),
)
logger.debug("analyzed %s and extracted %d features", p.address, feature_count)
for rule_name, res in process_matches.items():
all_process_matches[rule_name].extend(res)
for rule_name, res in thread_matches.items():
all_thread_matches[rule_name].extend(res)
for rule_name, res in call_matches.items():
all_call_matches[rule_name].extend(res)
pbar.advance(task)
# collection of features that captures the rule matches within process and thread scopes.
# mapping from feature (matched rule) to set of addresses at which it matched.

View File

@@ -6,21 +6,18 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import time
import logging
import itertools
import collections
from typing import Any, Tuple
import tqdm.contrib.logging
from typing import Any, List, Tuple
import capa.perf
import capa.helpers
import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.helpers import redirecting_print_to_tqdm
from capa.capabilities.common import find_file_capabilities
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor
@@ -143,75 +140,58 @@ def find_static_capabilities(
library_functions: Tuple[rdoc.LibraryFunction, ...] = ()
assert isinstance(extractor, StaticFeatureExtractor)
with redirecting_print_to_tqdm(disable_progress):
with tqdm.contrib.logging.logging_redirect_tqdm():
pbar = tqdm.tqdm
if capa.helpers.is_runtime_ghidra():
# Ghidrathon interpreter cannot properly handle
# the TMonitor thread that is created via a monitor_interval
# > 0
pbar.monitor_interval = 0
if disable_progress:
# do not use tqdm to avoid unnecessary side effects when caller intends
# to disable progress completely
def pbar(s, *args, **kwargs):
return s
functions: List[FunctionHandle] = list(extractor.get_functions())
n_funcs: int = len(functions)
n_libs: int = 0
percentage: float = 0
elif not sys.stderr.isatty():
# don't display progress bar when stderr is redirected to a file
def pbar(s, *args, **kwargs):
return s
functions = list(extractor.get_functions())
n_funcs = len(functions)
pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions", leave=False)
for f in pb:
t0 = time.time()
if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address)
logger.debug("skipping library function 0x%x (%s)", f.address, function_name)
library_functions += (
rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name),
)
n_libs = len(library_functions)
percentage = round(100 * (n_libs / n_funcs))
if isinstance(pb, tqdm.tqdm):
pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)")
continue
function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(
ruleset, extractor, f
with capa.helpers.CapaProgressBar(
console=capa.helpers.log_console, transient=True, disable=disable_progress
) as pbar:
task = pbar.add_task(
"matching", total=n_funcs, unit="functions", postfix=f"skipped {n_libs} library functions, {percentage}%"
)
for f in functions:
t0 = time.time()
if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address)
logger.debug("skipping library function 0x%x (%s)", f.address, function_name)
library_functions += (
rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name),
)
feature_counts.functions += (
rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count),
)
t1 = time.time()
n_libs = len(library_functions)
percentage = round(100 * (n_libs / n_funcs))
pbar.update(task, postfix=f"skipped {n_libs} library functions, {percentage}%")
pbar.advance(task)
continue
match_count = 0
for name, matches_ in itertools.chain(
function_matches.items(), bb_matches.items(), insn_matches.items()
):
# in practice, most matches are derived rules,
# like "check OS version/5bf4c7f39fd4492cbed0f6dc7d596d49"
# but when we log to the human, they really care about "real" rules.
if not ruleset.rules[name].is_subscope_rule():
match_count += len(matches_)
function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(ruleset, extractor, f)
feature_counts.functions += (
rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count),
)
t1 = time.time()
logger.debug(
"analyzed function 0x%x and extracted %d features, %d matches in %0.02fs",
f.address,
feature_count,
match_count,
t1 - t0,
)
match_count = 0
for name, matches_ in itertools.chain(function_matches.items(), bb_matches.items(), insn_matches.items()):
if not ruleset.rules[name].is_subscope_rule():
match_count += len(matches_)
for rule_name, res in function_matches.items():
all_function_matches[rule_name].extend(res)
for rule_name, res in bb_matches.items():
all_bb_matches[rule_name].extend(res)
for rule_name, res in insn_matches.items():
all_insn_matches[rule_name].extend(res)
logger.debug(
"analyzed function 0x%x and extracted %d features, %d matches in %0.02fs",
f.address,
feature_count,
match_count,
t1 - t0,
)
for rule_name, res in function_matches.items():
all_function_matches[rule_name].extend(res)
for rule_name, res in bb_matches.items():
all_bb_matches[rule_name].extend(res)
for rule_name, res in insn_matches.items():
all_insn_matches[rule_name].extend(res)
pbar.advance(task)
# collection of features that captures the rule matches within function, BB, and instruction scopes.
# mapping from feature (matched rule) to set of addresses at which it matched.

View File

@@ -5,8 +5,6 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import struct
from typing import Tuple, Iterator
from binaryninja import Segment, BinaryView, SymbolType, SymbolBinding
@@ -20,56 +18,24 @@ from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress, Absolu
from capa.features.extractors.binja.helpers import read_c_string, unmangle_c_name
def check_segment_for_pe(bv: BinaryView, seg: Segment) -> Iterator[Tuple[int, int]]:
"""check segment for embedded PE
adapted for binja from:
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
"""
mz_xor = [
(
capa.features.extractors.helpers.xor_static(b"MZ", i),
capa.features.extractors.helpers.xor_static(b"PE", i),
i,
)
for i in range(256)
]
todo = []
# If this is the first segment of the binary, skip the first bytes. Otherwise, there will always be a matched
# PE at the start of the binaryview.
start = seg.start
if bv.view_type == "PE" and start == bv.start:
def check_segment_for_pe(bv: BinaryView, seg: Segment) -> Iterator[Tuple[Feature, Address]]:
"""check segment for embedded PE"""
start = 0
if bv.view_type == "PE" and seg.start == bv.start:
# If this is the first segment of the binary, skip the first bytes.
# Otherwise, there will always be a matched PE at the start of the binaryview.
start += 1
for mzx, pex, i in mz_xor:
for off, _ in bv.find_all_data(start, seg.end, mzx):
todo.append((off, mzx, pex, i))
buf = bv.read(seg.start, seg.length)
while len(todo):
off, mzx, pex, i = todo.pop()
# The MZ header has one field we will check e_lfanew is at 0x3c
e_lfanew = off + 0x3C
if seg.end < (e_lfanew + 4):
continue
newoff = struct.unpack("<I", capa.features.extractors.helpers.xor_static(bv.read(e_lfanew, 4), i))[0]
peoff = off + newoff
if seg.end < (peoff + 2):
continue
if bv.read(peoff, 2) == pex:
yield off, i
for offset, _ in capa.features.extractors.helpers.carve_pe(buf, start):
yield Characteristic("embedded pe"), FileOffsetAddress(seg.start + offset)
def extract_file_embedded_pe(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
"""extract embedded PE features"""
for seg in bv.segments:
for ea, _ in check_segment_for_pe(bv, seg):
yield Characteristic("embedded pe"), FileOffsetAddress(ea)
yield from check_segment_for_pe(bv, seg)
def extract_file_export_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -5,31 +5,175 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import os
import sys
import logging
import subprocess
import importlib.util
from typing import Optional
from pathlib import Path
logger = logging.getLogger(__name__)
# When the script gets executed as a standalone executable (via PyInstaller), `import binaryninja` does not work because
# we have excluded the binaryninja module in `pyinstaller.spec`. The trick here is to call the system Python and try
# to find out the path of the binaryninja module that has been installed.
# Note, including the binaryninja module in the `pyinstaller.spec` would not work, since the binaryninja module tries to
# find the binaryninja core e.g., `libbinaryninjacore.dylib`, using a relative path. And this does not work when the
# binaryninja module is extracted by the PyInstaller.
code = r"""
CODE = r"""
from pathlib import Path
from importlib import util
spec = util.find_spec('binaryninja')
if spec is not None:
if len(spec.submodule_search_locations) > 0:
path = Path(spec.submodule_search_locations[0])
# encode the path with utf8 then convert to hex, make sure it can be read and restored properly
print(str(path.parent).encode('utf8').hex())
path = Path(spec.submodule_search_locations[0])
# encode the path with utf8 then convert to hex, make sure it can be read and restored properly
print(str(path.parent).encode('utf8').hex())
"""
def find_binja_path() -> Path:
raw_output = subprocess.check_output(["python", "-c", code]).decode("ascii").strip()
return Path(bytes.fromhex(raw_output).decode("utf8"))
def find_binaryninja_path_via_subprocess() -> Optional[Path]:
raw_output = subprocess.check_output(["python", "-c", CODE]).decode("ascii").strip()
output = bytes.fromhex(raw_output).decode("utf8")
if not output.strip():
return None
return Path(output)
def get_desktop_entry(name: str) -> Optional[Path]:
"""
Find the path for the given XDG Desktop Entry name.
Like:
>> get_desktop_entry("com.vector35.binaryninja.desktop")
Path("~/.local/share/applications/com.vector35.binaryninja.desktop")
"""
assert sys.platform in ("linux", "linux2")
assert name.endswith(".desktop")
data_dirs = os.environ.get("XDG_DATA_DIRS", "/usr/share") + f":{Path.home()}/.local/share"
for data_dir in data_dirs.split(":"):
applications = Path(data_dir) / "applications"
for application in applications.glob("*.desktop"):
if application.name == name:
return application
return None
def get_binaryninja_path(desktop_entry: Path) -> Optional[Path]:
# from: Exec=/home/wballenthin/software/binaryninja/binaryninja %u
# to: /home/wballenthin/software/binaryninja/
for line in desktop_entry.read_text(encoding="utf-8").splitlines():
if not line.startswith("Exec="):
continue
if not line.endswith("binaryninja %u"):
continue
binaryninja_path = Path(line[len("Exec=") : -len("binaryninja %u")])
if not binaryninja_path.exists():
return None
return binaryninja_path
return None
def validate_binaryninja_path(binaryninja_path: Path) -> bool:
if not binaryninja_path:
return False
module_path = binaryninja_path / "python"
if not module_path.is_dir():
return False
if not (module_path / "binaryninja" / "__init__.py").is_file():
return False
return True
def find_binaryninja() -> Optional[Path]:
binaryninja_path = find_binaryninja_path_via_subprocess()
if not binaryninja_path or not validate_binaryninja_path(binaryninja_path):
if sys.platform == "linux" or sys.platform == "linux2":
# ok
logger.debug("detected OS: linux")
elif sys.platform == "darwin":
logger.warning("unsupported platform to find Binary Ninja: %s", sys.platform)
return False
elif sys.platform == "win32":
logger.warning("unsupported platform to find Binary Ninja: %s", sys.platform)
return False
else:
logger.warning("unsupported platform to find Binary Ninja: %s", sys.platform)
return False
desktop_entry = get_desktop_entry("com.vector35.binaryninja.desktop")
if not desktop_entry:
logger.debug("failed to find Binary Ninja application")
return None
logger.debug("found Binary Ninja application: %s", desktop_entry)
binaryninja_path = get_binaryninja_path(desktop_entry)
if not binaryninja_path:
logger.debug("failed to determine Binary Ninja installation path")
return None
if not validate_binaryninja_path(binaryninja_path):
logger.debug("failed to validate Binary Ninja installation")
return None
logger.debug("found Binary Ninja installation: %s", binaryninja_path)
return binaryninja_path / "python"
def is_binaryninja_installed() -> bool:
"""Is the binaryninja module ready to import?"""
try:
return importlib.util.find_spec("binaryninja") is not None
except ModuleNotFoundError:
return False
def has_binaryninja() -> bool:
if is_binaryninja_installed():
logger.debug("found installed Binary Ninja API")
return True
logger.debug("Binary Ninja API not installed, searching...")
binaryninja_path = find_binaryninja()
if not binaryninja_path:
logger.debug("failed to find Binary Ninja installation")
logger.debug("found Binary Ninja API: %s", binaryninja_path)
return binaryninja_path is not None
def load_binaryninja() -> bool:
try:
import binaryninja
return True
except ImportError:
binaryninja_path = find_binaryninja()
if not binaryninja_path:
return False
sys.path.append(binaryninja_path.absolute().as_posix())
try:
import binaryninja # noqa: F401 unused import
return True
except ImportError:
return False
if __name__ == "__main__":
print(find_binja_path())
print(find_binaryninja_path_via_subprocess())

View File

@@ -9,6 +9,7 @@
import logging
from typing import Tuple, Iterator
import capa.features.extractors.helpers
from capa.helpers import assert_never
from capa.features.insn import API, Number
from capa.features.common import String, Feature
@@ -50,7 +51,8 @@ def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -
else:
assert_never(value)
yield API(call.api), ch.address
for name in capa.features.extractors.helpers.generate_symbols("", call.api):
yield API(name), ch.address
def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -9,6 +9,7 @@
import logging
from typing import Tuple, Iterator
import capa.features.extractors.helpers
from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
@@ -44,7 +45,8 @@ def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -
# but yielding the entire string would be helpful for an analyst looking at the verbose output
yield String(arg_value), ch.address
yield API(call.name), ch.address
for name in capa.features.extractors.helpers.generate_symbols("", call.name):
yield API(name), ch.address
def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -8,7 +8,6 @@
from typing import List, Tuple, Iterator
import idaapi
import ida_nalt
import capa.ida.helpers
import capa.features.extractors.elf
@@ -32,7 +31,9 @@ class IdaFeatureExtractor(StaticFeatureExtractor):
def __init__(self):
super().__init__(
hashes=SampleHashes(
md5=ida_nalt.retrieve_input_file_md5(), sha1="(unknown)", sha256=ida_nalt.retrieve_input_file_sha256()
md5=capa.ida.helpers.retrieve_input_file_md5(),
sha1="(unknown)",
sha256=capa.ida.helpers.retrieve_input_file_sha256(),
)
)
self.global_features: List[Tuple[Feature, Address]] = []

View File

@@ -0,0 +1,113 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import os
import sys
import json
import logging
import importlib.util
from typing import Optional
from pathlib import Path
logger = logging.getLogger(__name__)
def is_idalib_installed() -> bool:
try:
return importlib.util.find_spec("ida") is not None
except ModuleNotFoundError:
return False
def get_idalib_user_config_path() -> Optional[Path]:
"""Get the path to the user's config file based on platform following IDA's user directories."""
# derived from `py-activate-idalib.py` from IDA v9.0 Beta 4
if sys.platform == "win32":
# On Windows, use the %APPDATA%\Hex-Rays\IDA Pro directory
config_dir = Path(os.getenv("APPDATA")) / "Hex-Rays" / "IDA Pro"
else:
# On macOS and Linux, use ~/.idapro
config_dir = Path.home() / ".idapro"
# Return the full path to the config file (now in JSON format)
user_config_path = config_dir / "ida-config.json"
if not user_config_path.exists():
return None
return user_config_path
def find_idalib() -> Optional[Path]:
config_path = get_idalib_user_config_path()
if not config_path:
return None
config = json.loads(config_path.read_text(encoding="utf-8"))
try:
ida_install_dir = Path(config["Paths"]["ida-install-dir"])
except KeyError:
return None
if not ida_install_dir.exists():
return None
libname = {
"win32": "idalib.dll",
"linux": "libidalib.so",
"linux2": "libidalib.so",
"darwin": "libidalib.dylib",
}[sys.platform]
if not (ida_install_dir / "ida.hlp").is_file():
return None
if not (ida_install_dir / libname).is_file():
return None
idalib_path = ida_install_dir / "idalib" / "python"
if not idalib_path.exists():
return None
if not (idalib_path / "ida" / "__init__.py").is_file():
return None
return idalib_path
def has_idalib() -> bool:
if is_idalib_installed():
logger.debug("found installed IDA idalib API")
return True
logger.debug("IDA idalib API not installed, searching...")
idalib_path = find_idalib()
if not idalib_path:
logger.debug("failed to find IDA idalib installation")
logger.debug("found IDA idalib API: %s", idalib_path)
return idalib_path is not None
def load_idalib() -> bool:
try:
import ida
return True
except ImportError:
idalib_path = find_idalib()
if not idalib_path:
return False
sys.path.append(idalib_path.absolute().as_posix())
try:
import ida # noqa: F401 unused import
return True
except ImportError:
return False

View File

@@ -130,7 +130,13 @@ def extract_file_arch(pe, **kwargs):
elif pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_AMD64"]:
yield Arch(ARCH_AMD64), NO_ADDRESS
else:
logger.warning("unsupported architecture: %s", pefile.MACHINE_TYPE[pe.FILE_HEADER.Machine])
try:
logger.warning(
"unsupported architecture: %s",
pefile.MACHINE_TYPE[pe.FILE_HEADER.Machine],
)
except KeyError:
logger.warning("unknown architecture: %s", pe.FILE_HEADER.Machine)
def extract_file_features(pe, buf):

View File

@@ -10,6 +10,7 @@ from typing import Dict, List, Tuple, Optional
from pathlib import Path
from zipfile import ZipFile
from collections import defaultdict
from dataclasses import dataclass
from capa.exceptions import UnsupportedFormatError
from capa.features.extractors.vmray.models import File, Flog, SummaryV2, StaticData, FunctionCall, xml_to_dict
@@ -21,6 +22,21 @@ DEFAULT_ARCHIVE_PASSWORD = b"infected"
SUPPORTED_FLOG_VERSIONS = ("2",)
@dataclass
class VMRayMonitorThread:
tid: int # thread ID assigned by OS
monitor_id: int # unique ID assigned to thread by VMRay
process_monitor_id: int # unqiue ID assigned to containing process by VMRay
@dataclass
class VMRayMonitorProcess:
pid: int # process ID assigned by OS
ppid: int # parent process ID assigned by OS
monitor_id: int # unique ID assigned to process by VMRay
image_name: str
class VMRayAnalysis:
def __init__(self, zipfile_path: Path):
self.zipfile = ZipFile(zipfile_path, "r")
@@ -45,9 +61,15 @@ class VMRayAnalysis:
self.exports: Dict[int, str] = {}
self.imports: Dict[int, Tuple[str, str]] = {}
self.sections: Dict[int, str] = {}
self.process_ids: Dict[int, int] = {}
self.process_threads: Dict[int, List[int]] = defaultdict(list)
self.process_calls: Dict[int, Dict[int, List[FunctionCall]]] = defaultdict(lambda: defaultdict(list))
self.monitor_processes: Dict[int, VMRayMonitorProcess] = {}
self.monitor_threads: Dict[int, VMRayMonitorThread] = {}
# map monitor thread IDs to their associated monitor process ID
self.monitor_threads_by_monitor_process: Dict[int, List[int]] = defaultdict(list)
# map function calls to their associated monitor thread ID mapped to its associated monitor process ID
self.monitor_process_calls: Dict[int, Dict[int, List[FunctionCall]]] = defaultdict(lambda: defaultdict(list))
self.base_address: int
self.sample_file_name: Optional[str] = None
@@ -79,13 +101,14 @@ class VMRayAnalysis:
self.sample_file_buf: bytes = self.zipfile.read(sample_file_path, pwd=DEFAULT_ARCHIVE_PASSWORD)
# do not change order, it matters
self._compute_base_address()
self._compute_imports()
self._compute_exports()
self._compute_sections()
self._compute_process_ids()
self._compute_process_threads()
self._compute_process_calls()
self._compute_monitor_processes()
self._compute_monitor_threads()
self._compute_monitor_process_calls()
def _find_sample_file(self):
for file_name, file_analysis in self.sv2.files.items():
@@ -128,34 +151,48 @@ class VMRayAnalysis:
for elffile_section in self.sample_file_static_data.elf.sections:
self.sections[elffile_section.header.sh_addr] = elffile_section.header.sh_name
def _compute_process_ids(self):
def _compute_monitor_processes(self):
for process in self.sv2.processes.values():
# we expect VMRay's monitor IDs to be unique, but OS PIDs may be reused
assert process.monitor_id not in self.process_ids.keys()
self.process_ids[process.monitor_id] = process.os_pid
# we expect monitor IDs to be unique
assert process.monitor_id not in self.monitor_processes
def _compute_process_threads(self):
# logs/flog.xml appears to be the only file that contains thread-related data
# so we use it here to map processes to threads
ppid: int = (
self.sv2.processes[process.ref_parent_process.path[1]].os_pid if process.ref_parent_process else 0
)
self.monitor_processes[process.monitor_id] = VMRayMonitorProcess(
process.os_pid, ppid, process.monitor_id, process.image_name
)
# not all processes are recorded in SummaryV2.json, get missing data from flog.xml, see #2394
for monitor_process in self.flog.analysis.monitor_processes:
vmray_monitor_process: VMRayMonitorProcess = VMRayMonitorProcess(
monitor_process.os_pid,
monitor_process.os_parent_pid,
monitor_process.process_id,
monitor_process.image_name,
)
if monitor_process.process_id not in self.monitor_processes:
self.monitor_processes[monitor_process.process_id] = vmray_monitor_process
else:
# we expect monitor processes recorded in both SummaryV2.json and flog.xml to equal
assert self.monitor_processes[monitor_process.process_id] == vmray_monitor_process
def _compute_monitor_threads(self):
for monitor_thread in self.flog.analysis.monitor_threads:
# we expect monitor IDs to be unique
assert monitor_thread.thread_id not in self.monitor_threads
self.monitor_threads[monitor_thread.thread_id] = VMRayMonitorThread(
monitor_thread.os_tid, monitor_thread.thread_id, monitor_thread.process_id
)
# we expect each monitor thread ID to be unique for its associated monitor process ID e.g. monitor
# thread ID 10 should not be captured twice for monitor process ID 1
assert monitor_thread.thread_id not in self.monitor_threads_by_monitor_process[monitor_thread.thread_id]
self.monitor_threads_by_monitor_process[monitor_thread.process_id].append(monitor_thread.thread_id)
def _compute_monitor_process_calls(self):
for function_call in self.flog.analysis.function_calls:
pid: int = self.get_process_os_pid(function_call.process_id) # flog.xml uses process monitor ID, not OS PID
tid: int = function_call.thread_id
assert isinstance(pid, int)
assert isinstance(tid, int)
if tid not in self.process_threads[pid]:
self.process_threads[pid].append(tid)
def _compute_process_calls(self):
for function_call in self.flog.analysis.function_calls:
pid: int = self.get_process_os_pid(function_call.process_id) # flog.xml uses process monitor ID, not OS PID
tid: int = function_call.thread_id
assert isinstance(pid, int)
assert isinstance(tid, int)
self.process_calls[pid][tid].append(function_call)
def get_process_os_pid(self, monitor_id: int) -> int:
return self.process_ids[monitor_id]
self.monitor_process_calls[function_call.process_id][function_call.thread_id].append(function_call)

View File

@@ -8,6 +8,7 @@
import logging
from typing import Tuple, Iterator
import capa.features.extractors.helpers
from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
@@ -26,7 +27,11 @@ def get_call_param_features(param: Param, ch: CallHandle) -> Iterator[Tuple[Feat
if param.deref.type_ in PARAM_TYPE_INT:
yield Number(hexint(param.deref.value)), ch.address
elif param.deref.type_ in PARAM_TYPE_STR:
yield String(param.deref.value), ch.address
# TODO(mr-tz): remove FPS like " \\x01\\x02\\x03\\x04\\x05\\x06\\x07\\x08\\x09\\x0a\\x0b\\x0c\\x0d\\x0e\\x0f\\x10\\x11\\x12\\x13\\x14\\x15\\x16\\x17\\x18\\x19\\x1a\\x1b\\x1c\\x1d\\x1e\..."
# https://github.com/mandiant/capa/issues/2432
# parsing the data up to here results in double-escaped backslashes, remove those here
yield String(param.deref.value.replace("\\\\", "\\")), ch.address
else:
logger.debug("skipping deref param type %s", param.deref.type_)
elif param.value is not None:
@@ -41,7 +46,8 @@ def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -
for param in call.params_in.params:
yield from get_call_param_features(param, ch)
yield API(call.name), ch.address
for name in capa.features.extractors.helpers.generate_symbols("", call.name):
yield API(name), ch.address
def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -15,9 +15,16 @@ import capa.features.extractors.vmray.call
import capa.features.extractors.vmray.file
import capa.features.extractors.vmray.global_
from capa.features.common import Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, ThreadAddress, DynamicCallAddress, AbsoluteVirtualAddress
from capa.features.extractors.vmray import VMRayAnalysis
from capa.features.extractors.vmray.models import PARAM_TYPE_STR, Process, ParamList, FunctionCall
from capa.features.address import (
NO_ADDRESS,
Address,
ThreadAddress,
ProcessAddress,
DynamicCallAddress,
AbsoluteVirtualAddress,
)
from capa.features.extractors.vmray import VMRayAnalysis, VMRayMonitorThread, VMRayMonitorProcess
from capa.features.extractors.vmray.models import PARAM_TYPE_STR, ParamList, FunctionCall
from capa.features.extractors.base_extractor import (
CallHandle,
SampleHashes,
@@ -69,20 +76,24 @@ class VMRayExtractor(DynamicFeatureExtractor):
yield from self.global_features
def get_processes(self) -> Iterator[ProcessHandle]:
yield from capa.features.extractors.vmray.file.get_processes(self.analysis)
for monitor_process in self.analysis.monitor_processes.values():
address: ProcessAddress = ProcessAddress(pid=monitor_process.pid, ppid=monitor_process.ppid)
yield ProcessHandle(address, inner=monitor_process)
def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
# we have not identified process-specific features for VMRay yet
yield from []
def get_process_name(self, ph) -> str:
process: Process = ph.inner
return process.image_name
monitor_process: VMRayMonitorProcess = ph.inner
return monitor_process.image_name
def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
for thread in self.analysis.process_threads[ph.address.pid]:
address: ThreadAddress = ThreadAddress(process=ph.address, tid=thread)
yield ThreadHandle(address=address, inner={})
for monitor_thread_id in self.analysis.monitor_threads_by_monitor_process[ph.inner.monitor_id]:
monitor_thread: VMRayMonitorThread = self.analysis.monitor_threads[monitor_thread_id]
address: ThreadAddress = ThreadAddress(process=ph.address, tid=monitor_thread.tid)
yield ThreadHandle(address=address, inner=monitor_thread)
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
if False:
@@ -92,7 +103,7 @@ class VMRayExtractor(DynamicFeatureExtractor):
return
def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
for function_call in self.analysis.process_calls[ph.address.pid][th.address.tid]:
for function_call in self.analysis.monitor_process_calls[ph.inner.monitor_id][th.inner.monitor_id]:
addr = DynamicCallAddress(thread=th.address, id=function_call.fncall_id)
yield CallHandle(address=addr, inner=function_call)

View File

@@ -6,37 +6,18 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, Tuple, Iterator
from typing import Tuple, Iterator
import capa.features.extractors.common
from capa.features.file import Export, Import, Section
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
from capa.features.extractors.vmray import VMRayAnalysis
from capa.features.extractors.helpers import generate_symbols
from capa.features.extractors.vmray.models import Process
from capa.features.extractors.base_extractor import ProcessHandle
logger = logging.getLogger(__name__)
def get_processes(analysis: VMRayAnalysis) -> Iterator[ProcessHandle]:
processes: Dict[str, Process] = analysis.sv2.processes
for process in processes.values():
# we map VMRay's monitor ID to the OS PID to make it easier for users
# to follow the processes in capa's output
pid: int = analysis.get_process_os_pid(process.monitor_id)
ppid: int = (
analysis.get_process_os_pid(processes[process.ref_parent_process.path[1]].monitor_id)
if process.ref_parent_process
else 0
)
addr: ProcessAddress = ProcessAddress(pid=pid, ppid=ppid)
yield ProcessHandle(address=addr, inner=process)
def extract_export_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for addr, name in analysis.exports.items():
yield Export(name), AbsoluteVirtualAddress(addr)

View File

@@ -87,7 +87,7 @@ class Param(BaseModel):
deref: Optional[ParamDeref] = None
def validate_param_list(value: Union[List[Param], Param]) -> List[Param]:
def validate_ensure_is_list(value: Union[List[Param], Param]) -> List[Param]:
if isinstance(value, list):
return value
else:
@@ -97,7 +97,7 @@ def validate_param_list(value: Union[List[Param], Param]) -> List[Param]:
# params may be stored as a list of Param or a single Param so we convert
# the input value to Python list type before the inner validation (List[Param])
# is called
ParamList = Annotated[List[Param], BeforeValidator(validate_param_list)]
ParamList = Annotated[List[Param], BeforeValidator(validate_ensure_is_list)]
class Params(BaseModel):
@@ -137,12 +137,46 @@ class FunctionReturn(BaseModel):
from_addr: HexInt = Field(alias="from")
class MonitorProcess(BaseModel):
ts: HexInt
process_id: int
image_name: str
filename: str
# page_root: HexInt
os_pid: HexInt
# os_integrity_level: HexInt
# os_privileges: HexInt
monitor_reason: str
parent_id: int
os_parent_pid: HexInt
# cmd_line: str
# cur_dir: str
# os_username: str
# bitness: int
# os_groups: str
class MonitorThread(BaseModel):
ts: HexInt
thread_id: int
process_id: int
os_tid: HexInt
# handle if there's only single entries, but the model expects a list
MonitorProcessList = Annotated[List[MonitorProcess], BeforeValidator(validate_ensure_is_list)]
MonitorThreadList = Annotated[List[MonitorThread], BeforeValidator(validate_ensure_is_list)]
FunctionCallList = Annotated[List[FunctionCall], BeforeValidator(validate_ensure_is_list)]
class Analysis(BaseModel):
log_version: str # tested 2
analyzer_version: str # tested 2024.2.1
# analysis_date: str
function_calls: List[FunctionCall] = Field(alias="fncall", default=[])
monitor_processes: MonitorProcessList = Field(alias="monitor_process", default=[])
monitor_threads: MonitorThreadList = Field(alias="monitor_thread", default=[])
function_calls: FunctionCallList = Field(alias="fncall", default=[])
# function_returns: List[FunctionReturn] = Field(alias="fnret", default=[])

View File

@@ -372,6 +372,10 @@ if __name__ == "__main__":
from capa.exceptions import UnsupportedRuntimeError
raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.8+")
elif sys.version_info < (3, 10):
from warnings import warn
warn("This is the last capa version supporting Python 3.8 and 3.9.", DeprecationWarning, stacklevel=2)
exit_code = main()
if exit_code != 0:
popup("capa explorer encountered errors during analysis. Please check the console output for more information.") # type: ignore [name-defined] # noqa: F821

View File

@@ -164,4 +164,8 @@ if __name__ == "__main__":
from capa.exceptions import UnsupportedRuntimeError
raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.8+")
elif sys.version_info < (3, 10):
from warnings import warn
warn("This is the last capa version supporting Python 3.8 and 3.9.", DeprecationWarning, stacklevel=2)
sys.exit(main())

View File

@@ -5,11 +5,13 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import os
import sys
import gzip
import inspect
import ctypes
import logging
import tempfile
import contextlib
import importlib.util
from typing import Dict, List, Union, BinaryIO, Iterator, NoReturn
@@ -17,8 +19,21 @@ from pathlib import Path
from zipfile import ZipFile
from datetime import datetime
import tqdm
import msgspec.json
from rich.console import Console
from rich.progress import (
Task,
Text,
Progress,
BarColumn,
TextColumn,
SpinnerColumn,
ProgressColumn,
TimeElapsedColumn,
MofNCompleteColumn,
TaskProgressColumn,
TimeRemainingColumn,
)
from capa.exceptions import UnsupportedFormatError
from capa.features.common import (
@@ -48,6 +63,10 @@ EXTENSIONS_FREEZE = "frz"
logger = logging.getLogger("capa")
# shared console used to redirect logging to stderr
log_console: Console = Console(stderr=True)
def hex(n: int) -> str:
"""render the given number using upper case hex, like: 0x123ABC"""
if n < 0:
@@ -81,6 +100,59 @@ def assert_never(value) -> NoReturn:
assert False, f"Unhandled value: {value} ({type(value).__name__})" # noqa: B011
@contextlib.contextmanager
def stdout_redirector(stream):
"""
Redirect stdout at the C runtime level,
which lets us handle native libraries that spam stdout.
*But*, this only works on Linux! Otherwise will silently still write to stdout.
So, try to upstream the fix when possible.
Via: https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
"""
if sys.platform not in ("linux", "linux2"):
logger.warning("Unable to capture STDOUT on non-Linux (begin)")
yield
logger.warning("Unable to capture STDOUT on non-Linux (end)")
return
# libc is only on Linux
LIBC = ctypes.CDLL(None)
C_STDOUT = ctypes.c_void_p.in_dll(LIBC, "stdout")
# The original fd stdout points to. Usually 1 on POSIX systems.
original_stdout_fd = sys.stdout.fileno()
def _redirect_stdout(to_fd):
"""Redirect stdout to the given file descriptor."""
# Flush the C-level buffer stdout
LIBC.fflush(C_STDOUT)
# Flush and close sys.stdout - also closes the file descriptor (fd)
sys.stdout.close()
# Make original_stdout_fd point to the same file as to_fd
os.dup2(to_fd, original_stdout_fd)
# Create a new sys.stdout that points to the redirected fd
sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, "wb"))
# Save a copy of the original stdout fd in saved_stdout_fd
saved_stdout_fd = os.dup(original_stdout_fd)
try:
# Create a temporary file and redirect stdout to it
tfile = tempfile.TemporaryFile(mode="w+b")
_redirect_stdout(tfile.fileno())
# Yield to caller, then redirect stdout back to the saved fd
yield
_redirect_stdout(saved_stdout_fd)
# Copy contents of temporary file to the given stream
tfile.flush()
tfile.seek(0, io.SEEK_SET)
stream.write(tfile.read())
finally:
tfile.close()
os.close(saved_stdout_fd)
def load_json_from_path(json_path: Path):
with gzip.open(json_path, "r") as compressed_report:
try:
@@ -191,39 +263,6 @@ def get_format(sample: Path) -> str:
return FORMAT_UNKNOWN
@contextlib.contextmanager
def redirecting_print_to_tqdm(disable_progress):
"""
tqdm (progress bar) expects to have fairly tight control over console output.
so calls to `print()` will break the progress bar and make things look bad.
so, this context manager temporarily replaces the `print` implementation
with one that is compatible with tqdm.
via: https://stackoverflow.com/a/42424890/87207
"""
old_print = print # noqa: T202 [reserved word print used]
def new_print(*args, **kwargs):
# If tqdm.tqdm.write raises error, use builtin print
if disable_progress:
old_print(*args, **kwargs)
else:
try:
tqdm.tqdm.write(*args, **kwargs)
except Exception:
old_print(*args, **kwargs)
try:
# Globally replace print with new_print.
# Verified this works manually on Python 3.11:
# >>> import inspect
# >>> inspect.builtins
# <module 'builtins' (built-in)>
inspect.builtins.print = new_print # type: ignore
yield
finally:
inspect.builtins.print = old_print # type: ignore
def log_unsupported_format_error():
logger.error("-" * 80)
logger.error(" Input file does not appear to be a supported file.")
@@ -377,3 +416,47 @@ def is_cache_newer_than_rule_code(cache_dir: Path) -> bool:
return False
return True
class RateColumn(ProgressColumn):
"""Renders speed column in progress bar."""
def render(self, task: "Task") -> Text:
speed = f"{task.speed:>.1f}" if task.speed else "00.0"
unit = task.fields.get("unit", "it")
return Text.from_markup(f"[progress.data.speed]{speed} {unit}/s")
class PostfixColumn(ProgressColumn):
"""Renders a postfix column in progress bar."""
def render(self, task: "Task") -> Text:
return Text(task.fields.get("postfix", ""))
class MofNCompleteColumnWithUnit(MofNCompleteColumn):
"""Renders completed/total count column with a unit."""
def render(self, task: "Task") -> Text:
ret = super().render(task)
unit = task.fields.get("unit")
return ret.append(f" {unit}") if unit else ret
class CapaProgressBar(Progress):
@classmethod
def get_default_columns(cls):
return (
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
TaskProgressColumn(),
BarColumn(),
MofNCompleteColumnWithUnit(),
"",
TimeElapsedColumn(),
"<",
TimeRemainingColumn(),
"",
RateColumn(),
PostfixColumn(),
)

View File

@@ -14,6 +14,7 @@ from pathlib import Path
import idc
import idaapi
import ida_ida
import ida_nalt
import idautils
import ida_bytes
import ida_loader
@@ -64,6 +65,12 @@ if version < 9.0:
info: idaapi.idainfo = idaapi.get_inf_structure()
return info.is_64bit()
def retrieve_input_file_md5() -> str:
return ida_nalt.retrieve_input_file_md5()
def retrieve_input_file_sha256() -> str:
return ida_nalt.retrieve_input_file_sha256()
else:
def get_filetype() -> "ida_ida.filetype_t":
@@ -78,6 +85,12 @@ else:
def is_64bit() -> bool:
return idaapi.inf_is_64bit()
def retrieve_input_file_md5() -> str:
return ida_nalt.retrieve_input_file_md5().hex()
def retrieve_input_file_sha256() -> str:
return ida_nalt.retrieve_input_file_sha256().hex()
def inform_user_ida_ui(message):
# this isn't a logger, this is IDA's logging facility

View File

@@ -5,8 +5,8 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import os
import sys
import logging
import datetime
import contextlib
@@ -69,6 +69,7 @@ BACKEND_DRAKVUF = "drakvuf"
BACKEND_VMRAY = "vmray"
BACKEND_FREEZE = "freeze"
BACKEND_BINEXPORT2 = "binexport2"
BACKEND_IDA = "ida"
class CorruptFile(ValueError):
@@ -170,6 +171,7 @@ def get_workspace(path: Path, input_format: str, sigpaths: List[Path]):
# to do a subclass check via isinstance.
if type(e) is Exception and "Couldn't convert rva" in e.args[0]:
raise CorruptFile(e.args[0]) from e
raise
viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths])
@@ -237,24 +239,15 @@ def get_extractor(
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(input_path)
elif backend == BACKEND_BINJA:
import capa.helpers
from capa.features.extractors.binja.find_binja_api import find_binja_path
import capa.features.extractors.binja.find_binja_api as finder
# When we are running as a standalone executable, we cannot directly import binaryninja
# We need to fist find the binja API installation path and add it into sys.path
if capa.helpers.is_running_standalone():
bn_api = find_binja_path()
if bn_api.exists():
sys.path.append(str(bn_api))
if not finder.has_binaryninja():
raise RuntimeError("cannot find Binary Ninja API module.")
try:
import binaryninja
from binaryninja import BinaryView
except ImportError:
raise RuntimeError(
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: "
+ "https://docs.binary.ninja/dev/batch.html#install-the-api)."
)
if not finder.load_binaryninja():
raise RuntimeError("failed to load Binary Ninja API module.")
import binaryninja
import capa.features.extractors.binja.extractor
@@ -269,7 +262,7 @@ def get_extractor(
raise UnsupportedOSError()
with console.status("analyzing program...", spinner="dots"):
bv: BinaryView = binaryninja.load(str(input_path))
bv: binaryninja.BinaryView = binaryninja.load(str(input_path))
if bv is None:
raise RuntimeError(f"Binary Ninja cannot open file {input_path}")
@@ -321,6 +314,34 @@ def get_extractor(
return capa.features.extractors.binexport2.extractor.BinExport2FeatureExtractor(be2, buf)
elif backend == BACKEND_IDA:
import capa.features.extractors.ida.idalib as idalib
if not idalib.has_idalib():
raise RuntimeError("cannot find IDA idalib module.")
if not idalib.load_idalib():
raise RuntimeError("failed to load IDA idalib module.")
import ida
import ida_auto
import capa.features.extractors.ida.extractor
logger.debug("idalib: opening database...")
# idalib writes to stdout (ugh), so we have to capture that
# so as not to screw up structured output.
with capa.helpers.stdout_redirector(io.BytesIO()):
with console.status("analyzing program...", spinner="dots"):
if ida.open_database(str(input_path), run_auto_analysis=True):
raise RuntimeError("failed to analyze input file")
logger.debug("idalib: waiting for analysis...")
ida_auto.auto_wait()
logger.debug("idalib: opened database.")
return capa.features.extractors.ida.extractor.IdaFeatureExtractor()
else:
raise ValueError("unexpected backend: " + backend)

View File

@@ -22,6 +22,7 @@ from pathlib import Path
import colorama
from pefile import PEFormatError
from rich.logging import RichHandler
from elftools.common.exceptions import ELFError
import capa.perf
@@ -43,6 +44,7 @@ import capa.features.extractors.common
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.loader import (
BACKEND_IDA,
BACKEND_VIV,
BACKEND_CAPE,
BACKEND_BINJA,
@@ -283,6 +285,7 @@ def install_common_args(parser, wanted=None):
backends = [
(BACKEND_AUTO, "(default) detect appropriate backend automatically"),
(BACKEND_VIV, "vivisect"),
(BACKEND_IDA, "IDA via idalib"),
(BACKEND_PEFILE, "pefile (file features only)"),
(BACKEND_BINJA, "Binary Ninja"),
(BACKEND_DOTNET, ".NET"),
@@ -403,15 +406,23 @@ def handle_common_args(args):
ShouldExitError: if the program is invoked incorrectly and should exit.
"""
if args.quiet:
logging.basicConfig(level=logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
# use [/] after the logger name to reset any styling,
# and prevent the color from carrying over to the message
logformat = "[dim]%(name)s[/]: %(message)s"
# set markup=True to allow the use of Rich's markup syntax in log messages
rich_handler = RichHandler(markup=True, show_time=False, show_path=True, console=capa.helpers.log_console)
rich_handler.setFormatter(logging.Formatter(logformat))
# use RichHandler for root logger
logging.getLogger().addHandler(rich_handler)
# disable vivisect-related logging, it's verbose and not relevant for capa users
set_vivisect_log_level(logging.CRITICAL)
@@ -892,6 +903,10 @@ def apply_extractor_filters(extractor: FeatureExtractor, extractor_filters: Filt
def main(argv: Optional[List[str]] = None):
if sys.version_info < (3, 8):
raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.8+")
elif sys.version_info < (3, 10):
from warnings import warn
warn("This is the last capa version supporting Python 3.8 and 3.9.", DeprecationWarning, stacklevel=2)
if argv is None:
argv = sys.argv[1:]

View File

@@ -9,28 +9,29 @@
import io
from typing import Dict, List, Tuple, Union, Iterator, Optional
import termcolor
import rich.console
from rich.progress import Text
import capa.render.result_document as rd
def bold(s: str) -> str:
def bold(s: str) -> Text:
"""draw attention to the given string"""
return termcolor.colored(s, "cyan")
return Text.from_markup(f"[cyan]{s}")
def bold2(s: str) -> str:
def bold2(s: str) -> Text:
"""draw attention to the given string, within a `bold` section"""
return termcolor.colored(s, "green")
return Text.from_markup(f"[green]{s}")
def mute(s: str) -> str:
def mute(s: str) -> Text:
"""draw attention away from the given string"""
return termcolor.colored(s, "dark_grey")
return Text.from_markup(f"[dim]{s}")
def warn(s: str) -> str:
return termcolor.colored(s, "yellow")
def warn(s: str) -> Text:
return Text.from_markup(f"[yellow]{s}")
def format_parts_id(data: Union[rd.AttackSpec, rd.MBCSpec]):
@@ -85,3 +86,17 @@ class StringIO(io.StringIO):
def writeln(self, s):
self.write(s)
self.write("\n")
class Console(rich.console.Console):
def writeln(self, *args, **kwargs) -> None:
"""
prints the text with a new line at the end.
"""
return self.print(*args, **kwargs)
def write(self, *args, **kwargs) -> None:
"""
prints the text without a new line at the end.
"""
return self.print(*args, **kwargs, end="")

View File

@@ -25,7 +25,8 @@ See the License for the specific language governing permissions and limitations
from typing import cast
import tabulate
from rich.text import Text
from rich.table import Table
import capa.rules
import capa.helpers
@@ -34,6 +35,7 @@ import capa.features.freeze as frz
import capa.render.result_document as rd
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.render.utils import Console
def format_address(address: frz.Address) -> str:
@@ -140,7 +142,7 @@ def render_call(layout: rd.DynamicLayout, addr: frz.Address) -> str:
)
def render_static_meta(ostream, meta: rd.StaticMetadata):
def render_static_meta(console: Console, meta: rd.StaticMetadata):
"""
like:
@@ -161,12 +163,16 @@ def render_static_meta(ostream, meta: rd.StaticMetadata):
total feature count 1918
"""
grid = Table.grid(padding=(0, 2))
grid.add_column(style="dim")
grid.add_column()
rows = [
("md5", meta.sample.md5),
("sha1", meta.sample.sha1),
("sha256", meta.sample.sha256),
("path", meta.sample.path),
("timestamp", meta.timestamp),
("timestamp", str(meta.timestamp)),
("capa version", meta.version),
("os", meta.analysis.os),
("format", meta.analysis.format),
@@ -175,18 +181,21 @@ def render_static_meta(ostream, meta: rd.StaticMetadata):
("extractor", meta.analysis.extractor),
("base address", format_address(meta.analysis.base_address)),
("rules", "\n".join(meta.analysis.rules)),
("function count", len(meta.analysis.feature_counts.functions)),
("library function count", len(meta.analysis.library_functions)),
("function count", str(len(meta.analysis.feature_counts.functions))),
("library function count", str(len(meta.analysis.library_functions))),
(
"total feature count",
meta.analysis.feature_counts.file + sum(f.count for f in meta.analysis.feature_counts.functions),
str(meta.analysis.feature_counts.file + sum(f.count for f in meta.analysis.feature_counts.functions)),
),
]
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
for row in rows:
grid.add_row(*row)
console.print(grid)
def render_dynamic_meta(ostream, meta: rd.DynamicMetadata):
def render_dynamic_meta(console: Console, meta: rd.DynamicMetadata):
"""
like:
@@ -205,12 +214,16 @@ def render_dynamic_meta(ostream, meta: rd.DynamicMetadata):
total feature count 1918
"""
table = Table.grid(padding=(0, 2))
table.add_column(style="dim")
table.add_column()
rows = [
("md5", meta.sample.md5),
("sha1", meta.sample.sha1),
("sha256", meta.sample.sha256),
("path", meta.sample.path),
("timestamp", meta.timestamp),
("timestamp", str(meta.timestamp)),
("capa version", meta.version),
("os", meta.analysis.os),
("format", meta.analysis.format),
@@ -218,26 +231,29 @@ def render_dynamic_meta(ostream, meta: rd.DynamicMetadata):
("analysis", meta.flavor.value),
("extractor", meta.analysis.extractor),
("rules", "\n".join(meta.analysis.rules)),
("process count", len(meta.analysis.feature_counts.processes)),
("process count", str(len(meta.analysis.feature_counts.processes))),
(
"total feature count",
meta.analysis.feature_counts.file + sum(p.count for p in meta.analysis.feature_counts.processes),
str(meta.analysis.feature_counts.file + sum(p.count for p in meta.analysis.feature_counts.processes)),
),
]
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
for row in rows:
table.add_row(*row)
console.print(table)
def render_meta(osstream, doc: rd.ResultDocument):
def render_meta(console: Console, doc: rd.ResultDocument):
if doc.meta.flavor == rd.Flavor.STATIC:
render_static_meta(osstream, cast(rd.StaticMetadata, doc.meta))
render_static_meta(console, cast(rd.StaticMetadata, doc.meta))
elif doc.meta.flavor == rd.Flavor.DYNAMIC:
render_dynamic_meta(osstream, cast(rd.DynamicMetadata, doc.meta))
render_dynamic_meta(console, cast(rd.DynamicMetadata, doc.meta))
else:
raise ValueError("invalid meta analysis")
def render_rules(ostream, doc: rd.ResultDocument):
def render_rules(console: Console, doc: rd.ResultDocument):
"""
like:
@@ -254,11 +270,15 @@ def render_rules(ostream, doc: rd.ResultDocument):
if count == 1:
capability = rutils.bold(rule.meta.name)
else:
capability = f"{rutils.bold(rule.meta.name)} ({count} matches)"
capability = Text.assemble(rutils.bold(rule.meta.name), f" ({count} matches)")
ostream.writeln(capability)
console.print(capability)
had_match = True
table = Table.grid(padding=(0, 2))
table.add_column(style="dim")
table.add_column()
rows = []
ns = rule.meta.namespace
@@ -310,23 +330,26 @@ def render_rules(ostream, doc: rd.ResultDocument):
rows.append(("matches", "\n".join(lines)))
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
ostream.write("\n")
for row in rows:
table.add_row(*row)
console.print(table)
console.print()
if not had_match:
ostream.writeln(rutils.bold("no capabilities found"))
console.print(rutils.bold("no capabilities found"))
def render_verbose(doc: rd.ResultDocument):
ostream = rutils.StringIO()
console = Console(highlight=False)
render_meta(ostream, doc)
ostream.write("\n")
with console.capture() as capture:
render_meta(console, doc)
console.print()
render_rules(console, doc)
console.print()
render_rules(ostream, doc)
ostream.write("\n")
return ostream.getvalue()
return capture.get()
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:

View File

@@ -9,7 +9,8 @@ import logging
import textwrap
from typing import Dict, Iterable, Optional
import tabulate
from rich.text import Text
from rich.table import Table
import capa.rules
import capa.helpers
@@ -22,6 +23,7 @@ import capa.render.result_document as rd
import capa.features.freeze.features as frzf
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.render.utils import Console
logger = logging.getLogger(__name__)
@@ -45,7 +47,7 @@ def hanging_indent(s: str, indent: int) -> str:
return textwrap.indent(s, prefix=prefix)[len(prefix) :]
def render_locations(ostream, layout: rd.Layout, locations: Iterable[frz.Address], indent: int):
def render_locations(console: Console, layout: rd.Layout, locations: Iterable[frz.Address], indent: int):
import capa.render.verbose as v
# it's possible to have an empty locations array here,
@@ -56,7 +58,7 @@ def render_locations(ostream, layout: rd.Layout, locations: Iterable[frz.Address
if len(locations) == 0:
return
ostream.write(" @ ")
console.write(" @ ")
location0 = locations[0]
if len(locations) == 1:
@@ -64,58 +66,58 @@ def render_locations(ostream, layout: rd.Layout, locations: Iterable[frz.Address
if location.type == frz.AddressType.CALL:
assert isinstance(layout, rd.DynamicLayout)
ostream.write(hanging_indent(v.render_call(layout, location), indent + 1))
console.write(hanging_indent(v.render_call(layout, location), indent + 1))
else:
ostream.write(v.format_address(locations[0]))
console.write(v.format_address(locations[0]))
elif location0.type == frz.AddressType.CALL and len(locations) > 1:
location = locations[0]
assert isinstance(layout, rd.DynamicLayout)
s = f"{v.render_call(layout, location)}\nand {(len(locations) - 1)} more..."
ostream.write(hanging_indent(s, indent + 1))
console.write(hanging_indent(s, indent + 1))
elif len(locations) > 4:
# don't display too many locations, because it becomes very noisy.
# probably only the first handful of locations will be useful for inspection.
ostream.write(", ".join(map(v.format_address, locations[0:4])))
ostream.write(f", and {(len(locations) - 4)} more...")
console.write(", ".join(map(v.format_address, locations[0:4])))
console.write(f", and {(len(locations) - 4)} more...")
elif len(locations) > 1:
ostream.write(", ".join(map(v.format_address, locations)))
console.write(", ".join(map(v.format_address, locations)))
else:
raise RuntimeError("unreachable")
def render_statement(ostream, layout: rd.Layout, match: rd.Match, statement: rd.Statement, indent: int):
ostream.write(" " * indent)
def render_statement(console: Console, layout: rd.Layout, match: rd.Match, statement: rd.Statement, indent: int):
console.write(" " * indent)
if isinstance(statement, rd.SubscopeStatement):
# emit `basic block:`
# rather than `subscope:`
ostream.write(statement.scope)
console.write(statement.scope)
ostream.write(":")
console.write(":")
if statement.description:
ostream.write(f" = {statement.description}")
ostream.writeln("")
console.write(f" = {statement.description}")
console.writeln()
elif isinstance(statement, (rd.CompoundStatement)):
# emit `and:` `or:` `optional:` `not:`
ostream.write(statement.type)
console.write(statement.type)
ostream.write(":")
console.write(":")
if statement.description:
ostream.write(f" = {statement.description}")
ostream.writeln("")
console.write(f" = {statement.description}")
console.writeln()
elif isinstance(statement, rd.SomeStatement):
ostream.write(f"{statement.count} or more:")
console.write(f"{statement.count} or more:")
if statement.description:
ostream.write(f" = {statement.description}")
ostream.writeln("")
console.write(f" = {statement.description}")
console.writeln()
elif isinstance(statement, rd.RangeStatement):
# `range` is a weird node, its almost a hybrid of statement+feature.
@@ -133,25 +135,25 @@ def render_statement(ostream, layout: rd.Layout, match: rd.Match, statement: rd.
value = rutils.bold2(value)
if child.description:
ostream.write(f"count({child.type}({value} = {child.description})): ")
console.write(f"count({child.type}({value} = {child.description})): ")
else:
ostream.write(f"count({child.type}({value})): ")
console.write(f"count({child.type}({value})): ")
else:
ostream.write(f"count({child.type}): ")
console.write(f"count({child.type}): ")
if statement.max == statement.min:
ostream.write(f"{statement.min}")
console.write(f"{statement.min}")
elif statement.min == 0:
ostream.write(f"{statement.max} or fewer")
console.write(f"{statement.max} or fewer")
elif statement.max == (1 << 64 - 1):
ostream.write(f"{statement.min} or more")
console.write(f"{statement.min} or more")
else:
ostream.write(f"between {statement.min} and {statement.max}")
console.write(f"between {statement.min} and {statement.max}")
if statement.description:
ostream.write(f" = {statement.description}")
render_locations(ostream, layout, match.locations, indent)
ostream.writeln("")
console.write(f" = {statement.description}")
render_locations(console, layout, match.locations, indent)
console.writeln()
else:
raise RuntimeError("unexpected match statement type: " + str(statement))
@@ -162,9 +164,9 @@ def render_string_value(s: str) -> str:
def render_feature(
ostream, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, feature: frzf.Feature, indent: int
console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, feature: frzf.Feature, indent: int
):
ostream.write(" " * indent)
console.write(" " * indent)
key = feature.type
value: Optional[str]
@@ -205,14 +207,14 @@ def render_feature(
elif isinstance(feature, frzf.OperandOffsetFeature):
key = f"operand[{feature.index}].offset"
ostream.write(f"{key}: ")
console.write(f"{key}: ")
if value:
ostream.write(rutils.bold2(value))
console.write(rutils.bold2(value))
if feature.description:
ostream.write(capa.rules.DESCRIPTION_SEPARATOR)
ostream.write(feature.description)
console.write(capa.rules.DESCRIPTION_SEPARATOR)
console.write(feature.description)
if isinstance(feature, (frzf.OSFeature, frzf.ArchFeature, frzf.FormatFeature)):
# don't show the location of these global features
@@ -224,35 +226,32 @@ def render_feature(
elif isinstance(feature, (frzf.OSFeature, frzf.ArchFeature, frzf.FormatFeature)):
pass
else:
render_locations(ostream, layout, match.locations, indent)
ostream.write("\n")
render_locations(console, layout, match.locations, indent)
console.writeln()
else:
# like:
# regex: /blah/ = SOME_CONSTANT
# - "foo blah baz" @ 0x401000
# - "aaa blah bbb" @ 0x402000, 0x403400
ostream.write(key)
ostream.write(": ")
ostream.write(value)
ostream.write("\n")
console.writeln(f"{key}: {value}")
for capture, locations in sorted(match.captures.items()):
ostream.write(" " * (indent + 1))
ostream.write("- ")
ostream.write(rutils.bold2(render_string_value(capture)))
console.write(" " * (indent + 1))
console.write("- ")
console.write(rutils.bold2(render_string_value(capture)))
if isinstance(layout, rd.DynamicLayout) and rule.meta.scopes.dynamic == capa.rules.Scope.CALL:
# like above, don't re-render calls when in call scope.
pass
else:
render_locations(ostream, layout, locations, indent=indent)
ostream.write("\n")
render_locations(console, layout, locations, indent=indent)
console.writeln()
def render_node(ostream, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, node: rd.Node, indent: int):
def render_node(console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, node: rd.Node, indent: int):
if isinstance(node, rd.StatementNode):
render_statement(ostream, layout, match, node.statement, indent=indent)
render_statement(console, layout, match, node.statement, indent=indent)
elif isinstance(node, rd.FeatureNode):
render_feature(ostream, layout, rule, match, node.feature, indent=indent)
render_feature(console, layout, rule, match, node.feature, indent=indent)
else:
raise RuntimeError("unexpected node type: " + str(node))
@@ -265,7 +264,9 @@ MODE_SUCCESS = "success"
MODE_FAILURE = "failure"
def render_match(ostream, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, indent=0, mode=MODE_SUCCESS):
def render_match(
console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, indent=0, mode=MODE_SUCCESS
):
child_mode = mode
if mode == MODE_SUCCESS:
# display only nodes that evaluated successfully.
@@ -297,13 +298,13 @@ def render_match(ostream, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Mat
else:
raise RuntimeError("unexpected mode: " + mode)
render_node(ostream, layout, rule, match, match.node, indent=indent)
render_node(console, layout, rule, match, match.node, indent=indent)
for child in match.children:
render_match(ostream, layout, rule, child, indent=indent + 1, mode=child_mode)
render_match(console, layout, rule, child, indent=indent + 1, mode=child_mode)
def render_rules(ostream, doc: rd.ResultDocument):
def render_rules(console: Console, doc: rd.ResultDocument):
"""
like:
@@ -350,13 +351,13 @@ def render_rules(ostream, doc: rd.ResultDocument):
if count == 1:
if rule.meta.lib:
lib_info = " (library rule)"
capability = f"{rutils.bold(rule.meta.name)}{lib_info}"
capability = Text.assemble(rutils.bold(rule.meta.name), f"{lib_info}")
else:
if rule.meta.lib:
lib_info = ", only showing first match of library rule"
capability = f"{rutils.bold(rule.meta.name)} ({count} matches{lib_info})"
capability = Text.assemble(rutils.bold(rule.meta.name), f" ({count} matches{lib_info})")
ostream.writeln(capability)
console.writeln(capability)
had_match = True
rows = []
@@ -402,7 +403,14 @@ def render_rules(ostream, doc: rd.ResultDocument):
if rule.meta.description:
rows.append(("description", rule.meta.description))
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
grid = Table.grid(padding=(0, 2))
grid.add_column(style="dim")
grid.add_column()
for row in rows:
grid.add_row(*row)
console.writeln(grid)
if capa.rules.Scope.FILE in rule.meta.scopes:
matches = doc.rules[rule.meta.name].matches
@@ -413,61 +421,58 @@ def render_rules(ostream, doc: rd.ResultDocument):
# so, lets be explicit about our assumptions and raise an exception if they fail.
raise RuntimeError(f"unexpected file scope match count: {len(matches)}")
_, first_match = matches[0]
render_match(ostream, doc.meta.analysis.layout, rule, first_match, indent=0)
render_match(console, doc.meta.analysis.layout, rule, first_match, indent=0)
else:
for location, match in sorted(doc.rules[rule.meta.name].matches):
if doc.meta.flavor == rd.Flavor.STATIC:
assert rule.meta.scopes.static is not None
ostream.write(rule.meta.scopes.static.value)
ostream.write(" @ ")
ostream.write(capa.render.verbose.format_address(location))
console.write(rule.meta.scopes.static.value + " @ ")
console.write(capa.render.verbose.format_address(location))
if rule.meta.scopes.static == capa.rules.Scope.BASIC_BLOCK:
func = frz.Address.from_capa(functions_by_bb[location.to_capa()])
ostream.write(f" in function {capa.render.verbose.format_address(func)}")
console.write(f" in function {capa.render.verbose.format_address(func)}")
elif doc.meta.flavor == rd.Flavor.DYNAMIC:
assert rule.meta.scopes.dynamic is not None
assert isinstance(doc.meta.analysis.layout, rd.DynamicLayout)
ostream.write(rule.meta.scopes.dynamic.value)
ostream.write(" @ ")
console.write(rule.meta.scopes.dynamic.value + " @ ")
if rule.meta.scopes.dynamic == capa.rules.Scope.PROCESS:
ostream.write(v.render_process(doc.meta.analysis.layout, location))
console.write(v.render_process(doc.meta.analysis.layout, location))
elif rule.meta.scopes.dynamic == capa.rules.Scope.THREAD:
ostream.write(v.render_thread(doc.meta.analysis.layout, location))
console.write(v.render_thread(doc.meta.analysis.layout, location))
elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL:
ostream.write(hanging_indent(v.render_call(doc.meta.analysis.layout, location), indent=1))
console.write(hanging_indent(v.render_call(doc.meta.analysis.layout, location), indent=1))
else:
capa.helpers.assert_never(rule.meta.scopes.dynamic)
else:
capa.helpers.assert_never(doc.meta.flavor)
ostream.write("\n")
render_match(ostream, doc.meta.analysis.layout, rule, match, indent=1)
console.writeln()
render_match(console, doc.meta.analysis.layout, rule, match, indent=1)
if rule.meta.lib:
# only show first match
break
ostream.write("\n")
console.writeln()
if not had_match:
ostream.writeln(rutils.bold("no capabilities found"))
console.writeln(rutils.bold("no capabilities found"))
def render_vverbose(doc: rd.ResultDocument):
ostream = rutils.StringIO()
console = Console(highlight=False)
capa.render.verbose.render_meta(ostream, doc)
ostream.write("\n")
with console.capture() as capture:
capa.render.verbose.render_meta(console, doc)
console.writeln()
render_rules(console, doc)
console.writeln()
render_rules(ostream, doc)
ostream.write("\n")
return ostream.getvalue()
return capture.get()
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:

View File

@@ -5,7 +5,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
__version__ = "7.2.0"
__version__ = "7.4.0"
def get_major_version():

View File

@@ -26,7 +26,9 @@
### Bug Fixes
### capa explorer IDA Pro plugin
### capa Explorer Web
### capa Explorer IDA Pro plugin
### Development
@@ -42,5 +44,6 @@
- [ ] [publish to PyPI](https://pypi.org/project/flare-capa)
- [ ] [create tag in capa rules](https://github.com/mandiant/capa-rules/tags)
- [ ] [create release in capa rules](https://github.com/mandiant/capa-rules/releases)
- [ ] Update [homepage](https://github.com/mandiant/capa/blob/master/web/public/index.html)
- [ ] [Spread the word](https://twitter.com)
- [ ] Update internal service

View File

@@ -20,7 +20,7 @@ authors = [
description = "The FLARE team's open-source tool to identify capabilities in executable files."
readme = {file = "README.md", content-type = "text/markdown"}
license = {file = "LICENSE.txt"}
requires-python = ">=3.8"
requires-python = ">=3.8.1"
keywords = ["malware analysis", "reverse engineering", "capability detection", "software behaviors", "capa", "FLARE"]
classifiers = [
"Development Status :: 5 - Production/Stable",
@@ -65,12 +65,8 @@ dependencies = [
# or minor otherwise).
# As specific constraints are identified, please provide
# comments and context.
"tqdm>=4",
"pyyaml>=6",
"tabulate>=0.9",
"colorama>=0.4",
"termcolor>=2",
"wcwidth>=0.2",
"ida-settings>=2",
"ruamel.yaml>=0.18",
"pefile>=2023.2.7",
@@ -146,11 +142,9 @@ dev = [
"types-backports==0.1.3",
"types-colorama==0.4.15.11",
"types-PyYAML==6.0.8",
"types-tabulate==0.9.0.20240106",
"types-termcolor==1.1.4",
"types-psutil==6.0.0.20240901",
"types_requests==2.32.0.20240712",
"types-protobuf==5.27.0.20240907",
"types-protobuf==5.28.0.20240924",
"deptry==0.20.0"
]
build = [
@@ -159,7 +153,7 @@ build = [
# These dependencies are not used in production environments
# and should not conflict with other libraries/tooling.
"pyinstaller==6.10.0",
"setuptools==70.0.0",
"setuptools==75.1.0",
"build==1.2.2"
]
scripts = [
@@ -183,7 +177,9 @@ known_first_party = [
"binaryninja",
"flirt",
"ghidra",
"ida",
"ida_ida",
"ida_auto",
"ida_bytes",
"ida_entry",
"ida_funcs",
@@ -234,10 +230,7 @@ DEP002 = [
"types-protobuf",
"types-psutil",
"types-PyYAML",
"types-tabulate",
"types-termcolor",
"types_requests",
"wcwidth"
]
# dependencies imported but missing from definitions

View File

@@ -20,29 +20,28 @@ markdown-it-py==3.0.0
mdurl==0.1.2
msgpack==1.0.8
networkx==3.1
pefile==2023.2.7
pefile==2024.8.26
pip==24.2
protobuf==5.27.3
protobuf==5.28.2
pyasn1==0.5.1
pyasn1-modules==0.3.0
pycparser==2.22
pydantic==2.9.1
pydantic-core==2.23.3
pydantic==2.9.2
# pydantic pins pydantic-core,
# but dependabot updates these separately (which is broken) and is annoying,
# so we rely on pydantic to pull in the right version of pydantic-core.
# pydantic-core==2.23.4
xmltodict==0.13.0
pyelftools==0.31
pygments==2.18.0
python-flirt==0.8.10
pyyaml==6.0.2
rich==13.8.0
rich==13.9.2
ruamel-yaml==0.18.6
ruamel-yaml-clib==0.2.8
setuptools==70.0.0
setuptools==75.1.0
six==1.16.0
sortedcontainers==2.4.0
tabulate==0.9.0
termcolor==2.4.0
tqdm==4.66.5
viv-utils==0.7.11
vivisect==1.2.1
wcwidth==0.2.13
msgspec==0.18.6

2
rules

Submodule rules updated: dec3ded6f6...64b174e502

316
scripts/compare-backends.py Normal file
View File

@@ -0,0 +1,316 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import json
import time
import logging
import argparse
import contextlib
import statistics
import subprocess
import multiprocessing
from typing import Set, Dict, List, Optional
from pathlib import Path
from collections import Counter
from dataclasses import dataclass
from multiprocessing import Pool
import rich
import rich.box
import rich.table
import capa.main
logger = logging.getLogger("capa.compare-backends")
BACKENDS = ("vivisect", "ida", "binja")
@dataclass
class CapaInvocation:
path: Path
backend: str
duration: float
returncode: int
stdout: Optional[str]
stderr: Optional[str]
err: Optional[str]
def invoke_capa(file: Path, backend: str) -> CapaInvocation:
stdout = None
stderr = None
err = None
returncode: int
try:
logger.debug("run capa: %s: %s", backend, file.name)
t1 = time.time()
child = subprocess.run(
["python", "-m", "capa.main", "--json", "--backend=" + backend, str(file)],
capture_output=True,
check=True,
text=True,
encoding="utf-8",
)
returncode = child.returncode
stdout = child.stdout
stderr = child.stderr
except subprocess.CalledProcessError as e:
returncode = e.returncode
stdout = e.stdout
stderr = e.stderr
logger.debug("%s:%s: error", backend, file.name)
err = str(e)
else:
pass
finally:
t2 = time.time()
return CapaInvocation(
path=file,
backend=backend,
duration=t2 - t1,
returncode=returncode,
stdout=stdout,
stderr=stderr,
err=err,
)
def wrapper_invoke_capa(args):
file, backend = args
return invoke_capa(file, backend)
def collect(args):
results_path = args.results_path
if not results_path.is_file():
default_doc = {backend: {} for backend in BACKENDS} # type: ignore
results_path.write_text(json.dumps(default_doc), encoding="utf-8")
testfiles = Path(__file__).parent.parent / "tests" / "data"
for file in sorted(p for p in testfiles.glob("*")):
# remove leftover analysis files
# because IDA doesn't cleanup after itself, currently.
if file.suffix in (".til", ".id0", ".id1", ".id2", ".nam", ".viv"):
logger.debug("removing: %s", file)
with contextlib.suppress(IOError):
file.unlink()
doc = json.loads(results_path.read_text(encoding="utf-8"))
plan = []
for file in sorted(p for p in testfiles.glob("*")):
if not file.is_file():
continue
if file.is_dir():
continue
if file.name.startswith("."):
continue
if file.suffix not in (".exe_", ".dll_", ".elf_", ""):
continue
logger.debug("%s", file.name)
key = str(file)
for backend in BACKENDS:
if (backend, file.name) in {
("binja", "0953cc3b77ed2974b09e3a00708f88de931d681e2d0cb64afbaf714610beabe6.exe_")
}:
# this file takes 38GB+ and 20hrs+
# https://github.com/Vector35/binaryninja-api/issues/5951
continue
if key in doc[backend]:
if not args.retry_failures:
continue
if not doc[backend][key]["err"]:
# didn't previously fail, don't repeat work
continue
else:
# want to retry this previous failure
pass
plan.append((file, backend))
pool_size = multiprocessing.cpu_count() // 2
logger.info("work pool size: %d", pool_size)
with Pool(processes=pool_size) as pool:
for i, result in enumerate(pool.imap_unordered(wrapper_invoke_capa, plan)):
doc[result.backend][str(result.path)] = {
"path": str(result.path),
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"err": result.err,
"duration": result.duration,
}
if i % 8 == 0:
logger.info("syncing output database")
results_path.write_text(json.dumps(doc))
logger.info(
"%.1f\t%s %s %s",
result.duration,
"(err)" if result.err else " ",
result.backend.ljust(8),
result.path.name,
)
results_path.write_text(json.dumps(doc))
return
def report(args):
doc = json.loads(args.results_path.read_text(encoding="utf-8"))
samples = set()
for backend in BACKENDS:
samples.update(doc[backend].keys())
failures_by_backend: Dict[str, Set[str]] = {backend: set() for backend in BACKENDS}
durations_by_backend: Dict[str, List[float]] = {backend: [] for backend in BACKENDS}
console = rich.get_console()
for key in sorted(samples):
sample = Path(key).name
console.print(sample, style="bold")
seen_rules: Counter[str] = Counter()
rules_by_backend: Dict[str, Set[str]] = {backend: set() for backend in BACKENDS}
for backend in BACKENDS:
if key not in doc[backend]:
continue
entry = doc[backend][key]
duration = entry["duration"]
if not entry["err"]:
matches = json.loads(entry["stdout"])["rules"].keys()
seen_rules.update(matches)
rules_by_backend[backend].update(matches)
durations_by_backend[backend].append(duration)
console.print(f" {backend: >8}: {duration: >6.1f}s {len(matches): >3d} matches")
else:
failures_by_backend[backend].add(sample)
console.print(f" {backend: >8}: {duration: >6.1f}s (error)")
if not seen_rules:
console.print()
continue
t = rich.table.Table(box=rich.box.SIMPLE, header_style="default")
t.add_column("viv")
t.add_column("ida")
t.add_column("bn")
t.add_column("rule")
for rule, _ in seen_rules.most_common():
t.add_row(
"x" if rule in rules_by_backend["vivisect"] else " ",
"x" if rule in rules_by_backend["ida"] else " ",
"x" if rule in rules_by_backend["binja"] else " ",
rule,
)
console.print(t)
for backend in BACKENDS:
console.print(f"failures for {backend}:", style="bold")
for failure in sorted(failures_by_backend[backend]):
console.print(f" - {failure}")
if not failures_by_backend[backend]:
console.print(" (none)", style="green")
console.print()
console.print("durations:", style="bold")
console.print(" (10-quantiles, in seconds)", style="grey37")
for backend in BACKENDS:
q = statistics.quantiles(durations_by_backend[backend], n=10)
console.print(f" {backend: <8}: ", end="")
for i in range(9):
if i in (4, 8):
style = "bold"
else:
style = "default"
console.print(f"{q[i]: >6.1f}", style=style, end=" ")
console.print()
console.print(" ^-- 10% of samples took less than this ^", style="grey37")
console.print(" 10% of samples took more than this -----------------+", style="grey37")
console.print()
for backend in BACKENDS:
total = sum(durations_by_backend[backend])
successes = len(durations_by_backend[backend])
avg = statistics.mean(durations_by_backend[backend])
console.print(
f" {backend: <8}: {total: >7.0f} seconds across {successes: >4d} successful runs, {avg: >4.1f} average"
)
console.print()
console.print("slowest samples:", style="bold")
for backend in BACKENDS:
console.print(backend)
for duration, path in sorted(
((d["duration"], Path(d["path"]).name) for d in doc[backend].values()), reverse=True
)[:5]:
console.print(f" - {duration: >6.1f} {path}")
return
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
default_samples_path = Path(__file__).resolve().parent.parent / "tests" / "data"
parser = argparse.ArgumentParser(description="Compare analysis backends.")
capa.main.install_common_args(
parser,
wanted=set(),
)
subparsers = parser.add_subparsers()
collect_parser = subparsers.add_parser("collect")
collect_parser.add_argument("results_path", type=Path, help="Path to output JSON file")
collect_parser.add_argument("--samples", type=Path, default=default_samples_path, help="Path to samples")
collect_parser.add_argument("--retry-failures", action="store_true", help="Retry previous failures")
collect_parser.set_defaults(func=collect)
report_parser = subparsers.add_parser("report")
report_parser.add_argument("results_path", type=Path, help="Path to JSON file")
report_parser.set_defaults(func=report)
args = parser.parse_args(args=argv)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
args.func(args)
if __name__ == "__main__":
sys.exit(main())

106
scripts/detect-backends.py Normal file
View File

@@ -0,0 +1,106 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import logging
import argparse
import importlib.util
import rich
import rich.table
import capa.main
from capa.features.extractors.ida.idalib import find_idalib, load_idalib, is_idalib_installed
from capa.features.extractors.binja.find_binja_api import find_binaryninja, load_binaryninja, is_binaryninja_installed
logger = logging.getLogger(__name__)
def is_vivisect_installed() -> bool:
try:
return importlib.util.find_spec("vivisect") is not None
except ModuleNotFoundError:
return False
def load_vivisect() -> bool:
try:
import vivisect # noqa: F401 unused import
return True
except ImportError:
return False
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Detect analysis backends.")
capa.main.install_common_args(parser, wanted=set())
args = parser.parse_args(args=argv)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
if args.debug:
logging.getLogger("capa").setLevel(logging.DEBUG)
logging.getLogger("viv_utils").setLevel(logging.DEBUG)
else:
logging.getLogger("capa").setLevel(logging.ERROR)
logging.getLogger("viv_utils").setLevel(logging.ERROR)
table = rich.table.Table()
table.add_column("backend")
table.add_column("already installed?")
table.add_column("found?")
table.add_column("loads?")
if True:
row = ["vivisect"]
if is_vivisect_installed():
row.append("True")
row.append("-")
else:
row.append("False")
row.append("False")
row.append(str(load_vivisect()))
table.add_row(*row)
if True:
row = ["Binary Ninja"]
if is_binaryninja_installed():
row.append("True")
row.append("-")
else:
row.append("False")
row.append(str(find_binaryninja() is not None))
row.append(str(load_binaryninja()))
table.add_row(*row)
if True:
row = ["IDA idalib"]
if is_idalib_installed():
row.append("True")
row.append("-")
else:
row.append("False")
row.append(str(find_idalib() is not None))
row.append(str(load_idalib()))
table.add_row(*row)
rich.print(table)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -31,11 +31,9 @@ from typing import Set, Dict, List
from pathlib import Path
from dataclasses import field, dataclass
import tqdm
import pydantic
import termcolor
import ruamel.yaml
import tqdm.contrib.logging
from rich import print
import capa.main
import capa.rules
@@ -51,18 +49,6 @@ from capa.render.result_document import RuleMetadata
logger = logging.getLogger("lint")
def red(s):
return termcolor.colored(s, "red")
def orange(s):
return termcolor.colored(s, "yellow")
def green(s):
return termcolor.colored(s, "green")
@dataclass
class Context:
"""
@@ -80,8 +66,8 @@ class Context:
class Lint:
WARN = orange("WARN")
FAIL = red("FAIL")
WARN = "[yellow]WARN[/yellow]"
FAIL = "[red]FAIL[/red]"
name = "lint"
level = FAIL
@@ -896,7 +882,7 @@ def lint_rule(ctx: Context, rule: Rule):
if (not lints_failed) and (not lints_warned) and has_examples:
print("")
print(f'{" (nursery) " if is_nursery_rule(rule) else ""} {rule.name}')
print(f" {Lint.WARN}: {green('no lint failures')}: Graduate the rule")
print(f" {Lint.WARN}: '[green]no lint failures[/green]': Graduate the rule")
print("")
else:
lints_failed = len(tuple(filter(lambda v: v.level == Lint.FAIL, violations)))
@@ -921,12 +907,15 @@ def lint(ctx: Context):
ret = {}
source_rules = [rule for rule in ctx.rules.rules.values() if not rule.is_subscope_rule()]
with tqdm.contrib.logging.tqdm_logging_redirect(source_rules, unit="rule", leave=False) as pbar:
with capa.helpers.redirecting_print_to_tqdm(False):
for rule in pbar:
name = rule.name
pbar.set_description(width(f"linting rule: {name}", 48))
ret[name] = lint_rule(ctx, rule)
n_rules: int = len(source_rules)
with capa.helpers.CapaProgressBar(transient=True, console=capa.helpers.log_console) as pbar:
task = pbar.add_task(description="linting", total=n_rules, unit="rule")
for rule in source_rules:
name = rule.name
pbar.update(task, description=width(f"linting rule: {name}", 48))
ret[name] = lint_rule(ctx, rule)
pbar.advance(task)
return ret
@@ -1020,18 +1009,18 @@ def main(argv=None):
logger.debug("lints ran for ~ %02d:%02dm", min, sec)
if warned_rules:
print(orange("rules with WARN:"))
print("[yellow]rules with WARN:[/yellow]")
for warned_rule in sorted(warned_rules):
print(" - " + warned_rule)
print()
if failed_rules:
print(red("rules with FAIL:"))
print("[red]rules with FAIL:[/red]")
for failed_rule in sorted(failed_rules):
print(" - " + failed_rule)
return 1
else:
logger.info(green("no lints failed, nice!"))
logger.info("[green]no lints failed, nice![/green]")
return 0

291
scripts/parse-vmray-flog.py Normal file
View File

@@ -0,0 +1,291 @@
import sys
import logging
from typing import Any, Literal, Optional
from pathlib import Path
from pydantic import BeforeValidator
from typing_extensions import Annotated
from pydantic.dataclasses import dataclass
HexInt = Annotated[int, BeforeValidator(lambda v: int(v.strip('"'), 0x10))]
QuotedInt = Annotated[int, BeforeValidator(lambda v: int(v.strip('"')))]
QuotedString = Annotated[str, BeforeValidator(lambda v: v.strip('"'))]
logger = logging.getLogger("vmray.flog")
@dataclass
class Region:
id: QuotedInt
start_va: HexInt
end_va: HexInt
monitored: bool
entry_point: HexInt
region_type: Literal["private"] | Literal["mapped_file"] | Literal["pagefile_backed"]
name: QuotedString
filename: str
@dataclass
class Event:
timestamp: tuple[int, int]
api: str
args: str
rv: Optional[int]
@dataclass
class Thread:
id: QuotedInt
os_tid: HexInt
events: list[Event]
@dataclass
class Process:
id: QuotedInt
image_name: QuotedString
filename: QuotedString
page_root: HexInt
os_pid: HexInt
os_integrity_level: HexInt
os_privileges: HexInt
monitor_reason: Literal['"analysis_target"'] | Literal['"rpc_server"']
parent_id: HexInt
os_parent_pid: HexInt
cmd_line: str # TODO: json decode str
cur_dir: str # TODO: json decode str
os_username: str # TODO: json decode str
bitness: QuotedInt # TODO: enum 32 or 64
os_groups: str # TODO: list of str
regions: list[Region]
threads: list[Thread]
@dataclass
class Flog:
processes: list[Process]
processes_by_id: dict[int, Process]
regions_by_id: dict[int, Region]
threads_by_id: dict[int, Thread]
def parse_properties(txt: str) -> dict[str, Any]:
properties = {}
for line in txt.partition("\n\n")[0].splitlines():
key, _, value = line.lstrip().partition(" = ")
properties[key] = value
return properties
def parse_region(txt: str) -> Region:
# like:
#
# Region:
# id = 125
# start_va = 0x10000
# end_va = 0x2ffff
# monitored = 1
# entry_point = 0x0
# region_type = private
# name = "private_0x0000000000010000"
# filename = ""
region_kwargs = parse_properties(txt)
return Region(**region_kwargs)
def parse_event(line: str) -> Event:
# like:
#
# [0066.433] CoInitializeEx (pvReserved=0x0, dwCoInit=0x2) returned 0x0
# [0071.184] RegisterClipboardFormatW (lpszFormat="WM_GETCONTROLTYPE") returned 0xc1dc
# [0072.750] GetCurrentProcess () returned 0xffffffffffffffff
numbers, _, rest = line.lstrip()[1:].partition("] ")
major, _, minor = numbers.partition(".")
majori = int(major.lstrip("0") or "0")
minori = int(minor.lstrip("0") or "0")
timestamp = (majori, minori)
api, _, rest = rest.partition(" (")
args, _, rest = rest.rpartition(")")
if " returned " in rest:
_, _, rvs = rest.partition(" returned ")
rv = int(rvs, 0x10)
else:
rv = None
return Event(
timestamp=timestamp,
api=api,
args=args,
rv=rv,
)
def parse_thread(txt: str) -> Thread:
# like:
#
# Thread:
# id = 1
# os_tid = 0x117c
#
# [0066.433] CoInitializeEx (pvReserved=0x0, dwCoInit=0x2) returned 0x0
# [0071.184] RegisterClipboardFormatW (lpszFormat="WM_GETCONTROLTYPE") returned 0xc1dc
# [0072.750] GetCurrentProcess () returned 0xffffffffffffffff
thread_kwargs = parse_properties(txt)
events = []
for line in txt.splitlines():
if not line.startswith("\t["):
continue
events.append(parse_event(line))
return Thread(
events=events,
**thread_kwargs,
)
def parse_process(txt: str) -> Process:
# properties look like:
#
# id = "1"
# image_name = "svchost.exe"
# filename = "c:\\users\\rdhj0cnfevzx\\desktop\\svchost.exe"
# page_root = "0x751fc000"
# os_pid = "0x118c"
# os_integrity_level = "0x3000"
# os_privileges = "0x60800000"
# monitor_reason = "analysis_target"
# parent_id = "0"
# os_parent_pid = "0x7d8"
# cmd_line = "\"c:\\users\\rdhj0cnfevzx\\desktop\\svchost.exe\" "
# cur_dir = "c:\\users\\rdhj0cnfevzx\\desktop\\"
# os_username = "xc64zb\\rdhj0cnfevzx"
# bitness = "32"
# os_groups = "xc64zb\\domain users" [0x7], "everyone" [0x7], ...
process_kwargs = parse_properties(txt)
regions = []
for region in txt.split("\nRegion:\n")[1:]:
regions.append(parse_region(region))
threads = []
for thread in txt.split("\nThread:\n")[1:]:
threads.append(parse_thread(thread))
return Process(
regions=regions,
threads=threads,
**process_kwargs,
)
def parse_processes(txt: str) -> list[Process]:
processes = []
for process in txt.split("\nProcess:\n")[1:]:
processes.append(parse_process(process))
return processes
def parse_flog(txt: str) -> Flog:
# the header probably fits within this size
header_lines = txt[:512].splitlines()
# file may start with: | ef bb bf |
assert "# Flog Txt Version 1" in header_lines[0]
for line in header_lines[1:]:
line = line.strip()
if not line.startswith("#"):
break
# metadata lines, like:
#
# Flog Txt Version 1
# Analyzer Version: 2024.4.1
# Analyzer Build Date: Sep 2 2024 06:30:10
# Log Creation Date: 08.10.2024 18:12:03.945c
logger.debug("%s", line)
processes = parse_processes(txt)
processes_by_id = {process.id: process for process in processes}
regions_by_id = {region.id: region for process in processes for region in process.regions}
threads_by_id = {thread.id: thread for process in processes for thread in process.threads}
return Flog(
processes=processes,
processes_by_id=processes_by_id,
regions_by_id=regions_by_id,
threads_by_id=threads_by_id,
)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
flog_path = Path(sys.argv[1])
flog = parse_flog(flog_path.read_text(encoding="utf-8"))
for process in flog.processes:
print(f"{process.id=} {len(process.regions)=} {len(process.threads)=}")
for region in process.regions:
print(f" {region.id=} {region.name}")
for thread in process.threads:
print(f" {thread.id=} {len(thread.events)=}")
def test_event_timestamp():
event = parse_event(" [0072.750] GetCurrentProcess () returned 0xffffffffffffffff")
assert event.timestamp == (72, 750)
def test_event_api():
event = parse_event(" [0072.750] GetCurrentProcess () returned 0xffffffffffffffff")
assert event.api == "GetCurrentProcess"
def test_event_empty_args():
event = parse_event(" [0072.750] GetCurrentProcess () returned 0xffffffffffffffff")
assert len(event.args) == 0
# single arg
# [0074.875] GetSystemMetrics (nIndex=75) returned 1
# no return value
# [0083.567] CoTaskMemFree (pv=0x746aa0)
# two args
# [0085.491] GetWindowLongPtrW (hWnd=0x401f0, nIndex=-16) returned 0x6c10000
# in/out
# [0086.848] GetClientRect (in: hWnd=0x401f0, lpRect=0x14d0c0 | out: lpRect=0x14d0c0) returned 1
# string
# [0102.753] FindAtomW (lpString="GDI+Atom_4492_1") returned 0xc000
# int (hex)
# [0102.756] GdipDeleteFont (font=0x1c504e00) returned 0x0
# int (decimal)
# [0074.875] GetSystemMetrics (nIndex=75) returned 1
# int (negative)
# [0085.491] GetWindowLongPtrW (hWnd=0x401f0, nIndex=-16) returned 0x6c10000
# struct
# [0067.024] GetVersionExW (in: lpVersionInformation=0x14e3f0*(dwOSVersionInfoSize=0x114, dwMajorVersion=0x0, dwMinorVersion=0x0, dwBuildNumber=0x0, dwPlatformId=0x0, szCSDVersion="") | out: lpVersionInformation=0x14e3f0*(dwOSVersionInfoSize=0x114, dwMajorVersion=0x6, dwMinorVersion=0x2, dwBuildNumber=0x23f0, dwPlatformId=0x2, szCSDVersion="")) returned 1
# nested struct
# [0111.527] CoCreateGuid (in: pguid=0x14c910 | out: pguid=0x14c910*(Data1=0x63ac5b46, Data2=0xc417, Data3=0x49b0, Data4=([0]=0xac, [1]=0xbf, [2]=0xb8, [3]=0xf3, [4]=0x8b, [5]=0x1a, [6]=0x51, [7]=0x78))) returned 0x0
# bytes
# [0111.527] CoCreateGuid (in: pguid=0x14c910 | out: pguid=0x14c910*(Data1=0x63ac5b46, Data2=0xc417, Data3=0x49b0, Data4=([0]=0xac, [1]=0xbf, [2]=0xb8, [3]=0xf3, [4]=0x8b, [5]=0x1a, [6]=0x51, [7]=0x78))) returned 0x0

View File

@@ -42,9 +42,10 @@ import logging
import argparse
import subprocess
import tqdm
import humanize
import tabulate
from rich import box
from rich.table import Table
from rich.console import Console
import capa.main
import capa.perf
@@ -92,51 +93,61 @@ def main(argv=None):
except capa.main.ShouldExitError as e:
return e.status_code
with tqdm.tqdm(total=args.number * args.repeat, leave=False) as pbar:
with capa.helpers.CapaProgressBar(console=capa.helpers.log_console) as progress:
total_iterations = args.number * args.repeat
task = progress.add_task("profiling", total=total_iterations)
def do_iteration():
capa.perf.reset()
capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True)
pbar.update(1)
progress.advance(task)
samples = timeit.repeat(do_iteration, number=args.number, repeat=args.repeat)
logger.debug("perf: find capabilities: min: %0.2fs", (min(samples) / float(args.number)))
logger.debug("perf: find capabilities: avg: %0.2fs", (sum(samples) / float(args.repeat) / float(args.number)))
logger.debug(
"perf: find capabilities: avg: %0.2fs",
(sum(samples) / float(args.repeat) / float(args.number)),
)
logger.debug("perf: find capabilities: max: %0.2fs", (max(samples) / float(args.number)))
for counter, count in capa.perf.counters.most_common():
logger.debug("perf: counter: %s: %s", counter, count)
print(
tabulate.tabulate(
[(counter, humanize.intcomma(count)) for counter, count in capa.perf.counters.most_common()],
headers=["feature class", "evaluation count"],
tablefmt="github",
)
)
print()
console = Console()
print(
tabulate.tabulate(
[
(
args.label,
"{:,}".format(capa.perf.counters["evaluate.feature"]),
# python documentation indicates that min(samples) should be preferred,
# so lets put that first.
#
# https://docs.python.org/3/library/timeit.html#timeit.Timer.repeat
f"{(min(samples) / float(args.number)):.2f}s",
f"{(sum(samples) / float(args.repeat) / float(args.number)):.2f}s",
f"{(max(samples) / float(args.number)):.2f}s",
)
],
headers=["label", "count(evaluations)", "min(time)", "avg(time)", "max(time)"],
tablefmt="github",
)
table1 = Table(box=box.MARKDOWN)
table1.add_column("feature class")
table1.add_column("evaluation count")
for counter, count in capa.perf.counters.most_common():
table1.add_row(counter, humanize.intcomma(count))
console.print(table1)
console.print()
table2 = Table(box=box.MARKDOWN)
table2.add_column("label")
table2.add_column("count(evaluations)", style="magenta")
table2.add_column("min(time)", style="green")
table2.add_column("avg(time)", style="yellow")
table2.add_column("max(time)", style="red")
table2.add_row(
args.label,
# python documentation indicates that min(samples) should be preferred,
# so lets put that first.
#
# https://docs.python.org/3/library/timeit.html#timeit.Timer.repeat
"{:,}".format(capa.perf.counters["evaluate.feature"]),
f"{(min(samples) / float(args.number)):.2f}s",
f"{(sum(samples) / float(args.repeat) / float(args.number)):.2f}s",
f"{(max(samples) / float(args.number)):.2f}s",
)
console.print(table2)
return 0

View File

@@ -12,11 +12,12 @@ import sys
import typing
import logging
import argparse
from typing import Set, Tuple
from typing import Set, List, Tuple
from collections import Counter
import tabulate
from termcolor import colored
from rich import print
from rich.text import Text
from rich.table import Table
import capa.main
import capa.rules
@@ -77,23 +78,30 @@ def get_file_features(
return feature_map
def get_colored(s: str):
def get_colored(s: str) -> Text:
if "(" in s and ")" in s:
s_split = s.split("(", 1)
s_color = colored(s_split[1][:-1], "cyan")
return f"{s_split[0]}({s_color})"
return Text.assemble(s_split[0], "(", (s_split[1][:-1], "cyan"), ")")
else:
return colored(s, "cyan")
return Text(s, style="cyan")
def print_unused_features(feature_map: typing.Counter[Feature], rules_feature_set: Set[Feature]):
unused_features = []
unused_features: List[Tuple[str, Text]] = []
for feature, count in reversed(feature_map.most_common()):
if feature in rules_feature_set:
continue
unused_features.append((str(count), get_colored(str(feature))))
table = Table(title="Unused Features", box=None)
table.add_column("Count", style="dim")
table.add_column("Feature")
for count_str, feature_text in unused_features:
table.add_row(count_str, feature_text)
print("\n")
print(tabulate.tabulate(unused_features, headers=["Count", "Feature"], tablefmt="plain"))
print(table)
print("\n")

View File

@@ -431,6 +431,14 @@ def get_data_path_by_name(name) -> Path:
/ "vmray"
/ "93b2d1840566f45fab674ebc79a9d19c88993bcb645e0357f3cb584d16e7c795_min_archive.zip"
)
elif name.startswith("2f8a79-vmray"):
return (
CD
/ "data"
/ "dynamic"
/ "vmray"
/ "2f8a79b12a7a989ac7e5f6ec65050036588a92e65aeb6841e08dc228ff0e21b4_min_archive.zip"
)
elif name.startswith("ea2876"):
return CD / "data" / "ea2876e9175410b6f6719f80ee44b9553960758c7d0f7bed73c0fe9a78d8e669.dll_"
elif name.startswith("1038a2"):

View File

@@ -37,6 +37,8 @@ DYNAMIC_CAPE_FEATURE_PRESENCE_TESTS = sorted(
),
("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), False),
# thread/api calls
("0000a657", "process=(2900:2852),thread=2904", capa.features.insn.API("RegQueryValueExA"), True),
("0000a657", "process=(2900:2852),thread=2904", capa.features.insn.API("RegQueryValueEx"), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), False),
# thread/number call argument

View File

@@ -22,6 +22,8 @@ DYNAMIC_DRAKVUF_FEATURE_PRESENCE_TESTS = sorted(
("93b2d1-drakvuf", "process=(3564:4852),thread=6592", capa.features.insn.API("LdrLoadDll"), True),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592", capa.features.insn.API("DoesNotExist"), False),
# call/api
("93b2d1-drakvuf", "process=(3564:4852),thread=4716,call=17", capa.features.insn.API("CreateWindowExW"), True),
("93b2d1-drakvuf", "process=(3564:4852),thread=4716,call=17", capa.features.insn.API("CreateWindowEx"), True),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("LdrLoadDll"), True),
("93b2d1-drakvuf", "process=(3564:4852),thread=6592,call=1", capa.features.insn.API("DoesNotExist"), False),
# call/string argument

View File

@@ -10,7 +10,6 @@ import textwrap
from unittest.mock import Mock
import fixtures
import rich.console
import capa.rules
import capa.render.utils
@@ -24,6 +23,7 @@ import capa.features.basicblock
import capa.render.result_document
import capa.render.result_document as rd
import capa.features.freeze.features
from capa.render.utils import Console
def test_render_number():
@@ -154,7 +154,7 @@ def test_render_meta_maec():
# capture the output of render_maec
f = io.StringIO()
console = rich.console.Console(file=f)
console = Console(file=f)
capa.render.default.render_maec(mock_rd, console)
output = f.getvalue()
@@ -198,7 +198,7 @@ def test_render_meta_maec():
],
)
def test_render_vverbose_feature(feature, expected):
ostream = capa.render.utils.StringIO()
console = Console(highlight=False)
addr = capa.features.freeze.Address.from_capa(capa.features.address.AbsoluteVirtualAddress(0x401000))
feature = capa.features.freeze.features.feature_from_capa(feature)
@@ -240,6 +240,8 @@ def test_render_vverbose_feature(feature, expected):
matches=(),
)
capa.render.vverbose.render_feature(ostream, layout, rm, matches, feature, indent=0)
with console.capture() as capture:
capa.render.vverbose.render_feature(console, layout, rm, matches, feature, indent=0)
assert ostream.getvalue().strip() == expected
output = capture.get().strip()
assert output == expected

View File

@@ -19,22 +19,51 @@ DYNAMIC_VMRAY_FEATURE_PRESENCE_TESTS = sorted(
("93b2d1-vmray", "file", capa.features.common.String("\\Program Files\\WindowsApps\\does_not_exist"), False),
# file/imports
("93b2d1-vmray", "file", capa.features.file.Import("GetAddrInfoW"), True),
("93b2d1-vmray", "file", capa.features.file.Import("GetAddrInfo"), True),
# thread/api calls
("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("GetAddrInfoW"), True),
("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("DoesNotExist"), False),
("93b2d1-vmray", "process=(2176:0),thread=2180", capa.features.insn.API("LoadLibraryExA"), True),
("93b2d1-vmray", "process=(2176:0),thread=2180", capa.features.insn.API("LoadLibraryEx"), True),
("93b2d1-vmray", "process=(2176:0),thread=2420", capa.features.insn.API("GetAddrInfoW"), True),
("93b2d1-vmray", "process=(2176:0),thread=2420", capa.features.insn.API("GetAddrInfo"), True),
("93b2d1-vmray", "process=(2176:0),thread=2420", capa.features.insn.API("DoesNotExist"), False),
# call/api
("93b2d1-vmray", "process=(2176:0),thread=7,call=2361", capa.features.insn.API("GetAddrInfoW"), True),
("93b2d1-vmray", "process=(2176:0),thread=2420,call=2361", capa.features.insn.API("GetAddrInfoW"), True),
# call/string argument
(
"93b2d1-vmray",
"process=(2176:0),thread=7,call=10323",
"process=(2176:0),thread=2420,call=10323",
capa.features.common.String("raw.githubusercontent.com"),
True,
),
# backslashes in paths; see #2428
(
"93b2d1-vmray",
"process=(2176:0),thread=2180,call=267",
capa.features.common.String("C:\\Users\\WhuOXYsD\\Desktop\\filename.exe"),
True,
),
(
"93b2d1-vmray",
"process=(2176:0),thread=2180,call=267",
capa.features.common.String("C:\\\\Users\\\\WhuOXYsD\\\\Desktop\\\\filename.exe"),
False,
),
(
"93b2d1-vmray",
"process=(2176:0),thread=2204,call=2395",
capa.features.common.String("Software\\Microsoft\\Windows\\CurrentVersion\\Policies\\System"),
True,
),
(
"93b2d1-vmray",
"process=(2176:0),thread=2204,call=2395",
capa.features.common.String("Software\\\\Microsoft\\\\Windows\\\\CurrentVersion\\\\Policies\\\\System"),
False,
),
# call/number argument
# VirtualAlloc(4096, 4)
("93b2d1-vmray", "process=(2176:0),thread=7,call=2358", capa.features.insn.Number(4096), True),
("93b2d1-vmray", "process=(2176:0),thread=7,call=2358", capa.features.insn.Number(4), True),
("93b2d1-vmray", "process=(2176:0),thread=2420,call=2358", capa.features.insn.Number(4096), True),
("93b2d1-vmray", "process=(2176:0),thread=2420,call=2358", capa.features.insn.Number(4), True),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
@@ -46,24 +75,24 @@ DYNAMIC_VMRAY_FEATURE_COUNT_TESTS = sorted(
# file/imports
("93b2d1-vmray", "file", capa.features.file.Import("GetAddrInfoW"), 1),
# thread/api calls
("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("free"), 1),
("93b2d1-vmray", "process=(2176:0),thread=7", capa.features.insn.API("GetAddrInfoW"), 5),
("93b2d1-vmray", "process=(2176:0),thread=2420", capa.features.insn.API("free"), 1),
("93b2d1-vmray", "process=(2176:0),thread=2420", capa.features.insn.API("GetAddrInfoW"), 5),
# call/api
("93b2d1-vmray", "process=(2176:0),thread=7,call=2345", capa.features.insn.API("free"), 1),
("93b2d1-vmray", "process=(2176:0),thread=7,call=2345", capa.features.insn.API("GetAddrInfoW"), 0),
("93b2d1-vmray", "process=(2176:0),thread=7,call=2361", capa.features.insn.API("GetAddrInfoW"), 1),
("93b2d1-vmray", "process=(2176:0),thread=2420,call=2345", capa.features.insn.API("free"), 1),
("93b2d1-vmray", "process=(2176:0),thread=2420,call=2345", capa.features.insn.API("GetAddrInfoW"), 0),
("93b2d1-vmray", "process=(2176:0),thread=2420,call=2361", capa.features.insn.API("GetAddrInfoW"), 1),
# call/string argument
(
"93b2d1-vmray",
"process=(2176:0),thread=7,call=10323",
"process=(2176:0),thread=2420,call=10323",
capa.features.common.String("raw.githubusercontent.com"),
1,
),
("93b2d1-vmray", "process=(2176:0),thread=7,call=10323", capa.features.common.String("non_existant"), 0),
("93b2d1-vmray", "process=(2176:0),thread=2420,call=10323", capa.features.common.String("non_existant"), 0),
# call/number argument
("93b2d1-vmray", "process=(2176:0),thread=7,call=10315", capa.features.insn.Number(4096), 1),
("93b2d1-vmray", "process=(2176:0),thread=7,call=10315", capa.features.insn.Number(4), 1),
("93b2d1-vmray", "process=(2176:0),thread=7,call=10315", capa.features.insn.Number(404), 0),
("93b2d1-vmray", "process=(2176:0),thread=2420,call=10315", capa.features.insn.Number(4096), 1),
("93b2d1-vmray", "process=(2176:0),thread=2420,call=10315", capa.features.insn.Number(4), 1),
("93b2d1-vmray", "process=(2176:0),thread=2420,call=10315", capa.features.insn.Number(404), 0),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
@@ -87,3 +116,10 @@ def test_vmray_features(sample, scope, feature, expected):
)
def test_vmray_feature_counts(sample, scope, feature, expected):
fixtures.do_test_feature_count(fixtures.get_vmray_extractor, sample, scope, feature, expected)
def test_vmray_processes():
# see #2394
path = fixtures.get_data_path_by_name("2f8a79-vmray")
vmre = fixtures.get_vmray_extractor(path)
assert len(vmre.analysis.monitor_processes) == 9

File diff suppressed because it is too large Load Diff

View File

@@ -33,7 +33,7 @@
"eslint-plugin-vue": "^9.23.0",
"jsdom": "^24.1.0",
"prettier": "^3.2.5",
"vite": "^5.3.1",
"vite": "^5.4.6",
"vite-plugin-singlefile": "^2.0.2",
"vitest": "^1.6.0"
}

View File

@@ -1,6 +1,7 @@
<script setup>
import Menubar from "primevue/menubar";
import { RouterLink } from "vue-router";
import Button from "primevue/button";
const isBundle = import.meta.env.MODE === "bundle";
</script>
@@ -14,6 +15,9 @@ const isBundle = import.meta.env.MODE === "bundle";
</template>
<template #end>
<div class="flex align-items-center gap-3">
<a href="https://github.com/mandiant/capa/issues/new/choose" target="_blank" rel="noopener noreferrer">
<Button severity="contrast" size="small" outlined label="Provide feedback" />
</a>
<a
v-if="!isBundle"
v-ripple

View File

@@ -5,7 +5,7 @@
mode="basic"
name="model[]"
accept=".json,.gz"
:max-file-size="10000000"
:max-file-size="100000000"
:auto="true"
:custom-upload="true"
choose-label="Upload from local"

View File

@@ -18,12 +18,20 @@ const router = createRouter({
name: "analysis",
component: AnalysisView,
beforeEnter: (to, from, next) => {
if (rdocStore.data.value === null) {
// No rdoc loaded, redirect to home page
next({ name: "home" });
} else {
// rdoc is loaded, proceed to analysis page
// check if rdoc is loaded
if (rdocStore.data.value !== null) {
// rdocStore.data already contains the rdoc json - continue
next();
} else {
// rdoc is not loaded, check if the rdoc query param is set in the URL
const rdocUrl = to.query.rdoc;
if (rdocUrl) {
// query param is set - try to load the rdoc from the homepage
next({ name: "home", query: { rdoc: rdocUrl } });
} else {
// no query param is set - go back home
next({ name: "home" });
}
}
}
},

View File

@@ -492,6 +492,8 @@ function getFeatureName(feature) {
return `operand[${feature.index}].offset: 0x${feature.operand_offset.toString(16).toUpperCase()}`;
case "class":
return `${feature.class_}`;
case "import":
return `${feature.import_}`;
default:
return `${feature[feature.type]}`;
}

View File

@@ -88,7 +88,7 @@
box-shadow: 0 0.5rem 1rem rgba(0,0,0,0.05),inset 0 -1px 0 rgba(0,0,0,0.15);"
>
<a href="/" class="d-flex align-items-center mb-3 mb-md-0 me-md-auto">
<img src="./img/logo.png" height=48 />
<img src="./img/logo.png" alt="capa logo" height=48 />
</a>
<ul class="nav nav-pills">
@@ -118,7 +118,7 @@
references.
</p>
<div class="d-grid gap-2 d-md-flex justify-content-md-start mb-4 mb-lg-3">
<a href="#download" type="button" class="btn btn-primary bs-primary btn-lg px-4 me-md-2 fw-bold">Download</button>
<a href="#download" type="button" class="btn btn-primary bs-primary btn-lg px-4 me-md-2 fw-bold">Download</a>
<a href="./rules/" type="button" class="btn btn-outline-secondary btn-lg px-4">Browse Rules</a>
</div>
</div>
@@ -194,7 +194,7 @@
<div class="row flex-lg-row-reverse align-items-center g-5">
<h1>What's New</h1>
<h3 class="mt-3">Rule Updates</h3>
<h2 class="mt-3">Rule Updates</h2>
<ul class="mt-2 ps-5">
<!-- TODO(williballenthin): add date -->
@@ -213,10 +213,22 @@
</li>
</ul>
<h3 class="mt-3">Tool Updates</h3>
<h2 class="mt-3">Tool Updates</h2>
<h5 class="mt-2">v7.2.0</h5>
<!-- TODO(williballenthin): add date -->
<h3 class="mt-2">v7.3.0 (<em>2024-09-20</em>)</h3>
<div class="mt-0">
The <a href="https://github.com/mandiant/capa/releases/tag/v7.3.0">capa v7.3.0</a> release comes with the following three major enhancements:
<p><strong>1. Support for VMRay sandbox analysis archives</strong>: Unlock powerful malware analysis with capa&#39;s new <a href="https://www.vmray.com/">VMRay sandbox</a> integration!
Simply provide a VMRay analysis archive, and capa will automatically extract and match capabilities to streamline your workflow. This is the second support for the analysis of dynamic
analysis results after <a href="https://www.mandiant.com/resources/blog/dynamic-capa-executable-behavior-cape-sandbox">CAPE</a>.</p>
<p><strong>2. Support for BinExport files generated by Ghidra</strong>: <a href="https://github.com/google/binexport">BinExport</a> files store disassembled data into a Protocol Buffer format.
capa now supports the analysis of BinExport files generated by Ghidra. Using Ghidra and the BinExport file format users can now analyze ARM (AARCH64) ELF files targeting Android.</p>
<p><strong>3. Introducing the capa rules website</strong>: You can now browse capa&#39;s default rule set at <a href="https://mandiant.github.io/capa/rules">https://mandiant.github.io/capa/rules</a>.
In modern terminals the CLI capa tool hyperlinks to resources on the web, including entries on the capa rules website.
Furthermore, <a href="https://mandiant.github.io/capa">https://mandiant.github.io/capa</a> provides a landing page for the capa tool project.</p>
</div>
<h3 class="mt-2">v7.2.0 (<em>2024-08-20</em>)</h3>
<p class="mt-0">
<a href="https://github.com/mandiant/capa/releases/tag/v7.2.0">capa v7.2.0</a>
introduces a first version of capa Explorer Web: a web-based user interface to inspect capa results using your browser.
@@ -254,9 +266,9 @@
<div class="col">
<div class="row row-cols-1 row-cols-sm-2 g-4">
<div class="col d-flex flex-column gap-2">
<h4 class="fw-semibold mb-0 text-body-emphasis">
<h3 class="fw-semibold mb-0 text-body-emphasis">
IDA Pro
</h4>
</h3>
<p class="text-body-secondary">
<!-- TODO(williballenthin): add link to find out more -->
Use the capa Explorer IDA Plugin to guide your reverse engineering, zeroing in on the interesting functions by behavior.
@@ -264,9 +276,9 @@
</div>
<div class="col d-flex flex-column gap-2">
<h4 class="fw-semibold mb-0 text-body-emphasis">
<h3 class="fw-semibold mb-0 text-body-emphasis">
Ghidra
</h4>
</h3>
<p class="text-body-secondary">
<!-- TODO(williballenthin): add link to find out more -->
Invoke Ghidra in headless mode to collect features for capa, or use the capa Explorer Ghidra plugin to understand key functions.
@@ -274,9 +286,9 @@
</div>
<div class="col d-flex flex-column gap-2">
<h4 class="fw-semibold mb-0 text-body-emphasis">
<h3 class="fw-semibold mb-0 text-body-emphasis">
Binary Ninja
</h4>
</h3>
<p class="text-body-secondary">
<!-- TODO(williballenthin): add link to find out more -->
Use Binary Ninja as the disassembler backend, relying on its state-of-the-art code analysis to recover capabilities.
@@ -284,9 +296,9 @@
</div>
<div class="col d-flex flex-column gap-2">
<h4 class="fw-semibold mb-0 text-body-emphasis">
<h3 class="fw-semibold mb-0 text-body-emphasis">
CAPE
</h4>
</h3>
<p class="text-body-secondary">
<!-- TODO(williballenthin): add link to find out more -->
Analyze the API trace captured by CAPE as it detonates malware, summarizing the behaviors seen across thousands of function calls.
@@ -356,10 +368,10 @@
<div class="bg-dark text-secondary px-4 pt-5 text-center">
<div class="py-5">
<img src="./img/icon.png" />
<h3 class="display-5 fw-bold text-white">
<img src="./img/icon.png" alt="capa icon"/>
<h2 class="display-5 fw-bold text-white">
capa
</h3>
</h2>
<div class="col-lg-6 mx-auto">
<p class="fs-5 my-4">
@@ -379,7 +391,7 @@
</div>
</div>
</div>
</main>
</main>
<script>
window.addEventListener('DOMContentLoaded', (event) => {