Skip to content

Graph Store

The knowledge graph storage and query layer.

GraphStore

GraphStore

In-memory knowledge graph with event emission.

Events are dispatched through an EventBus (preferred) or a legacy asyncio.Queue. Since v0.1.0, NeuroWeave is fully async — the agent, extraction pipeline, and visualization server all share the same asyncio event loop.

node_count property

node_count: int

edge_count property

edge_count: int

add_node

add_node(node: Node) -> Node

Add a node to the graph. If a node with the same ID exists, update it.

add_edge

add_edge(edge: Edge) -> Edge

Add a directed edge between two existing nodes.

Raises:

Type Description
KeyError

If source or target node doesn't exist.

get_node

get_node(node_id: str) -> dict[str, Any] | None

Return node data dict or None if not found.

to_dict

to_dict() -> dict[str, Any]

Serialize the full graph to a dict suitable for JSON/visualization.

set_event_bus

set_event_bus(bus: Any) -> None

Attach an EventBus to receive graph mutation events (preferred).

When an EventBus is set, events are emitted through it. The legacy asyncio.Queue is still supported as a fallback.

set_event_queue

set_event_queue(q: Queue[GraphEvent]) -> None

Attach an asyncio queue to receive graph mutation events (legacy).

Query Engine

query_subgraph

query_subgraph async

query_subgraph(store: Any, *, entities: list[str] | None = None, relations: list[str] | None = None, min_confidence: float = 0.0, max_hops: int = 1) -> QueryResult

Query the graph for a filtered subgraph.

The query works in two phases:

  1. Seed resolution — find nodes matching the entities names (case-insensitive). If no entities are specified, all nodes are seeds (whole-graph query).

  2. Hop traversal — starting from seed nodes, walk max_hops hops through the graph (following edges in either direction). Collect all reachable nodes.

After traversal, edges are filtered: - Only edges between collected nodes are included. - If relations is specified, only edges with matching relation types are included. - If min_confidence > 0, only edges meeting the threshold are included.

Parameters:

Name Type Description Default
store Any

The graph store to query (async interface).

required
entities list[str] | None

Entity names to start from (case-insensitive match). None = all nodes.

None
relations list[str] | None

Relation types to include. None = all relations.

None
min_confidence float

Minimum edge confidence threshold (0.0 to 1.0).

0.0
max_hops int

How many hops to traverse from seed nodes (0 = seeds only).

1

Returns:

Type Description
QueryResult

QueryResult with matching nodes, edges, and metadata.

QueryResult

QueryResult dataclass

Result of a structured graph query.

Attributes:

Name Type Description
nodes list[dict[str, Any]]

List of matching node dicts (id, name, node_type, properties, ...).

edges list[dict[str, Any]]

List of matching edge dicts (id, source_id, target_id, relation, confidence, ...).

seed_node_ids list[str]

The node IDs that matched the initial entity filter (before hop traversal).

hops_traversed int

The actual max_hops value used.

query_params dict[str, Any]

The original query parameters for transparency/debugging.

nodes class-attribute instance-attribute

nodes: list[dict[str, Any]] = field(default_factory=list)

edges class-attribute instance-attribute

edges: list[dict[str, Any]] = field(default_factory=list)

node_count property

node_count: int

edge_count property

edge_count: int

is_empty property

is_empty: bool

node_names

node_names() -> set[str]

Convenience: set of all node names in the result.

relation_types

relation_types() -> set[str]

Convenience: set of all relation types in the result.

to_dict

to_dict() -> dict[str, Any]

Serialize for JSON / API responses.

NL Query Planner

NLQueryPlanner

NLQueryPlanner

Translates natural language questions into structured graph queries.

Uses an LLM to interpret the question against the current graph schema, producing a QueryPlan that maps to query_subgraph() parameters.

Falls back to a broad whole-graph search if the LLM response is unparseable or the call fails.

Usage

planner = NLQueryPlanner(llm_client, graph_store) plan = await planner.plan("what does my wife like?") result = planner.execute(plan)

plan async

plan(question: str) -> QueryPlan

Translate a natural language question into a QueryPlan.

Parameters:

Name Type Description Default
question str

Free-text question about the knowledge graph.

required

Returns:

Type Description
QueryPlan

QueryPlan ready for execution. On LLM failure, returns a

QueryPlan

broad fallback plan (whole-graph search).

execute async

execute(plan: QueryPlan) -> QueryResult

Execute a QueryPlan against the graph store.

This is a thin wrapper around query_subgraph() that maps the plan's fields to the function's parameters.

Parameters:

Name Type Description Default
plan QueryPlan

A QueryPlan from plan().

required

Returns:

Type Description
QueryResult

QueryResult from the structured query engine.

query async

query(question: str) -> QueryResult

Convenience: plan + execute in one call.

Parameters:

Name Type Description Default
question str

Natural language question.

required

Returns:

Type Description
QueryResult

QueryResult from executing the generated plan.

QueryPlan

QueryPlan dataclass

A structured query plan produced by the NL planner.

This maps directly to the parameters of query_subgraph().

Attributes:

Name Type Description
entities list[str]

Entity names to start traversal from.

relations list[str] | None

Relation types to filter on (None = all).

min_confidence float

Minimum edge confidence threshold.

max_hops int

How many hops to traverse from seed entities.

reasoning str

LLM's brief explanation of its interpretation.

raw_response str

The raw LLM output (for debugging).

duration_ms float

Time taken for the LLM call.

is_broad_search: bool

True if this plan has no entity filter (whole-graph search).

Ingestion

ingest_extraction

ingest_extraction async

ingest_extraction(store: Any, result: ExtractionResult) -> dict[str, int]

Materialize an ExtractionResult into the graph store.

  • Entities are deduplicated by lowercase name (cross-session via store lookup).
  • When an existing node is reused, its properties are merged (new wins on conflict).
  • Relations are added as edges between resolved entity nodes.
  • Entities referenced in relations but missing from the entities list are auto-created as CONCEPT nodes (handles inconsistent LLM output).

Parameters:

Name Type Description Default
store Any

The graph store to write to (async interface).

required
result ExtractionResult

Extraction result from the pipeline.

required

Returns:

Type Description
dict[str, int]

Dict with counts: {"nodes_added": N, "edges_added": N, "edges_skipped": N}

Data Factories

make_node

make_node(name: str, node_type: NodeType, *, node_id: str | None = None, **properties: Any) -> Node

Convenience factory for creating a Node with an auto-generated ID.

make_edge

make_edge(source_id: str, target_id: str, relation: str, confidence: float, *, edge_id: str | None = None, **properties: Any) -> Edge

Convenience factory for creating an Edge with an auto-generated ID.