mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
add flake8-bugbear linter
This commit is contained in:
@@ -92,7 +92,8 @@ repos:
|
||||
# F401: `foo` imported but unused (prefer ruff)
|
||||
# F811 Redefinition of unused `foo` (prefer ruff)
|
||||
# E501 line too long (prefer black)
|
||||
- "--extend-ignore=E203,F401,F811,E501"
|
||||
# B010 Do not call setattr with a constant attribute value
|
||||
- "--extend-ignore=E203,F401,F811,E501,B010"
|
||||
- "--extend-exclude"
|
||||
- "capa/render/proto/capa_pb2.py"
|
||||
- "capa/"
|
||||
|
||||
@@ -71,7 +71,7 @@ class Statement:
|
||||
yield child
|
||||
|
||||
if hasattr(self, "children"):
|
||||
for child in getattr(self, "children"):
|
||||
for child in self.children:
|
||||
assert isinstance(child, (Statement, Feature))
|
||||
yield child
|
||||
|
||||
@@ -83,7 +83,7 @@ class Statement:
|
||||
self.child = new
|
||||
|
||||
if hasattr(self, "children"):
|
||||
children = getattr(self, "children")
|
||||
children = self.children
|
||||
for i, child in enumerate(children):
|
||||
if child is existing:
|
||||
children[i] = new
|
||||
|
||||
@@ -100,7 +100,10 @@ class Result:
|
||||
return self.success
|
||||
|
||||
|
||||
class Feature(abc.ABC):
|
||||
class Feature(abc.ABC): # noqa: B024
|
||||
# this is an abstract class, since we don't want anyone to instantiate it directly,
|
||||
# but it doesn't have any abstract methods.
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
value: Union[str, int, float, bytes],
|
||||
|
||||
@@ -411,7 +411,7 @@ class ELF:
|
||||
# there should be vn_cnt of these.
|
||||
# each entry describes an ABI name required by the shared object.
|
||||
vna_offset = vn_offset + vn_aux
|
||||
for i in range(vn_cnt):
|
||||
for _ in range(vn_cnt):
|
||||
# ElfXX_Vernaux layout is the same on 32 and 64 bit
|
||||
_, _, _, vna_name, vna_next = struct.unpack_from(self.endian + "IHHII", shdr.buf, vna_offset)
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import functools
|
||||
from typing import Any, Dict, Tuple, Iterator, Optional
|
||||
|
||||
import idc
|
||||
@@ -80,9 +81,22 @@ def get_segment_buffer(seg: idaapi.segment_t) -> bytes:
|
||||
return buff if buff else b""
|
||||
|
||||
|
||||
def inspect_import(imports, library, ea, function, ordinal):
|
||||
if function and function.startswith("__imp_"):
|
||||
# handle mangled PE imports
|
||||
function = function[len("__imp_") :]
|
||||
|
||||
if function and "@@" in function:
|
||||
# handle mangled ELF imports, like "fopen@@glibc_2.2.5"
|
||||
function, _, _ = function.partition("@@")
|
||||
|
||||
imports[ea] = (library.lower(), function, ordinal)
|
||||
return True
|
||||
|
||||
|
||||
def get_file_imports() -> Dict[int, Tuple[str, str, int]]:
|
||||
"""get file imports"""
|
||||
imports = {}
|
||||
imports: Dict[int, Tuple[str, str, int]] = {}
|
||||
|
||||
for idx in range(idaapi.get_import_module_qty()):
|
||||
library = idaapi.get_import_module_name(idx)
|
||||
@@ -96,19 +110,8 @@ def get_file_imports() -> Dict[int, Tuple[str, str, int]]:
|
||||
if library == ".dynsym":
|
||||
library = ""
|
||||
|
||||
def inspect_import(ea, function, ordinal):
|
||||
if function and function.startswith("__imp_"):
|
||||
# handle mangled PE imports
|
||||
function = function[len("__imp_") :]
|
||||
|
||||
if function and "@@" in function:
|
||||
# handle mangled ELF imports, like "fopen@@glibc_2.2.5"
|
||||
function, _, _ = function.partition("@@")
|
||||
|
||||
imports[ea] = (library.lower(), function, ordinal)
|
||||
return True
|
||||
|
||||
idaapi.enum_import_names(idx, inspect_import)
|
||||
cb = functools.partial(inspect_import, imports, library)
|
||||
idaapi.enum_import_names(idx, cb)
|
||||
|
||||
return imports
|
||||
|
||||
|
||||
@@ -288,16 +288,16 @@ def extract_insn_bytes_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Itera
|
||||
else:
|
||||
continue
|
||||
|
||||
for v in derefs(f.vw, v):
|
||||
for vv in derefs(f.vw, v):
|
||||
try:
|
||||
buf = read_bytes(f.vw, v)
|
||||
buf = read_bytes(f.vw, vv)
|
||||
except envi.exc.SegmentationViolation:
|
||||
continue
|
||||
|
||||
if capa.features.extractors.helpers.all_zeros(buf):
|
||||
continue
|
||||
|
||||
if f.vw.isProbablyString(v) or f.vw.isProbablyUnicode(v):
|
||||
if f.vw.isProbablyString(vv) or f.vw.isProbablyUnicode(vv):
|
||||
# don't extract byte features for obvious strings
|
||||
continue
|
||||
|
||||
@@ -700,9 +700,9 @@ def extract_op_string_features(
|
||||
else:
|
||||
return
|
||||
|
||||
for v in derefs(f.vw, v):
|
||||
for vv in derefs(f.vw, v):
|
||||
try:
|
||||
s = read_string(f.vw, v).rstrip("\x00")
|
||||
s = read_string(f.vw, vv).rstrip("\x00")
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
|
||||
@@ -45,7 +45,9 @@ def is_runtime_ida():
|
||||
|
||||
|
||||
def assert_never(value) -> NoReturn:
|
||||
assert False, f"Unhandled value: {value} ({type(value).__name__})"
|
||||
# careful: python -O will remove this assertion.
|
||||
# but this is only used for type checking, so it's ok.
|
||||
assert False, f"Unhandled value: {value} ({type(value).__name__})" # noqa: B011
|
||||
|
||||
|
||||
def get_format_from_extension(sample: str) -> str:
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import re
|
||||
from typing import Dict, Optional
|
||||
from collections import Counter
|
||||
|
||||
import idc
|
||||
@@ -646,7 +647,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
|
||||
counted = list(zip(Counter(features).keys(), Counter(features).values()))
|
||||
|
||||
# single features
|
||||
for k, v in filter(lambda t: t[1] == 1, counted):
|
||||
for k, _ in filter(lambda t: t[1] == 1, counted):
|
||||
if isinstance(k, (capa.features.common.String,)):
|
||||
value = f'"{capa.features.common.escape_string(k.get_value_str())}"'
|
||||
else:
|
||||
@@ -682,10 +683,12 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
|
||||
|
||||
# we don't add a new node for description; either set description column of parent's last child
|
||||
# or the parent itself
|
||||
if parent.childCount():
|
||||
parent.child(parent.childCount() - 1).setText(1, feature.lstrip("description:").lstrip())
|
||||
else:
|
||||
parent.setText(1, feature.lstrip("description:").lstrip())
|
||||
if feature.startswith("description:"):
|
||||
description = feature[len("description:") :].lstrip()
|
||||
if parent.childCount():
|
||||
parent.child(parent.childCount() - 1).setText(1, description)
|
||||
else:
|
||||
parent.setText(1, description)
|
||||
return None
|
||||
elif feature.startswith("- description:"):
|
||||
if not parent:
|
||||
@@ -693,7 +696,8 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget):
|
||||
return None
|
||||
|
||||
# we don't add a new node for description; set the description column of the parent instead
|
||||
parent.setText(1, feature.lstrip("- description:").lstrip())
|
||||
description = feature[len("- description:") :].lstrip()
|
||||
parent.setText(1, description)
|
||||
return None
|
||||
|
||||
node = QtWidgets.QTreeWidgetItem(parent)
|
||||
@@ -1010,7 +1014,7 @@ class CapaExplorerRulegenFeatures(QtWidgets.QTreeWidget):
|
||||
|
||||
return o
|
||||
|
||||
def load_features(self, file_features, func_features={}):
|
||||
def load_features(self, file_features, func_features: Optional[Dict] = None):
|
||||
""" """
|
||||
self.parse_features_for_tree(self.new_parent_node(self, ("File Scope",)), file_features)
|
||||
if func_features:
|
||||
|
||||
@@ -440,7 +440,8 @@ def get_default_root() -> str:
|
||||
# pylance/mypy don't like `sys._MEIPASS` because this isn't standard.
|
||||
# its injected by pyinstaller.
|
||||
# so we'll fetch this attribute dynamically.
|
||||
return getattr(sys, "_MEIPASS")
|
||||
assert hasattr(sys, "_MEIPASS")
|
||||
return sys._MEIPASS
|
||||
else:
|
||||
return os.path.join(os.path.dirname(__file__), "..")
|
||||
|
||||
|
||||
@@ -64,7 +64,7 @@ def find_subrule_matches(doc: rd.ResultDocument):
|
||||
matches.add(match.node.feature.match)
|
||||
|
||||
for rule in rutils.capability_rules(doc):
|
||||
for address, match in rule.matches:
|
||||
for _, match in rule.matches:
|
||||
rec(match)
|
||||
|
||||
return matches
|
||||
|
||||
@@ -1247,7 +1247,7 @@ class RuleSet:
|
||||
# the set of subtypes of type A is unbounded,
|
||||
# because any user might come along and create a new subtype B,
|
||||
# so mypy can't reason about this set of types.
|
||||
assert False, f"Unhandled value: {node} ({type(node).__name__})"
|
||||
assert_never(node)
|
||||
else:
|
||||
# programming error
|
||||
assert_never(node)
|
||||
|
||||
@@ -169,15 +169,17 @@ def main(argv=None):
|
||||
return -1
|
||||
|
||||
samples = []
|
||||
for base, directories, files in os.walk(args.input):
|
||||
for base, _, files in os.walk(args.input):
|
||||
for file in files:
|
||||
samples.append(os.path.join(base, file))
|
||||
|
||||
def pmap(f, args, parallelism=multiprocessing.cpu_count()):
|
||||
cpu_count = multiprocessing.cpu_count()
|
||||
|
||||
def pmap(f, args, parallelism=cpu_count):
|
||||
"""apply the given function f to the given args using subprocesses"""
|
||||
return multiprocessing.Pool(parallelism).imap(f, args)
|
||||
|
||||
def tmap(f, args, parallelism=multiprocessing.cpu_count()):
|
||||
def tmap(f, args, parallelism=cpu_count):
|
||||
"""apply the given function f to the given args using threads"""
|
||||
return multiprocessing.pool.ThreadPool(parallelism).imap(f, args)
|
||||
|
||||
|
||||
@@ -888,7 +888,7 @@ def collect_samples(path) -> Dict[str, Path]:
|
||||
recurse through the given path, collecting all file paths, indexed by their content sha256, md5, and filename.
|
||||
"""
|
||||
samples = {}
|
||||
for root, dirs, files in os.walk(path):
|
||||
for root, _, files in os.walk(path):
|
||||
for name in files:
|
||||
if name.endswith(".viv"):
|
||||
continue
|
||||
|
||||
1
setup.py
1
setup.py
@@ -75,6 +75,7 @@ setuptools.setup(
|
||||
"pytest-instafail==0.5.0",
|
||||
"pytest-cov==4.1.0",
|
||||
"flake8==6.0.0",
|
||||
"flake8-bugbear==23.6.5",
|
||||
"ruff==0.0.275",
|
||||
"black==23.3.0",
|
||||
"isort==5.11.4",
|
||||
|
||||
@@ -20,6 +20,7 @@ import capa.features.address
|
||||
import capa.render.proto.capa_pb2 as capa_pb2
|
||||
import capa.render.result_document as rd
|
||||
import capa.features.freeze.features
|
||||
from capa.helpers import assert_never
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -306,7 +307,7 @@ def assert_statement(a: rd.StatementNode, b: capa_pb2.StatementNode):
|
||||
|
||||
else:
|
||||
# unhandled statement
|
||||
assert False
|
||||
assert_never(sa)
|
||||
|
||||
|
||||
def assert_round_trip(doc: rd.ResultDocument):
|
||||
|
||||
Reference in New Issue
Block a user