Skip to content

Normal

Setup

Build a knowledge graph with Refinery normalisation using NormalPipeline.

NormalPipeline sits between StructurePipeline (no Refinery) and StandardPipeline (adds Document parsing for binary files). Use it when raw connector output needs normalisation before chunking — for example HTML stripping, JSON flattening, or mixed text/code sources.

Flow ──── Connector → Refinery → Chunking → Wrapper → Assembler → Loader

Pipeline Comparison ─────────────────── StructurePipeline — no Refinery (code sources, already clean) NormalPipeline — adds Refinery before Chunking StandardPipeline — adds Document parsing for binary files (PDF, DOCX, …)

Python
from sayou.core.schemas import SayouChunk, SayouNode, SayouOutput, SayouPacket

from sayou.brain.pipelines.normal import NormalPipeline

Sample HTML Packets

Simulating a Connector that fetches HTML documentation pages. These need Refinery (html strategy) to strip tags before chunking.

Python
html_packets = [
    SayouPacket(
        data="<h1>Introduction</h1><p>Sayou Fabric is a modular pipeline.</p>",
        success=True,
        meta={"filename": "intro.html"},
    ),
    SayouPacket(
        data="<h2>Architecture</h2><p>Eight libraries coordinate via Brain.</p>",
        success=True,
        meta={"filename": "architecture.html"},
    ),
    SayouPacket(
        data="<h2>Chunking</h2><p>Splits blocks into retrieval-ready pieces.</p>",
        success=True,
        meta={"filename": "chunking.html"},
    ),
]

Stage Stubs

Each sub-pipeline stub returns realistic Sayou schema objects so the output is meaningful without a real installation.

Python
refinery_log = []


def refinery_side(data, **kw):
    import re

    clean = re.sub(r"<[^>]+>", "", str(data)).strip()
    refinery_log.append(clean[:50])
    blk = MagicMock()
    blk.type = "text"
    blk.content = clean
    blk.metadata = {}
    return [blk]


chunk_idx = [0]
chunking_log = []


def chunking_side(blocks, **kw):
    result = []
    for blk in blocks:
        words = blk.content.split()
        mid = max(1, len(words) // 2)
        for part in [words[:mid], words[mid:]]:
            chunk_idx[0] += 1
            result.append(
                SayouChunk(
                    content=" ".join(part),
                    metadata={"chunk_id": f"c{chunk_idx[0]:03d}"},
                )
            )
    chunking_log.append(len(result))
    return result


node_idx = [0]
wrapper_log = []


def wrapper_side(chunks, **kw):
    nodes = []
    for ch in chunks:
        node_idx[0] += 1
        nodes.append(
            SayouNode(
                node_id=f"sayou:doc:n{node_idx[0]:03d}",
                node_class="sayou:TextFragment",
                attributes={"schema:text": ch.content},
            )
        )
    wrapper_log.append(len(nodes))
    return SayouOutput(nodes=nodes)


assembly_result = {"nodes": [], "edges": [], "metadata": {}}

Run the Pipeline

Python
mock_connector = MagicMock()
mock_connector.run.return_value = iter(html_packets)
mock_refinery = MagicMock()
mock_refinery.run.side_effect = refinery_side
mock_chunking = MagicMock()
mock_chunking.run.side_effect = chunking_side
mock_wrapper = MagicMock()
mock_wrapper.run.side_effect = wrapper_side
mock_assembler = MagicMock()
mock_assembler.run.return_value = assembly_result
mock_loader = MagicMock()
mock_loader.run.return_value = True

pipeline = NormalPipeline()
with patch.object(pipeline, "connector", mock_connector), patch.object(
    pipeline, "refinery", mock_refinery
), patch.object(pipeline, "chunking", mock_chunking), patch.object(
    pipeline, "wrapper", mock_wrapper
), patch.object(
    pipeline, "assembler", mock_assembler
), patch.object(
    pipeline, "loader", mock_loader
):

    stats = pipeline.ingest(
        "https://docs.example.com/",
        destination="bolt://localhost:7687",
        strategies={
            "connector": "http",
            "refinery": "html",
            "chunking": "markdown",
            "wrapper": "document_chunk",
            "assembler": "GraphBuilder",
            "loader": "Neo4jWriter",
        },
    )

print("=== NormalPipeline Run ===")
print(f"  Packets extracted : {stats['extracted']}")
print(f"  Total chunks      : {stats['chunks']}")
print(f"  Nodes collected   : {stats['nodes']}")
print(f"  Edges assembled   : {stats['edges']}")
print(f"  Saved             : {stats['saved']}")

Per-packet Stage Trace

Each packet flows through Refinery → Chunking → Wrapper independently. Nodes are accumulated in memory before Assembler runs once globally.

Python
print("\n=== Per-packet Stage Trace ===")
for i, (pkt, refined, n_chunks, n_nodes) in enumerate(
    zip(html_packets, refinery_log, chunking_log, wrapper_log)
):
    fname = pkt.meta["filename"]
    print(
        f"  [{i+1}] {fname:22s}"
        f"  refinery→ {refined!r:38s}"
        f"  chunks={n_chunks}  nodes={n_nodes}"
    )

Global Assembly

After all packets are processed, the accumulated SayouNodes are passed to AssemblerPipeline as a single SayouOutput. This enables cross-document edge resolution impossible in a per-packet assembly loop.

Python
asm_call = mock_assembler.run.call_args
asm_input = asm_call.args[0] if asm_call.args else asm_call.kwargs.get("input_data")
print("\n=== Global Assembly ===")
print(f"  AssemblerPipeline.run() called once")
print(
    f"  Input: SayouOutput with {len(asm_input.nodes)} node(s) "
    f"(accumulated from all {len(html_packets)} files)"
)
print(f"  Strategy: GraphBuilder → Loader receives assembled graph dict")

Pipeline Comparison

Python
print("\n=== Pipeline Comparison ===")
rows = [
    ("Connector", "yes", "yes", "yes"),
    ("Document", "—", "—", "yes"),
    ("Refinery", "—", "yes", "yes"),
    ("Chunking", "yes", "yes", "yes"),
    ("Wrapper", "yes", "yes", "yes"),
    ("Assembler", "yes", "yes", "yes"),
    ("Loader", "yes", "yes", "yes"),
]
print(f"  {'Stage':12s}  {'Structure':10s}  {'Normal':10s}  Standard")
print(f"  {'-'*48}")
for stage, s, n, std in rows:
    print(f"  {stage:12s}  {s:10s}  {n:10s}  {std}")