RAGSpine
Guides

Service

The HTTP + async-queue layer — a FastAPI app factory with dependency injection, the RQ task queue behind a TaskQueue Protocol, worker-owned ingestion jobs, the FAQ short-circuit cache, and ServiceConfig (env RAGSPINE_*).

The service domain (src/ragspine/service/) wraps the engine in an HTTP API and an async ingestion pipeline. It owns five things: ServiceConfig (env-driven), the FastAPI app, the RQ task queue, worker-owned ingestion jobs, and the FAQ short-circuit cache.

One invariant governs the whole layer:

The FAQ cache sits in front of the anti-fabrication guard — so it must conservatively exclude anything a vetted short-circuit could get wrong: structured-numeric / competitor / real-time / expired / disabled / RESTRICTED questions never short-circuit. See FAQ short-circuit.

Layout

config.py

ServiceConfig

config.py assembles a frozen ServiceConfig from the environment via ServiceConfig.from_env(). Every value is optional; the defaults run the offline, deterministic path with no API key. The full variable table lives in Configuration; the most-used:

FieldEnv varDefault
db_pathRAGSPINE_DB_PATHdata/fact_metric.db
chunk_db_pathRAGSPINE_CHUNK_DB_PATHNone
provider_typeRAGSPINE_PROVIDERmock (mock | anthropic)
redis_urlRAGSPINE_REDIS_URLredis://localhost:6379/0
faq_sourceRAGSPINE_FAQ_SOURCENone (→ empty cache)
allowed_upload_rootRAGSPINE_ALLOWED_UPLOAD_ROOTNone

config.py also provides build_provider(config) (mock → MockProvider, anthropic → AnthropicProvider), the context managers open_fact_store(config) and open_narrative_retriever(config, provider), and validate_ingest_path(path, config, *, suffixes) — which enforces the allowed_upload_root containment (rejecting path traversal) and a suffix whitelist, raising PathNotAllowedError on violation.

FastAPI app — factory + dependency injection

api/app.py exposes an app factory:

def create_app(
    config: ServiceConfig | None = None,
    *,
    provider: LLMProvider | None = None,
    queue: TaskQueue | None = None,
    faq_cache: FAQCache | None = None,
) -> FastAPI: ...

Anything not passed is built from config (ServiceConfig.from_env(), build_provider, RQQueue(config.redis_url), and FAQCache.from_file(config.faq_source) or FAQCache.empty()). The assembled instances are stored on app.state, and api/dependencies.py exposes them as FastAPI dependencies (get_config, get_provider, get_queue, get_faq_cache) — overridable via app.dependency_overrides in tests. routes.py wires them as Annotated aliases (ConfigDep, ProviderDep, QueueDep, FAQCacheDep).

Endpoints

All routes live on a module-level router in api/routes.py:

Prop

Type

Request/response models (api/schemas.py, Pydantic v2) include AskRequest(question, reference_date), AskResponse(request_id, answer, route, answer_kind, clarification, sources, tool_status_summary, cache), the two ingest-job request models, JobSubmitResponse(job_id), and JobStatusResponse(id, status, result, error).

The two ingest routes only accept whitelisted suffixes — structured: .xlsx / .xlsm / .pptx / .pdf; narrative: .pptx / .pdf.

Task queue — the TaskQueue Protocol

tasks/task_queue.py defines the queue seam:

class TaskQueue(Protocol):
    def enqueue(self, func_path: str, payload: dict, *, job_id=None,
                timeout=None, max_retries=0, result_ttl=None,
                failure_ttl=None) -> str: ...
    def get(self, job_id: str) -> JobStatus | None: ...

FakeQueue

Synchronous, in-memory. enqueue runs the job inline (failures recorded as JOB_FAILED, not re-raised). Used in tests and offline demos — no Redis needed.

RQQueue

Production queue over RQ + Redis (both lazy-imported). enqueue maps to rq.Queue.enqueue; get fetches via rq.job.Job.fetch and maps status. Adds ping() for /readyz.

Status constants: JOB_QUEUED, JOB_STARTED, JOB_FINISHED, JOB_FAILED. A JobStatus carries id, status, result, error (error shape {type, message, stage, retryable}).

Ingestion jobs — worker-owned stores

tasks/jobs.py holds the two job functions the queue resolves by dotted path (run_structured_ingest_job, run_narrative_ingest_job). Each one is self-contained: it builds a ServiceConfig from its payload, defensively re-validates the file path(s) (a violation raises JobError(stage="validation")), opens its own SQLite stores, runs the ingest, and closes everything in finally.

Jobs never reuse the caller's database connections — they own their stores so they can run out-of-process in a worker. Their report serializers (ingest_report_to_dict, narrative_report_to_dict) emit counts and statuses only, never raw fact values or chunk text.

FAQ short-circuit cache

faq/faq_cache.py is the SME-vetted Q→A cache that can bypass the LLM entirely. FAQCache builds a normalized index over questions and aliases (NFKC + casefold + whitespace fold + trailing-punctuation strip); construct it with FAQCache.from_file(path) or FAQCache.empty(). The core method is a pure function:

def lookup(self, question: str, *, reference_date=None) -> FAQHit | None: ...

Its exclusion rules are front-loaded — any match returns None (a miss) — in this order:

Out-of-scope / competitorclarify_scope mode is out_of_scope_entity, or intent.external_entity is set.

Structured-numericintent.route == "structured", or any of metric / entity / period (single or multi-value) is present.

Real-time — the normalized question contains a real-time cue (e.g. time words, “股价”, “price now”).

No exact match — after all the above, an exact normalized lookup misses.

Expired / disabled — the item is disabled or outside its [valid_from, valid_until] window.

RESTRICTED — the item's sensitivity (case-insensitively) equals RESTRICTED.

Only when none of these fire does lookup return a FAQHit (item_id, version, answer, source, cache_type="faq"). In routes.py, /v1/ask runs the FAQ lookup first; a hit returns immediately with route="faq" and never touches the provider, fact store, or retriever — so it sits cleanly in front of the anti-fabrication guard, and its conservative exclusions are what keep that safe.

Run it

RAGSPINE_DB_PATH=data/fact_metric.db \
RAGSPINE_CHUNK_DB_PATH=data/narrative.db \
.venv/bin/python scripts/run_server.py --host 0.0.0.0 --port 8000
# needs Redis; --queue defaults to "ragspine-ingest" (matches RQQueue)
RAGSPINE_REDIS_URL=redis://localhost:6379/0 .venv/bin/python scripts/run_worker.py

Both require the [service] extra (pip install -e ".[service]"). The server-side enqueue queue name and the worker-side consume queue name share the same default (ragspine-ingest), so they agree out of the box.

See also

On this page