Concepts

A Pipelogic component is a function. Pipelogic gives that function typed inputs from upstream components and serializes its outputs to downstream components. This page is the mental model.

The four pieces

Every Python component has four parts:

  • Name
    component.yml
    Type
    metadata
    Description

    Declares the component's name, runtime, typed I/O, configuration parameters, and tags.

  • Name
    src/main.py
    Type
    implementation
    Description

    Imports pipelogic.worker.run and calls it with your worker function. The function takes one positional argument per input stream and returns one value per output stream.

  • Name
    src/requirements.txt
    Type
    dependencies
    Description

    Pinned exact versions. Never includes numpy (pipelogic ships its own).

  • Name
    .pipecomponent
    Type
    registration
    Description

    Workspace-side metadata that ties the directory to your registered component. Created by ppl init.

The type system

Pipelogic uses a pipelang type system that is enforced across components. Compatibility is checked when you wire two components together — you cannot connect a BoundingBox output to an Image input.

Atomic types

Bool, Char, Int32, Int64, UInt32, UInt64, Float, Double, String, Bytes. They round-trip to native Python with the help of small wrapper classes in pipelogic.types:

from pipelogic.types import Int32, String, Bytes

The default Python int maps to Int64; the default Python float maps to Double. To use Int32 or Float explicitly, pass a type string:

from pipelogic.types import List
List([1, 2, 3], '[Int32]')

Composite types

FormExampleDescription
List[Int32]Variable-length sequence of one type
Tuple(Float, Float)Fixed positional product type
Record{name: String, age: UInt32}Named-field product type
UnionInt32 | FloatSum / discriminated union type
NamedPerson := {name: String, age: UInt32}Aliases a type under a global name

Named types are how Pipelogic introduces domain types like Image, BoundingBox, AudioFrame. You register them once at module import time:

from pipelogic import types

types.register_named([
    ("Person", "{name: String, age: UInt32}"),
])

PERSON_TYPE = types.get_type("Person")

The high-level pipelogic.cv package does this for Image, Tensor, AudioFrame, BoundingBox, etc., when you import it — you never need to register the built-in domain types yourself.

Type variables

Some components are generic. Their component.yml declares a placeholder — any lowercase identifier — as the input or output type:

worker:
  input_type: t
  output_type: t

When the component is wired into a backend, the platform binds t to whatever concrete type is connected. The same letter inside one component.yml always refers to the same bound type; in another component, the same letter is a fresh variable.

Repeat is the canonical example: it takes any t and emits the same t.

Maybe<t> and the Cond transformation

Maybe<t> is the platform's built-in option type — semantically Nothing | Just<t>. The Cond transformation produces it: when its predicate fails, Cond emits Nothing so the downstream stream keeps its cadence and timing-sensitive consumers don't desync.

A consumer that takes a plain t can still be wired to a Maybe<t> source — coercion strips the wrapper at wire time when routed through unpack_named / unpack_union.

In C++ the same type is exposed as ppl::Maybe<t>.

Variadic packs

Some components accept a variable number of typed inputs. The trailing $<name>... element captures all remaining inputs into a tuple, with each element's type bound to whatever the user wires:

worker:
  input_types:
    - Image
    - $ts...     # tuple of whatever the user connects after Image

visualize and evaluate-expression are real components that use this.

Domain types in pipelogic.cv

Prefer the Python wrappers — they handle field layout, named-type registration, and serialization for you. Drop down to raw dicts only when you have a reason the wrappers don't cover.

WrapperPipelang shapePython factory
ImageImage := {width: UInt64, height: UInt64, format: ..., data: Bytes}Image(ndarray, ColorSpace.BGR)
TensorTensor := {shape: [Int64], data: [Float]}Tensor(ndarray)
AudioFrameAudioFrame := {sample_rate: UInt64, data: Tensor}AudioFrame(ndarray, sample_rate=16000)
BoundingBox{class: DetectedClass, rectangle: Rectangle<Double>}BoundingBox.from_xyxy(x1, y1, x2, y2, class_id, confidence)
Landmark{point: Point<Double>, confidence: Double}Landmark.from_xy(x, y, confidence)
MaskMask := Tensor<Bool>Mask(boolean_ndarray)

