RAGSpine
Guides

Ingestion

IR/text → stores. Structured fact ingestion with a batch manifest ledger, narrative chunk ingestion, and an SME human review-queue state machine — all idempotent.

The ingestion domain takes extraction output (StyledGrid IR, narrative text) and writes it into the stores. It has three lanes: structured (numeric facts), narrative (document chunks), and review (the human-in-the-loop queue that catches everything the deterministic path is unsure about). Every lane is idempotent — re-running an ingest must not double-write.

Package: src/ragspine/ingestion/. Contract: src/ragspine/ingestion/CLAUDE.md.

Layout

Structured ingestion

structured/ingestion.py orchestrates one document end to end: extract → normalize (glossary) → color tags → upsert. There are two entry points:

  • ingest_excel(path, store, registry, queue, *, dry_run=False, extractor_version="xlsx_styled@1", manifest=None, batch_id=None) — xlsx-only.
  • ingest_file(path, store, registry, queue, *, dry_run=False, manifest=None, batch_id=None, valid_as_of=None, grid_extractor=None) — the unified multi-format dispatcher: it routes by suffix to the right extractor (xlsx / xlsm / pptx, or PDF via the router) and reuses the shared ingest logic.

Both return an IngestReport (a counts object, not raw numbers):

Prop

Type

Inside, each grid is color-tagged via the active mapping (apply_mapping), turned into Fact objects, and written with store.upsert_facts(...). Facts are stamped with their lineage — source_doc_id, source_locator, source_file_hash, extractor_version, mapping_version, and review_status=REVIEW_AUTO_APPROVED.

from ragspine.ingestion.structured.ingestion import ingest_file

report = ingest_file("report.xlsx", store, registry, queue)
print(report.n_facts_ingested, report.n_enqueued_review)

When ingestion enqueues review instead

The structured path is conservative. A file routes to the review queue rather than auto-ingesting when:

  • no grid resolves an entity yet the file contains extractable data — reason "实体无法解析,需人工指认" (entity unresolvable, needs human identification);
  • the file has colored cells but the scope has no active color mapping — reason "颜色映射未确认,需 SME 确认图例" (color mapping unconfirmed, SME must confirm the legend);
  • a PDF looks like a PowerPoint export (ask for the pptx source) or is a scan needing OCR.

With dry_run=True, extraction and reporting run fully but n_facts_ingested and n_enqueued_review stay 0 — the store and queue are untouched.

The batch manifest ledger

structured/ingestion_manifest.py records what ran. ManifestStore (sqlite, manifest_batch + manifest_input tables) opens a batch, logs each input file, and closes the batch with a final status and duration. Each batch is a ManifestRecord:

fieldmeaning
batch_idcaller-supplied, or auto batch-{uuid4 hex[:12]}
statusrunningdone / failed
inputsper-file {path, hash, format, …} rows
n_facts · n_warnings · n_failedaggregate counts
duration_s · failurestiming + per-file errors

API: open_batch(batch_id=None), record_input(...), close_batch(batch_id, status="done"), get_batch(id), list_batches(). Two observability helpers ride alongside: compute_metrics(manifest_store, queue, store) (fact totals, review backlog, confidence buckets, warning rate) and list_versions(store, registry) (active extractor versions + color mappings).

Where idempotency actually lives. The contract calls the manifest "the guard," and it is the audit ledger of every run (path / hash / counts / failures). But the literal no-double-write guarantee comes from the fact store's unique-key upsert (store.upsert_facts, keyed on dim_key): re-running a batch re-extracts and re-upserts, and the unique key keeps the store from growing. batch_id is not content-derived — it is caller-supplied or a random uuid.

Narrative ingestion

The narrative lane is two modules with a clean split:

Pure, deterministic text extraction — zero OCR, zero LLM, no store. extract_narrative(path) dispatches by suffix ({.pptx, .pdf}) to extract_pptx_narrative / extract_pdf_narrative and returns a NarrativeDoc: doc_id, file_hash, a list of NarrativeSegment (text + source_locator), skipped_pages, and warnings. Locators look like 'slide={N},frame={M}', 'slide={N},notes', or 'page={N}'. NarrativeDoc.to_text() joins segments with blank lines — that string is the chunking input contract.

Batch orchestration: extract → chunk → write chunk store, idempotent and dry-runnable. ingest_narrative(inputs, store, *, meta_by_doc=None, dry_run=False) accepts a folder, a file, or a list, and returns a NarrativeIngestReport (a list of per-file FileReport, plus counts()). Each file is chunked via chunk_document(doc.to_text(), doc_meta) and written with store.replace_doc_chunks(...) into the ChunkStore.

Per-file status is one of ingested / skipped / failed / no_text. Idempotency uses a narrative_doc table (doc_id → file_hash) in the same sqlite DB: if the recorded hash matches the file, the file is skipped without re-extracting. meta_by_doc keys are validated against ALLOWED_META_KEYS (title, topic, entity, geography, period, language, sensitivity, valid_as_of) — unknown fields raise ValueError. The period is taken from metadata or inferred from the filename via period_from_filename(name).

Sensitivity is applied here: an explicit meta["sensitivity"] wins, otherwise classify_sensitivity(...) from common runs. This is what later lets retrieval enforce RESTRICTED isolation.

The review queue

review/review_queue.py is the SME human-review state machine that catches everything the deterministic path is unsure about — low-confidence OCR, cross-channel conflicts, unconfirmed color mappings, unresolvable entities. It is sqlite-backed (same DB as the fact store, different tables: review_item + an append-only review_audit).

The state machine has three string states and two transitions:

pending ──approve──▶ approved   (terminal)
pending ──reject───▶ rejected   (terminal)

STATUS_PENDING = "pending", STATUS_APPROVED = "approved", STATUS_REJECTED = "rejected". Approved and rejected are terminal — re-processing a terminal item (or acting on a non-existent one) raises IllegalTransitionError.

A ReviewItem carries reason, payload (JSON), locator, priority (default 100, lower = reviewed sooner), id, status, actor, note, and corrected_value.

API:

methodeffect
enqueue(reason, payload, locator, priority=100) -> intinsert a pending item + write an enqueue audit row
list_pending() -> list[ReviewItem]pending items, ordered priority ASC, id ASC
approve(item_id, actor, note=None)→ approved
reject(item_id, actor, note=None, corrected_value=None)→ rejected (optionally record a correction)
get(item_id) · audit_trail(item_id)fetch item / append-only AuditRecord history

Every transition appends an AuditRecord (enqueue / approve / reject) — the trail is append-only and never mutated, so review history is fully reconstructable. See the review queue concept.

Invariants this domain upholds

  • Idempotent ingestion — structured re-runs upsert on dim_key; narrative re-runs skip on matching file_hash. Re-ingesting never doubles the store.
  • Provenance preserved — every fact and chunk keeps its source_doc_id + locator.
  • Conservative auto-ingest — anything ambiguous (entity / mapping / confidence / conflict) goes to a human, not silently into the store.
  • Append-only audit — review transitions are recorded, never overwritten.

On this page