#3669 Business Knowledge Graph

#3669 Business Knowledge Graph

#3669 List and describe a recommended set of node types in a knowledge graph of a business

Here’s a recommended set of node types for building a knowledge graph of a business. I’ll organize them into logical layers so you can see how they connect and scale.


1. Core Business Entities

  • Company – The overarching business entity. Properties: name, type (LLC, Inc.), registration info, industry, size.

  • Business Unit / Department – Subdivisions within the company (e.g., Marketing, R&D, Operations). Properties: goals, KPIs, budget.

  • Team – Smaller working groups within departments, often project-based.


2. People & Roles

  • Employee – Individuals working in the business. Properties: name, role, skills, tenure, contact info.

  • Role / Position – Defines responsibilities independent of individuals (e.g., Software Engineer, CFO).

  • Customer – End users or buyers of products/services. Properties: demographics, preferences, purchase history.

  • Partner / Supplier – External organizations that provide goods, services, or collaborations.


3. Products & Services

  • Product – A tangible offering. Properties: name, SKU, features, lifecycle stage, cost.

  • Service – Intangible offerings. Properties: scope, pricing model, SLA.

  • Feature / Component – Sub-element of a product or service.


4. Processes & Operations

  • Process – Business workflows (e.g., Order Fulfillment, Hiring, R&D Cycle). Properties: steps, owners, KPIs.

  • Policy / Rule – Governing constraints or guidelines.

  • Project / Initiative – Time-bounded efforts to achieve goals. Properties: timeline, budget, deliverables.


5. Knowledge & Resources

  • Document – Internal knowledge (contracts, reports, manuals, SOPs). Properties: type, date, author.

  • Knowledge Asset – FAQs, training, patents, research findings.

  • Technology / Tool – Software and hardware used internally.


6. Finance & Assets

  • Financial Account – Revenue streams, cost centers, budgets.

  • Transaction – Sales, purchases, payments.

  • Asset – Tangible (equipment, facilities) or intangible (IP, licenses).


7. External Environment

  • Market / Industry Segment – Defines competitive context.

  • Competitor – Other companies in the same space. Properties: strengths, weaknesses, market share.

  • Regulation / Standard – External compliance requirements.


8. Relationships (Examples)

  • Employee → works_in → Department

  • Employee → holds_role → Role

  • Customer → purchases → Product

  • Supplier → provides → Product/Service

  • Department → owns → Process

  • Document → supports → Project

  • Product → complies_with → Regulation


Why this set works:

  • Covers organizational structure (company, departments, employees).

  • Captures value creation (products, services, projects).

  • Tracks resources (documents, tools, assets, finances).

  • Links to external ecosystem (customers, competitors, regulations).

  • Flexible enough for scaling: You can expand into time-series nodes (Events, Milestones) or analytics nodes (KPIs, Metrics).


Would you like me to map these into a Cypher schema (with node labels and relationship types), so you could directly implement this in Neo4j?


What about employee experience?

Great add. If you want the graph to explain and improve employee experience (EX), model it as a journey over time with measurable touchpoints, not just HR records. Here’s a practical, scalable EX slice you can drop into your business KG.

1) Core EX node types

  • Employee — canonical person record.
    Props: employee_id, start_date, location, employment_type, manager_id

  • Role — job definition independent of people.
    Props: title, job_family, level

  • Team / Department — org context; useful for cohort analysis.
    Props: name, cost_center, leader_id

  • LifecycleStage — phases like Attract, Hire, Onboard, Develop, Perform, Grow, Exit.
    Props: name, ordinal

  • Touchpoint — any employee-visible interaction that can be rated or timed (e.g., “Laptop provision,” “Benefits enrollment,” “Code review,” “1:1 with manager,” “Incident response duty,” “Expense reimbursement”).
    Props: name, category (IT/Facilities/HR/Manager), channel (in-person/tool), sla_target_days

  • ExperienceEventtime-stamped event at a touchpoint.
    Props: event_id, ts, outcome (success/fail), wait_time_s, nps, csat, effort_score, survey_text

  • SurveyInstrument — survey or pulse construct.
    Props: name, frequency, question_bank_ref

  • Sentiment — derived NLP sentiment from free text (surveys, exit interviews, chat).
    Props: source, ts, sentiment (-1..1), topics [list], keywords

  • LearningActivity — course, certification, internal training.
    Props: title, provider, hours, skill_tags

  • Skill — normalized capability node.
    Props: name, level_scale (e.g., 1–5)

  • PerformanceCycle — review window.
    Props: cycle_id, start, end

  • PerformanceOutcome — rating/output for a cycle.
    Props: rating, promotion_flag, comp_change_pct

  • WorkCondition — schedule, travel load, on-call burden, hybrid/remote—drivers of EX.
    Props: type, intensity (0–1), notes

  • DigitalTool — SaaS/app used by employees (DEX focus).
    Props: name, owner_team, license_tier

  • ITIncident / Request — service tickets affecting EX.
    Props: ticket_id, opened, closed, priority

  • Policy — rules that influence EX (PTO, travel, WFH).
    Props: name, effective_date

  • Recognition — kudos/awards.
    Props: ts, channel, points

  • WellbeingResource — benefits & programs.
    Props: name, category (mental/financial/fitness)

2) Key relationships

  • (:Employee)-[:HOLDS_ROLE]->(:Role)

  • (:Employee)-[:MEMBER_OF]->(:Team)

  • (:Team)-[:PART_OF]->(:Department)

  • (:LifecycleStage)-[:HAS_TOUCHPOINT]->(:Touchpoint)

  • (:Employee)-[:EXPERIENCED {ts}]->(:ExperienceEvent)-[:AT_TOUCHPOINT]->(:Touchpoint)

  • (:ExperienceEvent)-[:MEASURED_BY]->(:SurveyInstrument)

  • (:ExperienceEvent)-[:HAS_SENTIMENT]->(:Sentiment)

  • (:Employee)-[:COMPLETED]->(:LearningActivity)-[:DEVELOPS]->(:Skill)

  • (:Employee)-[:EVALUATED_IN]->(:PerformanceCycle)

  • (:Employee)-[:HAS_OUTCOME]->(:PerformanceOutcome)-[:FOR_CYCLE]->(:PerformanceCycle)

  • (:Employee)-[:USES]->(:DigitalTool)

  • (:DigitalTool)-[:GENERATED]->(:ITIncident)

  • (:Policy)-[:IMPACTS]->(:Touchpoint)

  • (:Employee)-[:RECEIVED]->(:Recognition)

  • (:Employee)-[:ENROLLED_IN]->(:WellbeingResource)

  • (:WorkCondition)-[:APPLIES_TO]->(:Employee)

3) Metrics & scoring (attach to events, not people)

For each ExperienceEvent capture:

  • Flow: wait_time_s, rework_count, handoffs

  • Outcome: success/fail, sla_met

  • Perception: nps, csat, effort_score, sentiment

  • Context: tool_id, device_type, location, shift
    Then compute ExperienceScore per touchpoint/employee/team/time window (e.g., 0.5*nps_norm + 0.3*effort_norm + 0.2*sla_met) and materialize as a property on ExperienceEvent and roll-ups on Employee, Team, Touchpoint.

4) Example: minimal schema (Neo4j Cypher)

