updating progress message

This commit is contained in:
Michael Hunhoff
2020-09-14 13:00:06 -06:00
parent 0d93df7d59
commit 89e409157f

View File

@@ -33,11 +33,13 @@ settings = ida_settings.IDASettings("capa")
class UserCancelledError(Exception): class UserCancelledError(Exception):
"""throw exception when user cancels action""" """throw exception when user cancels action"""
pass pass
class CapaExplorerProgressIndicator(QtCore.QObject): class CapaExplorerProgressIndicator(QtCore.QObject):
"""implement progress signal, used during feature extraction""" """implement progress signal, used during feature extraction"""
progress = QtCore.pyqtSignal(str) progress = QtCore.pyqtSignal(str)
def __init__(self): def __init__(self):
@@ -59,23 +61,16 @@ class CapaExplorerFeatureExtractor(capa.features.extractors.ida.IdaFeatureExtrac
track progress during feature extraction, also allow user to cancel feature extraction track progress during feature extraction, also allow user to cancel feature extraction
""" """
def __init__(self, progress):
def __init__(self):
super(CapaExplorerFeatureExtractor, self).__init__() super(CapaExplorerFeatureExtractor, self).__init__()
self.progress = progress self.indicator = CapaExplorerProgressIndicator()
def extract_function_features(self, f): def extract_function_features(self, f):
self.progress.update("function at 0x%x" % f.start_ea) self.indicator.update("function at 0x%X" % f.start_ea)
for (feature, ea) in capa.features.extractors.ida.function.extract_features(f): for (feature, ea) in capa.features.extractors.ida.function.extract_features(f):
yield feature, ea yield feature, ea
def extract_basic_block_features(self, f, bb):
for (feature, ea) in capa.features.extractors.ida.basicblock.extract_features(f, bb):
yield feature, ea
def extract_insn_features(self, f, bb, insn):
for (feature, ea) in capa.features.extractors.ida.insn.extract_features(f, bb, insn):
yield feature, ea
class CapaExplorerForm(idaapi.PluginForm): class CapaExplorerForm(idaapi.PluginForm):
"""form element for plugin interface""" """form element for plugin interface"""
@@ -221,7 +216,7 @@ class CapaExplorerForm(idaapi.PluginForm):
"""load status label""" """load status label"""
label = QtWidgets.QLabel() label = QtWidgets.QLabel()
label.setAlignment(QtCore.Qt.AlignLeft) label.setAlignment(QtCore.Qt.AlignLeft)
label.setText("Analyze database to get started...") label.setText("Click Analyze to get started...")
self.view_status_label = label self.view_status_label = label
@@ -417,34 +412,29 @@ class CapaExplorerForm(idaapi.PluginForm):
self.process_total = 0 self.process_total = 0
self.process_count = 0 self.process_count = 0
# default from view
self.set_view_status_label("No rules loaded")
self.disable_controls()
def update_wait_box(text): def update_wait_box(text):
"""update the IDA wait box""" """update the IDA wait box"""
ida_kernwin.replace_wait_box("Processing; %s" % text) ida_kernwin.replace_wait_box("capa explorer...%s" % text)
def slot_progress_feature_extraction(text): def slot_progress_feature_extraction(text):
"""slot function to handle feature extraction progress updates""" """slot function to handle feature extraction progress updates"""
update_wait_box("%s (%d/%d)" % (text, self.process_count, self.process_total)) update_wait_box("%s (%d of %d)" % (text, self.process_count, self.process_total))
self.process_count += 1 self.process_count += 1
progress = CapaExplorerProgressIndicator() extractor = CapaExplorerFeatureExtractor()
progress.progress.connect(slot_progress_feature_extraction) extractor.indicator.progress.connect(slot_progress_feature_extraction)
extractor = CapaExplorerFeatureExtractor(progress)
update_wait_box("calculating analysis") update_wait_box("calculating analysis")
try: try:
self.process_total += len(tuple(extractor.get_functions())) self.process_total += len(tuple(extractor.get_functions()))
except Exception as e: except Exception as e:
logger.error("Failed to calculate analysis (error: %s)." % e) logger.error("Failed to calculate analysis (error: %s).", e)
return return False
if ida_kernwin.user_cancelled(): if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.") logger.info("User cancelled analysis.")
return return False
update_wait_box("loading rules") update_wait_box("loading rules")
@@ -454,22 +444,22 @@ class CapaExplorerForm(idaapi.PluginForm):
if "rule_path" in settings and os.path.exists(settings["rule_path"]): if "rule_path" in settings and os.path.exists(settings["rule_path"]):
self.rule_path = settings["rule_path"] self.rule_path = settings["rule_path"]
else: else:
idaapi.info("You must select a file directory containing capa rules before running analysis.") idaapi.info("Please select a file directory containing capa rules.")
rule_path = self.ask_user_directory() rule_path = self.ask_user_directory()
if not rule_path: if not rule_path:
logger.warning( logger.warning(
"You must select a file directory containing capa rules before running analysis. The standard collection of capa rules can be downloaded from https://github.com/fireeye/capa-rules." "You must select a file directory containing capa rules before analysis can be run. The standard collection of capa rules can be downloaded from https://github.com/fireeye/capa-rules."
) )
return return False
self.rule_path = rule_path self.rule_path = rule_path
settings.user["rule_path"] = rule_path settings.user["rule_path"] = rule_path
except Exception as e: except Exception as e:
logger.error("Failed to load capa rules (error: %s)." % e) logger.error("Failed to load capa rules (error: %s).", e)
return return False
if ida_kernwin.user_cancelled(): if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.") logger.info("User cancelled analysis.")
return return False
rule_path = self.rule_path rule_path = self.rule_path
@@ -500,7 +490,7 @@ class CapaExplorerForm(idaapi.PluginForm):
rules = [] rules = []
total_paths = len(rule_paths) total_paths = len(rule_paths)
for (i, rule_path) in enumerate(rule_paths): for (i, rule_path) in enumerate(rule_paths):
update_wait_box("loading rule %d/%d from %s" % (i + 1, total_paths, self.rule_path)) update_wait_box("loading capa rules from %s (%d of %d)" % (self.rule_path, i + 1, total_paths))
if ida_kernwin.user_cancelled(): if ida_kernwin.user_cancelled():
raise UserCancelledError("user cancelled") raise UserCancelledError("user cancelled")
try: try:
@@ -517,33 +507,33 @@ class CapaExplorerForm(idaapi.PluginForm):
rules = capa.rules.RuleSet(rules) rules = capa.rules.RuleSet(rules)
except UserCancelledError: except UserCancelledError:
logger.info("User cancelled analysis.") logger.info("User cancelled analysis.")
return return False
except Exception as e: except Exception as e:
capa.ida.helpers.inform_user_ida_ui("Failed to load capa rules from %s" % self.rule_path) capa.ida.helpers.inform_user_ida_ui("Failed to load capa rules from %s" % self.rule_path)
logger.error("Failed to load rules from %s (error: %s).", self.rule_path, e) logger.error("Failed to load rules from %s (error: %s).", self.rule_path, e)
logger.error("Make sure your file directory contains properly formatted capa rules. You can download the standard collection of capa rules from https://github.com/fireeye/capa-rules.") logger.error(
"Make sure your file directory contains properly formatted capa rules. You can download the standard collection of capa rules from https://github.com/fireeye/capa-rules."
)
self.rule_path = "" self.rule_path = ""
settings.user.del_value("rule_path") settings.user.del_value("rule_path")
return return False
if ida_kernwin.user_cancelled(): if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.") logger.info("User cancelled analysis.")
return return False
update_wait_box("extracting features") update_wait_box("extracting features")
try: try:
meta = capa.ida.helpers.collect_metadata() meta = capa.ida.helpers.collect_metadata()
capabilities, counts = capa.main.find_capabilities( capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
rules, extractor, disable_progress=True
)
meta["analysis"].update(counts) meta["analysis"].update(counts)
except UserCancelledError: except UserCancelledError:
logger.info("User cancelled analysis.") logger.info("User cancelled analysis.")
return return False
except Exception as e: except Exception as e:
logger.error("Failed to extract capabilities from database (error: %s)" % e) logger.error("Failed to extract capabilities from database (error: %s)", e)
return return False
update_wait_box("checking for file limitations") update_wait_box("checking for file limitations")
@@ -561,7 +551,9 @@ class CapaExplorerForm(idaapi.PluginForm):
logger.warning( logger.warning(
" This means the results may be misleading or incomplete if the binary file loaded in IDA is not x86/AMD64." " This means the results may be misleading or incomplete if the binary file loaded in IDA is not x86/AMD64."
) )
logger.warning(" If you don't know the input file type, you can try using the `file` utility to guess it.") logger.warning(
" If you don't know the input file type, you can try using the `file` utility to guess it."
)
logger.warning("-" * 80) logger.warning("-" * 80)
capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis")
@@ -569,25 +561,26 @@ class CapaExplorerForm(idaapi.PluginForm):
if capa.main.has_file_limitation(rules, capabilities, is_standalone=False): if capa.main.has_file_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
except Exception as e: except Exception as e:
logger.error("Failed to check for file limitations (error: %s)" % e) logger.error("Failed to check for file limitations (error: %s)", e)
return return False
if ida_kernwin.user_cancelled(): if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.") logger.info("User cancelled analysis.")
return return False
update_wait_box("Processing; rendering results") update_wait_box("rendering results")
try: try:
self.doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities) self.doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities)
self.model_data.render_capa_doc(self.doc) self.model_data.render_capa_doc(self.doc)
self.render_capa_doc_mitre_summary() self.render_capa_doc_mitre_summary()
self.enable_controls()
self.set_view_status_label("capa rules directory: %s (%d rules)" % (self.rule_path, rule_count))
except Exception as e: except Exception as e:
logger.error("Failed to render results (error: %s)" % e) logger.error("Failed to render results (error: %s)", e)
return return False
self.enable_controls() return True
self.set_view_status_label("Loaded %d capa rules from %s" % (rule_count, self.rule_path))
def render_capa_doc_mitre_summary(self): def render_capa_doc_mitre_summary(self):
"""render MITRE ATT&CK results""" """render MITRE ATT&CK results"""
@@ -667,10 +660,16 @@ class CapaExplorerForm(idaapi.PluginForm):
self.search_model_proxy.invalidate() self.search_model_proxy.invalidate()
self.model_data.clear() self.model_data.clear()
ida_kernwin.show_wait_box("Processing") self.disable_controls()
self.load_capa_results() self.set_view_status_label("Loading...")
ida_kernwin.show_wait_box("capa explorer")
success = self.load_capa_results()
ida_kernwin.hide_wait_box() ida_kernwin.hide_wait_box()
if not success:
self.set_view_status_label("Click Analyze to get started...")
self.ida_reset() self.ida_reset()
logger.info("Analysis completed.") logger.info("Analysis completed.")