Concepts
A Pipelogic component is a function. Pipelogic gives that function typed inputs from upstream components and serializes its outputs to downstream components. This page is the mental model.
The four pieces
Every Python component has four parts:
- Name
component.yml- Type
- metadata
- Description
Declares the component's name, runtime, typed I/O, configuration parameters, and tags.
- Name
src/main.py- Type
- implementation
- Description
Imports
pipelogic.worker.runand calls it with your worker function. The function takes one positional argument per input stream and returns one value per output stream.
- Name
src/requirements.txt- Type
- dependencies
- Description
Pinned exact versions. Never includes
numpy(pipelogic ships its own).
- Name
.pipecomponent- Type
- registration
- Description
Workspace-side metadata that ties the directory to your registered component. Created by
ppl init.
The type system
Pipelogic uses a pipelang type system that is enforced across components. Compatibility is checked when you wire two components together — you cannot connect a BoundingBox output to an Image input.
Atomic types
Bool, Char, Int32, Int64, UInt32, UInt64, Float, Double, String, Bytes. They round-trip to native Python with the help of small wrapper classes in pipelogic.types:
from pipelogic.types import Int32, String, Bytes
The default Python int maps to Int64; the default Python float maps to Double. To use Int32 or Float explicitly, pass a type string:
from pipelogic.types import List
List([1, 2, 3], '[Int32]')
Composite types
| Form | Example | Description |
|---|---|---|
| List | [Int32] | Variable-length sequence of one type |
| Tuple | (Float, Float) | Fixed positional product type |
| Record | {name: String, age: UInt32} | Named-field product type |
| Union | Int32 | Float | Sum / discriminated union type |
| Named | Person := {name: String, age: UInt32} | Aliases a type under a global name |
Named types are how Pipelogic introduces domain types like Image, BoundingBox, AudioFrame. You register them once at module import time:
from pipelogic import types
types.register_named([
("Person", "{name: String, age: UInt32}"),
])
PERSON_TYPE = types.get_type("Person")
The high-level pipelogic.cv package does this for Image, Tensor, AudioFrame, BoundingBox, etc., when you import it — you never need to register the built-in domain types yourself.
Type variables
Some components are generic. Their component.yml declares a placeholder — any lowercase identifier — as the input or output type:
worker:
input_type: t
output_type: t
When the component is wired into a backend, the platform binds t to whatever concrete type is connected. The same letter inside one component.yml always refers to the same bound type; in another component, the same letter is a fresh variable.
Repeat is the canonical example: it takes any t and emits the same t.
Maybe<t> and the Cond transformation
Maybe<t> is the platform's built-in option type — semantically Nothing | Just<t>. The Cond transformation produces it: when its predicate fails, Cond emits Nothing so the downstream stream keeps its cadence and timing-sensitive consumers don't desync.
A consumer that takes a plain t can still be wired to a Maybe<t> source — coercion strips the wrapper at wire time when routed through unpack_named / unpack_union.
In C++ the same type is exposed as ppl::Maybe<t>.
Variadic packs
Some components accept a variable number of typed inputs. The trailing $<name>... element captures all remaining inputs into a tuple, with each element's type bound to whatever the user wires:
worker:
input_types:
- Image
- $ts... # tuple of whatever the user connects after Image
visualize and evaluate-expression are real components that use this.
Domain types in pipelogic.cv
Prefer the Python wrappers — they handle field layout, named-type registration, and serialization for you. Drop down to raw dicts only when you have a reason the wrappers don't cover.
| Wrapper | Pipelang shape | Python factory |
|---|---|---|
Image | Image := {width: UInt64, height: UInt64, format: ..., data: Bytes} | Image(ndarray, ColorSpace.BGR) |
Tensor | Tensor := {shape: [Int64], data: [Float]} | Tensor(ndarray) |
AudioFrame | AudioFrame := {sample_rate: UInt64, data: Tensor} | AudioFrame(ndarray, sample_rate=16000) |
BoundingBox | {class: DetectedClass, rectangle: Rectangle<Double>} | BoundingBox.from_xyxy(x1, y1, x2, y2, class_id, confidence) |
Landmark | {point: Point<Double>, confidence: Double} | Landmark.from_xy(x, y, confidence) |
Mask | Mask := Tensor<Bool> | Mask(boolean_ndarray) |
See API: cv for the complete surface.
Streams
A stream is a typed sequence of messages flowing between two components. Streams are typed: the upstream's output_type must unify with the downstream's input_type.
Stream semantics you can rely on:
- Pull-based with explicit shifts. The consumer declares how many messages to skip / take / advance via
StreamPullShift. The runtime fires the worker once that many messages are available. - Per-edge FIFO ordering, single producer, single consumer. Fan-out and merging are graph-level, not queue-level.
- Backpressure is automatic. A slow consumer eventually blocks producers, or drops messages on
virtual_in.push(blocking=False). - At-most-once delivery. Use
initial_stateif you need state that survives container restarts.
A worker function gets one Python argument per declared input stream:
from pipelogic.worker import run
def double(x):
return x * 2
run(double)
The number and types of arguments are determined by component.yml's worker.input_type (single) or worker.input_types (list). Same for outputs.
Single output vs multi-output
A component with one output returns one value. A component with multiple outputs returns a tuple, in declared order:
worker:
input_type: Image
output_types:
- BoundingBox
- Image
In single-output mode, returning a tuple is not auto-unpacked — the runtime wraps your tuple as the single output. If you want multiple outputs, declare them in component.yml and return a tuple in declared order.
The run loop
pipelogic.worker.run(worker_function) is the entry point. It reads component.yml, materializes the input/output streams, and dispatches each tick to your function. The default cadence is one message in per input stream, one message out per output stream, per tick — your worker is just a regular function, called once with the tick's inputs and expected to return the tick's outputs.
Three extensions unlock more advanced patterns:
Stateful workers
Pass initial_state to run and accept a state keyword argument. Return {"output": ..., "state": ...} to update state. Pipelogic persists state across container restarts:
from pipelogic.worker import run
def counter(x, state):
new_state = state + 1
return {"output": new_state, "state": new_state}
run(counter, initial_state=0)
Virtual streams
A virtual input stream lets you push messages from outside the streaming graph (a webcam thread, a webhook handler). Declare it by giving your worker a virtual_input parameter:
from pipelogic.worker import run, virtual_in
import threading
def from_camera(virtual_input):
return virtual_input[0] # first message from the virtual queue
def producer():
while True:
frame = grab_frame()
virtual_in.push(frame, blocking=False)
threading.Thread(target=producer, daemon=True).start()
run(from_camera)
Symmetrically, set use_virtual_output=True to let your worker push messages from a side thread to a virtual output queue.
Streaming mode
Pass initial_pull_request to run and you opt into streaming mode: each input is now a list of messages, each output is a list of messages, and the worker decides how many to pull next via an InputsPullRequest. This is for components with asymmetric stream rates (a worker that pulls N seconds of audio at a time and emits one summary per batch).
See API: worker for the full surface.
How a component starts
The platform launches your component for you — you write main.py, declare typed I/O in component.yml, and ppl release ships the sources. The platform compiles, packages, and runs the container.
Configuration
component.yml declares a config_schema block. The runtime exposes those values as attributes on the singleton pipelogic.worker.config:
worker:
input_type: Image
output_type: Image
config_schema:
blur_radius:
type: UInt64
default: 5
description: "Gaussian blur radius in pixels."
The user can edit blur_radius from the App without touching code.
By default, config.<name> reads return the value as of the moment your component started — they don't auto-update when the user moves a slider in the App. To pick up live edits, call config.sync(). It pulls any pending updates from the platform, applies them so subsequent config.<name> reads see the new values, and returns a dict of the keys that changed since the previous sync (handy if you need to recompute something when a parameter flips):
def blur(image: Image) -> Image:
changed = config.sync()
if "blur_radius" in changed:
# re-derive anything that depends on the radius
...
radius = int(config.blur_radius) | 1
arr = cv2.GaussianBlur(image.to_bgr(), (radius, radius), 0)
return Image(arr, color_space=ColorSpace.BGR)
Calling config.sync() once at the top of each tick is the common pattern. Skip it if you want config to be effectively immutable for the lifetime of the component.
What's next
- Quickstart — write your first component end-to-end.
- API: types — the full type-system reference.
- API: worker — every kwarg of
run(), virtual streams, streaming mode. - API: cv — Image, Tensor, AudioFrame, geometry types.
- Recipes — patterns from real workers in the catalog.