// Labels CREATE CONSTRAINT emp_id IF NOT EXISTS FOR (e:Employee) REQUIRE e.employee_id IS UNIQUE; CREATE CONSTRAINT evt_id IF NOT EXISTS FOR (x:ExperienceEvent) REQUIRE x.event_id IS UNIQUE; // Touchpoint catalog MERGE (ls:LifecycleStage {name:'Onboard', ordinal:3}); MERGE (tp:Touchpoint {name:'Laptop Provisioning', category:'IT', channel:'ServiceNow', sla_target_days:2}); MERGE (ls)-[:HAS_TOUCHPOINT]->(tp); // An employee and an EX event MERGE (e:Employee {employee_id:'E123', start_date:date('2025-01-10'), location:'Austin', employment_type:'FT'}); MERGE (r:Role {title:'Software Engineer', job_family:'Engineering', level:'L3'}); MERGE (t:Team {name:'Platform'}); MERGE (e)-[:HOLDS_ROLE]->(r); MERGE (e)-[:MEMBER_OF]->(t); MERGE (x:ExperienceEvent { event_id:'EVT-1001', ts:datetime('2025-08-15T14:03:00Z'), outcome:'success', wait_time_s:86400, nps:10, csat:5, effort_score:2, sla_met:true, experience_score:0.87 }); MERGE (e)-[:EXPERIENCED {ts:x.ts}]->(x); MERGE (x)-[:AT_TOUCHPOINT]->(tp);

5) Useful queries

A. Top friction touchpoints this quarter (low score, high volume)

MATCH (x:ExperienceEvent)-[:AT_TOUCHPOINT]->(tp:Touchpoint) WHERE x.ts >= datetime() - duration('P3M') WITH tp, count(*) AS volume, avg(x.experience_score) AS score RETURN tp.name AS touchpoint, volume, round(score,3) AS avg_score ORDER BY score ASC, volume DESC LIMIT 10;

B. Manager-level heatmap: onboarding experience by team

MATCH (e:Employee)-[:MEMBER_OF]->(t:Team), (e)-[:EXPERIENCED]->(x:ExperienceEvent)-[:AT_TOUCHPOINT]->(tp:Touchpoint)<-[:HAS_TOUCHPOINT]-(:LifecycleStage {name:'Onboard'}) WITH t.name AS team, avg(x.experience_score) AS score, count(x) AS events RETURN team, round(score,3) AS avg_score, events ORDER BY avg_score ASC;

C. Digital experience (DEX): which tools correlate with low sentiment?

MATCH (e:Employee)-[:EXPERIENCED]->(x:ExperienceEvent)-[:AT_TOUCHPOINT]->(tp:Touchpoint), (x)-[:HAS_SENTIMENT]->(s:Sentiment), (e)-[:USES]->(d:DigitalTool) WHERE x.ts >= datetime() - duration('P90D') WITH d.name AS tool, avg(s.sentiment) AS avg_sent, count(*) AS n RETURN tool, round(avg_sent,3) AS avg_sentiment, n ORDER BY avg_sentiment ASC, n DESC LIMIT 10;

D. Do learning & recognition improve EX? (simple cohort inference)

MATCH (e:Employee)-[:COMPLETED]->(:LearningActivity)<-[:EXPERIENCED]-(e)-[:RECEIVED]->(:Recognition), (e)-[:EXPERIENCED]->(x:ExperienceEvent) WHERE x.ts >= datetime() - duration('P6M') WITH e, avg(x.experience_score) AS ex_score, count { (e)-[:COMPLETED]->() } AS learns, count { (e)-[:RECEIVED]->() } AS kudos RETURN percentileCont(ex_score,0.5) AS median_ex, avg(learns) AS avg_learn, avg(kudos) AS avg_kudos;

6) Data ingestion hints

  • Surveys/PulsesExperienceEvent (+ SurveyInstrument) and optional Sentiment node from free-text.

  • ITSM (ServiceNow/Jira) → map incidents/requests to ExperienceEvent at IT touchpoints.

  • HRIS (Workday/BambooHR) → baseline Employee, Role, LifecycleStage transitions, Policy.

  • Collaboration/DEX → login latency, crash rates, meeting load → ExperienceEvent at “DigitalTool” touchpoints.

  • L&D → completions & skills → LearningActivity/Skill.

7) Privacy, governance, ethics

  • Store survey free-text as separate Sentiment nodes to enable access control by edge.

  • Anonymize or aggregate for dashboards at Team level; keep PII off broad edges.

  • Track provenance: add source_system, ingested_ts, hash on events.

8) What this unlocks (quick wins)

  • Journey analytics: where onboarding breaks.

  • DEX optimization: quantify which tools hurt/help experience.

  • Manager coaching: link 1:1 frequency, recognition → EX → retention.

  • Policy impact: see before/after deltas from WFH or PTO policy changes.

  • Early warning: falling EX + rising on-call burden + low recognition → attrition risk (build a monitored pattern without exposing PII broadly).

If you want, I can generate:

  1. a Neo4j seed script that creates this EX subgraph and sample data;

  2. a streaming upsert pattern (MERGE-first) to ingest surveys/ITSM tickets hourly;

  3. a set of dashboards queries (top friction touchpoints, cohort rollups, before/after policy changes).


It appears I need a python application to define a node with its embedded document set that can be automically be added to the business knowledge graph as new details of the business materialize

Awesome—let’s give you a single-file Python ingestor that:

  • lets you define or upsert a business node (e.g., Company, Department, Product, Policy, etc.)

  • attaches an embedded document set to that node (PDF/DOCX/TXT/MD)

  • chunks + embeds text (SentenceTransformers or Ollama)

  • MERGE-upserts into Neo4j with dedupe via SHA-256

  • can auto-ingest new/changed files by watching a folder

  • creates an optional Neo4j vector index for similarity search

You can drop this into ingest_kg.py and run. I kept it compact but production-ready.


Quick start

# 1) Python env pip install neo4j sentence-transformers watchdog python-docx pypdf pyyaml # 2) Edit config.yaml (see sample below) # 3) Initialize schema (constraints + vector index) python ingest_kg.py init # 4) One-shot ingest a folder of docs under a node python ingest_kg.py ingest \ --label Company --key id --value "ACME-001" \ --props name="ACME Robotics" industry="Manufacturing" \ --docs ./docs/acme # 5) Watch a folder; auto-add changes python ingest_kg.py watch \ --label Department --key id --value "ENG" \ --props name="Engineering" leader="E123" \ --docs ./docs/engineering

config.yaml (example)

neo4j: uri: "neo4j://localhost:7687" user: "neo4j" password: "password" database: "neo4j" embedding: # "sentence-transformers" or "ollama" backend: "sentence-transformers" model: "sentence-transformers/all-MiniLM-L6-v2" # If using Ollama for embeddings: ollama: host: "http://localhost:11434" model: "nomic-embed-text" chunking: size: 1000 # characters overlap: 200 watch: debounce_s: 2

What it creates in Neo4j

  • Your business node: (:<Label> { <key>: <value>, …props })

  • (:Document { doc_id, path, ext, sha256, title, created_ts, updated_ts })

  • (:Doc_Chunk { chunk_id, ord, text, sha256, embedding:[…] })

  • Relationships:

    • (<Label>)-[:HAS_DOCUMENT]->(Document)

    • (Document)-[:HAS_CHUNK]->(Doc_Chunk)

It also sets unique constraints and (optionally) a vector index on Doc_Chunk.embedding.


The code (single file)

