#3805 Claude Code: KG in SQLite3 with Document Embeddings

#3805 Claude Code: KG in SQLite3 with Document Embeddings

#3805 Create a python application that reads a CSV to update a knowledge graph implemented in SQLite3, which also supports documents and document chunks as nodes, with embeddings for each document chunk. Multi-hop cypher queries must be supported.

I'll create a comprehensive Python application for a knowledge graph system with document support, embeddings, and Cypher-like query capabilities.

import sqlite3 import json import csv import numpy as np from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass import re @dataclass class Node: id: str type: str properties: Dict[str, Any] @dataclass class Relationship: source_id: str target_id: str rel_type: str properties: Dict[str, Any] class KnowledgeGraph: def __init__(self, db_path: str = "knowledge_graph.db"): self.conn = sqlite3.connect(db_path) self.conn.row_factory = sqlite3.Row self._initialize_schema() def _initialize_schema(self): """Initialize the database schema for nodes, relationships, documents, and embeddings.""" cursor = self.conn.cursor() # Nodes table cursor.execute(""" CREATE TABLE IF NOT EXISTS nodes ( id TEXT PRIMARY KEY, type TEXT NOT NULL, properties TEXT ) """) # Relationships table cursor.execute(""" CREATE TABLE IF NOT EXISTS relationships ( id INTEGER PRIMARY KEY AUTOINCREMENT, source_id TEXT NOT NULL, target_id TEXT NOT NULL, rel_type TEXT NOT NULL, properties TEXT, FOREIGN KEY (source_id) REFERENCES nodes(id), FOREIGN KEY (target_id) REFERENCES nodes(id) ) """) # Documents table cursor.execute(""" CREATE TABLE IF NOT EXISTS documents ( id TEXT PRIMARY KEY, title TEXT, content TEXT, metadata TEXT, FOREIGN KEY (id) REFERENCES nodes(id) ) """) # Document chunks table with embeddings cursor.execute(""" CREATE TABLE IF NOT EXISTS document_chunks ( id TEXT PRIMARY KEY, document_id TEXT NOT NULL, chunk_index INTEGER NOT NULL, content TEXT NOT NULL, embedding BLOB, FOREIGN KEY (id) REFERENCES nodes(id), FOREIGN KEY (document_id) REFERENCES documents(id) ) """) # Indexes for faster queries cursor.execute("CREATE INDEX IF NOT EXISTS idx_relationships_source ON relationships(source_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_relationships_target ON relationships(target_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_relationships_type ON relationships(rel_type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_nodes_type ON nodes(type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_chunks_document ON document_chunks(document_id)") self.conn.commit() def add_node(self, node_id: str, node_type: str, properties: Dict[str, Any] = None): """Add a node to the knowledge graph.""" cursor = self.conn.cursor() props_json = json.dumps(properties or {}) cursor.execute( "INSERT OR REPLACE INTO nodes (id, type, properties) VALUES (?, ?, ?)", (node_id, node_type, props_json) ) self.conn.commit() def add_relationship(self, source_id: str, target_id: str, rel_type: str, properties: Dict[str, Any] = None): """Add a relationship between two nodes.""" cursor = self.conn.cursor() props_json = json.dumps(properties or {}) cursor.execute( "INSERT INTO relationships (source_id, target_id, rel_type, properties) VALUES (?, ?, ?, ?)", (source_id, target_id, rel_type, props_json) ) self.conn.commit() def add_document(self, doc_id: str, title: str, content: str, metadata: Dict[str, Any] = None, chunk_size: int = 500): """Add a document and split it into chunks with embeddings.""" # Add document node self.add_node(doc_id, "Document", {"title": title}) # Add document details cursor = self.conn.cursor() metadata_json = json.dumps(metadata or {}) cursor.execute( "INSERT OR REPLACE INTO documents (id, title, content, metadata) VALUES (?, ?, ?, ?)", (doc_id, title, content, metadata_json) ) # Split into chunks and add them chunks = self._split_into_chunks(content, chunk_size) for idx, chunk in enumerate(chunks): chunk_id = f"{doc_id}_chunk_{idx}" self.add_document_chunk(chunk_id, doc_id, idx, chunk) self.conn.commit() def add_document_chunk(self, chunk_id: str, document_id: str, chunk_index: int, content: str, embedding: Optional[np.ndarray] = None): """Add a document chunk with optional embedding.""" # Add chunk node self.add_node(chunk_id, "DocumentChunk", { "document_id": document_id, "chunk_index": chunk_index }) # Add relationship to document self.add_relationship(chunk_id, document_id, "PART_OF") # Add chunk details cursor = self.conn.cursor() embedding_blob = None if embedding is not None: embedding_blob = embedding.tobytes() cursor.execute( "INSERT OR REPLACE INTO document_chunks (id, document_id, chunk_index, content, embedding) VALUES (?, ?, ?, ?, ?)", (chunk_id, document_id, chunk_index, content, embedding_blob) ) self.conn.commit() def _split_into_chunks(self, text: str, chunk_size: int) -> List[str]: """Split text into chunks of approximately chunk_size characters.""" words = text.split() chunks = [] current_chunk = [] current_size = 0 for word in words: word_size = len(word) + 1 # +1 for space if current_size + word_size > chunk_size and current_chunk: chunks.append(' '.join(current_chunk)) current_chunk = [word] current_size = word_size else: current_chunk.append(word) current_size += word_size if current_chunk: chunks.append(' '.join(current_chunk)) return chunks def generate_embedding(self, text: str, dim: int = 128) -> np.ndarray: """Generate a simple embedding (placeholder - use real embeddings in production).""" # This is a simple hash-based embedding for demonstration # In production, use sentence-transformers, OpenAI, or similar np.random.seed(hash(text) % (2**32)) return np.random.randn(dim).astype(np.float32) def update_chunk_embeddings(self, embedding_dim: int = 128): """Generate and update embeddings for all document chunks.""" cursor = self.conn.cursor() cursor.execute("SELECT id, content FROM document_chunks") for row in cursor.fetchall(): chunk_id, content = row embedding = self.generate_embedding(content, embedding_dim) self.conn.execute( "UPDATE document_chunks SET embedding = ? WHERE id = ?", (embedding.tobytes(), chunk_id) ) self.conn.commit() def similarity_search(self, query: str, top_k: int = 5, embedding_dim: int = 128) -> List[Dict]: """Find most similar document chunks to a query.""" query_embedding = self.generate_embedding(query, embedding_dim) cursor = self.conn.cursor() cursor.execute("SELECT id, document_id, content, embedding FROM document_chunks WHERE embedding IS NOT NULL") results = [] for row in cursor.fetchall(): chunk_id, doc_id, content, embedding_blob = row if embedding_blob: chunk_embedding = np.frombuffer(embedding_blob, dtype=np.float32) similarity = np.dot(query_embedding, chunk_embedding) / ( np.linalg.norm(query_embedding) * np.linalg.norm(chunk_embedding) ) results.append({ 'chunk_id': chunk_id, 'document_id': doc_id, 'content': content, 'similarity': float(similarity) }) results.sort(key=lambda x: x['similarity'], reverse=True) return results[:top_k] def cypher_query(self, query: str) -> List[Dict[str, Any]]: """Execute a Cypher-like query (simplified implementation).""" parser = CypherParser(self) return parser.execute(query) def load_from_csv(self, csv_path: str, node_col: str = "id", type_col: str = "type", rel_source_col: str = None, rel_target_col: str = None, rel_type_col: str = None): """Load nodes and relationships from CSV.""" with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) rows = list(reader) # Load nodes for row in rows: if node_col in row and type_col in row: node_id = row[node_col] node_type = row[type_col] properties = {k: v for k, v in row.items() if k not in [node_col, type_col, rel_source_col, rel_target_col, rel_type_col]} self.add_node(node_id, node_type, properties) # Load relationships if specified if rel_source_col and rel_target_col and rel_type_col: for row in rows: if all(col in row for col in [rel_source_col, rel_target_col, rel_type_col]): source = row[rel_source_col] target = row[rel_target_col] rel_type = row[rel_type_col] if source and target and rel_type: properties = {k: v for k, v in row.items() if k not in [node_col, type_col, rel_source_col, rel_target_col, rel_type_col]} self.add_relationship(source, target, rel_type, properties) def get_node(self, node_id: str) -> Optional[Dict]: """Get a node by ID.""" cursor = self.conn.cursor() cursor.execute("SELECT * FROM nodes WHERE id = ?", (node_id,)) row = cursor.fetchone() if row: return { 'id': row['id'], 'type': row['type'], 'properties': json.loads(row['properties']) } return None def close(self): """Close the database connection.""" self.conn.close() class CypherParser: """Simplified Cypher query parser for multi-hop queries.""" def __init__(self, kg: KnowledgeGraph): self.kg = kg def execute(self, query: str) -> List[Dict[str, Any]]: """Execute a Cypher-like query.""" query = query.strip() # MATCH (a:Type)-[r:REL_TYPE]->(b:Type) WHERE ... RETURN ... match_pattern = r'MATCH\s+(.+?)(?:WHERE\s+(.+?))?(?:RETURN\s+(.+?))?(?:LIMIT\s+(\d+))?$' match = re.search(match_pattern, query, re.IGNORECASE | re.DOTALL) if not match: return [] path_pattern = match.group(1).strip() where_clause = match.group(2).strip() if match.group(2) else None return_clause = match.group(3).strip() if match.group(3) else "*" limit = int(match.group(4)) if match.group(4) else None # Parse the path pattern results = self._execute_path_query(path_pattern, where_clause, return_clause, limit) return results def _execute_path_query(self, path_pattern: str, where_clause: Optional[str], return_clause: str, limit: Optional[int]) -> List[Dict]: """Execute a path-based query.""" # Parse pattern like (a:Person)-[:KNOWS]->(b:Person)-[:WORKS_AT]->(c:Company) node_pattern = r'\((\w+)(?::(\w+))?\)' rel_pattern = r'-\[:(\w+)\]->' nodes = re.findall(node_pattern, path_pattern) rels = re.findall(rel_pattern, path_pattern) if not nodes: return [] # Start with first node var_name, node_type = nodes[0] cursor = self.kg.conn.cursor() if node_type: cursor.execute("SELECT * FROM nodes WHERE type = ?", (node_type,)) else: cursor.execute("SELECT * FROM nodes") start_nodes = cursor.fetchall() results = [] # For each starting node, traverse the path for start_node in start_nodes: path_results = self._traverse_path(start_node, nodes, rels, 0) results.extend(path_results) # Apply WHERE filter if where_clause: results = self._apply_where_filter(results, where_clause) # Apply RETURN projection results = self._apply_return_projection(results, return_clause, nodes) # Apply LIMIT if limit: results = results[:limit] return results def _traverse_path(self, current_node: sqlite3.Row, nodes: List[Tuple], rels: List[str], depth: int) -> List[Dict]: """Recursively traverse the path pattern.""" if depth >= len(nodes) - 1: # Base case: reached end of path node_dict = { 'id': current_node['id'], 'type': current_node['type'], 'properties': json.loads(current_node['properties']) } var_name = nodes[depth][0] return [{var_name: node_dict}] # Get relationships from current node rel_type = rels[depth] if depth < len(rels) else None next_var_name, next_node_type = nodes[depth + 1] cursor = self.kg.conn.cursor() if rel_type: cursor.execute( "SELECT * FROM relationships WHERE source_id = ? AND rel_type = ?", (current_node['id'], rel_type) ) else: cursor.execute( "SELECT * FROM relationships WHERE source_id = ?", (current_node['id'],) ) relationships = cursor.fetchall() all_results = [] for rel in relationships: # Get target node cursor.execute("SELECT * FROM nodes WHERE id = ?", (rel['target_id'],)) target_node = cursor.fetchone() if not target_node: continue # Check if target node matches type if next_node_type and target_node['type'] != next_node_type: continue # Recursively traverse sub_results = self._traverse_path(target_node, nodes, rels, depth + 1) # Add current node to each result current_var_name = nodes[depth][0] current_dict = { 'id': current_node['id'], 'type': current_node['type'], 'properties': json.loads(current_node['properties']) } for sub_result in sub_results: result = {current_var_name: current_dict} result.update(sub_result) all_results.append(result) return all_results def _apply_where_filter(self, results: List[Dict], where_clause: str) -> List[Dict]: """Apply WHERE clause filtering.""" filtered = [] for result in results: # Simple property filter: a.name = "John" if self._evaluate_where(result, where_clause): filtered.append(result) return filtered def _evaluate_where(self, result: Dict, where_clause: str) -> bool: """Evaluate WHERE clause for a single result.""" # Simple implementation for property equality # Example: a.name = "John" or a.age > 25 try: # Replace variable.property with actual values for var_name, node_data in result.items(): properties = node_data.get('properties', {}) for prop_name, prop_value in properties.items(): placeholder = f"{var_name}.{prop_name}" if isinstance(prop_value, str): where_clause = where_clause.replace(placeholder, f'"{prop_value}"') else: where_clause = where_clause.replace(placeholder, str(prop_value)) # Evaluate the condition return eval(where_clause.replace('=', '==')) except: return True def _apply_return_projection(self, results: List[Dict], return_clause: str, nodes: List[Tuple]) -> List[Dict]: """Apply RETURN clause projection.""" if return_clause == "*": return results # Parse return items: a, b.name, c return_items = [item.strip() for item in return_clause.split(',')] projected = [] for result in results: projected_result = {} for item in return_items: if '.' in item: # Return specific property: a.name var_name, prop_name = item.split('.') if var_name in result: projected_result[item] = result[var_name].get('properties', {}).get(prop_name) else: # Return entire node if item in result: projected_result[item] = result[item] projected.append(projected_result) return projected # Example usage if __name__ == "__main__": # Initialize knowledge graph kg = KnowledgeGraph("example_kg.db") # Add some sample data kg.add_node("person1", "Person", {"name": "Alice", "age": 30}) kg.add_node("person2", "Person", {"name": "Bob", "age": 25}) kg.add_node("company1", "Company", {"name": "TechCorp"}) kg.add_relationship("person1", "person2", "KNOWS", {"since": 2020}) kg.add_relationship("person1", "company1", "WORKS_AT", {"role": "Engineer"}) kg.add_relationship("person2", "company1", "WORKS_AT", {"role": "Designer"}) # Add a document kg.add_document( "doc1", "AI Research Paper", "Artificial intelligence is transforming the world. Machine learning enables computers to learn from data. " "Deep learning uses neural networks with multiple layers. Natural language processing helps computers understand human language.", {"author": "Dr. Smith", "year": 2024} ) # Generate embeddings for chunks kg.update_chunk_embeddings() # Perform similarity search print("\n=== Similarity Search ===") results = kg.similarity_search("neural networks and learning", top_k=3) for i, result in enumerate(results, 1): print(f"{i}. Similarity: {result['similarity']:.4f}") print(f" Content: {result['content'][:100]}...") # Execute Cypher queries print("\n=== Cypher Query: Find people who know each other ===") results = kg.cypher_query("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b") for result in results: print(result) print("\n=== Cypher Query: Multi-hop - People who know someone at a company ===") results = kg.cypher_query("MATCH (a:Person)-[:KNOWS]->(b:Person)-[:WORKS_AT]->(c:Company) RETURN a.name, b.name, c.name") for result in results: print(result) # Load from CSV (example) print("\n=== Load from CSV (create sample CSV first) ===") sample_csv = "sample_data.csv" with open(sample_csv, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['id', 'type', 'name', 'source_id', 'target_id', 'rel_type']) writer.writerow(['person3', 'Person', 'Charlie', '', '', '']) writer.writerow(['person4', 'Person', 'Diana', 'person3', 'person4', 'KNOWS']) kg.load_from_csv(sample_csv, node_col='id', type_col='type', rel_source_col='source_id', rel_target_col='target_id', rel_type_col='rel_type') print("Data loaded from CSV successfully!") kg.close()

