#3880 Hazard Reports Name Entity Recognition and Knowledge Graphs

#3880 Hazard Reports Name Entity Recognition and Knowledge Graphs

#3880 A software hazard report has one or more causes of potential hazard caused by software issues.

I have a directory of software hazard reports in PDF format. I want to parse them for named entities and relationships, then generated a cross-hazard report knowledge graph, including: hazard reports, causes, controls for each cause and the control verifications.


Create a python app to:

1) Accept a directory name as input from the user

2) Read the directory, 

3) Loop over all PDF files converting each to text, then identifying a list of named entities and relationships, 

4) Report the list of hazard reports, and for each, the causes, controls, and verifications

5) Generate a list of Cypher statements to create the knowledge graph

I can certainly help you build this. This is a classic "Unstructured Data to Structured Knowledge Graph" pipeline.

To accomplish this, we need a few specific components:

  1. PDF Extraction: We'll use pypdf to extract raw text.

  2. Entity Extraction: Since I cannot see your specific PDF layout, I have implemented a Rule-Based/Regex Parser that looks for common keywords (e.g., "Cause:", "Control:", "Verification:").

    • Note: In a production environment, you would replace the Regex section with an NLP library like spaCy or an LLM API call to handle inconsistent wording.

  3. Graph Generation: We will generate standard Cypher queries compatible with Neo4j.

Here are the files for the application.

1. Requirements File

First, you will need the PDF parsing library.

Requirements.txt:  pypdf==3.17.1

2. The Python Application

This script encapsulates the logic for reading, parsing, and generating the graph query code.

import os

import re

import sys

from typing import List, Dict, Any

from pypdf import PdfReader


class HazardEntity:

    """Data structure to hold the extracted information."""

    def __init__(self, filename: str, report_id: str, title: str):

        self.filename = filename

        self.report_id = report_id

        self.title = title

        self.causes: List[Dict[str, Any]] = []


    def add_cause(self, description: str):

        cause = {

            "id": f"C-{len(self.causes) + 1}",

            "description": description,

            "controls": []

        }

        self.causes.append(cause)

        return cause


    def add_control(self, cause_index: int, description: str):

        if 0 <= cause_index < len(self.causes):

            control = {

                "id": f"CTL-{len(self.causes[cause_index]['controls']) + 1}",

                "description": description,

                "verifications": []

            }

            self.causes[cause_index]['controls'].append(control)

            return control

        return None


    def add_verification(self, cause_index: int, control_index: int, description: str):

        if 0 <= cause_index < len(self.causes):

            controls = self.causes[cause_index]['controls']

            if 0 <= control_index < len(controls):

                controls[control_index]['verifications'].append(description)