#!/usr/bin/env python3 # ingest_kg.py import argparse, os, sys, time, hashlib, json, glob, re, math, datetime, traceback from dataclasses import dataclass from typing import List, Dict, Tuple, Optional import yaml from neo4j import GraphDatabase from docx import Document as Docx from pypdf import PdfReader # Optional (loaded lazily) ST = None requests = None Observer = None FileSystemEventHandler = None # --------------------------- # Utilities # --------------------------- def now_iso(): return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat() def sha256_bytes(b: bytes) -> str: h = hashlib.sha256(); h.update(b); return h.hexdigest() def sha256_text(s: str) -> str: return sha256_bytes(s.encode("utf-8")) def read_text_from_file(path: str) -> str: ext = os.path.splitext(path)[1].lower() if ext in [".txt", ".md", ".csv", ".log"]: with open(path, "r", encoding="utf-8", errors="ignore") as f: return f.read() elif ext in [".docx"]: d = Docx(path) return "\n".join([p.text for p in d.paragraphs]) elif ext in [".pdf"]: reader = PdfReader(path) texts = [] for page in reader.pages: try: texts.append(page.extract_text() or "") except Exception: pass return "\n".join(texts) else: raise ValueError(f"Unsupported file type: {ext}") def chunk_text(s: str, size: int, overlap: int) -> List[str]: if size <= 0: return [s] chunks = [] start = 0 n = len(s) while start < n: end = min(n, start + size) chunks.append(s[start:end]) start = end - overlap if start < 0: start = 0 if start >= n: break return [c.strip() for c in chunks if c.strip()] # --------------------------- # Embeddings # --------------------------- @dataclass class EmbedConfig: backend: str # "sentence-transformers" or "ollama" model: str # ST model path or name ollama_host: str = "http://localhost:11434" ollama_model: str = "nomic-embed-text" class Embedder: def __init__(self, cfg: EmbedConfig): self.cfg = cfg self._st_model = None def _ensure_st(self): global ST if ST is None: from sentence_transformers import SentenceTransformer as ST # lazy import globals()['ST'] = ST if self._st_model is None: self._st_model = ST(self.cfg.model) def embed(self, texts: List[str]) -> List[List[float]]: if self.cfg.backend == "sentence-transformers": self._ensure_st() vecs = self._st_model.encode(texts, show_progress_bar=False, normalize_embeddings=True) return [v.tolist() for v in vecs] elif self.cfg.backend == "ollama": return self._embed_ollama(texts) else: raise ValueError(f"Unknown embedding backend: {self.cfg.backend}") def _embed_ollama(self, texts: List[str]) -> List[List[float]]: global requests if requests is None: import requests as _requests requests = _requests url = f"{self.cfg.ollama_host}/api/embeddings" out = [] for t in texts: r = requests.post(url, json={"model": self.cfg.ollama_model, "prompt": t}) r.raise_for_status() out.append(r.json()["embedding"]) return out # --------------------------- # Neo4j access # --------------------------- class Neo4jClient: def __init__(self, uri, user, password, database="neo4j"): self.driver = GraphDatabase.driver(uri, auth=(user, password)) self.database = database def close(self): self.driver.close() def run(self, cypher: str, **params): with self.driver.session(database=self.database) as sess: return list(sess.run(cypher, **params)) def init_schema(self): stmts = [ "CREATE CONSTRAINT doc_id IF NOT EXISTS FOR (d:Document) REQUIRE d.doc_id IS UNIQUE", "CREATE CONSTRAINT chunk_id IF NOT EXISTS FOR (c:Doc_Chunk) REQUIRE c.chunk_id IS UNIQUE", ] for s in stmts: self.run(s) # Optional: vector index (Neo4j 5.11+). Adjust dimensions after first vector inserted if needed. # You can re-run with the correct dimension: self.run(""" CALL db.index.vector.createNodeIndex('chunk_embedding_idx', 'Doc_Chunk', 'embedding', 384, 'cosine') """) # Will error harmlessly if it exists/ dims mismatch; ignore errors in practice. def upsert_entity(self, label: str, key: str, value, props: Dict): # MERGE by key, set props (non-destructive for existing keys unless provided) prop_set = ", ".join([f"e.{k} = $props.{k}" for k in props.keys()]) cypher = f""" MERGE (e:{label} {{ {key}: $value }}) ON CREATE SET e.created_ts = datetime($now) SET {prop_set}, e.updated_ts = datetime($now) RETURN e """ return self.run(cypher, value=value, props=props, now=now_iso()) def upsert_document(self, label: str, key: str, value, doc_props: Dict): # Ensure entity exists and link document cypher = f""" MERGE (e:{label} {{ {key}: $value }}) MERGE (d:Document {{ doc_id: $doc.doc_id }}) ON CREATE SET d.created_ts = datetime($now) SET d.path = $doc.path, d.ext = $doc.ext, d.sha256 = $doc.sha256, d.title = $doc.title, d.updated_ts = datetime($now) MERGE (e)-[:HAS_DOCUMENT]->(d) RETURN d """ return self.run(cypher, value=value, doc=doc_props, now=now_iso()) def upsert_chunk(self, doc_id: str, chunk_props: Dict, embedding: List[float]): cypher = """ MERGE (d:Document { doc_id: $doc_id }) MERGE (c:Doc_Chunk { chunk_id: $chunk.chunk_id }) ON CREATE SET c.created_ts = datetime($now) SET c.ord = $chunk.ord, c.text = $chunk.text, c.sha256 = $chunk.sha256, c.embedding = $embedding, c.updated_ts = datetime($now) MERGE (d)-[:HAS_CHUNK]->(c) RETURN c """ return self.run(cypher, doc_id=doc_id, chunk=chunk_props, embedding=embedding, now=now_iso()) # --------------------------- # Ingestion core # --------------------------- def list_files(folder: str) -> List[str]: pats = ["**/*.pdf", "**/*.docx", "**/*.txt", "**/*.md"] files = [] for p in pats: files.extend(glob.glob(os.path.join(folder, p), recursive=True)) return sorted(set(files)) def build_doc_meta(path: str) -> Dict: ext = os.path.splitext(path)[1].lower() title = os.path.basename(path) try: stat = os.stat(path) cts = datetime.datetime.utcfromtimestamp(stat.st_ctime).isoformat() + "Z" uts = datetime.datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z" except Exception: cts = uts = now_iso() content = read_text_from_file(path) dsha = sha256_text(content) doc_id = sha256_text(f"{path}::{dsha}") return dict( doc_id=doc_id, path=os.path.abspath(path), ext=ext, title=title, sha256=dsha, created_ts=cts, updated_ts=uts, content=content, ) def ingest_folder( neo: Neo4jClient, embedder: Embedder, label: str, key: str, value, entity_props: Dict, folder: str, chunk_size: int, overlap: int ): # Upsert entity neo.upsert_entity(label, key, value, entity_props) # For each file → doc meta → chunks → embed → upsert files = list_files(folder) if not files: print(f"[INGEST] No files found under: {folder}") return for fpath in files: try: meta = build_doc_meta(fpath) except Exception as e: print(f"[SKIP] {fpath}: {e}") continue neo.upsert_document(label, key, value, { k: meta[k] for k in ["doc_id", "path", "ext", "sha256", "title"] }) chunks = chunk_text(meta["content"], chunk_size, overlap) # Deduplicate by chunk text hash chunk_hashes = [sha256_text(c) for c in chunks] embeddings = embedder.embed(chunks) if chunks else [] for i, (txt, ch, emb) in enumerate(zip(chunks, chunk_hashes, embeddings)): chunk_props = dict( chunk_id=sha256_text(f"{meta['doc_id']}::{i}::{ch}"), ord=i, text=txt, sha256=ch ) neo.upsert_chunk(meta["doc_id"], chunk_props, emb) print(f"[OK] {os.path.basename(fpath)}{len(chunks)} chunks") # --------------------------- # Folder Watch # --------------------------- class Handler: def __init__(self, neo, embedder, label, key, value, entity_props, folder, chunk_size, overlap, debounce_s): self.neo = neo self.embedder = embedder self.label = label self.key = key self.value = value self.entity_props = entity_props self.folder = folder self.chunk_size = chunk_size self.overlap = overlap self.debounce_s = debounce_s self._pending = set() self._last_run = 0.0 def on_any_event(self, event): if event.is_directory: return path = event.src_path if not any(path.lower().endswith(ext) for ext in [".pdf",".docx",".txt",".md"]): return self._pending.add(path) now = time.time() if now - self._last_run >= self.debounce_s: self.flush() def flush(self): paths = list(self._pending) self._pending.clear() self._last_run = time.time() if not paths: return # Minimal re-run: only changed files for p in paths: try: meta = build_doc_meta(p) self.neo.upsert_entity(self.label, self.key, self.value, self.entity_props) self.neo.upsert_document(self.label, self.key, self.value, { k: meta[k] for k in ["doc_id", "path", "ext", "sha256", "title"] }) chunks = chunk_text(meta["content"], self.chunk_size, self.overlap) embeddings = self.embedder.embed(chunks) if chunks else [] for i, (txt, emb) in enumerate(zip(chunks, embeddings)): ch = sha256_text(txt) chunk_props = dict( chunk_id=sha256_text(f"{meta['doc_id']}::{i}::{ch}"), ord=i, text=txt, sha256=ch ) self.neo.upsert_chunk(meta["doc_id"], chunk_props, emb) print(f"[WATCH] {os.path.basename(p)}{len(chunks)} chunks") except Exception as e: print(f"[WATCH:ERROR] {p}: {e}\n{traceback.format_exc()}") def start_watch(neo, embedder, label, key, value, entity_props, folder, chunk_size, overlap, debounce_s): global Observer, FileSystemEventHandler if Observer is None or FileSystemEventHandler is None: from watchdog.observers import Observer as _Observer from watchdog.events import FileSystemEventHandler as _Handler Observer = _Observer; FileSystemEventHandler = _Handler class _H(FileSystemEventHandler): def __init__(self, impl: Handler): super().__init__(); self.impl = impl def on_created(self, event): self.impl.on_any_event(event) def on_modified(self, event): self.impl.on_any_event(event) def on_moved(self, event): self.impl.on_any_event(event) impl = Handler(neo, embedder, label, key, value, entity_props, folder, chunk_size, overlap, debounce_s) obs = Observer() obs.schedule(_H(impl), path=folder, recursive=True) obs.start() print(f"[WATCH] Watching: {folder} (debounce {debounce_s}s). Ctrl+C to stop.") try: while True: time.sleep(1) except KeyboardInterrupt: obs.stop() obs.join() # --------------------------- # CLI # --------------------------- def load_config() -> dict: path = "config.yaml" if not os.path.exists(path): raise SystemExit("Missing config.yaml in current directory.") with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) def parse_kv_list(kv_list: List[str]) -> Dict: out = {} for item in kv_list or []: if "=" not in item: raise SystemExit(f"Bad --props entry (use key=value): {item}") k, v = item.split("=", 1) out[k] = v return out def main(): ap = argparse.ArgumentParser(description="Business KG Ingestor") sp = ap.add_subparsers(dest="cmd", required=True) sp_init = sp.add_parser("init", help="Init Neo4j schema (constraints + vector index)") for name in ["ingest", "watch"]: p = sp.add_parser(name, help=f"{name} docs into a labeled entity") p.add_argument("--label", required=True, help="Neo4j label, e.g., Company, Department, Product") p.add_argument("--key", required=True, help="Unique key prop on the label (e.g., id, name)") p.add_argument("--value", required=True, help="Value for the key") p.add_argument("--props", nargs="*", help='Additional props, e.g., name="ACME" industry="Manufacturing"') p.add_argument("--docs", required=True, help="Folder path to ingest or watch") args = ap.parse_args() cfg = load_config() neo = Neo4jClient( cfg["neo4j"]["uri"], cfg["neo4j"]["user"], cfg["neo4j"]["password"], cfg["neo4j"].get("database","neo4j") ) if args.cmd == "init": try: neo.init_schema() print("[INIT] Constraints and vector index requested.") finally: neo.close() return # Build embedder emb_cfg = EmbedConfig( backend=cfg["embedding"]["backend"], model=cfg["embedding"]["model"], ollama_host=cfg["embedding"].get("ollama", {}).get("host", "http://localhost:11434"), ollama_model=cfg["embedding"].get("ollama", {}).get("model", "nomic-embed-text"), ) embedder = Embedder(emb_cfg) # Common flags entity_props = parse_kv_list(args.props) label, key, value, folder = args.label, args.key, args.value, args.docs csize = cfg["chunking"]["size"] overlap = cfg["chunking"]["overlap"] try: if args.cmd == "ingest": ingest_folder(neo, embedder, label, key, value, entity_props, folder, csize, overlap) elif args.cmd == "watch": debounce = cfg.get("watch", {}).get("debounce_s", 2) start_watch(neo, embedder, label, key, value, entity_props, folder, csize, overlap, debounce) finally: neo.close() if __name__ == "__main__": main()

