diff --git a/mitreattack/attackToExcel/attackToExcel.py b/mitreattack/attackToExcel/attackToExcel.py index b9ccf0b..58a8a29 100644 --- a/mitreattack/attackToExcel/attackToExcel.py +++ b/mitreattack/attackToExcel/attackToExcel.py @@ -81,10 +81,8 @@ def get_stix_data( return mem_store -def build_dataframes_pre_v18(src: MemoryStore, domain: str) -> Dict: - """Build pandas dataframes for each attack type, and return a dictionary lookup for each type to the relevant dataframe. - - This version of the function is used for ATT&CK versions prior to v18, to account for changes to data components/data sources. +def _build_dataframes_common(src: MemoryStore, domain: str, *, use_pre_v18_datasources: bool = False) -> Dict: + """Shared implementation for building pandas dataframes for each ATT&CK type. Parameters ---------- @@ -92,29 +90,62 @@ def build_dataframes_pre_v18(src: MemoryStore, domain: str) -> Dict: MemoryStore or other stix2 DataSource object domain : str domain of ATT&CK src corresponds to, e.g "enterprise-attack" + use_pre_v18_datasources : bool + If True, uses the pre-v18 datasourcesToDf (combined data sources + components). + If False, uses the v18+ datacomponentsToDf. Returns ------- dict A dict lookup of each ATT&CK type to dataframes for the given type to be ingested by write_excel """ + # Pre-compute all relationship data once (the expensive part) instead of + # repeating it 7+ times across the individual *ToDf() calls. + logger.info("Pre-computing relationship data...") + rel_data = stixToDf._process_all_relationships(src) + df = { - "techniques": stixToDf.techniquesToDf(src, domain), + "techniques": stixToDf.techniquesToDf(src, domain, _rel_data=rel_data), "tactics": stixToDf.tacticsToDf(src), - "software": stixToDf.softwareToDf(src), - "groups": stixToDf.groupsToDf(src), - "campaigns": stixToDf.campaignsToDf(src), - "assets": stixToDf.assetsToDf(src), - "mitigations": stixToDf.mitigationsToDf(src), + "software": stixToDf.softwareToDf(src, _rel_data=rel_data), + "groups": stixToDf.groupsToDf(src, _rel_data=rel_data), + "campaigns": stixToDf.campaignsToDf(src, _rel_data=rel_data), + "assets": stixToDf.assetsToDf(src, _rel_data=rel_data), + "mitigations": stixToDf.mitigationsToDf(src, _rel_data=rel_data), "matrices": stixToDf.matricesToDf(src, domain), - "relationships": stixToDf.relationshipsToDf(src), - "datasources": stixToDf.datasourcesToDf(src), + "relationships": stixToDf.relationshipsToDf(src, _precomputed=rel_data), "analytics": stixToDf.analyticsToDf(src), "detectionstrategies": stixToDf.detectionstrategiesToDf(src), } + + if use_pre_v18_datasources: + df["datasources"] = stixToDf.datasourcesToDf(src, _rel_data=rel_data) + else: + df["datacomponents"] = stixToDf.datacomponentsToDf(src) + return df +def build_dataframes_pre_v18(src: MemoryStore, domain: str) -> Dict: + """Build pandas dataframes for each attack type, and return a dictionary lookup for each type to the relevant dataframe. + + This version of the function is used for ATT&CK versions prior to v18, to account for changes to data components/data sources. + + Parameters + ---------- + src : MemoryStore + MemoryStore or other stix2 DataSource object + domain : str + domain of ATT&CK src corresponds to, e.g "enterprise-attack" + + Returns + ------- + dict + A dict lookup of each ATT&CK type to dataframes for the given type to be ingested by write_excel + """ + return _build_dataframes_common(src, domain, use_pre_v18_datasources=True) + + def build_dataframes(src: MemoryStore, domain: str) -> Dict: """Build pandas dataframes for each attack type, and return a dictionary lookup for each type to the relevant dataframe. @@ -130,21 +161,7 @@ def build_dataframes(src: MemoryStore, domain: str) -> Dict: dict A dict lookup of each ATT&CK type to dataframes for the given type to be ingested by write_excel """ - df = { - "techniques": stixToDf.techniquesToDf(src, domain), - "tactics": stixToDf.tacticsToDf(src), - "software": stixToDf.softwareToDf(src), - "groups": stixToDf.groupsToDf(src), - "campaigns": stixToDf.campaignsToDf(src), - "assets": stixToDf.assetsToDf(src), - "mitigations": stixToDf.mitigationsToDf(src), - "matrices": stixToDf.matricesToDf(src, domain), - "relationships": stixToDf.relationshipsToDf(src), - "datacomponents": stixToDf.datacomponentsToDf(src), - "analytics": stixToDf.analyticsToDf(src), - "detectionstrategies": stixToDf.detectionstrategiesToDf(src), - } - return df + return _build_dataframes_common(src, domain, use_pre_v18_datasources=False) def build_ds_an_lg_relationships(dataframes: Dict) -> Dict[str, pd.DataFrame]: diff --git a/mitreattack/attackToExcel/stixToDf.py b/mitreattack/attackToExcel/stixToDf.py index c6bdf6c..8128b33 100644 --- a/mitreattack/attackToExcel/stixToDf.py +++ b/mitreattack/attackToExcel/stixToDf.py @@ -12,7 +12,35 @@ from tqdm import tqdm from mitreattack.constants import MITRE_ATTACK_ID_SOURCE_NAMES, PLATFORMS_LOOKUP -from mitreattack.stix20 import MitreAttackData + +# Module-level constants for type mappings (avoid recreating per call) +_ATTACK_TO_STIX_TERM = { + "technique": ["attack-pattern"], + "tactic": ["x-mitre-tactic"], + "software": ["tool", "malware"], + "group": ["intrusion-set"], + "campaign": ["campaign"], + "asset": ["x-mitre-asset"], + "mitigation": ["course-of-action"], + "matrix": ["x-mitre-matrix"], + "datasource": ["x-mitre-data-component"], + "detectionstrategy": ["x-mitre-detection-strategy"], +} + +_STIX_TO_ATTACK_TERM = { + "attack-pattern": "technique", + "x-mitre-tactic": "tactic", + "tool": "software", + "malware": "software", + "intrusion-set": "group", + "course-of-action": "mitigation", + "x-mitre-matrix": "matrix", + "x-mitre-data-component": "datacomponent", + "x-mitre-data-source": "datasource", + "campaign": "campaign", + "x-mitre-asset": "asset", + "x-mitre-detection-strategy": "detectionstrategy", +} def remove_revoked_deprecated(stix_objects): @@ -101,11 +129,133 @@ def parseBaseStix(sdo): return row -def techniquesToDf(src, domain): +def _extract_attack_id(obj): + """Extract ATT&CK ID from an already-fetched STIX object without re-fetching.""" + external_references = obj.get("external_references", []) + if external_references: + attack_source = external_references[0] + if attack_source.get("external_id") and attack_source.get("source_name") == "mitre-attack": + return attack_source["external_id"] + return None + + +def _prefetch_data_components(src, analytics): + """Pre-fetch all data components referenced by analytics into a dict for O(1) lookups. + + :param src: MemoryStore or other stix2 DataSource object + :param analytics: list of analytic STIX objects + :returns: dict of data_component_id -> data_component_object + """ + all_dc_ids = set() + for analytic in analytics: + for logsrc in analytic.get("x_mitre_log_source_references", []): + dc_id = logsrc.get("x_mitre_data_component_ref", "") + if dc_id: + all_dc_ids.add(dc_id) + dc_cache = {} + for dc_id in all_dc_ids: + dc_obj = src.get(dc_id) + if dc_obj is not None: + dc_cache[dc_id] = dc_obj + return dc_cache + + +def _process_all_relationships(src): + """Process all relationships from the STIX data source into a sorted DataFrame and citations. + + This is the expensive operation (object lookups, iteration) that should only be done once + per export. The result can be passed to relationshipsToDf() via the _precomputed parameter + to avoid redundant processing. + + :param src: MemoryStore or other stix2 DataSource object holding the domain data + :returns: tuple of (sorted_relationship_df, citations_df) + """ + relationships_stix = src.query([Filter("type", "=", "relationship")]) + relationships_stix = remove_revoked_deprecated(relationships_stix) + + # Pre-fetch all referenced objects to avoid N+1 lookup pattern. + # Instead of calling src.get() 2-4 times per relationship in the loop, + # we collect all unique IDs and fetch each object exactly once. + referenced_ids = set() + for rel in relationships_stix: + referenced_ids.add(rel["source_ref"]) + referenced_ids.add(rel["target_ref"]) + + object_cache = {} + for obj_id in referenced_ids: + obj = src.get(obj_id) + if obj is not None: + object_cache[obj_id] = obj + + relationship_rows = [] + for relationship in tqdm(relationships_stix, desc="parsing all relationships"): + source = object_cache.get(relationship["source_ref"]) + target = object_cache.get(relationship["target_ref"]) + + # filter if related objects don't exist or are revoked or deprecated + if not source or source.get("x_mitre_deprecated", False) is True or source.get("revoked", False) is True: + continue + if not target or target.get("x_mitre_deprecated", False) is True or target.get("revoked", False) is True: + continue + if relationship["relationship_type"] == "revoked": + continue + # don't track sub-technique relationships, those are tracked in the techniques df + if relationship["relationship_type"] == "subtechnique-of": + continue + + row = {} + # Extract ATT&CK IDs directly from already-fetched objects + # (avoids the redundant src.get() that MitreAttackData.get_attack_id() would do) + row["source ID"] = _extract_attack_id(source) + row["source name"] = source.get("name") + row["source ref"] = source.get("id") + row["source type"] = _STIX_TO_ATTACK_TERM.get(source["type"]) + + row["mapping type"] = relationship["relationship_type"] + + row["target ID"] = _extract_attack_id(target) + row["target name"] = target.get("name") + row["target ref"] = target.get("id") + row["target type"] = _STIX_TO_ATTACK_TERM.get(target["type"]) + + if "description" in relationship: + row["mapping description"] = relationship["description"] + row["STIX ID"] = relationship["id"] + if "created" in relationship: + row["created"] = format_date(relationship["created"]) + if "modified" in relationship: + row["last modified"] = format_date(relationship["modified"]) + relationship_rows.append(row) + + citations = get_citations(relationships_stix) + + relationship_df = pd.DataFrame(relationship_rows) + if relationship_df.empty or "mapping type" not in relationship_df.columns: + return (pd.DataFrame(), citations) + + relationship_df = relationship_df.sort_values( + [ + "mapping type", + "source type", + "target type", + "source name", + "target name", + "source ref", + "target ref", + "created", + "last modified", + ] + ) + + return (relationship_df, citations) + + +def techniquesToDf(src, domain, *, _rel_data=None): """Parse STIX techniques from the given data and return corresponding pandas dataframes. :param src: MemoryStore or other stix2 DataSource object holding the domain data :param domain: domain of ATT&CK src corresponds to, e.g "enterprise-attack" + :param _rel_data: optional pre-computed relationship data from _process_all_relationships() :returns: a lookup of labels (descriptors/names) to dataframes """ techniques = src.query([Filter("type", "=", "attack-pattern")]) @@ -225,7 +375,8 @@ def techniquesToDf(src, domain): "techniques": pd.DataFrame(technique_rows).sort_values("name"), } # add relationships - codex = relationshipsToDf(src, relatedType="technique") + rel_kwargs = {"_precomputed": _rel_data} if _rel_data is not None else {} + codex = relationshipsToDf(src, relatedType="technique", **rel_kwargs) dataframes.update(codex) # add relationship references dataframes["techniques"]["relationship citations"] = _get_relationship_citations(dataframes["techniques"], codex) @@ -264,12 +415,13 @@ def tacticsToDf(src): return dataframes -def datasourcesToDf(src): +def datasourcesToDf(src, *, _rel_data=None): """Parse STIX Data Sources and their Data components from the given data and return corresponding pandas dataframes. This is only used in versions of ATT&CK before v18. :param src: MemoryStore or other stix2 DataSource object holding the domain data + :param _rel_data: optional pre-computed relationship data from _process_all_relationships() :returns: a lookup of labels (descriptors/names) to dataframes """ data = list( @@ -328,7 +480,8 @@ def datasourcesToDf(src): ] ) # add relationships - dataframes.update(relationshipsToDf(src, relatedType="datasource")) + rel_kwargs = {"_precomputed": _rel_data} if _rel_data is not None else {} + dataframes.update(relationshipsToDf(src, relatedType="datasource", **rel_kwargs)) # add/merge citations if not citations.empty: if "citations" in dataframes: # append to existing citations from references @@ -398,41 +551,26 @@ def analyticsToDf(src): } ) - # Prints out errors where data components are not in the same domain as analytics + dc_cache = _prefetch_data_components(src, analytics) + + # Single pass: validate and build rows together for analytic in tqdm(analytics, desc="parsing analytics"): analytic_id = analytic.get("id") + analytic_rows.append(parseBaseStix(analytic)) + + # log-source relationship table rows (also validates data components) for logsrc in analytic.get("x_mitre_log_source_references", []): data_comp_id = logsrc.get("x_mitre_data_component_ref", "") - data_comp = src.get(data_comp_id) + data_comp = dc_cache.get(data_comp_id) try: data_comp_attack_id = data_comp["external_references"][0]["external_id"] except (KeyError, TypeError, IndexError, AttributeError): if data_comp_id not in failed_by_data_component: failed_by_data_component[data_comp_id] = [] failed_by_data_component[data_comp_id].append(analytic_id) + continue - if failed_by_data_component: - lines = ["Failures grouped by data component:\n"] - for dc_id in sorted(failed_by_data_component): - analytic_ids = sorted(set(failed_by_data_component[dc_id])) - dc_obj = src.get(dc_id) or {} - dc_name = dc_obj.get("name", "") - - lines.append(f"data_component={dc_id}" + (f" ({dc_name})" if dc_name else "")) - lines.extend([f" - analytic={a}" for a in analytic_ids]) - lines.append("") - - raise RuntimeError("\n".join(lines)) - - for analytic in tqdm(analytics, desc="parsing analytics"): - analytic_rows.append(parseBaseStix(analytic)) - - # log-source relationship table rows - for logsrc in analytic.get("x_mitre_log_source_references", []): - data_comp_id = logsrc.get("x_mitre_data_component_ref", "") - data_comp = src.get(data_comp_id) data_comp_name = data_comp.get("name", "") if data_comp else "" - data_comp_attack_id = data_comp["external_references"][0]["external_id"] logsource_rows.append( { @@ -460,6 +598,19 @@ def analyticsToDf(src): } ) + if failed_by_data_component: + lines = ["Failures grouped by data component:\n"] + for dc_id in sorted(failed_by_data_component): + analytic_ids = sorted(set(failed_by_data_component[dc_id])) + dc_obj = dc_cache.get(dc_id, {}) + dc_name = dc_obj.get("name", "") + + lines.append(f"data_component={dc_id}" + (f" ({dc_name})" if dc_name else "")) + lines.extend([f" - analytic={a}" for a in analytic_ids]) + lines.append("") + + raise RuntimeError("\n".join(lines)) + dataframes["analytics"] = pd.DataFrame(analytic_rows).sort_values("name") citations = get_citations(analytics) @@ -485,6 +636,18 @@ def detectionstrategiesToDf(src): if detection_strategies: detection_strategy_rows = [] rel_rows = [] + + # Pre-fetch all referenced analytics once instead of per-detection-strategy + all_analytic_ids = set() + for ds in detection_strategies: + for a_id in ds.get("x_mitre_analytic_refs", []): + all_analytic_ids.add(a_id) + analytic_cache = {} + for a_id in all_analytic_ids: + a_obj = src.get(a_id) + if a_obj is not None: + analytic_cache[a_id] = a_obj + for detection_strategy in tqdm(detection_strategies, desc="parsing detection strategies"): row = parseBaseStix(detection_strategy) row["analytic_refs"] = "; ".join(detection_strategy.get("x_mitre_analytic_refs", [])) @@ -492,7 +655,7 @@ def detectionstrategiesToDf(src): # analytics relationship table rows for analytic_id in detection_strategy.get("x_mitre_analytic_refs", []): - analytic_obj = src.get(analytic_id) + analytic_obj = analytic_cache.get(analytic_id) rel_rows.append( { @@ -517,10 +680,11 @@ def detectionstrategiesToDf(src): return dataframes -def softwareToDf(src): +def softwareToDf(src, *, _rel_data=None): """Parse STIX software from the given data and return corresponding pandas dataframes. :param src: MemoryStore or other stix2 DataSource object holding the domain data + :param _rel_data: optional pre-computed relationship data from _process_all_relationships() :returns: a lookup of labels (descriptors/names) to dataframes """ software = list( @@ -547,7 +711,8 @@ def softwareToDf(src): "software": pd.DataFrame(software_rows).sort_values("name"), } # add relationships - codex = relationshipsToDf(src, relatedType="software") + rel_kwargs = {"_precomputed": _rel_data} if _rel_data is not None else {} + codex = relationshipsToDf(src, relatedType="software", **rel_kwargs) dataframes.update(codex) # add relationship references dataframes["software"]["relationship citations"] = _get_relationship_citations(dataframes["software"], codex) @@ -572,6 +737,8 @@ def detectionStrategiesAnalyticsLogSourcesDf(src): analytics = remove_revoked_deprecated(analytics) analytics_by_id = {a["id"]: a for a in analytics} + dc_cache = _prefetch_data_components(src, analytics) + rows = [] for ds in detection_strategies: ds_attack_id = ds.get("external_references", [{}])[0].get("external_id", "") @@ -586,7 +753,7 @@ def detectionStrategiesAnalyticsLogSourcesDf(src): logsrc_refs = analytic.get("x_mitre_log_source_references", []) for logsrc in logsrc_refs: data_comp_id = logsrc.get("x_mitre_data_component_ref", "") - data_comp = src.get(data_comp_id) + data_comp = dc_cache.get(data_comp_id) rows.append( { @@ -607,10 +774,11 @@ def detectionStrategiesAnalyticsLogSourcesDf(src): return pd.DataFrame(rows) -def groupsToDf(src): +def groupsToDf(src, *, _rel_data=None): """Parse STIX groups from the given data and return corresponding pandas dataframes. :param src: MemoryStore or other stix2 DataSource object holding the domain data + :param _rel_data: optional pre-computed relationship data from _process_all_relationships() :returns: a lookup of labels (descriptors/names) to dataframes """ groups = src.query([Filter("type", "=", "intrusion-set")]) @@ -641,7 +809,8 @@ def groupsToDf(src): "groups": pd.DataFrame(group_rows).sort_values("name"), } # add relationships - codex = relationshipsToDf(src, relatedType="group") + rel_kwargs = {"_precomputed": _rel_data} if _rel_data is not None else {} + codex = relationshipsToDf(src, relatedType="group", **rel_kwargs) dataframes.update(codex) # add relationship references dataframes["groups"]["relationship citations"] = _get_relationship_citations(dataframes["groups"], codex) @@ -658,10 +827,11 @@ def groupsToDf(src): return dataframes -def campaignsToDf(src): +def campaignsToDf(src, *, _rel_data=None): """Parse STIX campaigns from the given data and return corresponding pandas dataframes. :param src: MemoryStore or other stix2 DataSource object holding the domain data + :param _rel_data: optional pre-computed relationship data from _process_all_relationships() :returns: a lookup of labels (descriptors/names) to dataframes """ campaigns = src.query([Filter("type", "=", "campaign")]) @@ -700,7 +870,8 @@ def campaignsToDf(src): "campaigns": pd.DataFrame(campaign_rows).sort_values("name"), } # add relationships - codex = relationshipsToDf(src, relatedType="campaign") + rel_kwargs = {"_precomputed": _rel_data} if _rel_data is not None else {} + codex = relationshipsToDf(src, relatedType="campaign", **rel_kwargs) dataframes.update(codex) # add relationship references @@ -721,10 +892,11 @@ def campaignsToDf(src): return dataframes -def assetsToDf(src): +def assetsToDf(src, *, _rel_data=None): """Parse STIX assets from the given data and return corresponding pandas dataframes. :param src: MemoryStore or other stix2 DataSource object holding the domain data + :param _rel_data: optional pre-computed relationship data from _process_all_relationships() :returns: a lookup of labels (descriptors/names) to dataframes """ assets = src.query([Filter("type", "=", "x-mitre-asset")]) @@ -761,7 +933,8 @@ def assetsToDf(src): "assets": pd.DataFrame(asset_rows).sort_values("name"), } # add relationships - codex = relationshipsToDf(src, relatedType="asset") + rel_kwargs = {"_precomputed": _rel_data} if _rel_data is not None else {} + codex = relationshipsToDf(src, relatedType="asset", **rel_kwargs) dataframes.update(codex) # add relationship references dataframes["assets"]["relationship citations"] = _get_relationship_citations(dataframes["assets"], codex) @@ -780,10 +953,11 @@ def assetsToDf(src): return dataframes -def mitigationsToDf(src): +def mitigationsToDf(src, *, _rel_data=None): """Parse STIX mitigations from the given data and return corresponding pandas dataframes. :param src: MemoryStore or other stix2 DataSource object holding the domain data + :param _rel_data: optional pre-computed relationship data from _process_all_relationships() :returns: a lookup of labels (descriptors/names) to dataframes """ mitigations = src.query([Filter("type", "=", "course-of-action")]) @@ -797,7 +971,8 @@ def mitigationsToDf(src): "mitigations": pd.DataFrame(mitigation_rows).sort_values("name"), } # add relationships - codex = relationshipsToDf(src, relatedType="mitigation") + rel_kwargs = {"_precomputed": _rel_data} if _rel_data is not None else {} + codex = relationshipsToDf(src, relatedType="mitigation", **rel_kwargs) dataframes.update(codex) # add relationship references dataframes["mitigations"]["relationship citations"] = _get_relationship_citations(dataframes["mitigations"], codex) @@ -844,7 +1019,15 @@ def _loc_to_excel(self, row, col): def build_technique_and_sub_columns( - src, techniques, columns, merge_data_handle, matrix_grid_handle, tactic_name, platform=None + src, + techniques, + columns, + merge_data_handle, + matrix_grid_handle, + tactic_name, + platform=None, + *, + _sub_techniques_store=None, ): """Build technique and subtechnique columns for a given matrix and attach them to the appropriate object listings. @@ -856,19 +1039,23 @@ def build_technique_and_sub_columns( columns will be appended here) :param tactic_name: The name of the corresponding tactic for this column :param platform: [Optional] The name of a platform to filter subtechniques by + :param _sub_techniques_store: [Optional] Pre-computed MemoryStore of subtechnique-of relationships :return: Nothing (meta - modifies the passed in merge_data_handle and matrix_grid_handle objects) """ techniques_column = [] subtechniques_column = [] - all_sub_techniques = src.query( - [ - Filter("type", "=", "relationship"), - Filter("relationship_type", "=", "subtechnique-of"), - ] - ) - all_sub_techniques = MemoryStore(stix_data=all_sub_techniques) + if _sub_techniques_store is not None: + all_sub_techniques = _sub_techniques_store + else: + all_sub_techniques = src.query( + [ + Filter("type", "=", "relationship"), + Filter("relationship_type", "=", "subtechnique-of"), + ] + ) + all_sub_techniques = MemoryStore(stix_data=all_sub_techniques) for technique in techniques: techniques_column.append(technique["name"]) @@ -996,6 +1183,15 @@ def matricesToDf(src, domain): matrices_parsed = [] sub_matrices_parsed = [] + # Pre-compute subtechnique-of relationships once instead of per-tactic/per-platform + all_sub_technique_rels = src.query( + [ + Filter("type", "=", "relationship"), + Filter("relationship_type", "=", "subtechnique-of"), + ] + ) + sub_techniques_store = MemoryStore(stix_data=all_sub_technique_rels) + for matrix in tqdm(matrices, desc="parsing matrices"): sub_matrices_grid = dict() sub_matrices_merges = dict() @@ -1044,6 +1240,7 @@ def matricesToDf(src, domain): merge_data_handle=merge, matrix_grid_handle=matrix_grid, tactic_name=tactic["name"], + _sub_techniques_store=sub_techniques_store, ) for platform in PLATFORMS_LOOKUP[domain]: @@ -1064,6 +1261,7 @@ def matricesToDf(src, domain): matrix_grid_handle=sub_matrices_grid[platform], tactic_name=tactic["name"], platform=platform, + _sub_techniques_store=sub_techniques_store, ) # square the grid because pandas doesn't like jagged columns @@ -1103,126 +1301,33 @@ def matricesToDf(src, domain): return matrices_parsed, sub_matrices_parsed -def relationshipsToDf(src, relatedType=None): +def relationshipsToDf(src, relatedType=None, *, _precomputed=None): """Parse STIX relationships from the given data and return corresponding pandas dataframes. :param src: MemoryStore or other stix2 DataSource object holding the domain data :param relatedType: optional, singular attack type to only return relationships with, e.g "mitigation" + :param _precomputed: optional, pre-computed (relationship_df, citations) tuple from _process_all_relationships(). + When provided, skips the expensive relationship processing and only performs filtering/categorization. :returns: a lookup of labels (descriptors/names) to dataframes """ - # Helper lookups - attackToStixTerm = { - "technique": ["attack-pattern"], - "tactic": ["x-mitre-tactic"], - "software": ["tool", "malware"], - "group": ["intrusion-set"], - "campaign": ["campaign"], - "asset": ["x-mitre-asset"], - "mitigation": ["course-of-action"], - "matrix": ["x-mitre-matrix"], - "datasource": ["x-mitre-data-component"], - "detectionstrategy": ["x-mitre-detection-strategy"], - } - stixToAttackTerm = { - "attack-pattern": "technique", - "x-mitre-tactic": "tactic", - "tool": "software", - "malware": "software", - "intrusion-set": "group", - "course-of-action": "mitigation", - "x-mitre-matrix": "matrix", - "x-mitre-data-component": "datacomponent", - "x-mitre-data-source": "datasource", - "campaign": "campaign", - "x-mitre-asset": "asset", - "x-mitre-detection-strategy": "detectionstrategy", - } - - mitre_attack_data = MitreAttackData(src=src) - - # get master list of relationships - relationships = src.query([Filter("type", "=", "relationship")]) - relationships = remove_revoked_deprecated(relationships) - relationship_rows = [] # build list of rows for dataframe - # tqdm description depends on the related type and parameters - iterdesc = "parsing all relationships" if not relatedType else f"parsing relationships for type={relatedType}" - for relationship in tqdm(relationships, desc=iterdesc): - source = src.get(relationship["source_ref"]) # source object of the relationship - target = src.get(relationship["target_ref"]) # target object of the relationship - - # filter if related objects don't exist or are revoked or deprecated - if not source or source.get("x_mitre_deprecated", False) is True or source.get("revoked", False) is True: - continue - if not target or target.get("x_mitre_deprecated", False) is True or target.get("revoked", False) is True: - continue - if relationship["relationship_type"] == "revoked": - continue - - # don't track sub-technique relationships, those are tracked in the techniques df - if relationship["relationship_type"] == "subtechnique-of": - continue - - # filter out relationships not with relatedType - if relatedType: - related = False - - # try all stix types for the ATT&CK type - for stixTerm in attackToStixTerm[relatedType]: - # if any stix type is part of the relationship - if source["type"] == stixTerm or target["type"] == stixTerm: - related = True - break - - if not related: - # skip this relationship if the types don't match - continue - - # add mapping data - row = {} - - row["source ID"] = mitre_attack_data.get_attack_id(stix_id=source["id"]) - row["source name"] = source.get("name") - row["source ref"] = source.get("id") - row["source type"] = stixToAttackTerm.get(source["type"]) - - # mapping type goes between the source/target data - row["mapping type"] = relationship["relationship_type"] - - row["target ID"] = mitre_attack_data.get_attack_id(stix_id=target["id"]) - row["target name"] = target.get("name") - row["target ref"] = target.get("id") - row["target type"] = stixToAttackTerm.get(target["type"]) - - if "description" in relationship: # add description of relationship to the end of the row - row["mapping description"] = relationship["description"] - # add required fields for workbench import: relationship stix id, created, and modified - row["STIX ID"] = relationship["id"] - if "created" in relationship: - row["created"] = format_date(relationship["created"]) - if "modified" in relationship: - row["last modified"] = format_date(relationship["modified"]) - relationship_rows.append(row) - - citations = get_citations(relationships) + if _precomputed is not None: + relationships, citations = _precomputed + else: + relationships, citations = _process_all_relationships(src) - relationship_df = pd.DataFrame(relationship_rows) - if relationship_df.empty or "mapping type" not in relationship_df.columns: + if relationships.empty or "mapping type" not in relationships.columns: logger.warning(f"No relationships found for relatedType={relatedType}. Returning empty dataframe.") return {} - relationships = relationship_df.sort_values( - [ - "mapping type", - "source type", - "target type", - "source name", - "target name", - "source ref", - "target ref", - "created", - "last modified", - ] - ) + # Filter to only relationships involving the requested type + if relatedType: + # Map relatedType to the ATT&CK type names used in the DataFrame + df_types = {_STIX_TO_ATTACK_TERM[t] for t in _ATTACK_TO_STIX_TERM[relatedType]} + type_mask = relationships["source type"].isin(df_types) | relationships["target type"].isin(df_types) + relationships = relationships[type_mask] + if relationships.empty: + logger.warning(f"No relationships found for relatedType={relatedType}. Returning empty dataframe.") + return {} # return all relationships and citations if not relatedType: @@ -1330,24 +1435,30 @@ def _get_relationship_citations(object_dataframe, relationship_df): This allows us to include citations from relationships for each ATT&CK object type. :param object_dataframe: Dataframe for relevant ATT&CK object - :param relationship_df: Dataframe of relationships + :param relationship_df: dict of sheet_name -> DataFrame from relationshipsToDf() :return: Array of strings, with each string being placed relative to the object listing, and containing all relevant citations """ - object_listing = [x for x in object_dataframe["ID"]] - new_citations = [] - for z in [x for x in relationship_df if x != "citations"]: - subset = [] - for y in object_listing: - mask = relationship_df[z].values == y - filtered = relationship_df[z].loc[mask] - temp = set() - for description in filter(lambda x: x == x, filtered["mapping description"].tolist()): - [temp.add(x) for x in re.findall(r"\(Citation: (.*?)\)", description)] - subset.append(",".join([f"(Citation: {z})" for z in temp])) - if not new_citations: - new_citations = subset - else: - for i in range(0, len(new_citations)): - new_citations[i] = ",".join([new_citations[i], subset[i]]) - return new_citations + all_ids = object_dataframe["ID"].tolist() + id_set = set(all_ids) + + # Collect citations per object ID using efficient column-based filtering + id_to_citations = {} + + for sheet_name, sheet_df in relationship_df.items(): + if sheet_name == "citations" or sheet_df.empty: + continue + if "mapping description" not in sheet_df.columns: + continue + + for id_col in ["source ID", "target ID"]: + if id_col not in sheet_df.columns: + continue + # Use .isin() for batch filtering (single pass) instead of per-ID comparison + mask = sheet_df[id_col].isin(id_set) + matching = sheet_df.loc[mask, [id_col, "mapping description"]].dropna(subset=["mapping description"]) + for obj_id, desc in zip(matching[id_col], matching["mapping description"], strict=True): + for cite in re.findall(r"\(Citation: (.*?)\)", desc): + id_to_citations.setdefault(obj_id, set()).add(cite) + + return [",".join(f"(Citation: {c})" for c in id_to_citations.get(obj_id, set())) for obj_id in all_ids]