class PDFProcessor:

    def __init__(self):

        pass


    def extract_text(self, filepath: str) -> str:

        """Reads a PDF and returns the full text content."""

        try:

            reader = PdfReader(filepath)

            full_text = ""

            for page in reader.pages:

                text = page.extract_text()

                if text:

                    full_text += text + "\n"

            return full_text

        except Exception as e:

            print(f"Error reading {filepath}: {e}")

            return ""


    def parse_entities(self, filename: str, text: str) -> HazardEntity:

        """

        Parses text to find Hazards, Causes, Controls, and Verifications.

        

        NOTE: This uses Heuristics/Regex assuming a structured report format.

        For completely unstructured prose, an LLM or NLP model (spaCy) is required.

        

        Expected logic for this parser:

        1. Finds a Report ID/Title.

        2. Looks for lines starting with 'Cause:', 'Control:', 'Verification:'.

        """

        

        # 1. Attempt to find a Report ID (e.g., HR-2023-001)

        # Looking for patterns like "ID: XYZ" or "Report: XYZ"

        id_match = re.search(r'(?:Report ID|Hazard ID|ID)\s*[:\-]\s*([A-Za-z0-9\-]+)', text, re.IGNORECASE)

        report_id = id_match.group(1) if id_match else f"Unknown-ID-{filename}"

        

        # 2. Attempt to find a Title

        # Assumes the first line or a line after "Title:" is the title

        title_match = re.search(r'(?:Title|Subject)\s*[:\-]\s*(.+)', text, re.IGNORECASE)

        report_title = title_match.group(1).strip() if title_match else f"Hazard Report from {filename}"


        entity = HazardEntity(filename, report_id, report_title)


        # 3. Parse the body for Causes, Controls, and Verifications

        # We split the text by lines and iterate statefully

        lines = text.split('\n')

        

        current_cause_idx = -1

        current_control_idx = -1

        

        for line in lines:

            line = line.strip()

            if not line:

                continue


            # Heuristic: Detect "Cause"

            if re.match(r'^(?:Cause|Root Cause)\s*[:\-]', line, re.IGNORECASE):

                desc = re.sub(r'^(?:Cause|Root Cause)\s*[:\-]\s*', '', line, flags=re.IGNORECASE)

                entity.add_cause(desc)

                current_cause_idx = len(entity.causes) - 1

                current_control_idx = -1 # Reset control index for new cause


            # Heuristic: Detect "Control" or "Mitigation"

            elif re.match(r'^(?:Control|Mitigation|Safeguard)\s*[:\-]', line, re.IGNORECASE):

                if current_cause_idx > -1:

                    desc = re.sub(r'^(?:Control|Mitigation|Safeguard)\s*[:\-]\s*', '', line, flags=re.IGNORECASE)

                    entity.add_control(current_cause_idx, desc)

                    current_control_idx = len(entity.causes[current_cause_idx]['controls']) - 1


            # Heuristic: Detect "Verification" or "Test"

            elif re.match(r'^(?:Verification|Test|Validation)\s*[:\-]', line, re.IGNORECASE):

                if current_cause_idx > -1 and current_control_idx > -1:

                    desc = re.sub(r'^(?:Verification|Test|Validation)\s*[:\-]\s*', '', line, flags=re.IGNORECASE)

                    entity.add_verification(current_cause_idx, current_control_idx, desc)


        return entity


class CypherGenerator:

    @staticmethod

    def clean_str(text: str) -> str:

        """Escapes quotes for Cypher compatibility."""

        return text.replace("'", "\\'").replace('"', '\\"')


    @staticmethod

    def generate_statements(entities: List[HazardEntity]) -> List[str]:

        statements = []

        statements.append("// --- Beginning of Import ---")

        

        for entity in entities:

            # Create Hazard Report Node

            h_id = CypherGenerator.clean_str(entity.report_id)

            h_title = CypherGenerator.clean_str(entity.title)

            h_file = CypherGenerator.clean_str(entity.filename)

            

            # Using MERGE ensures we don't duplicate if we run the script twice

            statements.append(

                f"MERGE (h:HazardReport {{id: '{h_id}'}}) "

                f"ON CREATE SET h.title = '{h_title}', h.source_file = '{h_file}'"

            )


            for c_idx, cause in enumerate(entity.causes):

                c_desc = CypherGenerator.clean_str(cause['description'])

                # Unique ID for cause usually combines report ID + cause index

                cause_node_id = f"{h_id}_CAUSE_{c_idx}"

                

                statements.append(f"MERGE (c{c_idx}:Cause {{uid: '{cause_node_id}'}}) ON CREATE SET c{c_idx}.description = '{c_desc}'")

                statements.append(f"MERGE (h)-[:HAS_CAUSE]->(c{c_idx})")


                for ctrl_idx, control in enumerate(cause['controls']):

                    ctrl_desc = CypherGenerator.clean_str(control['description'])

                    ctrl_node_id = f"{cause_node_id}_CTRL_{ctrl_idx}"

                    

                    statements.append(f"MERGE (ctl{c_idx}_{ctrl_idx}:Control {{uid: '{ctrl_node_id}'}}) ON CREATE SET ctl{c_idx}_{ctrl_idx}.description = '{ctrl_desc}'")

                    statements.append(f"MERGE (c{c_idx})-[:MITIGATED_BY]->(ctl{c_idx}_{ctrl_idx})")


                    for ver_idx, verification in enumerate(control['verifications']):

                        ver_desc = CypherGenerator.clean_str(verification)

                        ver_node_id = f"{ctrl_node_id}_VER_{ver_idx}"

                        

                        statements.append(f"MERGE (v{c_idx}_{ctrl_idx}_{ver_idx}:Verification {{uid: '{ver_node_id}'}}) ON CREATE SET v{c_idx}_{ctrl_idx}_{ver_idx}.method = '{ver_desc}'")

                        statements.append(f"MERGE (ctl{c_idx}_{ctrl_idx})-[:VERIFIED_BY]->(v{c_idx}_{ctrl_idx}_{ver_idx})")


        statements.append("// --- End of Import ---")

        return statements