Design choices that help you scale

  • Upsert-first (MERGE) everything ⇒ safe to re-run, supports incremental updates.

  • Content hashing (doc + chunk) ⇒ dedupe, stable IDs, safe re-ingest.

  • Two embedding backends:

    • sentence-transformers for offline, fast local dev.

    • Ollama (nomic-embed-text) for local, GPU-friendly pipelines.

  • Vector index on Doc_Chunk.embedding (Neo4j 5.11+) so you can later add queries like:

    CALL db.index.vector.queryNodes('chunk_embedding_idx', 5, $queryEmbedding) YIELD node, score MATCH (node)<-[:HAS_CHUNK]-(d:Document)<-[:HAS_DOCUMENT]-(e) RETURN e, d.title AS doc, node.ord AS chunk, score ORDER BY score;
  • Watch mode via watchdog to automatically ingest as “new details of the business materialize”.


Common extensions (just say the word and I’ll add)

  • FastAPI wrapper for HTTP ingest (POST /entity/{label}/{key}/{value} with files).

  • RAG search endpoint: takes a query → embed → vector search in Neo4j → return passages + source node.

  • Entity linking: auto-detect mentions of known entities in chunks, add (:Mention)-[:REFERS_TO]->(:Entity) edges.

  • Policy-based routing: push IT tickets to Department:IT, contracts to Legal, etc., based on filename or content.

  • S3 / SharePoint connectors** instead of local folders.

If you want, I can generate the FastAPI version next (same schema, plus a /search and /upsert API).


Provide python and cypher code to scan the current knowledge graph to "see where the new information fits" so that the appropriate relationships can also be added

Got it. Below is a practical auto-linker that scans what you’ve already ingested, figures out “where the new info fits,” and adds appropriate relationships—using both vector similarity (from your stored Doc_Chunk.embedding) and exact/normalized name mentions.

You’ll get:

  1. Cypher utilities (constraints + helper indexes)

  2. A Python script that:

    • computes a centroid embedding for the new entity’s chunks,

    • finds the closest existing entities via the Neo4j vector index,

    • detects mentions of known entity names in the new chunks,

    • MERGEs relationships with evidence and scores, or stores reviewable proposals.


0) Assumptions (matches your ingestor)

  • Node labels you want to “link into”: Company|Department|Team|Employee|Product|Service|Project|Process|Policy|Technology|Supplier|Customer.

  • New entity already exists (e.g., (:Department {id:"ENG"})) and has (:Document)-[:HAS_CHUNK]->(:Doc_Chunk {embedding:[…]}).

  • You already created the vector index (chunk_embedding_idx) on Doc_Chunk.embedding.

You can adjust labels and thresholds in the Python file near the top.


1) Cypher: constraints + helper (optional but recommended)

