Building Pipelines
The pattern
Section titled “The pattern”flowchart LR inbox["inbox"] v["validator-c"] d["double-c\nmap-c (* 2)"] out["doubled results"] errs["errors"] inbox --> v v -->|"outbox.right\n[3, 4, 7]"| d --> out v -->|"outbox.left"| errs style errs fill:none,stroke:#aaa,color:#888
Validate → transform
Section titled “Validate → transform”inherit (dnzl) actor reply;inherit (dnzl.ned) st map-c;
validator = n: if n > 0 then reply.right n else reply.left "non-positive: ${toString n}";
validator-c = actor validator;double-c = map-c (n: n * 2); # stream transformer, not a cycle-c
validated = validator-c { inbox = st 3 (-1) 4 0 7; };# outbox.right = ST [3, 4, 7] — bare values, tags strippeddoubled = double-c validated.outbox.right;doubled.toList# → [ 6 8 14 ]outbox.right yields bare success values — no { right = n } wrapper to unwrap.
Counter chain
Section titled “Counter chain”Two counters in series — each reply from A triggers one “inc” in B:
a = counter-c { inbox = st "inc" "inc" "inc"; };b = counter-c { inbox = a.outbox.right (st.map (_: "inc")); };b.outbox.toList# → [ { right = 1; } { right = 2; } { right = 3; } ]a.outbox.right is bare ints [1, 2, 3]. st.map (_: "inc") substitutes "inc" for each.
Type-based routing
Section titled “Type-based routing”Split one inbox across multiple actors by message type using when-c:
inherit (dnzl.ned) when-c;
inbox = st { type = "count"; cmd = "inc"; } { type = "div"; dividend = 10; divisor = 2; } { type = "count"; cmd = "inc"; } { type = "div"; dividend = 9; divisor = 3; } { type = "count"; cmd = "get"; };
counts = counter-c { inbox = when-c (m: m.type == "count") inbox (m: m.cmd); };divides = div-c { inbox = when-c (m: m.type == "div") inbox (m: m); };
# concat outboxes in order — counts first, then divides(counts.outbox (divides.outbox)).toList# → [ { right = 1; } { right = 2; } { right = 2; }# { right = 5; } { right = 3; } ]flowchart TB inbox["shared inbox"] wc1["when-c type==count → cmd"] wc2["when-c type==div → msg"] c["counter-c"] d["div-c"] inbox --> wc1 --> c inbox --> wc2 --> d
Fan-in with merge
Section titled “Fan-in with merge”inherit (dnzl) merge actor reply;
cmds-a = st "inc" "inc";cmds-b = st "inc" "get";
shared = counter-c { inbox = merge [ cmds-a cmds-b ]; };shared.outbox.toList# → [ { right = 1; } { right = 2; } { right = 3; } { right = 3; } ]merge concatenates in list order — all of cmds-a before cmds-b.
Composite cycle
Section titled “Composite cycle”Wrap routing logic inside a named cycle-c:
# Caller sends typed envelopes; routing is hidden insidetyped-c = { inbox }: let counts = counter-c { inbox = when-c (m: m ? cmd) inbox (m: m.cmd); }; divides = div-c { inbox = when-c (m: m ? dividend) inbox (m: m); }; in { outbox = counts.outbox (divides.outbox); };
a = typed-c { inbox = st { cmd = "inc"; } { dividend = 10; divisor = 2; } { cmd = "get"; };};a.outbox.toList# → [ { right = 1; } { right = 1; } { right = 5; } ]