mirror of
https://github.com/mandiant/capa.git
synced 2025-12-12 15:49:46 -08:00
introduce flake8-logging-format linter
This commit is contained in:
@@ -93,7 +93,8 @@ repos:
|
||||
# F811 Redefinition of unused `foo` (prefer ruff)
|
||||
# E501 line too long (prefer black)
|
||||
# B010 Do not call setattr with a constant attribute value
|
||||
- "--extend-ignore=E203,F401,F811,E501,B010"
|
||||
# G200 Logging statement uses exception in arguments
|
||||
- "--extend-ignore=E203,F401,F811,E501,B010,G200"
|
||||
- "--extend-exclude"
|
||||
- "capa/render/proto/capa_pb2.py"
|
||||
- "capa/"
|
||||
|
||||
@@ -46,7 +46,8 @@ NETNODE_RULES_CACHE_ID = "rules-cache-id"
|
||||
|
||||
|
||||
def inform_user_ida_ui(message):
|
||||
idaapi.info(f"{message}. Please refer to IDA Output window for more information.")
|
||||
# this isn't a logger, this is IDA's logging facility
|
||||
idaapi.info(f"{message}. Please refer to IDA Output window for more information.") # noqa: G004
|
||||
|
||||
|
||||
def is_supported_ida_version():
|
||||
@@ -54,7 +55,7 @@ def is_supported_ida_version():
|
||||
if version < 7.4 or version >= 9:
|
||||
warning_msg = "This plugin does not support your IDA Pro version"
|
||||
logger.warning(warning_msg)
|
||||
logger.warning("Your IDA Pro version is: %s. Supported versions are: IDA >= 7.4 and IDA < 9.0." % version)
|
||||
logger.warning("Your IDA Pro version is: %s. Supported versions are: IDA >= 7.4 and IDA < 9.0.", version)
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -212,7 +213,7 @@ def idb_contains_cached_results() -> bool:
|
||||
n = netnode.Netnode(CAPA_NETNODE)
|
||||
return bool(n.get(NETNODE_RESULTS))
|
||||
except netnode.NetnodeCorruptError as e:
|
||||
logger.error("%s", e, exc_info=True)
|
||||
logger.exception(str(e))
|
||||
return False
|
||||
|
||||
|
||||
|
||||
@@ -600,7 +600,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
raise UserCancelledError()
|
||||
|
||||
if not os.path.exists(path):
|
||||
logger.error("rule path %s does not exist or cannot be accessed" % path)
|
||||
logger.error("rule path %s does not exist or cannot be accessed", path)
|
||||
return False
|
||||
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH] = path
|
||||
@@ -613,7 +613,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
return False
|
||||
except Exception as e:
|
||||
capa.ida.helpers.inform_user_ida_ui("Failed to load capa rules")
|
||||
logger.error("Failed to load capa rules (error: %s).", e, exc_info=True)
|
||||
logger.exception("Failed to load capa rules (error: %s).", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -714,7 +714,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
cached_results_time = self.resdoc_cache.meta.timestamp.strftime("%Y-%m-%d %H:%M:%S")
|
||||
new_view_status = f"capa rules: {view_status_rules}, cached results (created {cached_results_time})"
|
||||
except Exception as e:
|
||||
logger.error("Failed to load cached capa results (error: %s).", e, exc_info=True)
|
||||
logger.exception("Failed to load cached capa results (error: %s).", e)
|
||||
return False
|
||||
else:
|
||||
# load results from fresh anlaysis
|
||||
@@ -731,7 +731,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.feature_extractor = CapaExplorerFeatureExtractor()
|
||||
self.feature_extractor.indicator.progress.connect(slot_progress_feature_extraction)
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to initialize feature extractor (error: %s)", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -743,7 +743,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
try:
|
||||
self.process_total += len(tuple(self.feature_extractor.get_functions()))
|
||||
except Exception as e:
|
||||
logger.error("Failed to calculate analysis (error: %s).", e, exc_info=True)
|
||||
logger.exception("Failed to calculate analysis (error: %s).", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -779,7 +779,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error("Failed to extract capabilities from database (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to extract capabilities from database (error: %s)", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -812,7 +812,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False):
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
|
||||
except Exception as e:
|
||||
logger.error("Failed to check for file limitations (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to check for file limitations (error: %s)", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -826,7 +826,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
meta, ruleset, capabilities
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to collect results (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to collect results (error: %s)", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -842,7 +842,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
capa.ida.helpers.save_rules_cache_id(ruleset_id)
|
||||
logger.info("Saved cached results to database")
|
||||
except Exception as e:
|
||||
logger.error("Failed to save results to database (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to save results to database (error: %s)", e)
|
||||
return False
|
||||
user_settings = settings.user[CAPA_SETTINGS_RULE_PATH]
|
||||
count_source_rules = self.program_analysis_ruleset_cache.source_rule_count
|
||||
@@ -863,7 +863,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked())
|
||||
except Exception as e:
|
||||
logger.error("Failed to render results (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to render results (error: %s)", e)
|
||||
return False
|
||||
|
||||
self.set_view_status_label(new_view_status)
|
||||
@@ -915,7 +915,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
has_cache: bool = capa.ida.helpers.idb_contains_cached_results()
|
||||
except Exception as e:
|
||||
capa.ida.helpers.inform_user_ida_ui("Failed to check for cached results, reanalyzing program")
|
||||
logger.error("Failed to check for cached results (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to check for cached results (error: %s)", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -935,7 +935,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
] = capa.ida.helpers.load_and_verify_cached_results()
|
||||
except Exception as e:
|
||||
capa.ida.helpers.inform_user_ida_ui("Failed to verify cached results, reanalyzing program")
|
||||
logger.error("Failed to verify cached results (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to verify cached results (error: %s)", e)
|
||||
return False
|
||||
|
||||
if results is None:
|
||||
@@ -988,7 +988,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.rulegen_feature_extractor = CapaExplorerFeatureExtractor()
|
||||
self.rulegen_feature_cache = CapaRuleGenFeatureCache(self.rulegen_feature_extractor)
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to initialize feature extractor (error: %s)", e)
|
||||
return False
|
||||
else:
|
||||
logger.info("Reusing prior rulegen cache")
|
||||
@@ -1005,7 +1005,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
if f is not None:
|
||||
self.rulegen_current_function = self.rulegen_feature_extractor.get_function(f.start_ea)
|
||||
except Exception as e:
|
||||
logger.error("Failed to resolve function at address 0x%X (error: %s)", f.start_ea, e, exc_info=True)
|
||||
logger.exception("Failed to resolve function at address 0x%X (error: %s)", f.start_ea, e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -1031,7 +1031,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
for addr, _ in result:
|
||||
all_function_features[capa.features.common.MatchedRule(name)].add(addr)
|
||||
except Exception as e:
|
||||
logger.error("Failed to generate rule matches (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to generate rule matches (error: %s)", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -1052,7 +1052,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
for addr, _ in result:
|
||||
all_file_features[capa.features.common.MatchedRule(name)].add(addr)
|
||||
except Exception as e:
|
||||
logger.error("Failed to generate file rule matches (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to generate file rule matches (error: %s)", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
@@ -1075,7 +1075,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
f"capa rules: {settings.user[CAPA_SETTINGS_RULE_PATH]} ({settings.user[CAPA_SETTINGS_RULE_PATH]} rules)"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to render views (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to render views (error: %s)", e)
|
||||
return False
|
||||
|
||||
return True
|
||||
@@ -1160,7 +1160,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
assert self.rulegen_ruleset_cache is not None
|
||||
assert self.rulegen_feature_cache is not None
|
||||
except Exception as e:
|
||||
logger.error("Failed to access cache (error: %s)", e, exc_info=True)
|
||||
logger.exception("Failed to access cache (error: %s)", e)
|
||||
self.set_rulegen_status("Error: see console output for more details")
|
||||
return
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ private rule capa_pe_file : CAPA {
|
||||
|
||||
def check_feature(statement, rulename):
|
||||
if statement in unsupported:
|
||||
logger.info("unsupported: " + statement + " in rule: " + rulename)
|
||||
logger.info("unsupported: %s in rule: %s", statement, rulename)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
@@ -112,7 +112,7 @@ def convert_capa_number_to_yara_bytes(number):
|
||||
sys.exit()
|
||||
|
||||
number = re.sub(r"^0[xX]", "", number)
|
||||
logger.info("number ok: " + repr(number))
|
||||
logger.info("number ok: %r", number)
|
||||
|
||||
# include spaces every 2 hex
|
||||
bytesv = re.sub(r"(..)", r"\1 ", number)
|
||||
@@ -142,7 +142,7 @@ def convert_description(statement):
|
||||
desc = statement.description
|
||||
if desc:
|
||||
yara_desc = " // " + desc
|
||||
logger.info("using desc: " + repr(yara_desc))
|
||||
logger.info("using desc: %r", yara_desc)
|
||||
return yara_desc
|
||||
except Exception:
|
||||
# no description
|
||||
@@ -153,7 +153,7 @@ def convert_description(statement):
|
||||
|
||||
def convert_rule(rule, rulename, cround, depth):
|
||||
depth += 1
|
||||
logger.info("recursion depth: " + str(depth))
|
||||
logger.info("recursion depth: %d", depth)
|
||||
|
||||
global var_names
|
||||
|
||||
@@ -164,7 +164,7 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
return "BREAK", s_type
|
||||
elif s_type == "string":
|
||||
string = kid.value
|
||||
logger.info("doing string: " + repr(string))
|
||||
logger.info("doing string: %r", string)
|
||||
string = string.replace("\\", "\\\\")
|
||||
string = string.replace("\n", "\\n")
|
||||
string = string.replace("\t", "\\t")
|
||||
@@ -176,7 +176,7 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
|
||||
# https://github.com/mandiant/capa-rules/blob/master/doc/format.md#api
|
||||
api = kid.value
|
||||
logger.info("doing api: " + repr(api))
|
||||
logger.info("doing api: %r", api)
|
||||
|
||||
# e.g. kernel32.CreateNamedPipe => look for kernel32.dll and CreateNamedPipe
|
||||
# TODO: improve .NET API call handling
|
||||
@@ -210,14 +210,14 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
|
||||
elif s_type == "export":
|
||||
export = kid.value
|
||||
logger.info("doing export: " + repr(export))
|
||||
logger.info("doing export: %r", export)
|
||||
|
||||
yara_condition += '\tpe.exports("' + export + '") '
|
||||
|
||||
elif s_type == "section":
|
||||
# https://github.com/mandiant/capa-rules/blob/master/doc/format.md#section
|
||||
section = kid.value
|
||||
logger.info("doing section: " + repr(section))
|
||||
logger.info("doing section: %r", section)
|
||||
|
||||
# e.g. - section: .rsrc
|
||||
var_name_sec = var_names.pop(0)
|
||||
@@ -229,14 +229,14 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
elif s_type == "match":
|
||||
# https://github.com/mandiant/capa-rules/blob/master/doc/format.md#matching-prior-rule-matches-and-namespaces
|
||||
match = kid.value
|
||||
logger.info("doing match: " + repr(match))
|
||||
logger.info("doing match: %r", match)
|
||||
|
||||
# e.g. - match: create process
|
||||
# - match: host-interaction/file-system/write
|
||||
match_rule_name = convert_rule_name(match)
|
||||
|
||||
if match.startswith(rulename + "/"):
|
||||
logger.info("Depending on myself = basic block: " + match)
|
||||
logger.info("Depending on myself = basic block: %s", match)
|
||||
return "BREAK", "Depending on myself = basic block"
|
||||
|
||||
if match_rule_name in converted_rules:
|
||||
@@ -244,14 +244,14 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
else:
|
||||
# don't complain in the early rounds as there should be 3+ rounds (if all rules are converted)
|
||||
if cround > min_rounds - 2:
|
||||
logger.info("needed sub-rule not converted (yet, maybe in next round): " + repr(match))
|
||||
logger.info("needed sub-rule not converted (yet, maybe in next round): %r", match)
|
||||
return "BREAK", "needed sub-rule not converted"
|
||||
else:
|
||||
return "BREAK", "NOLOG"
|
||||
|
||||
elif s_type == "bytes":
|
||||
bytesv = kid.get_value_str()
|
||||
logger.info("doing bytes: " + repr(bytesv))
|
||||
logger.info("doing bytes: %r", bytesv)
|
||||
var_name = var_names.pop(0)
|
||||
|
||||
yara_strings += "\t$" + var_name + " = { " + bytesv + " }" + convert_description(kid) + "\n"
|
||||
@@ -259,19 +259,19 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
|
||||
elif s_type == "number":
|
||||
number = kid.get_value_str()
|
||||
logger.info("doing number: " + repr(number))
|
||||
logger.info("doing number: %r", number)
|
||||
|
||||
if len(number) < 10:
|
||||
logger.info("too short for byte search (until I figure out how to do it properly)" + repr(number))
|
||||
logger.info("too short for byte search (until I figure out how to do it properly): %r", number)
|
||||
return "BREAK", "Number too short"
|
||||
|
||||
# there's just one rule which contains 0xFFFFFFF but yara gives a warning if if used
|
||||
if number == "0xFFFFFFFF":
|
||||
return "BREAK", "slow byte pattern for YARA search"
|
||||
|
||||
logger.info("number ok: " + repr(number))
|
||||
logger.info("number ok: %r", number)
|
||||
number = convert_capa_number_to_yara_bytes(number)
|
||||
logger.info("number ok: " + repr(number))
|
||||
logger.info("number ok: %r", number)
|
||||
|
||||
var_name = "num_" + var_names.pop(0)
|
||||
yara_strings += "\t$" + var_name + " = { " + number + "}" + convert_description(kid) + "\n"
|
||||
@@ -279,7 +279,7 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
|
||||
elif s_type == "regex":
|
||||
regex = kid.get_value_str()
|
||||
logger.info("doing regex: " + repr(regex))
|
||||
logger.info("doing regex: %r", regex)
|
||||
|
||||
# change capas /xxx/i to yaras /xxx/ nocase, count will be used later to decide appending 'nocase'
|
||||
regex, count = re.subn(r"/i$", "/", regex)
|
||||
@@ -315,7 +315,7 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
elif s_type == "Not" or s_type == "And" or s_type == "Or":
|
||||
pass
|
||||
else:
|
||||
logger.info("something unhandled: " + repr(s_type))
|
||||
logger.info("something unhandled: %r", s_type)
|
||||
sys.exit()
|
||||
|
||||
return yara_strings, yara_condition
|
||||
@@ -329,7 +329,7 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
|
||||
statement = rule.name
|
||||
|
||||
logger.info("doing statement: " + statement)
|
||||
logger.info("doing statement: %s", statement)
|
||||
|
||||
if check_feature(statement, rulename):
|
||||
return "BREAK", statement, rule_comment, incomplete
|
||||
@@ -337,18 +337,18 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
if statement == "And" or statement == "Or":
|
||||
desc = convert_description(rule)
|
||||
if desc:
|
||||
logger.info("description of bool statement: " + repr(desc))
|
||||
logger.info("description of bool statement: %r", desc)
|
||||
yara_strings_list.append("\t" * depth + desc + "\n")
|
||||
elif statement == "Not":
|
||||
logger.info("one of those seldom nots: " + rule.name)
|
||||
logger.info("one of those seldom nots: %s", rule.name)
|
||||
|
||||
# check for nested statements
|
||||
try:
|
||||
kids = rule.children
|
||||
num_kids = len(kids)
|
||||
logger.info("kids: " + kids)
|
||||
logger.info("kids: %s", kids)
|
||||
except Exception:
|
||||
logger.info("no kids in rule: " + rule.name)
|
||||
logger.info("no kids in rule: %s", rule.name)
|
||||
|
||||
try:
|
||||
# maybe it's "Not" = only one child:
|
||||
@@ -361,26 +361,26 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
|
||||
# just a single statement without 'and' or 'or' before it in this rule
|
||||
if "kids" not in locals().keys():
|
||||
logger.info("no kids: " + rule.name)
|
||||
logger.info("no kids: %s", rule.name)
|
||||
|
||||
yara_strings_sub, yara_condition_sub = do_statement(statement, rule)
|
||||
|
||||
if yara_strings_sub == "BREAK":
|
||||
logger.info("Unknown feature at1: " + rule.name)
|
||||
logger.info("Unknown feature at1: %s", rule.name)
|
||||
return "BREAK", yara_condition_sub, rule_comment, incomplete
|
||||
yara_strings_list.append(yara_strings_sub)
|
||||
yara_condition_list.append(yara_condition_sub)
|
||||
|
||||
else:
|
||||
x = 0
|
||||
logger.info("doing kids: %r - len: %s", kids, num_kids)
|
||||
logger.info("doing kids: %r - len: %d", kids, num_kids)
|
||||
for kid in kids:
|
||||
s_type = kid.name
|
||||
logger.info("doing type: " + s_type + " kidnum: " + str(x))
|
||||
logger.info("doing type: %s kidnum: %d", s_type, x)
|
||||
|
||||
if s_type == "Some":
|
||||
cmin = kid.count
|
||||
logger.info("Some type with minimum: " + str(cmin))
|
||||
logger.info("Some type with minimum: %d", cmin)
|
||||
|
||||
if not cmin:
|
||||
logger.info("this is optional: which means, we can just ignore it")
|
||||
@@ -395,8 +395,8 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
return "BREAK", "Some aka x or more (TODO)", rule_comment, incomplete
|
||||
|
||||
if s_type == "And" or s_type == "Or" or s_type == "Not" and not kid.name == "Some":
|
||||
logger.info("doing bool with recursion: " + repr(kid))
|
||||
logger.info("kid coming: " + repr(kid.name))
|
||||
logger.info("doing bool with recursion: %r", kid)
|
||||
logger.info("kid coming: %r", kid.name)
|
||||
# logger.info("grandchildren: " + repr(kid.children))
|
||||
|
||||
#
|
||||
@@ -406,22 +406,24 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
kid, rulename, cround, depth
|
||||
)
|
||||
|
||||
logger.info("coming out of this recursion, depth: " + repr(depth) + " s_type: " + s_type)
|
||||
logger.info("coming out of this recursion, depth: %d s_type: %s", depth, s_type)
|
||||
|
||||
if yara_strings_sub == "BREAK":
|
||||
logger.info(
|
||||
"Unknown feature at2: " + rule.name + " - s_type: " + s_type + " - depth: " + str(depth)
|
||||
"Unknown feature at2: %s - s_type: %s - depth: %d",
|
||||
rule.name,
|
||||
s_type,
|
||||
depth,
|
||||
)
|
||||
|
||||
# luckily this is only a killer, if we're inside an 'And', inside 'Or' we're just missing some coverage
|
||||
# only accept incomplete rules in rounds > 3 because the reason might be a reference to another rule not converted yet because of missing dependencies
|
||||
logger.info("rule.name, depth, cround: " + rule.name + ", " + str(depth) + ", " + str(cround))
|
||||
logger.info("rule.name, depth, cround: %s, %d, %d", rule.name, depth, cround)
|
||||
if rule.name == "Or" and depth == 1 and cround > min_rounds - 1:
|
||||
logger.info(
|
||||
"Unknown feature, just ignore this branch and keep the rest bec we're in Or (1): "
|
||||
+ s_type
|
||||
+ " - depth: "
|
||||
+ str(depth)
|
||||
"Unknown feature, just ignore this branch and keep the rest bec we're in Or (1): %s - depth: %s",
|
||||
s_type,
|
||||
depth,
|
||||
)
|
||||
# remove last 'or'
|
||||
# yara_condition = re.sub(r'\sor $', ' ', yara_condition)
|
||||
@@ -442,14 +444,13 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
yara_strings_sub, yara_condition_sub = do_statement(s_type, kid)
|
||||
|
||||
if yara_strings_sub == "BREAK":
|
||||
logger.info("Unknown feature at3: " + rule.name)
|
||||
logger.info("rule.name, depth, cround: " + rule.name + ", " + str(depth) + ", " + str(cround))
|
||||
logger.info("Unknown feature at3: %s", rule.name)
|
||||
logger.info("rule.name, depth, cround: %s, %d, %d", rule.name, depth, cround)
|
||||
if rule.name == "Or" and depth == 1 and cround > min_rounds - 1:
|
||||
logger.info(
|
||||
"Unknown feature, just ignore this branch and keep the rest bec we're in Or (2): "
|
||||
+ s_type
|
||||
+ " - depth: "
|
||||
+ str(depth)
|
||||
"Unknown feature, just ignore this branch and keep the rest bec we're in Or (2): %s - depth: %d",
|
||||
s_type,
|
||||
depth,
|
||||
)
|
||||
|
||||
rule_comment += "This rule is incomplete because a branch inside an Or-statement had an unsupported feature and was skipped"
|
||||
@@ -487,7 +488,7 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
|
||||
elif statement == "Some":
|
||||
cmin = rule.count
|
||||
logger.info("Some type with minimum at2: " + str(cmin))
|
||||
logger.info("Some type with minimum at2: %d", cmin)
|
||||
|
||||
if not cmin:
|
||||
logger.info("this is optional: which means, we can just ignore it")
|
||||
@@ -500,7 +501,7 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
yara_condition = "not " + "".join(yara_condition_list) + " "
|
||||
else:
|
||||
if len(yara_condition_list) != 1:
|
||||
logger.info("something wrong around here" + repr(yara_condition_list) + " - " + statement)
|
||||
logger.info("something wrong around here %r - %s", yara_condition_list, statement)
|
||||
sys.exit()
|
||||
|
||||
# strings might be empty with only conditions
|
||||
@@ -509,8 +510,10 @@ def convert_rule(rule, rulename, cround, depth):
|
||||
|
||||
yara_condition = "\n\t" + yara_condition_list[0]
|
||||
|
||||
logger.info(f"# end of convert_rule() #strings: {len(yara_strings_list)} #conditions: {len(yara_condition_list)}")
|
||||
logger.info(f"strings: {yara_strings} conditions: {yara_condition}")
|
||||
logger.info(
|
||||
"# end of convert_rule() #strings: %d #conditions: %d", len(yara_strings_list), len(yara_condition_list)
|
||||
)
|
||||
logger.info("strings: %s conditions: %s", yara_strings, yara_condition)
|
||||
|
||||
return yara_strings, yara_condition, rule_comment, incomplete
|
||||
|
||||
@@ -522,7 +525,7 @@ def output_yar(yara):
|
||||
def output_unsupported_capa_rules(yaml, capa_rulename, url, reason):
|
||||
if reason != "NOLOG":
|
||||
if capa_rulename not in unsupported_capa_rules_list:
|
||||
logger.info("unsupported: " + capa_rulename + " - reason: " + reason + " - url: " + url)
|
||||
logger.info("unsupported: %s - reason: %s, - url: %s", capa_rulename, reason, url)
|
||||
|
||||
unsupported_capa_rules_list.append(capa_rulename)
|
||||
unsupported_capa_rules.write(yaml.encode("utf-8") + b"\n")
|
||||
@@ -546,32 +549,32 @@ def convert_rules(rules, namespaces, cround, make_priv):
|
||||
rule_name = convert_rule_name(rule.name)
|
||||
|
||||
if rule.is_subscope_rule():
|
||||
logger.info("skipping sub scope rule capa: " + rule.name)
|
||||
logger.info("skipping sub scope rule capa: %s", rule.name)
|
||||
continue
|
||||
|
||||
if rule_name in converted_rules:
|
||||
logger.info("skipping already converted rule capa: " + rule.name + " - yara rule: " + rule_name)
|
||||
logger.info("skipping already converted rule capa: %s - yara rule: %s", rule.name, rule_name)
|
||||
continue
|
||||
|
||||
logger.info("-------------------------- DOING RULE CAPA: " + rule.name + " - yara rule: " + rule_name)
|
||||
logger.info("-------------------------- DOING RULE CAPA: %s - yara rule: ", rule.name, rule_name)
|
||||
if "capa/path" in rule.meta:
|
||||
url = get_rule_url(rule.meta["capa/path"])
|
||||
else:
|
||||
url = "no url"
|
||||
|
||||
logger.info("URL: " + url)
|
||||
logger.info("statements: " + repr(rule.statement))
|
||||
logger.info("URL: %s", url)
|
||||
logger.info("statements: %r", rule.statement)
|
||||
|
||||
# don't really know what that passed empty string is good for :)
|
||||
dependencies = rule.get_dependencies(namespaces)
|
||||
|
||||
if len(dependencies):
|
||||
logger.info("Dependencies at4: " + rule.name + " - dep: " + str(dependencies))
|
||||
logger.info("Dependencies at4: %s - dep: %s", rule.name, dependencies)
|
||||
|
||||
for dep in dependencies:
|
||||
logger.info("Dependencies at44: " + dep)
|
||||
logger.info("Dependencies at44: %s", dep)
|
||||
if not dep.startswith(rule.name + "/"):
|
||||
logger.info("Depending on another rule: " + dep)
|
||||
logger.info("Depending on another rule: %s", dep)
|
||||
continue
|
||||
|
||||
yara_strings, yara_condition, rule_comment, incomplete = convert_rule(rule.statement, rule.name, cround, 0)
|
||||
@@ -580,7 +583,7 @@ def convert_rules(rules, namespaces, cround, make_priv):
|
||||
# only give up if in final extra round #9000
|
||||
if cround == 9000:
|
||||
output_unsupported_capa_rules(rule.to_yaml(), rule.name, url, yara_condition)
|
||||
logger.info("Unknown feature at5: " + rule.name)
|
||||
logger.info("Unknown feature at5: %s", rule.name)
|
||||
else:
|
||||
yara_meta = ""
|
||||
metas = rule.meta
|
||||
@@ -596,24 +599,24 @@ def convert_rules(rules, namespaces, cround, make_priv):
|
||||
if meta_name == "att&ck":
|
||||
meta_name = "attack"
|
||||
for attack in list(metas[meta]):
|
||||
logger.info("attack:" + attack)
|
||||
logger.info("attack: %s", attack)
|
||||
# cut out tag in square brackets, e.g. Defense Evasion::Obfuscated Files or Information [T1027] => T1027
|
||||
r = re.search(r"\[(T[^\]]*)", attack)
|
||||
if r:
|
||||
tag = r.group(1)
|
||||
logger.info("attack tag:" + tag)
|
||||
logger.info("attack tag: %s", tag)
|
||||
tag = re.sub(r"\W", "_", tag)
|
||||
rule_tags += tag + " "
|
||||
# also add a line "attack = ..." to yaras 'meta:' to keep the long description:
|
||||
yara_meta += '\tattack = "' + attack + '"\n'
|
||||
elif meta_name == "mbc":
|
||||
for mbc in list(metas[meta]):
|
||||
logger.info("mbc:" + mbc)
|
||||
logger.info("mbc: %s", mbc)
|
||||
# cut out tag in square brackets, e.g. Cryptography::Encrypt Data::RC6 [C0027.010] => C0027.010
|
||||
r = re.search(r"\[(.[^\]]*)", mbc)
|
||||
if r:
|
||||
tag = r.group(1)
|
||||
logger.info("mbc tag:" + tag)
|
||||
logger.info("mbc tag: %s", tag)
|
||||
tag = re.sub(r"\W", "_", tag)
|
||||
rule_tags += tag + " "
|
||||
|
||||
@@ -713,10 +716,10 @@ def main(argv=None):
|
||||
try:
|
||||
rules = capa.main.get_rules([args.rules])
|
||||
namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values()))
|
||||
logger.info("successfully loaded %s rules (including subscope rules which will be ignored)", len(rules))
|
||||
logger.info("successfully loaded %d rules (including subscope rules which will be ignored)", len(rules))
|
||||
if args.tag:
|
||||
rules = rules.filter_rules_by_meta(args.tag)
|
||||
logger.debug("selected %s rules", len(rules))
|
||||
logger.debug("selected %d rules", len(rules))
|
||||
for i, r in enumerate(rules.rules, 1):
|
||||
logger.debug(" %d. %s", i, r)
|
||||
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
|
||||
@@ -748,7 +751,7 @@ def main(argv=None):
|
||||
count_incomplete = 0
|
||||
while num_rules != len(converted_rules) or cround < min_rounds:
|
||||
cround += 1
|
||||
logger.info("doing convert_rules(), round: " + str(cround))
|
||||
logger.info("doing convert_rules(), round: %d", cround)
|
||||
num_rules = len(converted_rules)
|
||||
count_incomplete += convert_rules(rules, namespaces, cround, make_priv)
|
||||
|
||||
@@ -758,7 +761,7 @@ def main(argv=None):
|
||||
stats = "\n// converted rules : " + str(len(converted_rules))
|
||||
stats += "\n// among those are incomplete : " + str(count_incomplete)
|
||||
stats += "\n// unconverted rules : " + str(len(unsupported_capa_rules_list)) + "\n"
|
||||
logger.info(stats)
|
||||
logger.info("%s", stats)
|
||||
output_yar(stats)
|
||||
|
||||
return 0
|
||||
|
||||
@@ -47,7 +47,7 @@ def get_features(rule_path: str) -> list:
|
||||
new_rule = capa.rules.Rule.from_yaml(f.read())
|
||||
feature_list = get_child_features(new_rule.statement)
|
||||
except Exception as e:
|
||||
logger.error("Error: New rule " + rule_path + " " + str(type(e)) + " " + str(e))
|
||||
logger.error("Error: New rule %s %s %s", rule_path, str(type(e)), str(e))
|
||||
sys.exit(-1)
|
||||
return feature_list
|
||||
|
||||
|
||||
@@ -355,7 +355,7 @@ class DoesntMatchExample(Lint):
|
||||
try:
|
||||
capabilities = get_sample_capabilities(ctx, path)
|
||||
except Exception as e:
|
||||
logger.error("failed to extract capabilities: %s %s %s", rule.name, str(path), e, exc_info=True)
|
||||
logger.exception("failed to extract capabilities: %s %s %s", rule.name, str(path), e)
|
||||
return True
|
||||
|
||||
if rule.name not in capabilities:
|
||||
|
||||
@@ -118,12 +118,12 @@ def main(argv=None):
|
||||
|
||||
samples = timeit.repeat(do_iteration, number=args.number, repeat=args.repeat)
|
||||
|
||||
logger.debug("perf: find capabilities: min: %0.2fs" % (min(samples) / float(args.number)))
|
||||
logger.debug("perf: find capabilities: avg: %0.2fs" % (sum(samples) / float(args.repeat) / float(args.number)))
|
||||
logger.debug("perf: find capabilities: max: %0.2fs" % (max(samples) / float(args.number)))
|
||||
logger.debug("perf: find capabilities: min: %0.2fs", (min(samples) / float(args.number)))
|
||||
logger.debug("perf: find capabilities: avg: %0.2fs", (sum(samples) / float(args.repeat) / float(args.number)))
|
||||
logger.debug("perf: find capabilities: max: %0.2fs", (max(samples) / float(args.number)))
|
||||
|
||||
for counter, count in capa.perf.counters.most_common():
|
||||
logger.debug("perf: counter: {:}: {:,}".format(counter, count))
|
||||
logger.debug("perf: counter: %s: %s", counter, count)
|
||||
|
||||
print(
|
||||
tabulate.tabulate(
|
||||
|
||||
@@ -65,7 +65,7 @@ class MitreExtractor:
|
||||
if self.url == "":
|
||||
raise ValueError(f"URL not specified in class {self.__class__.__name__}")
|
||||
|
||||
logging.info(f"Downloading STIX data at: {self.url}")
|
||||
logging.info("Downloading STIX data at: %s", self.url)
|
||||
stix_json = requests.get(self.url).json()
|
||||
self._memory_store = MemoryStore(stix_data=stix_json["objects"])
|
||||
|
||||
@@ -170,12 +170,12 @@ def main(args: argparse.Namespace) -> None:
|
||||
logging.info("Extracting MBC behaviors...")
|
||||
data["mbc"] = MbcExtractor().run()
|
||||
|
||||
logging.info(f"Writing results to {args.output}")
|
||||
logging.info("Writing results to %s", args.output)
|
||||
try:
|
||||
with open(args.output, "w", encoding="utf-8") as jf:
|
||||
json.dump(data, jf, indent=2)
|
||||
except BaseException as e:
|
||||
logging.error(f"Exception encountered when writing results: {e}")
|
||||
logging.error("Exception encountered when writing results: %s", e)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user