// Optional: unique ids per common entity types (edit keys to yours) CREATE CONSTRAINT dept_id IF NOT EXISTS FOR (d:Department) REQUIRE d.id IS UNIQUE; CREATE CONSTRAINT team_id IF NOT EXISTS FOR (t:Team) REQUIRE t.id IS UNIQUE; CREATE CONSTRAINT comp_id IF NOT EXISTS FOR (c:Company) REQUIRE c.id IS UNIQUE; CREATE CONSTRAINT prod_id IF NOT EXISTS FOR (p:Product) REQUIRE p.id IS UNIQUE; CREATE CONSTRAINT serv_id IF NOT EXISTS FOR (s:Service) REQUIRE s.id IS UNIQUE; CREATE CONSTRAINT proj_id IF NOT EXISTS FOR (p:Project) REQUIRE p.id IS UNIQUE; CREATE CONSTRAINT proc_id IF NOT EXISTS FOR (p:Process) REQUIRE p.id IS UNIQUE; CREATE CONSTRAINT pol_id IF NOT EXISTS FOR (p:Policy) REQUIRE p.id IS UNIQUE; CREATE CONSTRAINT tech_id IF NOT EXISTS FOR (t:Technology) REQUIRE t.id IS UNIQUE; CREATE CONSTRAINT supp_id IF NOT EXISTS FOR (s:Supplier) REQUIRE s.id IS UNIQUE; CREATE CONSTRAINT cust_id IF NOT EXISTS FOR (c:Customer) REQUIRE c.id IS UNIQUE; // Optional: a place to stash reviewable link suggestions CREATE CONSTRAINT lp_id IF NOT EXISTS FOR (l:LinkProposal) REQUIRE l.proposal_id IS UNIQUE;

Relationship types the script will create (you can rename):

  • :RELATED_TO (generic semantic fit, weighted by vector similarity)

  • :MENTIONS (Doc_Chunk → Entity, counted up)

  • :ABOUT (NewEntity → Entity if mention-density passes threshold)


2) Python: auto-linker (autolink.py)

#!/usr/bin/env python3 """ Auto-link new/updated entity into the business KG. Usage examples: python autolink.py \ --label Department --key id --value ENG \ --mode create # directly creates relationships python autolink.py \ --label Product --key id --value "PRD-42" \ --mode propose # writes LinkProposal nodes instead of edges (review first) """ import argparse, statistics, re, datetime from typing import List, Dict, Tuple from neo4j import GraphDatabase import yaml import numpy as np # ----------------------------- # Config # ----------------------------- ENTITY_TARGET_LABELS = [ "Company","Department","Team","Employee", "Product","Service","Project","Process", "Policy","Technology","Supplier","Customer" ] # Thresholds (tune these): TOPK_VECTOR = 60 # how many nearest chunks to pull globally AGG_MIN_MATCH_CHUNKS = 5 # min # of matched chunks pointing to a candidate entity SIM_MIN = 0.25 # min similarity (higher is closer; Neo4j returns cosine similarity if configured) ABOUT_MIN_SCORE = 0.35 # weighted score threshold to assert :ABOUT MENTION_MIN_COUNT = 3 # minimum chunk-mentions to assert :ABOUT via mentions # Mention detection normalization WORD_BOUNDARY = re.compile(r"\b", re.UNICODE) # ----------------------------- # Helpers # ----------------------------- def now_iso(): return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat() def avg_vec(vectors: List[List[float]]) -> List[float]: if not vectors: return [] arr = np.array(vectors, dtype=float) # normalize each then average -> robust centroid for cosine # (if your embeddings are already normalized, simple mean is fine) norms = np.linalg.norm(arr, axis=1, keepdims=True) norms[norms==0] = 1.0 arr = arr / norms centroid = arr.mean(axis=0) # return normalized centroid c_norm = np.linalg.norm(centroid) if c_norm > 0: centroid = centroid / c_norm return centroid.tolist() def parse_props_to_key(entity) -> Tuple[str,str]: # Try to discover a likely unique key among common names for k in ["id","employee_id","sku","name","key","code"]: if k in entity: return k, str(entity[k]) # Fallback return "internal_id", str(entity.id) # ----------------------------- # Neo4j # ----------------------------- class Neo: def __init__(self, uri, user, password, database="neo4j"): self.driver = GraphDatabase.driver(uri, auth=(user, password)) self.database = database def close(self): self.driver.close() def run(self, cypher, **params): with self.driver.session(database=self.database) as s: return list(s.run(cypher, **params)) def get_entity_and_chunks(self, label, key, value): cypher = f""" MATCH (e:{label} {{ {key}: $value }})-[:HAS_DOCUMENT]->(:Document)-[:HAS_CHUNK]->(c:Doc_Chunk) RETURN e AS e, c.embedding AS emb, c.chunk_id AS chunk_id, c.text AS text """ rows = self.run(cypher, value=value) if not rows: return None, [] entity = rows[0]["e"] chunks = [{"embedding": r["emb"], "chunk_id": r["chunk_id"], "text": r["text"]} for r in rows] return entity, chunks def vector_neighbors(self, centroid: List[float], k: int): cypher = """ CALL db.index.vector.queryNodes('chunk_embedding_idx', $k, $q) YIELD node, score MATCH (node)<-[:HAS_CHUNK]-(d:Document)<-[:HAS_DOCUMENT]-(ent) RETURN ent AS entity, labels(ent) AS labels, id(ent) AS ent_id, d.title AS doc, node.chunk_id AS chunk_id, score AS sim """ return self.run(cypher, q=centroid, k=k) def catalog_entity_names(self, exclude_ent_id: int) -> Dict[str, List[Dict]]: # Map normalized names to candidate entities (to support collisions) cypher = f""" MATCH (ent) WHERE id(ent) <> $eid AND size([x IN labels(ent) WHERE x IN $labels]) > 0 WITH ent, labels(ent) AS labs WITH ent, labs, coalesce(ent.name, ent.title, ent.id, ent.code, ent.employee_id) AS nm WHERE nm IS NOT NULL RETURN id(ent) AS eid, labs AS labels, nm AS name """ rows = self.run(cypher, eid=exclude_ent_id, labels=ENTITY_TARGET_LABELS) index = {} for r in rows: nm = str(r["name"]).strip() if not nm: continue key = nm.lower() index.setdefault(key, []).append({"eid": r["eid"], "labels": r["labels"], "name": nm}) return index def add_related_edge(self, src_id: int, tgt_id: int, method: str, score: float, top_evidence: List[str], mode: str): if mode == "propose": cypher = """ MATCH (s) WHERE id(s)=$sid MATCH (t) WHERE id(t)=$tid MERGE (p:LinkProposal {proposal_id: apoc.create.uuid()}) SET p.created_ts = datetime($now), p.method = $method, p.score = $score, p.evidence = $evidence MERGE (s)-[:PROPOSES]->(p)-[:TO]->(t) RETURN p """ else: cypher = """ MATCH (s) WHERE id(s)=$sid MATCH (t) WHERE id(t)=$tid MERGE (s)-[r:RELATED_TO]->(t) ON CREATE SET r.first_seen = datetime($now) SET r.method = $method, r.last_seen = datetime($now), r.score = coalesce(r.score,0.0) + $score, r.evidence = $evidence RETURN r """ self.run(cypher, sid=src_id, tid=tgt_id, now=now_iso(), method=method, score=float(score), evidence=top_evidence) def add_about_edge(self, src_id: int, tgt_id: int, reason: str, score: float, mode: str): if mode == "propose": cypher = """ MATCH (s) WHERE id(s)=$sid MATCH (t) WHERE id(t)=$tid MERGE (p:LinkProposal {proposal_id: apoc.create.uuid()}) SET p.created_ts = datetime($now), p.method = $reason, p.score = $score MERGE (s)-[:PROPOSES]->(p)-[:TO_ABOUT]->(t) """ else: cypher = """ MATCH (s) WHERE id(s)=$sid MATCH (t) WHERE id(t)=$tid MERGE (s)-[r:ABOUT]->(t) ON CREATE SET r.first_seen = datetime($now) SET r.reason = $reason, r.score = $score, r.last_seen = datetime($now) """ self.run(cypher, sid=src_id, tid=tgt_id, now=now_iso(), reason=reason, score=float(score)) def chunk_mentions_edge(self, chunk_id: str, tgt_id: int, count: int, mode: str): if mode == "propose": cypher = """ MATCH (c:Doc_Chunk {chunk_id:$cid}) MATCH (t) WHERE id(t)=$tid MERGE (p:LinkProposal {proposal_id: apoc.create.uuid()}) SET p.created_ts = datetime($now), p.method = 'mention', p.mentions = $count, p.chunk_id = $cid MERGE (c)-[:PROPOSES]->(p)-[:TO]->(t) """ else: cypher = """ MATCH (c:Doc_Chunk {chunk_id:$cid}) MATCH (t) WHERE id(t)=$tid MERGE (c)-[r:MENTIONS]->(t) ON CREATE SET r.first_seen=datetime($now), r.count=$count SET r.last_seen=datetime($now), r.count = coalesce(r.count,0) + $count """ self.run(cypher, cid=chunk_id, tid=tgt_id, now=now_iso(), count=int(count)) # ----------------------------- # Mention detection (simple exact name match, case-insensitive) # ----------------------------- def find_mentions(text: str, entity_name: str) -> int: # approximate word-boundary match; tweak for punctuation/camel-case as needed # quick and safe: count occurrences of the lowercased name in lowercased text with boundaries t = text.lower() n = entity_name.lower() # strict token boundary pattern = r'(?<!\w)' + re.escape(n) + r'(?!\w)' return len(re.findall(pattern, t)) # ----------------------------- # Main pipeline # ----------------------------- def main(): ap = argparse.ArgumentParser() ap.add_argument("--label", required=True) ap.add_argument("--key", required=True) ap.add_argument("--value", required=True) ap.add_argument("--mode", choices=["create","propose"], default="create") ap.add_argument("--config", default="config.yaml") args = ap.parse_args() with open(args.config,"r",encoding="utf-8") as f: cfg = yaml.safe_load(f) neo = Neo(cfg["neo4j"]["uri"], cfg["neo4j"]["user"], cfg["neo4j"]["password"], cfg["neo4j"].get("database","neo4j")) try: # 1) Load new entity and its chunks entity, chunks = neo.get_entity_and_chunks(args.label, args.key, args.value) if entity is None: raise SystemExit(f"No entity found: {args.label} {{{args.key}: {args.value}}}") src_id = entity.id if not chunks: print("[INFO] Entity has no chunks; nothing to link.") return # 2) Build centroid & pull nearest neighbors globally centroid = avg_vec([c["embedding"] for c in chunks if c["embedding"]]) if not centroid: print("[WARN] No embeddings found on chunks.") return rows = neo.vector_neighbors(centroid, TOPK_VECTOR) # Group by target entity agg: Dict[int, Dict] = {} for r in rows: ent_id = r["ent_id"] if ent_id == src_id: # ignore self continue agg.setdefault(ent_id, {"labels": r["labels"], "hits": []}) agg[ent_id]["hits"].append({"chunk_id": r["chunk_id"], "sim": float(r["sim"]), "doc": r["doc"]}) # 3) Score candidates (vector) vector_candidates = [] for eid, data in agg.items(): sims = [h["sim"] for h in data["hits"] if h["sim"] >= SIM_MIN] if len(sims) < AGG_MIN_MATCH_CHUNKS: continue score = float(np.mean(sorted(sims, reverse=True)[:10])) # top-10 average top_evidence = [h["chunk_id"] for h in sorted(data["hits"], key=lambda x: x["sim"], reverse=True)[:5]] vector_candidates.append((eid, score, top_evidence)) vector_candidates.sort(key=lambda x: x[1], reverse=True) # 4) Mentions-based detection name_index = neo.catalog_entity_names(exclude_ent_id=src_id) mention_counts: Dict[int,int] = {} # Build a compact index for fast lookup for ch in chunks: text = ch["text"] or "" if not text.strip(): continue # very fast path: try exact names in our catalog for nm_lower, ents in name_index.items(): c = find_mentions(text, nm_lower) # counts are small; acceptable if c > 0: for ent in ents: mention_counts[ent["eid"]] = mention_counts.get(ent["eid"], 0) + c neo.chunk_mentions_edge(ch["chunk_id"], ent["eid"], c, args.mode) # 5) Create/Propose edges based on scores # a) RELATED_TO via vector similarity for eid, score, evidence in vector_candidates: neo.add_related_edge(src_id, eid, method="vector", score=score, top_evidence=evidence, mode=args.mode) # b) ABOUT if strong vector score OR enough mentions # A simple fusion rule; tune as you like. # Build a quick dict for vector scores: vec_score_by_id = {eid:score for (eid,score,_) in vector_candidates} target_ids = set(vec_score_by_id.keys()).union(mention_counts.keys()) for eid in target_ids: vscore = vec_score_by_id.get(eid, 0.0) mcount = mention_counts.get(eid, 0) # weighted condition fused = 0.7*vscore + 0.3*min(1.0, mcount/10.0) if fused >= ABOUT_MIN_SCORE or mcount >= MENTION_MIN_COUNT: reason = f"fused={fused:.3f}; v={vscore:.3f}; m={mcount}" neo.add_about_edge(src_id, eid, reason=reason, score=fused, mode=args.mode) print(f"[DONE] Linked/proposed {len(vector_candidates)} vector-related targets; {len(mention_counts)} mention targets.") finally: neo.close() if __name__ == "__main__": main()

