Python component API

A Python Pipelogic component is, at its core, a process() function in src/main.py plus a component.yml describing its typed I/O and configuration. Everything else — class shape, lifecycle hooks, stateful patterns, virtual outputs, secret handling — is built on top of that one entrypoint.

This page is the full Python component API reference: what process() may accept and return, how stateful components keep per-stream state, how configuration values reach your code, and what the runtime guarantees about imports and threading. For the type-system side, read Pipelang type syntax and the type catalog.

For Python components (language: py), src/main.py is the entry point. The component shim provided by the runtime base imports your module and calls pipelogic.worker.run(your_function).

Minimal component

from pipelogic.worker import run, config

THRESHOLD = float(config.threshold)        # configs are read at module load

def process(message):
    return {"echoed": message}

run(process)

In-tree components call run(process) at module top level — there is no if __name__ == "__main__": guard. The component shim imports your module and run(...) takes over the message loop.

The function passed to run receives one positional argument per input stream and returns one value per output stream. The number of arguments and the types are determined by worker.input_type(s) and worker.output_type(s) in component.yml.

config — runtime parameters

from pipelogic.worker import config

threshold = float(config.threshold)
model_cfg = str(config.model_cfg)
hidden_dims = list(config.hidden_dims)     # for [Int64] etc.

config.<name> exposes every parameter declared in config_schema:. Read it at module load (top-level __init__-time) or inside the component function — both work, but reading at load time fails fast if the parameter is missing.

config values are typed based on the type: in config_schema. Cast explicitly if you want a builtin Python type:

type: in YAMLRecommended Python cast
Stringstr(config.x)
Boolbool(config.x)
UInt64 / Int64int(config.x)
Doublefloat(config.x)
(Double, Double, Double)tuple(float(v) for v in config.x)
[Double][float(v) for v in config.x]

Mutable parameters

For parameters declared mutable: true in config_schema, the component MUST call config.sync() inside process() to receive updates. config.sync() returns the set of parameter names that changed since the previous sync (an empty set if nothing changed); after the call, subsequent config.<name> reads return the new values.

from pipelogic.worker import config

def process(message):
    changed = config.sync()                    # set[str] — names changed since last sync
    if "threshold" in changed:
        ...                                    # react to a fresh threshold value
    thr = float(config.threshold)              # always the latest synced value
    return {"out": ...}

Without config.sync(), mutable parameters appear stuck at the value captured at the previous sync (or at module load) — even after ppl backend change-parameter succeeds on the backend.

Function signature — single input / single output

def process(in_msg):
    out_msg = ...
    return out_msg

in_msg is a Python object whose shape matches the input type:

Input type expressionType of in_msg
Stringstr
Boolbool
UInt64 / Int64int
Doublefloat
Imagepipelogic.cv.Image (after wrapping; see below)
DepthImagepipelogic.cv.DepthImage (unverified-by-components — see component-api/python/image)
AudioFramepipelogic.audio.AudioFrame
Tensorpipelogic.tensor.Tensor (also pipelogic.cv.Tensor)
[T]list[T]
(T1, T2, …)tuple of T1, T2, …
{f1: T1, f2: T2}dict with string keys

The return value follows the output type expression analogously.

Function signature — multiple inputs / outputs

For worker.input_types: [T1, T2], the function takes two positional arguments:

def process(image_in, mask_in):
    ...
    return result

For worker.output_types: [T1, T2], return a tuple:

def process(image_in):
    return aligned_image, metadata_string

Position matters — output 0 is the first tuple element, output 1 is the second.

Wrapping platform record types

Input objects of structured types (Image, AudioFrame, Tensor) arrive as raw record objects. To use them ergonomically, wrap them through the matching helper class:

from pipelogic.cv import Image, ColorSpace

def process(in_image):
    img = Image(in_image)             # accepts the raw pipelang record
    bgr = img.to_bgr()                # (h, w, 3) uint8 BGR ndarray
    ...
    out_arr = ...                     # (h, w, 3) uint8 BGR
    return Image(out_arr, ColorSpace.BGR)

Image(in_image) auto-detects the incoming color space from the pipelang record's tag. To normalise in-place use img.convert(ColorSpace.BGR); to peek without mutating use img.to_bgr() / img.to_rgb() / img.to_gray() (each returns a numpy view in the requested space).