I've created a comprehensive Python knowledge graph application with the following features:

Key Features:

  1. SQLite3-based Knowledge Graph
    • Nodes with types and properties
    • Relationships with types and properties
    • Efficient indexing for fast queries
  2. Document Support
    • Documents as nodes in the graph
    • Automatic text chunking
    • Document chunks as separate nodes linked to parent documents
  3. Embeddings System
    • Storage of embeddings as binary blobs in SQLite
    • generate_embedding() method (uses simple hash-based embeddings - replace with real ones)
    • similarity_search() for semantic search using cosine similarity
    • update_chunk_embeddings() to generate embeddings for all chunks
  4. CSV Import
    • load_from_csv() method to bulk import nodes and relationships
    • Flexible column mapping
  5. Multi-hop Cypher Queries
    • Simplified Cypher query parser
    • Supports path patterns: MATCH (a)-[:REL]->(b)-[:REL2]->(c)
    • WHERE clause filtering
    • RETURN projection
    • LIMIT support

Example Queries:

# Single hop
kg.cypher_query("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")

# Multi-hop
kg.cypher_query("MATCH (a:Person)-[:KNOWS]->(b:Person)-[:WORKS_AT]->(c:Company) RETURN a.name, b.name, c.name")

# With filtering
kg.cypher_query("MATCH (a:Person)-[:WORKS_AT]->(c:Company) WHERE a.age > 25 RETURN a, c LIMIT 10")

Production Notes:

  • Replace generate_embedding() with real embeddings (e.g., sentence-transformers, OpenAI)
  • Add more sophisticated Cypher query features as needed
  • Consider adding full-text search on document content
  • Add transaction support for bulk operations

The system is ready to use and can be extended with more advanced features!


From <https://claude.ai/chat/1f2d0118-01ef-4c59-88dd-fd5f9007a2b6>  Claude Code

Comments

Popular posts from this blog

#2892 Example of ChatGPT o1 Prompt

#3105 Federal and State Spending Idiocy