3) Optional: Review + apply proposals (if you ran with --mode propose)

List proposals for a given source entity

MATCH (s {id:$srcId})-[:PROPOSES]->(p:LinkProposal)-[rel]->(t) RETURN id(p) AS pid, type(rel) AS to_type, labels(t) AS tgt_labels, coalesce(t.id,t.name) AS tgt_key, p.method, p.score, p.evidence, p.mentions, p.chunk_id, p.created_ts ORDER BY p.score DESC, p.created_ts DESC;

Accept a proposal (turn into an edge and delete the proposal)

// Example for RELATED_TO proposal MATCH (s {id:$srcId})-[:PROPOSES]->(p:LinkProposal)-[:TO]->(t) WITH s,t,p MERGE (s)-[r:RELATED_TO]->(t) ON CREATE SET r.first_seen = datetime() SET r.method = p.method, r.score = coalesce(r.score,0) + coalesce(p.score,0), r.last_seen = datetime(), r.evidence = p.evidence DETACH DELETE p;

Accept an ABOUT proposal

MATCH (s {id:$srcId})-[:PROPOSES]->(p:LinkProposal)-[:TO_ABOUT]->(t) MERGE (s)-[r:ABOUT]->(t) ON CREATE SET r.first_seen = datetime() SET r.reason = p.method, r.score = coalesce(p.score,0), r.last_seen = datetime() DETACH DELETE p;

4) What relationships does this actually add?

  • Chunk-level:
    (:Doc_Chunk)-[:MENTIONS]->(:Entity) when an entity’s name appears in the text.

  • Entity-level:
    (:NewEntity)-[:RELATED_TO]->(:ExistingEntity) when centroid-based vector matches are strong across multiple chunks (evidence and score recorded).
    (:NewEntity)-[:ABOUT]->(:ExistingEntity) when the fused (vector + mentions) strength indicates “this node is mainly about that target.”