See component-api/python/image for the full image API. Other wrappers live in sibling top-level modules:

WrapperImport
Imagefrom pipelogic.cv import Image
AudioFramefrom pipelogic.audio import AudioFrame
Tensorfrom pipelogic.tensor import Tensor (or from pipelogic.cv import Tensor)
Meshfrom pipelogic.mesh import Mesh
Geometryfrom pipelogic.geometry import Polygon, BoundingBox, Point

DepthImage is also exported from pipelogic.cv, but its Python API is not exercised by any in-tree component — treat it as unverified-by-components (see component-api/python/image).

Returning generic values

For records and tuples, return a dict or tuple with the right shape:

# output_type: "BoundingBox"
return {"x": 10, "y": 20, "w": 30, "h": 40, "score": 0.9}

# output_type: "[BoundingBox]"
return [{"x": 10, "y": 20, "w": 30, "h": 40, "score": 0.9},
        {"x": 50, "y": 60, "w": 70, "h": 80, "score": 0.8}]

# output_type: "(Image, String)"
return img, "session-id"

The runtime serializes the return value to the platform protocol on your behalf.

Stateful components

If your component needs to carry state across messages (a tracker, a running average, a session map), pass initial_state= to run:

def process(message, state):
    state["count"] = state.get("count", 0) + 1
    out = {"running_count": state["count"]}
    return {"output": out, "state": state}    # dict with output + state keys

run(process, initial_state={"count": 0})

The function receives the persisted state via the state= keyword argument and MUST return an object whose output key/attribute holds the message(s) for the output stream(s) and whose state key/attribute holds the new persisted state. Returning a bare value or a tuple does not work — the runtime looks up rets["output"] / rets.output and rets["state"] / rets.state and raises if state is missing.

For multi-output stateful components, output is a tuple matching the declared output stream count. For virtual-output stateful components add virtual_output; for streaming mode add pull_request.

The platform persists state across container restarts (graceful crash recovery). On first ever start, the state is the value passed to initial_state.

Purity and scheduling

Do not use initial_state unless the component is actually history-dependent. Without persisted state, each call is independent relative to the selected inputs, current config/files, and deterministic code. That is the shape the platform can safely parallelize, batch, or scale out when scheduling/scaling allows, because no previous tick has to be consulted before running the next one.

Once initial_state is set, the vertex is explicitly stateful. The runtime must preserve the returned state and feed it into the next tick, which makes ordering part of the contract. Use that for trackers, accumulators, session maps, windows, and other logic where the same input can produce a different output depending on previous messages.

Module globals are still useful for model handles, database clients, compiled regexes, and regenerable caches. They should not contain semantic state that changes the output history. If the state changes what the component means, put it in state; if the code talks to the outside world, put that interaction behind virtual input/output.

Streaming (advanced) — StreamPullShift

For components that need to handle non-1:1 input/output rates, pass initial_pull_request= to run(...). The function then receives sequences (one per input stream) and returns sequences plus a pull_request (singular).

A pull request can be one option or an ordered list of options. Each option contains one StreamPullShift per input stream. The runtime checks options in order and runs the component with the first option that is currently satisfiable.

from pipelogic.worker import run, StreamPullShift

N_INPUTS = 2
Z = StreamPullShift.from_fixed(0, 0, 0)
ONE = StreamPullShift.from_fixed(0, 1, 1)
TWO_PLUS = StreamPullShift.from_least(0, 2)

def process(*input_seqs):
    # input_seqs is N_INPUTS sequences (one per input stream).
    # ... compute outputs ...
    return {
        "output": (out_seq_0, out_seq_1),
        "pull_request": [
            [ONE, TWO_PLUS],
            [Z, ONE],
        ],
    }

run(
    process,
    initial_pull_request=[
        [ONE, TWO_PLUS],
        [Z, ONE],
    ],
)

StreamPullShift.from_fixed(preshift, size, postshift) skips preshift, exposes exactly size messages to process, then advances by postshift. StreamPullShift.from_least(preshift, size) skips preshift, then fires when at least size messages are available. initial_pull_request= seeds the first batch; after that, every streaming return must include pull_request.

Virtual inputs / outputs