def main():

    print("--- PDF Hazard Report Parser & Knowledge Graph Generator ---")

    

    # 1. Get Input

    directory = input("Enter the directory path containing PDF files: ").strip()

    

    if not os.path.isdir(directory):

        print("Error: Directory not found.")

        sys.exit(1)


    processor = PDFProcessor()

    all_entities = []


    # 2. Loop over files

    print(f"\nScanning '{directory}' for PDFs...")

    files = [f for f in os.listdir(directory) if f.lower().endswith('.pdf')]

    

    if not files:

        print("No PDF files found in directory.")

        sys.exit(0)


    for filename in files:

        filepath = os.path.join(directory, filename)

        print(f"Processing: {filename}...")

        

        # 3. Extract and Parse

        text_content = processor.extract_text(filepath)

        if text_content:

            entity = processor.parse_entities(filename, text_content)

            all_entities.append(entity)


    # 4. Report Text Output

    print("\n" + "="*40)

    print("PARSED DATA REPORT")

    print("="*40)

    

    for ent in all_entities:

        print(f"Hazard: [{ent.report_id}] {ent.title}")

        if not ent.causes:

            print("  (No specific causes identified - check file format)")

        for cause in ent.causes:

            print(f"  └── Cause: {cause['description']}")

            for ctrl in cause['controls']:

                print(f"      └── Control: {ctrl['description']}")

                for ver in ctrl['verifications']:

                    print(f"          └── Verification: {ver}")

        print("-" * 20)


    # 5. Generate Cypher

    print("\n" + "="*40)

    print("CYPHER STATEMENTS (NEO4J)")

    print("="*40)

    

    cypher_queries = CypherGenerator.generate_statements(all_entities)

    

    # Print to console

    for query in cypher_queries:

        print(query)

        

    # Optional: Save to file

    output_file = "graph_import.cypher"

    with open(output_file, "w") as f:

        f.write("\n".join(cypher_queries))

    

    print(f"\nCypher queries also saved to '{output_file}'")


if __name__ == "__main__":

    main()

How to use this

  1. Install Dependencies: Open your terminal or command prompt and run:

    pip install -r requirements.txt
    
    
  2. Run the App:

    python hazard_graph_parser.py
    
    
  3. Provide Input: When prompted, paste the full path to your folder containing the PDF reports.

Note on PDF Formatting

The script uses a heuristic parser. It expects the PDF text to contain keywords to identify relationships. It looks for lines similar to:

  • ID: HR-101

  • Title: Buffer Overflow Risk

  • Cause: Unbounded string copy

  • Control: Use strncpy instead of strcpy

  • Verification: Static Code Analysis

If your PDFs have a complex tabular layout or use different terminology (e.g., "Mitigation" instead of "Control"), you can edit the Regex patterns in the parse_entities method inside hazard_graph_parser.py.


From <https://gemini.google.com/app/16ae64b8c9109410>  Google Gemini 3.0 Pro (Think)

Comments

Popular posts from this blog

#2892 Example of ChatGPT o1 Prompt

#3105 Federal and State Spending Idiocy