You can trivially split :RELATED_TO into more specific ones (e.g., SUPPLIER_OF, PART_OF, USES, DEPENDS_ON) once you add lightweight pattern rules (filename heuristics, keyword lists, or a classifier). If you want, I can add a rule pack (YAML) to map keywords/labels to specific relationship types.


Fast tweaks you might want

  • Swap the name-matcher for Aho-Corasick (rapid dictionary matches) if you have thousands of entities.

  • Add role-specific rules (e.g., if NewEntity:Department and target is Process, create (:Department)-[:OWNS]->(:Process) instead of :RELATED_TO).

  • Include time-bounded edges with valid_from/valid_to for process changes.

If you tell me your preferred relationship vocabulary (e.g., OWNS, USES, RESPONSIBLE_FOR, SUPPLIER_OF, SERVED_BY…), I’ll plug it into the auto-linker so it creates typed edges instead of a generic RELATED_TO.


Provide a step-by-step guide for building a business knowledge graph for small businesses and large

Below is a practical, end-to-end playbook for building a business knowledge graph (KG). It’s split into two tracks—Small Business (SB) and Large/Enterprise (ENT)—with shared steps and scale-specific guidance, so you can start lean and grow without rework.


0) Outcomes first (define why the KG exists)

Pick 3–5 priority use cases. Examples:

  • Revenue: unify customers → deals → invoices → churn signals

  • Operations: supplier → parts → equipment → downtime root-cause

  • Employee Experience: onboarding touchpoints → IT tickets → sentiment

  • Compliance: policies → processes → systems → evidence docs

For each use case, write a 1-sentence value statement + 3 KPIs (e.g., “reduce onboarding wait time by 30%”, “cut time-to-answer from 2h to 10m”).

SB: 1–2 use cases, aim for a useful demo in 2 weeks.
ENT: 3–5 use cases, each owned by a business sponsor.


1) Choose a minimal, scalable tech stack

  • Graph DB: Neo4j (developer-friendly, mature tooling).

  • Embeddings: SentenceTransformers or Ollama for local; can swap later.

  • Pipelines: Python for ETL/upserts; Watchdog for file watching.

  • Search: Neo4j native vector index (or pgvector if you co-store in SQL).

  • Optional UI: Neo4j Bloom, GraphApp (GraphQL), or a small FastAPI + React.

SB: Single Neo4j instance + Python scripts.
ENT: Neo4j AuraDS / Enterprise cluster, SSO, secrets manager, CI/CD, lineage.


2) Canonical model (start small, extend safely)

Core node labels (minimum viable):

  • Company, Department, Team, Employee

  • Customer, Supplier, Product, Service, Project, Process, Policy

  • Document, Doc_Chunk (text chunks with embedding)

  • (Optional early) Touchpoint, ExperienceEvent, DigitalTool, ITIncident, Skill

Core relationships (verbs):

  • (:Employee)-[:MEMBER_OF]->(:Team)

  • (:Team)-[:PART_OF]->(:Department)

  • (:Department)-[:PART_OF]->(:Company)

  • (:Customer)-[:PURCHASED]->(:Product|:Service)

  • (:Supplier)-[:PROVIDES]->(:Product|:Service)

  • (:Department)-[:OWNS]->(:Process)

  • (:Process)-[:USES]->(:DigitalTool)

  • (:Policy)-[:GOVERS]->(:Process)

  • (:Entity)-[:HAS_DOCUMENT]->(:Document)-[:HAS_CHUNK]->(:Doc_Chunk)

Key properties:

  • Every entity gets a stable unique key (e.g., id, employee_id, sku, name if unique).

  • Timestamps: created_ts, updated_ts (UTC).

  • Text chunks: chunk_id, ord, text, sha256, embedding:[…].


3) Initialize the graph (constraints + indexes)

// Uniques (extend as you add labels) CREATE CONSTRAINT company_id IF NOT EXISTS FOR (n:Company) REQUIRE n.id IS UNIQUE; CREATE CONSTRAINT dept_id IF NOT EXISTS FOR (n:Department) REQUIRE n.id IS UNIQUE; CREATE CONSTRAINT team_id IF NOT EXISTS FOR (n:Team) REQUIRE n.id IS UNIQUE; CREATE CONSTRAINT emp_eid IF NOT EXISTS FOR (n:Employee) REQUIRE n.employee_id IS UNIQUE; CREATE CONSTRAINT cust_id IF NOT EXISTS FOR (n:Customer) REQUIRE n.id IS UNIQUE; CREATE CONSTRAINT supp_id IF NOT EXISTS FOR (n:Supplier) REQUIRE n.id IS UNIQUE; CREATE CONSTRAINT prod_id IF NOT EXISTS FOR (n:Product) REQUIRE n.id IS UNIQUE; CREATE CONSTRAINT serv_id IF NOT EXISTS FOR (n:Service) REQUIRE n.id IS UNIQUE; CREATE CONSTRAINT proc_id IF NOT EXISTS FOR (n:Process) REQUIRE n.id IS UNIQUE; CREATE CONSTRAINT doc_id IF NOT EXISTS FOR (d:Document) REQUIRE d.doc_id IS UNIQUE; CREATE CONSTRAINT chunk_id IF NOT EXISTS FOR (c:Doc_Chunk) REQUIRE c.chunk_id IS UNIQUE; // Vector index (adjust dimension to your model, e.g., 384 for all-MiniLM-L6-v2) CALL db.index.vector.createNodeIndex('chunk_embedding_idx','Doc_Chunk','embedding',384,'cosine');

4) Stand up ingestion (structured + unstructured)

You already have two solid building blocks (from our prior steps):

  1. ingest_kg.py – upserts entities and attaches documents → chunks → embeddings → Doc_Chunk.

  2. autolink.py – finds where new info fits via vector neighbors + name mentions and adds RELATED_TO, MENTIONS, ABOUT.

SB workflow (manual-first, then semi-auto):

  • Export CSVs from CRM/ERP/HRIS → Python loader writes Company/Customer/Product etc.

  • Put PDFs/Docs in a watched folder; ingest_kg.py watch attaches them automatically.

  • Run autolink.py after each batch to stitch new facts to existing nodes.

ENT workflow (pipeline-first):

  • Set up nightly ELT (dbt/Airflow) to curated tables; Python readers upsert via MERGE.

  • Connect DMS/CDC streams for near-real-time deltas (e.g., Kafka → Python consumer).

  • Scan DMS events and call autolink.py on affected entities.


5) Map data sources to the model (quick catalog)

SourceEntitiesRelationships
CRM (e.g., customers, deals)Customer, Company, ProductCustomer-PURCHASED->Product, Customer-PART_OF->Company
ERP/InventoryProduct, SupplierSupplier-PROVIDES->Product
HRISEmployee, Department, Team, PolicyEmployee-MEMBER_OF->Team, Team-PART_OF->Department, Policy-GOVERS->Process
ITSM/DEXDigitalTool, ITIncidentProcess-USES->DigitalTool, ExperienceEvent-AT_TOUCHPOINT->Touchpoint
Docs (contracts, SOPs)Document, Doc_ChunkEntity-HAS_DOCUMENT->Document

Start with 1 table + 1 folder and grow.


6) Auto-stitch new info (“see where it fits”)

  • Compute a centroid embedding for the new node’s chunks.

  • Query the vector index for nearest chunks across the graph.

  • Aggregate by target entity → score + evidence.

  • Scan chunks for name mentions of known entities.

  • Create edges:

    • :MENTIONS at chunk level,

    • :RELATED_TO (semantic proximity),

    • :ABOUT when fused score passes threshold.

(This is exactly what autolink.py does; tune thresholds to your data.)


7) Useful queries (show value fast)

