#3880 Hazard Reports Name Entity Recognition and Knowledge Graphs
#3880 Hazard Reports Name Entity Recognition and Knowledge Graphs
#3880 A software hazard report has one or more causes of potential hazard caused by software issues.
I have a directory of software hazard reports in PDF format. I want to parse them for named entities and relationships, then generated a cross-hazard report knowledge graph, including: hazard reports, causes, controls for each cause and the control verifications.
Create a python app to:
1) Accept a directory name as input from the user
2) Read the directory,
3) Loop over all PDF files converting each to text, then identifying a list of named entities and relationships,
4) Report the list of hazard reports, and for each, the causes, controls, and verifications
5) Generate a list of Cypher statements to create the knowledge graph
I can certainly help you build this. This is a classic "Unstructured Data to Structured Knowledge Graph" pipeline.
To accomplish this, we need a few specific components:
PDF Extraction: We'll use
pypdfto extract raw text.Entity Extraction: Since I cannot see your specific PDF layout, I have implemented a Rule-Based/Regex Parser that looks for common keywords (e.g., "Cause:", "Control:", "Verification:").
Note: In a production environment, you would replace the Regex section with an NLP library like spaCy or an LLM API call to handle inconsistent wording.
Graph Generation: We will generate standard Cypher queries compatible with Neo4j.
Here are the files for the application.
1. Requirements File
First, you will need the PDF parsing library.
Requirements.txt: pypdf==3.17.1
2. The Python Application
This script encapsulates the logic for reading, parsing, and generating the graph query code.
import os
import re
import sys
from typing import List, Dict, Any
from pypdf import PdfReader
class HazardEntity:
"""Data structure to hold the extracted information."""
def __init__(self, filename: str, report_id: str, title: str):
self.filename = filename
self.report_id = report_id
self.title = title
self.causes: List[Dict[str, Any]] = []
def add_cause(self, description: str):
cause = {
"id": f"C-{len(self.causes) + 1}",
"description": description,
"controls": []
}
self.causes.append(cause)
return cause
def add_control(self, cause_index: int, description: str):
if 0 <= cause_index < len(self.causes):
control = {
"id": f"CTL-{len(self.causes[cause_index]['controls']) + 1}",
"description": description,
"verifications": []
}
self.causes[cause_index]['controls'].append(control)
return control
return None
def add_verification(self, cause_index: int, control_index: int, description: str):
if 0 <= cause_index < len(self.causes):
controls = self.causes[cause_index]['controls']
if 0 <= control_index < len(controls):
controls[control_index]['verifications'].append(description)
class PDFProcessor:
def __init__(self):
pass
def extract_text(self, filepath: str) -> str:
"""Reads a PDF and returns the full text content."""
try:
reader = PdfReader(filepath)
full_text = ""
for page in reader.pages:
text = page.extract_text()
if text:
full_text += text + "\n"
return full_text
except Exception as e:
print(f"Error reading {filepath}: {e}")
return ""
def parse_entities(self, filename: str, text: str) -> HazardEntity:
"""
Parses text to find Hazards, Causes, Controls, and Verifications.
NOTE: This uses Heuristics/Regex assuming a structured report format.
For completely unstructured prose, an LLM or NLP model (spaCy) is required.
Expected logic for this parser:
1. Finds a Report ID/Title.
2. Looks for lines starting with 'Cause:', 'Control:', 'Verification:'.
"""
# 1. Attempt to find a Report ID (e.g., HR-2023-001)
# Looking for patterns like "ID: XYZ" or "Report: XYZ"
id_match = re.search(r'(?:Report ID|Hazard ID|ID)\s*[:\-]\s*([A-Za-z0-9\-]+)', text, re.IGNORECASE)
report_id = id_match.group(1) if id_match else f"Unknown-ID-{filename}"
# 2. Attempt to find a Title
# Assumes the first line or a line after "Title:" is the title
title_match = re.search(r'(?:Title|Subject)\s*[:\-]\s*(.+)', text, re.IGNORECASE)
report_title = title_match.group(1).strip() if title_match else f"Hazard Report from {filename}"
entity = HazardEntity(filename, report_id, report_title)
# 3. Parse the body for Causes, Controls, and Verifications
# We split the text by lines and iterate statefully
lines = text.split('\n')
current_cause_idx = -1
current_control_idx = -1
for line in lines:
line = line.strip()
if not line:
continue
# Heuristic: Detect "Cause"
if re.match(r'^(?:Cause|Root Cause)\s*[:\-]', line, re.IGNORECASE):
desc = re.sub(r'^(?:Cause|Root Cause)\s*[:\-]\s*', '', line, flags=re.IGNORECASE)
entity.add_cause(desc)
current_cause_idx = len(entity.causes) - 1
current_control_idx = -1 # Reset control index for new cause
# Heuristic: Detect "Control" or "Mitigation"
elif re.match(r'^(?:Control|Mitigation|Safeguard)\s*[:\-]', line, re.IGNORECASE):
if current_cause_idx > -1:
desc = re.sub(r'^(?:Control|Mitigation|Safeguard)\s*[:\-]\s*', '', line, flags=re.IGNORECASE)
entity.add_control(current_cause_idx, desc)
current_control_idx = len(entity.causes[current_cause_idx]['controls']) - 1
# Heuristic: Detect "Verification" or "Test"
elif re.match(r'^(?:Verification|Test|Validation)\s*[:\-]', line, re.IGNORECASE):
if current_cause_idx > -1 and current_control_idx > -1:
desc = re.sub(r'^(?:Verification|Test|Validation)\s*[:\-]\s*', '', line, flags=re.IGNORECASE)
entity.add_verification(current_cause_idx, current_control_idx, desc)
return entity
class CypherGenerator:
@staticmethod
def clean_str(text: str) -> str:
"""Escapes quotes for Cypher compatibility."""
return text.replace("'", "\\'").replace('"', '\\"')
@staticmethod
def generate_statements(entities: List[HazardEntity]) -> List[str]:
statements = []
statements.append("// --- Beginning of Import ---")
for entity in entities:
# Create Hazard Report Node
h_id = CypherGenerator.clean_str(entity.report_id)
h_title = CypherGenerator.clean_str(entity.title)
h_file = CypherGenerator.clean_str(entity.filename)
# Using MERGE ensures we don't duplicate if we run the script twice
statements.append(
f"MERGE (h:HazardReport {{id: '{h_id}'}}) "
f"ON CREATE SET h.title = '{h_title}', h.source_file = '{h_file}'"
)
for c_idx, cause in enumerate(entity.causes):
c_desc = CypherGenerator.clean_str(cause['description'])
# Unique ID for cause usually combines report ID + cause index
cause_node_id = f"{h_id}_CAUSE_{c_idx}"
statements.append(f"MERGE (c{c_idx}:Cause {{uid: '{cause_node_id}'}}) ON CREATE SET c{c_idx}.description = '{c_desc}'")
statements.append(f"MERGE (h)-[:HAS_CAUSE]->(c{c_idx})")
for ctrl_idx, control in enumerate(cause['controls']):
ctrl_desc = CypherGenerator.clean_str(control['description'])
ctrl_node_id = f"{cause_node_id}_CTRL_{ctrl_idx}"
statements.append(f"MERGE (ctl{c_idx}_{ctrl_idx}:Control {{uid: '{ctrl_node_id}'}}) ON CREATE SET ctl{c_idx}_{ctrl_idx}.description = '{ctrl_desc}'")
statements.append(f"MERGE (c{c_idx})-[:MITIGATED_BY]->(ctl{c_idx}_{ctrl_idx})")
for ver_idx, verification in enumerate(control['verifications']):
ver_desc = CypherGenerator.clean_str(verification)
ver_node_id = f"{ctrl_node_id}_VER_{ver_idx}"
statements.append(f"MERGE (v{c_idx}_{ctrl_idx}_{ver_idx}:Verification {{uid: '{ver_node_id}'}}) ON CREATE SET v{c_idx}_{ctrl_idx}_{ver_idx}.method = '{ver_desc}'")
statements.append(f"MERGE (ctl{c_idx}_{ctrl_idx})-[:VERIFIED_BY]->(v{c_idx}_{ctrl_idx}_{ver_idx})")
statements.append("// --- End of Import ---")
return statements
def main():
print("--- PDF Hazard Report Parser & Knowledge Graph Generator ---")
# 1. Get Input
directory = input("Enter the directory path containing PDF files: ").strip()
if not os.path.isdir(directory):
print("Error: Directory not found.")
sys.exit(1)
processor = PDFProcessor()
all_entities = []
# 2. Loop over files
print(f"\nScanning '{directory}' for PDFs...")
files = [f for f in os.listdir(directory) if f.lower().endswith('.pdf')]
if not files:
print("No PDF files found in directory.")
sys.exit(0)
for filename in files:
filepath = os.path.join(directory, filename)
print(f"Processing: {filename}...")
# 3. Extract and Parse
text_content = processor.extract_text(filepath)
if text_content:
entity = processor.parse_entities(filename, text_content)
all_entities.append(entity)
# 4. Report Text Output
print("\n" + "="*40)
print("PARSED DATA REPORT")
print("="*40)
for ent in all_entities:
print(f"Hazard: [{ent.report_id}] {ent.title}")
if not ent.causes:
print(" (No specific causes identified - check file format)")
for cause in ent.causes:
print(f" └── Cause: {cause['description']}")
for ctrl in cause['controls']:
print(f" └── Control: {ctrl['description']}")
for ver in ctrl['verifications']:
print(f" └── Verification: {ver}")
print("-" * 20)
# 5. Generate Cypher
print("\n" + "="*40)
print("CYPHER STATEMENTS (NEO4J)")
print("="*40)
cypher_queries = CypherGenerator.generate_statements(all_entities)
# Print to console
for query in cypher_queries:
print(query)
# Optional: Save to file
output_file = "graph_import.cypher"
with open(output_file, "w") as f:
f.write("\n".join(cypher_queries))
print(f"\nCypher queries also saved to '{output_file}'")
if __name__ == "__main__":
main()
How to use this
Install Dependencies: Open your terminal or command prompt and run:
pip install -r requirements.txtRun the App:
python hazard_graph_parser.pyProvide Input: When prompted, paste the full path to your folder containing the PDF reports.
Note on PDF Formatting
The script uses a heuristic parser. It expects the PDF text to contain keywords to identify relationships. It looks for lines similar to:
ID: HR-101Title: Buffer Overflow RiskCause: Unbounded string copyControl: Use strncpy instead of strcpyVerification: Static Code Analysis
If your PDFs have a complex tabular layout or use different terminology (e.g., "Mitigation" instead of "Control"), you can edit the Regex patterns in the parse_entities method inside hazard_graph_parser.py.
From <https://gemini.google.com/app/16ae64b8c9109410> Google Gemini 3.0 Pro (Think)

Comments
Post a Comment