Virtual input/output is the side-effect boundary. Use it when a component needs a side thread, callback, HTTP/WebSocket listener, device reader, model stream, or external sender while keeping the graph-facing function as a scheduler-visible stream function.

Inbound side effects go through virtual_in.push(...). The process function receives them through a virtual_input keyword parameter when the pull request selects the virtual input. These payloads are internal to the component and bridge, not strict component.yml stream types; one virtual channel can carry multiple payload shapes, so component code should tag, validate, dispatch, and reject malformed payloads explicitly. Outbound side effects go through virtual_output: process returns work items, and a bridge callback registered with virtual_out.set_on_push(...) performs the actual IO.

from threading import Thread
from pipelogic.worker import run, StreamPullShift, virtual_in, virtual_out

Z = StreamPullShift.from_fixed(0, 0, 0)
ONE = StreamPullShift.from_fixed(0, 1, 1)

def side_reader():
    for event in read_external_http_or_device_events():
        virtual_in.push(event, blocking=False)
    virtual_in.close()

Thread(target=side_reader, daemon=True).start()

def process(graph_msgs, *, virtual_input):
    return {
        "output": transform(graph_msgs, virtual_input),
        "virtual_output": build_side_effect_work_items(graph_msgs),
        "pull_request": [
            [ONE, Z],  # virtual input first, then declared graph input
            [Z, ONE],
        ],
    }

def send_side_effect(work_item):
    deliver_to_external_system(work_item)

virtual_out.set_on_push(send_side_effect)

run(
    process,
    use_virtual_output=True,
    initial_pull_request=[
        [ONE, Z],
        [Z, ONE],
    ],
)

Two ordering rules matter:

  • If process declares virtual_input, each pull option has the virtual-input shift first, then one shift per declared graph input.
  • If use_virtual_output=True, process must return a virtual_output list. Stateful components also return state; streaming components also return pull_request.

What the component shim does for you

You don't have to:

  • Open or close streams.
  • Handle (de)serialization of typed messages on the wire.
  • Manage shared memory between vertices on the same node.
  • Implement health checks or graceful shutdown.

You do need to:

  • Make the function deterministic enough that the platform can replay it on retry (no global mutable state outside the state argument).
  • Use print() (or Python's logging module) for any component log output. print() is the canonical component logging path — the runtime captures stdout / stderr and surfaces it via ppl container logs.
  • Call config.sync() inside process() whenever you rely on mutable: true parameters; otherwise the values stay frozen at the previous sync.

Imports cheat sheet

from pipelogic.worker import run, config, StreamPullShift, virtual_in, virtual_out
from pipelogic.cv import Image, ColorSpace, Tensor, IMAGE_TYPE
from pipelogic.audio import AudioFrame
from pipelogic.tensor import Tensor, TENSOR_TYPE
from pipelogic.geometry import Polygon, BoundingBox, Point
from pipelogic.mesh import Mesh
from pipelogic.chat import ChatBackend, Message, run_chat, VisionBackend, run_vision
from pipelogic.infer import HotSwapModel, hf_login, hf_snapshot_download, resolve_device
from pipelogic.cloud import PointCloud
from pipelogic.types import (
    Named, List, Union, Tuple,
    register_named, get_type, create_type,
)

pipelogic.types exposes the type-system constructors (Named, List, Union, Tuple) and the registry helpers (register_named, get_type, create_type). If a name isn't here, check type-api/catalog for the catalog.

Common mistakes

  • Importing numpy directly is fine; never put it in requirements.txt — the runtime base already pins numpy. Pinning it again can produce ABI conflicts.
  • Don't construct platform records as raw dicts — for Image, always go through pipelogic.cv.Image(...). Same for AudioFrame, Tensor, etc. Hand-rolled dicts skip the validation that catches buffer-vs-shape mismatches at the boundary.
  • Don't sys.exit() / os._exit() from the component — let exceptions propagate. The shim catches and reports them; explicit exits look like crashes and confuse retry semantics. (os.exit() is not a real function — sys.exit() raises SystemExit and os._exit() terminates the process; both are wrong here.)
  • Forgetting config.sync() inside process() when reading mutable: true parameters. Without the call, the value stays frozen at the previous sync (or at module load), even after ppl backend change-parameter succeeds.

Related

Was this page helpful?