A) “What do we know about ACME Robotics?”

MATCH (c:Company {id:$id}) OPTIONAL MATCH (c)-[:PART_OF|OWNS|GOVERS|USES|RELATED_TO|ABOUT*1..2]-(x) OPTIONAL MATCH (c)-[:HAS_DOCUMENT]->(d:Document) RETURN c, collect(DISTINCT labels(x)+[coalesce(x.id,x.name)]) AS neighborhood, count(DISTINCT d) AS docs;

B) Top related suppliers to a product line (via semantic + mentions)

MATCH (p:Product {id:$pid})-[:RELATED_TO|ABOUT]->(s:Supplier) RETURN s.id AS supplier, s.name AS name, coalesce(r.score,0) AS score ORDER BY score DESC LIMIT 10;

C) RAG-style search across the business

// Assume you computed $queryEmbedding in your app CALL db.index.vector.queryNodes('chunk_embedding_idx', 8, $queryEmbedding) YIELD node, score MATCH (node)<-[:HAS_CHUNK]-(d:Document)<-[:HAS_DOCUMENT]-(e) RETURN e, d.title AS doc, node.ord AS chunk_no, score, node.text ORDER BY score DESC;

D) Employee onboarding friction (last 90 days)

MATCH (x:ExperienceEvent)-[:AT_TOUCHPOINT]->(tp:Touchpoint)<-[:HAS_TOUCHPOINT]-(:LifecycleStage {name:'Onboard'}) WHERE x.ts >= datetime() - duration('P90D') RETURN tp.name, count(*) AS events, round(avg(x.experience_score),3) AS avg_score ORDER BY avg_score ASC LIMIT 10;

8) Data governance, security, and PII

  • PII isolation: Store sensitive text in separate nodes (e.g., Sentiment), and secure edges using role-based access (ABAC where possible).

  • Provenance: For every node/edge from ETL, stamp source_system, source_key, ingested_ts, hash.

  • Versioning (ENT): Soft-version entities with valid_from/valid_to if policies/processes change over time.

  • Quality: Add is_canonical flags and a golden record process for dedupe (e.g., Customer<-[:SAME_AS]-Aliases).


9) Maturity path (keep the footprint small, add layers intentionally)

M0 (Week 1–2):

  • One entity family (e.g., Customer/Product) + documents.

  • Ingest + vector search + 5 hero queries.

M1 (Month 1–2):

  • Add Supplier/Process/Policy.

  • Autolink semantic edges, add simple dashboards.

  • Start capturing KPIs for your use cases.

M2 (Quarter 2):

  • Introduce Employee Experience or Compliance slice.

  • Add change data capture (CDC) + lineage + testable ETL.

M3 (Quarter 3+):

  • Entity linking at scale (Aho-Corasick/dictionary + classifier).

  • Role-specific edges (OWNS, RESPONSIBLE_FOR, PROVIDES, DEPENDS_ON) via rule packs.

  • ABAC/SSO, automated term-of-use, GDPR workflows.


10) Side-by-side build plan (Small vs Enterprise)

Small Business (team of 1–3)

Week 0–1

  1. Pick one use case (e.g., “faster onboarding answers”).

  2. Install Neo4j Desktop, clone minimal scripts.

  3. Create constraints + vector index (above).

  4. Ingest one CSV (customers) + one folder of SOP PDFs.

Week 2
5. Run autolink.py after each ingest; verify edges.
6. Add 5–10 Cypher queries; show a mini “Answers” page.
7. Start a watch folder for new docs.

Month 2
8. Add a second data source (ERP or HRIS).
9. Add 2–3 dashboards (KPIs for the chosen use case).
10. Document a repeatable “import checklist”.

Ops footprint: single VM, daily backup, manual secrets.


Enterprise (team of 4–10)

Phase 1 (0–6 weeks)

  1. Charter + use-case owners + data stewards.

  2. Stand up Neo4j Enterprise/AuraDS, SSO, secrets.

  3. Create base schema + naming conventions + code repo.

  4. Build dbt/Airflow ELT to curated tables (CRM, ERP, HRIS).

  5. Ingest docs from SharePoint/S3; run auto-link nightly.

  6. Publish read-only GraphQL for consumers; onboard 1–2 apps.

Phase 2 (7–12 weeks)
7. Add CDC (Kafka) for near-real-time updates.
8. Introduce rule packs (keyword → relationship type).
9. Security tiers (PII rings), audit logging, lineage/Atlas.
10. SLA dashboards (data freshness, link accuracy, query latency).

Phase 3 (Quarter 2+)
11. Expand to compliance or product lifecycle.
12. Add model-assisted mapping (NER/classifier), human review queue.
13. Cost controls: archiving, TTL on low-value chunks, compaction jobs.


11) Naming, IDs, and conventions (avoid messy rework)

  • IDs: snake_case, immutable, system-of-record prefix (e.g., crm:12345).

  • Labels: singular, PascalCase (Customer, not Customers).

  • Properties: snake_case, explicit types (date, datetime).

  • Edges: verbs in ALL_CAPS (PURCHASED, OWNS, USES, GOVERS, PROVIDES, DEPENDS_ON).

  • Timestamps: always UTC ISO-8601 (e.g., 2025-09-01T17:04:03Z).


12) Example upsert patterns (copy-paste)

Upsert a Department and link to Company

MERGE (co:Company {id:$company_id}) ON CREATE SET co.name = $company_name, co.created_ts = datetime() SET co.updated_ts = datetime() MERGE (d:Department {id:$dept_id}) ON CREATE SET d.name = $dept_name, d.created_ts = datetime() SET d.updated_ts = datetime() MERGE (d)-[:PART_OF]->(co);

Attach a document and chunks (from your Python ETL)

MERGE (e:Process {id:$proc_id}) MERGE (doc:Document {doc_id:$doc_id}) ON CREATE SET doc.title=$title, doc.path=$path, doc.created_ts=datetime() SET doc.ext=$ext, doc.sha256=$sha, doc.updated_ts=datetime() MERGE (e)-[:HAS_DOCUMENT]->(doc) WITH doc UNWIND $chunks AS ch MERGE (c:Doc_Chunk {chunk_id: ch.chunk_id}) ON CREATE SET c.created_ts=datetime() SET c.ord=ch.ord, c.text=ch.text, c.sha256=ch.sha256, c.embedding=ch.embedding, c.updated_ts=datetime() MERGE (doc)-[:HAS_CHUNK]->(c);

13) Pro tips (that save you pain)

  • Evidence on edges: store score, method, top_chunk_ids—it makes audits easy.

  • Don’t over-model early: start with 10–15 labels; add more after queries demand them.

  • Test data quality as code: tiny PyTest suite that runs Cypher checks (e.g., “no orphan Team nodes”).

  • Keep embeddings small first: all-MiniLM (384-D) is fast, good enough for doc stitching.

  • Refresh cadence: nightly compaction of duplicate chunks + recalculated centroids.

  • Bench latency: vector top-k ≤ 100; paginate; pre-compute centroids per entity.


Want me to package this as a repo?

I can generate a ready-to-run folder with:

  • schema.cypher (all constraints/indexes)

  • ingest_kg.py and autolink.py

  • load_structured.py (CSV → nodes/edges)

  • config.yaml templates (SB vs ENT)

  • examples/queries.cypher (dashboards & RAG)

Or, tell me your first target use case and I’ll tailor the minimal schema + ingest scripts to it.


https://chatgpt.com/c/68b526b0-68e8-8323-a356-f065f1c4f701

From <https://chatgpt.com/>  ChatGPT 5

Comments

Popular posts from this blog

#2892 Example of ChatGPT o1 Prompt

#3105 Federal and State Spending Idiocy