See API: cv for the complete surface.

Streams

A stream is a typed sequence of messages flowing between two components. Streams are typed: the upstream's output_type must unify with the downstream's input_type.

Stream semantics you can rely on:

  • Pull-based with explicit shifts. The consumer declares how many messages to skip / take / advance via StreamPullShift. The runtime fires the worker once that many messages are available.
  • Per-edge FIFO ordering, single producer, single consumer. Fan-out and merging are graph-level, not queue-level.
  • Backpressure is automatic. A slow consumer eventually blocks producers, or drops messages on virtual_in.push(blocking=False).
  • At-most-once delivery. Use initial_state if you need state that survives container restarts.

A worker function gets one Python argument per declared input stream:

from pipelogic.worker import run

def double(x):
    return x * 2

run(double)

The number and types of arguments are determined by component.yml's worker.input_type (single) or worker.input_types (list). Same for outputs.

Single output vs multi-output

A component with one output returns one value. A component with multiple outputs returns a tuple, in declared order:

worker:
  input_type: Image
  output_types:
    - BoundingBox
    - Image

The run loop

pipelogic.worker.run(worker_function) is the entry point. It reads component.yml, materializes the input/output streams, and dispatches each tick to your function. The default cadence is one message in per input stream, one message out per output stream, per tick — your worker is just a regular function, called once with the tick's inputs and expected to return the tick's outputs.

Three extensions unlock more advanced patterns:

Stateful workers

Pass initial_state to run and accept a state keyword argument. Return {"output": ..., "state": ...} to update state. Pipelogic persists state across container restarts:

from pipelogic.worker import run

def counter(x, state):
    new_state = state + 1
    return {"output": new_state, "state": new_state}

run(counter, initial_state=0)

Virtual streams

A virtual input stream lets you push messages from outside the streaming graph (a webcam thread, a webhook handler). Declare it by giving your worker a virtual_input parameter:

from pipelogic.worker import run, virtual_in
import threading

def from_camera(virtual_input):
    return virtual_input[0]   # first message from the virtual queue

def producer():
    while True:
        frame = grab_frame()
        virtual_in.push(frame, blocking=False)

threading.Thread(target=producer, daemon=True).start()
run(from_camera)

Symmetrically, set use_virtual_output=True to let your worker push messages from a side thread to a virtual output queue.

Streaming mode

Pass initial_pull_request to run and you opt into streaming mode: each input is now a list of messages, each output is a list of messages, and the worker decides how many to pull next via an InputsPullRequest. This is for components with asymmetric stream rates (a worker that pulls N seconds of audio at a time and emits one summary per batch).

See API: worker for the full surface.

How a component starts

The platform launches your component for you — you write main.py, declare typed I/O in component.yml, and ppl release ships the sources. The platform compiles, packages, and runs the container.

Configuration

component.yml declares a config_schema block. The runtime exposes those values as attributes on the singleton pipelogic.worker.config:

worker:
  input_type: Image
  output_type: Image
  config_schema:
    blur_radius:
      type: UInt64
      default: 5
      description: "Gaussian blur radius in pixels."

The user can edit blur_radius from the App without touching code.

By default, config.<name> reads return the value as of the moment your component started — they don't auto-update when the user moves a slider in the App. To pick up live edits, call config.sync(). It pulls any pending updates from the platform, applies them so subsequent config.<name> reads see the new values, and returns a dict of the keys that changed since the previous sync (handy if you need to recompute something when a parameter flips):

def blur(image: Image) -> Image:
    changed = config.sync()
    if "blur_radius" in changed:
        # re-derive anything that depends on the radius
        ...
    radius = int(config.blur_radius) | 1
    arr = cv2.GaussianBlur(image.to_bgr(), (radius, radius), 0)
    return Image(arr, color_space=ColorSpace.BGR)

Calling config.sync() once at the top of each tick is the common pattern. Skip it if you want config to be effectively immutable for the lifetime of the component.

What's next

  • Quickstart — write your first component end-to-end.
  • API: types — the full type-system reference.
  • API: worker — every kwarg of run(), virtual streams, streaming mode.
  • API: cv — Image, Tensor, AudioFrame, geometry types.
  • Recipes — patterns from real workers in the catalog.

Was this page helpful?