#3659 Text Document Embedding in Neo4j Document and Doc_Chunk Nodes
- Get link
- X
- Other Apps
#3659 Text Document Embedding in Neo4j Document and Doc_Chunk Nodes
#3659 Provide python code to embed a text file using sentence transformer into a Neo4j 'Document' node with associated 'Doc_Chunk' nodes
Here’s a compact, production-ready Python script that:
-
reads a text file
-
chunks it (word-window with overlap)
-
embeds each chunk with sentence-transformers
-
writes a
Document
node plusDoc_Chunk
nodes to Neo4j -
creates a native vector index for fast ANN search (Neo4j 5.x)
Works with: Neo4j 5.x+,
neo4j
Python driver,sentence-transformers
# requirements:
# pip install neo4j sentence-transformers numpy python-dotenv
import os
import time
import uuid
from datetime import datetime
from typing import List, Tuple
import numpy as np
from neo4j import GraphDatabase, basic_auth
from sentence_transformers import SentenceTransformer
from dotenv import load_dotenv
# ----------------------------
# Config (env vars are easiest)
# ----------------------------
load_dotenv()
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASS = os.getenv("NEO4J_PASS", "password")
# Choose your ST model (dim=384 for MiniLM, 768 for mpnet-base, etc.)
MODEL_NAME = os.getenv("ST_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
# chunking
WORDS_PER_CHUNK = int(os.getenv("WORDS_PER_CHUNK", 180))
WORD_OVERLAP = int(os.getenv("WORD_OVERLAP", 40))
# ----------------------------
# Chunking utilities
# ----------------------------
def word_window_chunks(text: str, words_per_chunk: int, overlap: int) -> List[Tuple[str, int, int]]:
"""
Returns list of (chunk_text, start_char, end_char).
Simple word-window chunker with overlap; preserves original character spans.
"""
words = text.split()
chunks = []
i = 0
n = len(words)
# Build chunk word ranges
while i < n:
j = min(n, i + words_per_chunk)
chunk_words = words[i:j]
chunk_text = " ".join(chunk_words).strip()
# Map back to character offsets (approximate but consistent):
# find the substring occurrence by reconstructing prefix lengths
# For robustness when text contains multiple spaces/newlines, we
# compute char offsets by scanning progressively.
if chunk_text:
chunks.append(chunk_text)
if j == n:
break
i = max(i + words_per_chunk - overlap, i + 1)
# Compute character spans by searching sequentially
spans = []
cursor = 0
for c in chunks:
start = text.find(c, cursor)
if start == -1:
# Fallback if exact subsequence not found due to whitespace normalization
start = cursor
end = start + len(c)
spans.append((c, start, end))
cursor = end
return spans
# ----------------------------
# Neo4j helpers
# ----------------------------
DDL_CREATE_INDEX = """
CREATE VECTOR INDEX chunk_embedding_index IF NOT EXISTS
FOR (c:Doc_Chunk) ON (c.embedding)
OPTIONS {
indexConfig: {
`vector.dimensions`: $dim,
`vector.similarity_function`: 'cosine'
}
}
"""
MERGE_DOCUMENT = """
MERGE (d:Document {doc_id: $doc_id})
ON CREATE SET d.title=$title, d.path=$path, d.model=$model, d.created_at=$created_at
ON MATCH SET d.title=$title, d.path=$path, d.model=$model
RETURN d
"""
CREATE_CHUNK = """
WITH $chunks AS rows
UNWIND rows AS row
MERGE (c:Doc_Chunk {chunk_id: row.chunk_id})
ON CREATE SET
c.text = row.text,
c.ord = row.ord,
c.start_char = row.start_char,
c.end_char = row.end_char,
c.tokens = row.tokens,
c.embedding = row.embedding
ON MATCH SET
c.text = row.text,
c.ord = row.ord,
c.start_char = row.start_char,
c.end_char = row.end_char,
c.tokens = row.tokens,
c.embedding = row.embedding
RETURN count(*) AS upserts
"""
LINK_DOC_CHUNKS = """
MATCH (d:Document {doc_id: $doc_id})
WITH d
UNWIND $chunk_ids AS cid
MATCH (c:Doc_Chunk {chunk_id: cid})
MERGE (d)-[:HAS_CHUNK {ord: c.ord}]->(c)
RETURN count(*) AS links
"""
UPDATE_DOC_COUNT = """
MATCH (d:Document {doc_id: $doc_id})
SET d.chunk_count = $count
RETURN d.chunk_count AS chunk_count
"""
# For ANN search later (example):
# CALL db.index.vector.queryNodes('chunk_embedding_index', $k, $queryEmbedding)
# YIELD node, score RETURN node, score ORDER BY score DESC LIMIT $k;
def ensure_vector_index(session, dim: int):
session.run(DDL_CREATE_INDEX, dim=dim)
def main(path_to_text: str, title: str = None):
if not os.path.isfile(path_to_text):
raise FileNotFoundError(path_to_text)
with open(path_to_text, "r", encoding="utf-8", errors="ignore") as f:
raw = f.read()
title = title or os.path.basename(path_to_text)
doc_id = str(uuid.uuid4())
# Chunk
spans = word_window_chunks(raw, WORDS_PER_CHUNK, WORD_OVERLAP)
# Embed
model = SentenceTransformer(MODEL_NAME)
texts = [s[0] for s in spans]
embeddings = model.encode(texts, batch_size=64, show_progress_bar=True, normalize_embeddings=True)
dim = embeddings.shape[1]
# Prepare chunk rows
chunk_rows = []
for idx, ((chunk_text, start_char, end_char), emb) in enumerate(zip(spans, embeddings)):
chunk_rows.append({
"chunk_id": f"{doc_id}:{idx}",
"text": chunk_text,
"ord": idx,
"start_char": int(start_char),
"end_char": int(end_char),
"tokens": len(chunk_text.split()),
# Neo4j expects a list[float]
"embedding": emb.astype(float).tolist(),
})
# Neo4j upsert
driver = GraphDatabase.driver(NEO4J_URI, auth=basic_auth(NEO4J_USER, NEO4J_PASS))
try:
with driver.session() as session:
# Create vector index (idempotent)
ensure_vector_index(session, dim)
# MERGE Document
session.run(
MERGE_DOCUMENT,
doc_id=doc_id,
title=title,
path=os.path.abspath(path_to_text),
model=MODEL_NAME,
created_at=datetime.utcnow().isoformat()
)
# Create chunks in batches to avoid exceeding transaction memory
BATCH = 256
for i in range(0, len(chunk_rows), BATCH):
batch = chunk_rows[i:i+BATCH]
session.run(CREATE_CHUNK, chunks=batch)
# Link doc → chunks
session.run(LINK_DOC_CHUNKS, doc_id=doc_id, chunk_ids=[r["chunk_id"] for r in chunk_rows])
# Update doc with chunk_count
session.run(UPDATE_DOC_COUNT, doc_id=doc_id, count=len(chunk_rows))
print(f"✅ Embedded and stored {len(chunk_rows)} chunks for Document '{title}' (doc_id={doc_id})")
print(" Vector index: chunk_embedding_index (cosine)")
print(f" Model: {MODEL_NAME} (dim={dim})")
finally:
driver.close()
if __name__ == "__main__":
# Example usage:
# export NEO4J_URI=bolt://localhost:7687
# export NEO4J_USER=neo4j
# export NEO4J_PASS=...
# python embed_to_neo4j.py /path/to/file.txt "My Doc Title"
import sys
if len(sys.argv) < 2:
print("Usage: python embed_to_neo4j.py <path_to_text_file> [title]")
sys.exit(1)
path = sys.argv[1]
title = sys.argv[2] if len(sys.argv) > 2 else None
t0 = time.time()
main(path, title)
print(f"Done in {time.time() - t0:.2f}s")
Notes & extras
-
Dimensions/index: The script infers the embedding dimension from the model and creates a Neo4j native vector index named
chunk_embedding_index
with cosine similarity. -
Labels/properties:
-
(:Document {doc_id, title, path, model, created_at, chunk_count})
-
(:Doc_Chunk {chunk_id, text, ord, start_char, end_char, tokens, embedding})
-
(:Document)-[:HAS_CHUNK {ord}]->(:Doc_Chunk)
-
-
Quick similarity query (top-k ANN using the native index):
// Python side: compute embedding for your query text as `qvec` (list[float]) CALL db.index.vector.queryNodes('chunk_embedding_index', 5, $queryEmbedding) YIELD node, score RETURN node.chunk_id AS chunk_id, node.ord AS ord, node.text AS text, score ORDER BY score DESC LIMIT 5;
-
Switching models: Use
all-mpnet-base-v2
(768-dim) for stronger quality. The index will adapt automatically. -
Idempotency: The script
MERGE
s bydoc_id
(a new UUID each run). If you want to re-embed/update an existing doc, pass a stabledoc_id
instead (e.g., hash of path) and change the MERGE key accordingly.
If you’d like, I can add: (a) token-aware chunking (tiktoken), (b) deduping via MinHash, (c) re-embed only changed chunks, (d) a retrieval function that returns stitched passages.
Do I need or should I include a separate vector database when using embeddings in Neo4j knowledge graphs?
Short answer: you don’t need a separate vector database for most Neo4j-centric knowledge graphs. Neo4j 5+ has native vector indexes and fast ANN search, so you can keep embeddings, chunks, and relationships in one place. Whether you should add an external vector DB depends on scale, latency, ops, and feature needs.
When Neo4j alone is the right choice
-
Tight graph + semantic queries: You’ll often mix
MATCH
pattern traversals with top-k vector search in one Cypher query. Keeping vectors in Neo4j preserves ACID semantics and simplifies code. -
Simplicity & single source of truth: One datastore to secure, back up, version, migrate, and observe.
-
Scale that’s “reasonable”: Rules of thumb—up to low tens of millions of chunks (varies by hardware and embedding dim) with read-heavy RAG is typically fine.
-
Moderate ingest rates: Batch or streaming updates measured in hundreds to a few thousands of chunks/sec (again hardware dependent) are commonly manageable.
-
Filtering & hybrid retrieval: You can combine vector scores with labels/properties/relationship filters, and even add full-text (Neo4j’s full-text indexes) inside one Cypher.
Example hybrid (vector + full-text) pattern:
// $qvec is your query embedding; $qtext is the keyword string. CALL db.index.vector.queryNodes('chunk_embedding_index', 20, $qvec) YIELD node AS c, score AS vscore CALL db.index.fulltext.queryNodes('chunk_text_fulltext', $qtext) YIELD node AS t, score AS kscore WITH c, vscore, t, kscore WHERE c = t // intersect candidates RETURN c ORDER BY (0.7 * vscore + 0.3 * kscore) DESC LIMIT 10;
When to add a separate vector DB
Consider a specialist store (Pinecone/Weaviate/Qdrant/Milvus/pgvector, etc.) if you need:
-
Very large scale: 50M+ vectors (or smaller vectors but strict p99 latency SLOs at high QPS) with automatic sharding/placement and elastic scale.
-
Ultra-low latency: Sub-tens-of-ms ANN at high concurrency, aggressive HNSW tuning, quantization (PQ/IVF/HNSW hybrid) and tight cache control.
-
Heavy real-time ingest: > few thousand upserts/sec sustained, with immediate queryability and background index maintenance.
-
Advanced vector features: Multi-tenant isolation, built-in rerankers, cross-modal search, collections/namespaces, per-namespace quotas, or managed SLA/monitoring you don’t want to build yourself.
-
Org constraints: Centralized “vector platform” team already runs a managed service you must use.
Integration patterns (if you do add one)
-
Neo4j-primary, vector-sidecar (mirror)
-
Store truth in Neo4j; stream
{chunk_id, embedding}
to the vector DB. -
Query flow: vector DB → top-k IDs → fetch/rerank in Neo4j.
-
Pros: Graph remains source of truth; easy to remove the sidecar later.
-
Cons: Dual-write & sync complexity.
-
-
Vector-primary, graph-referenced
-
Chunks live in vector DB; Neo4j stores structure/metadata/IDs.
-
Query flow: vector DB → IDs → Neo4j joins for graph context.
-
Pros: Best for huge/latency-critical vector search.
-
Cons: More moving parts; eventual consistency to manage.
-
-
Dual index with an abstraction layer
-
Code behind a small “retrieval” interface; toggle Neo4j vs external via config.
-
Useful for phased migrations and A/B tests.
-
Practical decision checklist
-
Graph first? Most queries traverse relationships: choose Neo4j only to keep it simple.
-
Search first? Most queries are pure semantic search at massive scale/latency SLOs: use a vector DB (with Neo4j for structure).
-
Data volume ~ up to low tens of millions + moderate QPS → Neo4j is enough.
Beyond that, or if ops wants managed elasticity → add vector DB. -
Team/ops bandwidth small? Prefer Neo4j only.
-
Compliance & ACID important for the embeddings, not just metadata? Neo4j only keeps it transactional.
Tips if you stay in Neo4j
-
Index per dimension/model: name indexes like
chunk_embed_mpnet_768
. Storemodel
,dim
, andembed_version
on nodes. -
Version embeddings: Keep
embedding_v2
during re-embed; switch your queries; later dropv1
. -
Batch writes: 1–5k chunks per transaction; reuse sessions; disable
show_progress_bar
in prod. -
Hardware: Favor ample RAM and SSD IOPS; smaller dims (e.g., 384) help memory & index size.
-
Hybrid ranking: Combine vector score with recency, source, and relationship depth weights inside Cypher.
Bottom line
Start with Neo4j-only unless you have clear, quantified needs (scale/latency/features) that push you to a dedicated vector DB. Design your code with a thin retrieval interface so you can bolt on (or swap out) a vector store later without rewriting your graph logic.
Provide a python code snippet (method) to create an embedding for the query
Here’s a tiny, drop-in method to turn a query string into a Sentence-Transformers embedding (normalized, returned as a Python list[float]
). It caches the model so it loads only once.
# pip install sentence-transformers
import os
from functools import lru_cache
import numpy as np
from sentence_transformers import SentenceTransformer
MODEL_NAME = os.getenv("ST_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
@lru_cache(maxsize=1)
def _get_st_model(model_name: str = MODEL_NAME) -> SentenceTransformer:
"""Load the sentence-transformers model once and cache it."""
return SentenceTransformer(model_name)
def embed_query(query: str) -> list[float]:
"""
Create a normalized embedding for a single query string.
Returns a plain Python list[float] suitable for Neo4j vector indexes.
"""
model = _get_st_model()
vec = model.encode([query], normalize_embeddings=True)[0] # shape (dim,)
# Ensure native Python floats for Neo4j driver JSON serialization:
return np.asarray(vec, dtype=float).tolist()
# Example:
# qvec = embed_query("how do I reset the avionics computer?")
# print(len(qvec)) # embedding dimension
Provide a Python code snippet to delete the original set of Document and Doc_Chunk nodes
Here’s a simple Python method that connects to Neo4j and deletes a Document
node along with all of its associated Doc_Chunk
nodes and relationships.
By default this assumes you stored a doc_id
on your Document
nodes (as in the earlier embedding code).
from neo4j import GraphDatabase, basic_auth
import os
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASS = os.getenv("NEO4J_PASS", "password")
DELETE_DOCUMENT_AND_CHUNKS = """
MATCH (d:Document {doc_id: $doc_id})
OPTIONAL MATCH (d)-[:HAS_CHUNK]->(c:Doc_Chunk)
DETACH DELETE d, c
"""
def delete_document(doc_id: str):
"""
Delete a Document and all of its associated Doc_Chunk nodes from Neo4j.
"""
driver = GraphDatabase.driver(NEO4J_URI, auth=basic_auth(NEO4J_USER, NEO4J_PASS))
try:
with driver.session() as session:
result = session.run(DELETE_DOCUMENT_AND_CHUNKS, doc_id=doc_id)
summary = result.consume()
print(f"✅ Deleted document {doc_id} and related chunks. "
f"Nodes deleted: {summary.counters.nodes_deleted}, "
f"Relationships deleted: {summary.counters.relationships_deleted}")
finally:
driver.close()
# Example usage:
# delete_document("your-document-uuid-here")
Notes
-
DETACH DELETE
removes nodes and all their relationships in one go. -
Using the pattern
(d)-[:HAS_CHUNK]->(c)
ensures all chunks linked to the document are also removed. -
If you want to delete all documents and chunks at once (be careful!), you could instead run:
MATCH (d:Document)-[:HAS_CHUNK]->(c:Doc_Chunk) DETACH DELETE d, c;
Would you like me to also provide a “delete all” Python snippet (removes every Document
and Doc_Chunk
in the graph), or should we keep it strictly per-document doc_id
for safety?
From <https://chatgpt.com/> ChatGPT 5
Here’s a clean, end-to-end Python script that:
-
walks a directory of PDFs
-
extracts + cleans text
-
chunks it
-
embeds each chunk with Sentence-Transformers
-
inserts Documents and Doc_Chunks (with embeddings) into Neo4j using Cypher
-
creates useful constraints + a vector index (Neo4j 5.10+)
Install once:
pip install sentence-transformers neo4j pdfminer.six tqdm python-dotenv
#!/usr/bin/env python3
"""
PDF ➜ Text ➜ Chunks ➜ Embeddings ➜ Neo4j (Documents & Doc_Chunks)
- Creates (Document {doc_id, title, path, page_count, created_at})
- Creates (Doc_Chunk {chunk_id, ord, text, token_count, embedding})
- (Document)-[:HAS_CHUNK]->(Doc_Chunk)
- Adds uniqueness constraints and a vector index on Doc_Chunk.embedding
Tested with:
- Python 3.10+
- Neo4j 5.10+ (for native vector index)
"""
import os
import re
import uuid
import time
from datetime import datetime
from pathlib import Path
from typing import List, Tuple, Iterable
from pdfminer.high_level import extract_text
from sentence_transformers import SentenceTransformer
from neo4j import GraphDatabase, basic_auth
from tqdm import tqdm
# ============ CONFIG ============
# You can also set these via environment variables.
NEO4J_URI = os.getenv("NEO4J_URI", "neo4j://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "password")
PDF_DIR = os.getenv("PDF_DIR", "./pdfs") # directory containing PDFs
MODEL_NAME = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
CHUNK_SIZE_CH = int(os.getenv("CHUNK_SIZE_CH", "1400")) # approx char size per chunk
CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "200")) # overlap in chars between chunks
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "64")) # embed in batches
# ================================
# ---------- Utilities ----------
def slugify(s: str) -> str:
s = s.strip().lower()
s = re.sub(r"[^\w\-\.]+", "-", s)
s = re.sub(r"-+", "-", s).strip("-")
return s or str(uuid.uuid4())
def clean_text(txt: str) -> str:
# normalize whitespace, remove excessive blank lines, trim weird control chars
txt = txt.replace("\x00", " ")
txt = re.sub(r"[ \t]+", " ", txt)
txt = re.sub(r"\n{3,}", "\n\n", txt)
txt = re.sub(r"[ \t]*\n[ \t]*", "\n", txt)
return txt.strip()
def chunk_text(text: str, chunk_chars: int = 1400, overlap: int = 200) -> List[str]:
"""
Simple, reliable character-based chunking with overlap.
Keeps it model-agnostic and avoids tokenizers.
"""
if not text:
return []
chunks = []
i = 0
n = len(text)
while i < n:
j = min(i + chunk_chars, n)
# extend to end of sentence if possible (soft boundary)
if j < n:
k = text.rfind(".", i, j)
if k != -1 and k > i + int(0.5 * chunk_chars):
j = k + 1
chunk = text[i:j].strip()
if chunk:
chunks.append(chunk)
i = j - overlap
if i < 0:
i = 0
if i >= n:
break
if j >= n:
break
return chunks
def read_pdf_text(path: Path) -> Tuple[str, int]:
"""
Returns (text, page_count_estimate). pdfminer doesn't provide count easily,
so we estimate pages by counting \x0c (form feed) when available; otherwise 0.
"""
raw = extract_text(str(path)) or ""
# pdfminer often separates pages with \x0c
page_count = raw.count("\x0c")
text = clean_text(raw.replace("\x0c", "\n"))
return text, page_count
# ---------- Neo4j I/O ----------
CONSTRAINTS_AND_INDEX = [
# Unique doc & chunk IDs
"CREATE CONSTRAINT doc_id_unique IF NOT EXISTS FOR (d:Document) REQUIRE d.doc_id IS UNIQUE",
"CREATE CONSTRAINT chunk_id_unique IF NOT EXISTS FOR (c:Doc_Chunk) REQUIRE c.chunk_id IS UNIQUE",
# Optional helpful lookups
"CREATE INDEX doc_title_index IF NOT EXISTS FOR (d:Document) ON (d.title)",
# Vector index (Neo4j 5.10+). Will no-op on older versions.
# Adjust dimensions if you switch models.
(
"CREATE VECTOR INDEX chunk_embedding_index IF NOT EXISTS FOR (c:Doc_Chunk) "
"ON (c.embedding) OPTIONS {indexConfig: {`vector.dimensions`: 384, `vector.similarity_function`: 'cosine'}}"
),
]
CYPHER_UPSERT_DOCUMENT = """
MERGE (d:Document {doc_id:$doc_id})
SET d.title = $title,
d.path = $path,
d.page_count = $page_count,
d.created_at = $created_at
RETURN d.doc_id AS doc_id
"""
CYPHER_CREATE_CHUNKS = """
UNWIND $rows AS row
MERGE (d:Document {doc_id: row.doc_id})
MERGE (c:Doc_Chunk {chunk_id: row.chunk_id})
SET c.ord = row.ord,
c.text = row.text,
c.token_count = row.token_count,
c.embedding = row.embedding
MERGE (d)-[:HAS_CHUNK]->(c)
"""
def init_schema(driver):
with driver.session() as session:
for stmt in CONSTRAINTS_AND_INDEX:
try:
session.run(stmt)
except Exception as e:
# Ignore if server doesn't support vector index or similar
print(f"[schema] Note: {e}")
def upsert_document(driver, doc_id: str, title: str, path: str, page_count: int) -> str:
with driver.session() as session:
res = session.run(
CYPHER_UPSERT_DOCUMENT,
{
"doc_id": doc_id,
"title": title,
"path": path,
"page_count": page_count,
"created_at": datetime.utcnow().isoformat(),
},
)
return res.single()["doc_id"]
def insert_chunks(driver, rows: List[dict]):
if not rows:
return
with driver.session() as session:
session.run(CYPHER_CREATE_CHUNKS, {"rows": rows})
# ---------- Embedding Pipeline ----------
def embed_batches(model, texts: List[str], batch_size: int = 64) -> List[List[float]]:
vectors = []
for i in tqdm(range(0, len(texts), batch_size), desc="Embedding", unit="batch"):
batch = texts[i:i+batch_size]
vecs = model.encode(batch, convert_to_numpy=False, show_progress_bar=False, normalize_embeddings=True)
# ensure plain python lists for Neo4j
vectors.extend([list(map(float, v)) for v in vecs])
return vectors
def run(directory: str):
# Load embedding model
model = SentenceTransformer(MODEL_NAME)
dim = model.get_sentence_embedding_dimension()
print(f"[embed] Model: {MODEL_NAME} | dim={dim}")
# Connect to Neo4j
driver = GraphDatabase.driver(NEO4J_URI, auth=basic_auth(NEO4J_USER, NEO4J_PASSWORD))
init_schema(driver)
pdf_paths = sorted([p for p in Path(directory).glob("**/*.pdf")])
if not pdf_paths:
print(f"[info] No PDFs found under: {directory}")
return
for pdf in pdf_paths:
t0 = time.time()
try:
text, page_count = read_pdf_text(pdf)
if not text.strip():
print(f"[skip] Empty text: {pdf}")
continue
title = pdf.stem
doc_id = slugify(f"{title}-{uuid.uuid4().hex[:8]}")
upsert_document(driver, doc_id, title, str(pdf.resolve()), int(page_count))
chunks = chunk_text(text, CHUNK_SIZE_CH, CHUNK_OVERLAP)
if not chunks:
print(f"[skip] No chunks after chunking: {pdf}")
continue
# Prepare embeddings
embeddings = embed_batches(model, chunks, BATCH_SIZE)
rows = []
for i, (chunk, emb) in enumerate(zip(chunks, embeddings), start=1):
rows.append({
"doc_id": doc_id,
"chunk_id": f"{doc_id}::chunk::{i}",
"ord": i,
"text": chunk,
"token_count": len(chunk.split()), # rough estimate; swap for tokenizer if needed
"embedding": emb,
})
insert_chunks(driver, rows)
dt = time.time() - t0
print(f"[ok] {pdf.name}: {len(rows)} chunks in {dt:0.1f}s")
except Exception as e:
print(f"[error] {pdf.name}: {e}")
driver.close()
print("[done] All PDFs processed.")
# ---------- Optional: similarity query helper ----------
SIMILARITY_QUERY = r"""
// Example: top-5 similar chunks for $queryEmbedding
WITH $queryEmbedding AS qe
CALL db.index.vector.queryNodes('chunk_embedding_index', 5, qe) YIELD node, score
MATCH (d:Document)-[:HAS_CHUNK]->(node)
RETURN d.title AS title, node.ord AS ord, score AS sim, node.text AS text
ORDER BY sim DESC
"""
def example_similarity_search(query: str):
"""
Usage example after loading data:
example_similarity_search("What is the hazard mitigation strategy?")
"""
model = SentenceTransformer(MODEL_NAME)
qe = model.encode([query], normalize_embeddings=True)[0].tolist()
driver = GraphDatabase.driver(NEO4J_URI, auth=basic_auth(NEO4J_USER, NEO4J_PASSWORD))
with driver.session() as session:
res = session.run(SIMILARITY_QUERY, {"queryEmbedding": qe})
for r in res:
print(f"[{r['sim']:.4f}] {r['title']} (chunk {r['ord']})\n{r['text'][:240]}...\n")
driver.close()
# ---------- main ----------
if __name__ == "__main__":
run(PDF_DIR)
# After loading, you can test similarity search by uncommenting:
# example_similarity_search("interface control requirements for power")
Notes & tweaks
-
Vector index: The script creates
chunk_embedding_index
with dimensions 384 (matchesall-MiniLM-L6-v2
). If you switch models, update the dimensions. -
Chunking: Char-based with soft sentence boundary. If you prefer token-based, swap in a tokenizer (e.g.,
tiktoken
) and keep chunks ≤ model/context needs. -
PDF extraction: Uses
pdfminer.six
which handles most PDFs. If you hit image-only PDFs, add OCR (e.g.,pytesseract
onpdf2image
pages). -
Schema:
doc_id
&chunk_id
unique;HAS_CHUNK
links documents to chunks.token_count
is a rough word count; adjust as needed. -
Similarity search: Provided a Cypher snippet using Neo4j’s native vector index (5.10+). If on older Neo4j, consider storing embeddings and running similarity client-side or via graph algorithms plugin.
If you want this adapted for Ollama embeddings, Azure OpenAI embeddings, or to include OCR for scanned PDFs, say the word and I’ll drop a variant.
Ollama please
Awesome—here’s a drop-in Ollama variant. It keeps your earlier pipeline but swaps Sentence-Transformers for Ollama local embeddings (e.g., nomic-embed-text
, mxbai-embed-large
).
Setup (one-time):
# Python deps pip install neo4j pdfminer.six tqdm python-dotenv ollama # Ollama runtime + model (on your machine) # https://ollama.com/download (install & start the Ollama service) ollama pull nomic-embed-text # or mxbai-embed-large, etc.
Run:
export OLLAMA_EMBED_MODEL="nomic-embed-text" # optional (defaults below) export PDF_DIR="./pdfs" export NEO4J_URI="neo4j://localhost:7687" export NEO4J_USER="neo4j" export NEO4J_PASSWORD="password" python pdfs_to_neo4j_ollama.py
#!/usr/bin/env python3
"""
PDF ➜ Text ➜ Chunks ➜ Embeddings (Ollama) ➜ Neo4j (Documents & Doc_Chunks)
- Uses Ollama local embedding models (e.g., 'nomic-embed-text', 'mxbai-embed-large')
- Creates (Document {doc_id, title, path, page_count, created_at})
- Creates (Doc_Chunk {chunk_id, ord, text, token_count, embedding})
- (Document)-[:HAS_CHUNK]->(Doc_Chunk)
- Builds a Neo4j vector index sized to the detected embedding dimension
Tested with:
- Python 3.10+
- Neo4j 5.10+ (for native vector index)
- Ollama (local service) with an embedding model pulled
"""
import os
import re
import uuid
import time
from datetime import datetime
from pathlib import Path
from typing import List, Tuple
from pdfminer.high_level import extract_text
from neo4j import GraphDatabase, basic_auth
from tqdm import tqdm
# ---- Ollama client ----
# 'ollama' is a tiny Python client that talks to your local Ollama daemon.
import ollama
# ============================ CONFIG ============================
NEO4J_URI = os.getenv("NEO4J_URI", "neo4j://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "password")
PDF_DIR = os.getenv("PDF_DIR", "./pdfs")
# An Ollama embedding model. Good options:
# - "nomic-embed-text" (dim ≈ 768)
# - "mxbai-embed-large" (dim ≈ 1024)
# - "all-minilm" (if available in your Ollama repo)
OLLAMA_EMBED_MODEL = os.getenv("OLLAMA_EMBED_MODEL", "nomic-embed-text")
CHUNK_SIZE_CH = int(os.getenv("CHUNK_SIZE_CH", "1400")) # characters per chunk
CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "200")) # overlap in characters
# ================================================================
# -------------------- Utilities --------------------
def slugify(s: str) -> str:
s = s.strip().lower()
s = re.sub(r"[^\w\-\.]+", "-", s)
s = re.sub(r"-+", "-", s).strip("-")
return s or str(uuid.uuid4())
def clean_text(txt: str) -> str:
txt = txt.replace("\x00", " ")
txt = re.sub(r"[ \t]+", " ", txt)
txt = re.sub(r"\n{3,}", "\n\n", txt)
txt = re.sub(r"[ \t]*\n[ \t]*", "\n", txt)
return txt.strip()
def chunk_text(text: str, chunk_chars: int = 1400, overlap: int = 200) -> List[str]:
if not text:
return []
chunks = []
i, n = 0, len(text)
while i < n:
j = min(i + chunk_chars, n)
# prefer ending at a sentence boundary if we can do so reasonably
if j < n:
k = text.rfind(".", i, j)
if k != -1 and k > i + int(0.5 * chunk_chars):
j = k + 1
chunk = text[i:j].strip()
if chunk:
chunks.append(chunk)
i = j - overlap
if i < 0:
i = 0
if j >= n:
break
return chunks
def read_pdf_text(path: Path) -> Tuple[str, int]:
raw = extract_text(str(path)) or ""
page_count = raw.count("\x0c")
text = clean_text(raw.replace("\x0c", "\n"))
return text, page_count
# -------------------- Neo4j Schema & I/O --------------------
CYPHER_UPSERT_DOCUMENT = """
MERGE (d:Document {doc_id:$doc_id})
SET d.title = $title,
d.path = $path,
d.page_count = $page_count,
d.created_at = $created_at
RETURN d.doc_id AS doc_id
"""
CYPHER_CREATE_CHUNKS = """
UNWIND $rows AS row
MERGE (d:Document {doc_id: row.doc_id})
MERGE (c:Doc_Chunk {chunk_id: row.chunk_id})
SET c.ord = row.ord,
c.text = row.text,
c.token_count = row.token_count,
c.embedding = row.embedding
MERGE (d)-[:HAS_CHUNK]->(c)
"""
def init_schema(driver, dim: int):
statements = [
"CREATE CONSTRAINT doc_id_unique IF NOT EXISTS FOR (d:Document) REQUIRE d.doc_id IS UNIQUE",
"CREATE CONSTRAINT chunk_id_unique IF NOT EXISTS FOR (c:Doc_Chunk) REQUIRE c.chunk_id IS UNIQUE",
"CREATE INDEX doc_title_index IF NOT EXISTS FOR (d:Document) ON (d.title)",
(
"CREATE VECTOR INDEX chunk_embedding_index IF NOT EXISTS FOR (c:Doc_Chunk) "
f"ON (c.embedding) OPTIONS {{indexConfig: {{`vector.dimensions`: {dim}, `vector.similarity_function`: 'cosine'}}}}"
),
]
with driver.session() as session:
for stmt in statements:
try:
session.run(stmt)
except Exception as e:
print(f"[schema] Note: {e}")
def upsert_document(driver, doc_id: str, title: str, path: str, page_count: int) -> str:
with driver.session() as session:
res = session.run(
CYPHER_UPSERT_DOCUMENT,
{
"doc_id": doc_id,
"title": title,
"path": path,
"page_count": page_count,
"created_at": datetime.utcnow().isoformat(),
},
)
return res.single()["doc_id"]
def insert_chunks(driver, rows: List[dict]):
if not rows:
return
with driver.session() as session:
session.run(CYPHER_CREATE_CHUNKS, {"rows": rows})
# -------------------- Ollama Embeddings --------------------
def embed_one_ollama(text: str, model: str) -> List[float]:
"""
Calls Ollama's embeddings endpoint for a single text.
Returns a Python list[float].
"""
# ollama.embeddings returns: {"embedding": [...], "num_tokens": int, "model": "..."}
result = ollama.embeddings(model=model, prompt=text)
vec = result.get("embedding")
if not isinstance(vec, list) or not vec:
raise RuntimeError("Invalid embedding from Ollama")
# ensure floats
return [float(x) for x in vec]
def embed_list_ollama(texts: List[str], model: str) -> List[List[float]]:
"""
Embeds a list of texts. (One-by-one calls for broad compatibility.)
If your ollama server supports batch 'input', you can optimize here.
"""
vectors = []
for t in tqdm(texts, desc=f"Embedding with {model}", unit="chunk"):
vectors.append(embed_one_ollama(t, model))
return vectors
# -------------------- Pipeline --------------------
SIMILARITY_QUERY = r"""
WITH $queryEmbedding AS qe
CALL db.index.vector.queryNodes('chunk_embedding_index', 5, qe) YIELD node, score
MATCH (d:Document)-[:HAS_CHUNK]->(node)
RETURN d.title AS title, node.ord AS ord, score AS sim, node.text AS text
ORDER BY sim DESC
"""
def run(directory: str, model_name: str):
# Probe dimension once to size the vector index correctly
probe_vec = embed_one_ollama("dimension probe", model_name)
dim = len(probe_vec)
print(f"[embed] Ollama model: {model_name} | dim={dim}")
driver = GraphDatabase.driver(NEO4J_URI, auth=basic_auth(NEO4J_USER, NEO4J_PASSWORD))
init_schema(driver, dim)
pdf_paths = sorted([p for p in Path(directory).glob("**/*.pdf")])
if not pdf_paths:
print(f"[info] No PDFs found under: {directory}")
driver.close()
return
for pdf in pdf_paths:
t0 = time.time()
try:
text, page_count = read_pdf_text(pdf)
if not text.strip():
print(f"[skip] Empty text: {pdf}")
continue
title = pdf.stem
doc_id = slugify(f"{title}-{uuid.uuid4().hex[:8]}")
upsert_document(driver, doc_id, title, str(pdf.resolve()), int(page_count))
chunks = chunk_text(text, CHUNK_SIZE_CH, CHUNK_OVERLAP)
if not chunks:
print(f"[skip] No chunks after chunking: {pdf}")
continue
embeddings = embed_list_ollama(chunks, model_name)
rows = []
for i, (chunk, emb) in enumerate(zip(chunks, embeddings), start=1):
rows.append({
"doc_id": doc_id,
"chunk_id": f"{doc_id}::chunk::{i}",
"ord": i,
"text": chunk,
"token_count": len(chunk.split()), # rough count
"embedding": emb,
})
insert_chunks(driver, rows)
dt = time.time() - t0
print(f"[ok] {pdf.name}: {len(rows)} chunks in {dt:0.1f}s")
except Exception as e:
print(f"[error] {pdf.name}: {e}")
driver.close()
print("[done] All PDFs processed.")
def example_similarity_search(query: str, model_name: str):
qe = embed_one_ollama(query, model_name)
driver = GraphDatabase.driver(NEO4J_URI, auth=basic_auth(NEO4J_USER, NEO4J_PASSWORD))
with driver.session() as session:
res = session.run(SIMILARITY_QUERY, {"queryEmbedding": qe})
for r in res:
print(f"[{r['sim']:.4f}] {r['title']} (chunk {r['ord']})\n{r['text'][:240]}...\n")
driver.close()
# -------------------- main --------------------
if __name__ == "__main__":
run(PDF_DIR, OLLAMA_EMBED_MODEL)
# Test similarity:
# example_similarity_search("interface control requirements for power", OLLAMA_EMBED_MODEL)
Notes & tips
-
Model choice & dimensions
The script probes the embedding dimension (e.g.,nomic-embed-text
≈ 768;mxbai-embed-large
≈ 1024) before creating the Neo4j vector index—so you don’t have to hardcode it. -
Batching
The example embeds each chunk one-by-one for broad compatibility with Ollama’s Python client. If your Ollama server supports arrayinput
for/api/embeddings
, you can optimizeembed_list_ollama()
to send batches and unpack the returned embeddings list. -
OCR for scanned PDFs
If some PDFs are image-only, add OCR:pip install pytesseract pdf2image pillow # and install Tesseract on your OS
Then detect empty text and fall back to OCR per page.
-
Older Neo4j
If your Neo4j is older than 5.10 or doesn’t support native vector indexes, skip the index creation and do similarity client-side, or use Graph Data Science. -
Safety
If you’ll embed sensitive documents, keep Ollama restricted to localhost and secure your Neo4j with proper auth/SSL.
Want me to switch chunking to token-aware (tiktoken) or add an Ollama batch implementation?
From <https://chatgpt.com/> ChatGPT 5
db.index.vector.queryNodes
: Caused by: java.lang.IllegalArgumentException: There is no such vector schema index: chunk_embedding_index}Thanks for the error dump—that means Neo4j can’t find the vector index named chunk_embedding_index
. Here’s how to fix it quickly and give you a safe fallback if your Neo4j build doesn’t support vector indexes.
1) Verify the index really exists (and its exact name)
Run these in Neo4j Browser or via a Python session:
// What version am I on? CALL dbms.components(); // List all indexes so we can see the name/type/state SHOW INDEXES YIELD name, type, entityType, labelsOrTypes, properties, state ORDER BY name;
If you don’t see chunk_embedding_index
in the output, create it (or recreate it with the right dimensions).
Replace <DIM> with your embedding size (e.g., 768 for
nomic-embed-text
, 1024 formxbai-embed-large
).
The property must be exactly the one you stored, i.e.,c.embedding
.
// One-time schema (constraints are harmless if they already exist) CREATE CONSTRAINT doc_id_unique IF NOT EXISTS FOR (d:Document) REQUIRE d.doc_id IS UNIQUE; CREATE CONSTRAINT chunk_id_unique IF NOT EXISTS FOR (c:Doc_Chunk) REQUIRE c.chunk_id IS UNIQUE; CREATE INDEX doc_title_index IF NOT EXISTS FOR (d:Document) ON (d.title); // Create the vector index with the correct dimension and cosine similarity CREATE VECTOR INDEX chunk_embedding_index IF NOT EXISTS FOR (c:Doc_Chunk) ON (c.embedding) OPTIONS { indexConfig: { `vector.dimensions`: <DIM>, `vector.similarity_function`: 'cosine' } };
Now re-run your query:
WITH $queryEmbedding AS qe CALL db.index.vector.queryNodes('chunk_embedding_index', 5, qe) YIELD node, score MATCH (d:Document)-[:HAS_CHUNK]->(node) RETURN d.title AS title, node.ord AS ord, score AS sim, node.text AS text ORDER BY sim DESC;
Common causes of this error
-
Index name mismatch: Your code queries
'chunk_embedding_index'
but you created a different name. -
Wrong dimensions: The index was created with, say, 384 dims but your vectors are 768. Recreate with the correct
<DIM>
. -
Different database: You created the index in a non-default database (e.g.,
neo4j
vs another). Make sure you’re connected to the same one when creating and querying. -
Feature not available on your build: Some Neo4j distributions/versions don’t support native VECTOR INDEX. If
SHOW INDEXES
doesn’t show typeVECTOR
after creation—or theCREATE VECTOR INDEX
statement errors—use the fallback below.
2) Quick code patch: verify/create the index from Python
Drop this check into your script right after connecting (it will detect your Ollama vector dimension and ensure the index is there):
def ensure_vector_index(driver, dim: int, name: str = "chunk_embedding_index"):
with driver.session() as session:
# Does it already exist?
existing = session.run("SHOW INDEXES YIELD name, type RETURN name, type").data()
if any(row["name"] == name and row["type"].upper() == "VECTOR" for row in existing):
return True # good to go
# Try to create it
try:
session.run(f"""
CREATE VECTOR INDEX {name} IF NOT EXISTS
FOR (c:Doc_Chunk) ON (c.embedding)
OPTIONS {{
indexConfig: {{
`vector.dimensions`: {dim},
`vector.similarity_function`: 'cosine'
}}
}}
""")
return True
except Exception as e:
print(f"[schema] Could not create VECTOR INDEX: {e}")
return False
Use it like:
dim = len(embed_one_ollama("dimension probe", OLLAMA_EMBED_MODEL))
ok = ensure_vector_index(driver, dim)
If ok
is False
, use the fallback query below.
3) Fallback if VECTOR INDEX isn’t available
If your Neo4j build doesn’t support CREATE VECTOR INDEX
, you can still run similarity—but without the fast ANN index.
Option A — Pure Cypher (simple, slower for large graphs)
If you normalized embeddings before storing (the Ollama code above stores raw values; consider normalizing), you can compute dot product client-side. Native Cypher can’t do vector math over lists well, so the practical approach is to bring candidates back to Python and rank there:
import numpy as np
from neo4j import GraphDatabase, basic_auth
def top_k_client_side(driver, query_vec, k=5, candidate_limit=5000):
with driver.session() as session:
# Pull a candidate set (tune LIMIT for your data size)
res = session.run("""
MATCH (c:Doc_Chunk)
RETURN c.chunk_id AS id, c.text AS text, c.embedding AS emb
LIMIT $limit
""", {"limit": candidate_limit})
rows = res.data()
E = np.array([r["emb"] for r in rows], dtype=float)
q = np.array(query_vec, dtype=float)
# Normalize for cosine sim
E = E / (np.linalg.norm(E, axis=1, keepdims=True) + 1e-9)
q = q / (np.linalg.norm(q) + 1e-9)
sims = (E @ q)
top_idx = sims.argsort()[::-1][:k]
out = []
for i in top_idx:
r = rows[i]
out.append({"chunk_id": r["id"], "sim": float(sims[i]), "text": r["text"]})
return out
Option B — Neo4j GDS (if you have the plugin)
If you have the Graph Data Science library installed, you can use gds.similarity.cosine
in Cypher, or build an approximate k-NN projection. (Happy to drop exact GDS snippets if you tell me your GDS version.)
4) Sanity checklist
-
Confirm the embedding dimension you stored matches the index dimension.
-
Confirm you’re querying the same database where you created the index.
-
Ensure your index name in the query (
'chunk_embedding_index'
) matches exactly. -
If creation failed earlier, your script may have printed a “[schema] Note: …” message—scan your logs.
-
If your Neo4j build doesn’t support VECTOR INDEX, use the client-side fallback above (or install/enable a build that supports it).
If you share:
-
your
CALL dbms.components()
output, and -
the
SHOW INDEXES
table,
I’ll tailor the exact CREATE VECTOR INDEX
command (or provide a GDS-based alternative) to your environment.
From <https://chatgpt.com/> ChatGPT 5
Google Search: cypher delete all indexes
apoc.schema.assert
procedure provides a convenient way to achieve this.- The first two empty maps (
{}
) indicate that no specific indexes or constraints are being asserted. - The third parameter,
true
, instructs APOC to drop any existing indexes and constraints that are not explicitly defined in the first two maps.
apoc.schema.assert
call effectively tells Neo4j to ensure that no indexes or constraints are present, and to remove any that currently exist to achieve this state.- APOC Procedures:This method requires the APOC Procedures library to be installed in your Neo4j instance.
- Irreversible Action:Deleting all indexes is an irreversible action. Ensure you understand the implications before executing this command, as it will impact query performance.
- Constraints:This method also drops all constraints in addition to indexes. If you only want to delete indexes, you would need to drop them individually using
DROP INDEX ON ...
statements after identifying them withSHOW INDEXES
.
- Get link
- X
- Other Apps
Comments
Post a Comment