Pipeline
Pipeline-topology export — derive a static PipelineGraph from RAGSpine's real wiring and render it as Mermaid, DOT, or JSON, via the agent / retriever / service builders and the topology.py CLI.
The pipeline domain (src/ragspine/pipeline/) is RAGSpine's code-first answer to the one
thing graph frameworks (Dify, LangGraph) give you that a plain-Python engine otherwise
doesn't: a visual diagram of the pipeline. Instead of a graph DSL you author and then
hope matches reality, RAGSpine derives a static PipelineGraph from the real wiring and
emits it as Mermaid, DOT, or JSON.
It is small, leaf-level, and strictly read-only: it imports nothing from the orchestrator
(all introspection is getattr duck-typing), so it can never perturb the system it
describes.
Layout
graph.py is the zero-dependency value layer (frozen dataclasses + exporters);
topology.py holds the three builders that read live composition. The public API
re-exports six names: Node, Edge, PipelineGraph, agent_topology,
retriever_topology, service_topology.
The value model
graph.py defines three frozen dataclasses:
Prop
Type
PipelineGraph carries three exporters and a combiner:
| Method | Returns | Output |
|---|---|---|
to_mermaid(*, direction="TD") | str | A flowchart — node shape is chosen by kind (gate {}, store [(...)], channel ([...]), else a rectangle). |
to_dot() | str | A Graphviz digraph (rankdir=TB). |
to_dict() | dict | JSON-round-trippable {title, nodes[...], edges[...]}. |
merge(other, *, group=None) | PipelineGraph | Dedupe nodes by id (first wins), keep all edges; optionally tag added nodes with domain=group. |
Exports are deterministic and byte-identical across runs (stable declared order). The domain
grouping round-trips through to_dict, but in v1 to_mermaid / to_dot do not emit
subgraph/cluster blocks.
The three builders
topology.py derives a graph from live, duck-typed composition — a node appears only when
its component is actually wired.
agent_topology(*, narrative_retriever=None)
The full request flow: parse_intent → clarify_scope (consults SecurityGate via a data edge) → route diamond → structured / narrative / composite branches. The narrative nodes appear only when a narrative_retriever is injected.
retriever_topology(retriever)
The HybridRetriever sub-pipeline: prefilter → BM25 [+vector] [+multi-query] → RRF → top_k. The vector node appears iff retriever.embedding_backend is set; the multi-query node iff query_rewriter is set. Rerank is deliberately not in this subgraph.
service_topology(app)
The service topology: FAQ short-circuit upstream of the agent, plus the async ingestion path (routes → queue → jobs). Duck-typed on app.state.faq_cache and app.state.queue.
HybridRetriever.topology() (in the retrieval domain) is a thin
delegator into retriever_topology — so a configured retriever can render itself without the
pipeline package importing the orchestrator.
The CLI — scripts/topology.py
scripts/topology.py renders any of the three topologies offline and deterministically (it
builds default/mock assemblies — no Redis, no API key). Run it from the project root:
python scripts/topology.py # agent → Mermaid → stdout
python scripts/topology.py --which retriever --of dot
python scripts/topology.py --of json --out docs/generated/topology.jsonFlags:
--of {mermaid,dot,json}— output format (defaultmermaid).--which {agent,retriever,service}— which topology (defaultagent).--out PATH— write to a file (creates parent dirs); omit to print to stdout. Writing into the git-ignoreddocs/generated/keeps regenerated diagrams diffing cleanly.
Under the hood the CLI builds each topology from a default/offline assembly: retriever →
retriever_topology(HybridRetriever([])) (a pure-BM25 skeleton over an empty corpus);
service → create_app(...) with MockProvider / FakeQueue / empty FAQCache, then
service_topology(app); agent → agent_topology(narrative_retriever=object()) (a
sentinel so the narrative/composite branches show up).
Python API
from ragspine.pipeline import agent_topology
graph = agent_topology(narrative_retriever=object())
print(graph.to_mermaid()) # Mermaid flowchart string
print(graph.to_dot()) # Graphviz digraph string
import json
print(json.dumps(graph.to_dict(), ensure_ascii=False, indent=2))See also
Service
The HTTP + async-queue layer — a FastAPI app factory with dependency injection, the RQ task queue behind a TaskQueue Protocol, worker-owned ingestion jobs, the FAQ short-circuit cache, and ServiceConfig (env RAGSPINE_*).
Common
Cross-cutting primitives shared by every domain — the configurable company/domain profile, deterministic sensitivity classification, the dimension glossary, privacy-aware observability, and global path constants.