pipelogic.worker
The entry point for every Python component.
from pipelogic.worker import run, config, virtual_in, virtual_out
run(worker_function, **options)
Registers your worker function with the runtime and blocks until the backend closes.
- Name
worker_function- Type
- callable
- Description
A function or lambda. The number of positional parameters must match the input streams declared in
component.yml. Special parameter names (state,virtual_input) opt into stateful or virtual-input modes.
- Name
use_virtual_output- Type
- bool, default False
- Description
If True, the worker function must return
{"output": ..., "virtual_output": ...}. The virtual output queue can be drained from a side thread viavirtual_out.set_on_push(callback).
- Name
initial_state- Type
- Any | None
- Description
If provided, opts into stateful mode. The worker function must accept a
statekeyword argument and return{"output": ..., "state": ...}. The runtime persists state across container restarts.
- Name
initial_pull_request- Type
- InputsPullRequest | None
- Description
If provided, opts into streaming mode. Each input becomes a list of messages (variable length per tick) and each output must be a list. The worker must return
{"output": ..., "pull_request": ...}so the runtime knows what to pull next.
Synchronous (default) mode
from pipelogic.worker import run
def double(x):
return x * 2
run(double)
One message in per stream per tick, one message out per stream per tick. The function arguments are positional, in the order declared by component.yml.
Multi-output
A function whose component declares output_types returns a tuple in declared order:
worker:
input_type: Image
output_types:
- BoundingBox
- Image
Stateful
Pass initial_state and accept a state kwarg. Return {"output": ..., "state": ...}. State persists across restarts.
def counter(message, state):
new_state = state + 1
return {"output": new_state, "state": new_state}
run(counter, initial_state=0)
Virtual input
Define a virtual_input parameter on your worker. The runtime detects the parameter name via inspect.signature and supplies a queue you can push to from another thread:
import threading
from pipelogic.worker import run, virtual_in
def from_camera(virtual_input):
return virtual_input[0]
def producer():
while True:
virtual_in.push(grab_frame(), blocking=False)
threading.Thread(target=producer, daemon=True).start()
run(from_camera)
Virtual output
from pipelogic.worker import run, virtual_out
def emit_externally(image):
return {"output": image, "virtual_output": [some_payload]}
def consumer(item):
upload_to_external_system(item)
virtual_out.set_on_push(consumer)
run(emit_externally, use_virtual_output=True)
Streaming mode
Pass initial_pull_request and the worker now receives lists of messages and must return a pull_request:
from pipelogic.worker import run
from pipelogic.types import InputsPullRequest, InputsPullOption, StreamPullShift
INITIAL = InputsPullRequest(InputsPullOption([
StreamPullShift.from_fixed(0, 1, 1),
]))
def chunked(messages):
return {
"output": [process(m) for m in messages],
"pull_request": INITIAL,
}
run(chunked, initial_pull_request=INITIAL)
StreamPullShift.from_fixed(skip, min_pull, max_pull) declares how many messages to skip and how many to receive. Use streaming mode when stream rates are asymmetric — e.g. an audio summarizer that pulls one second of audio at a time and emits one summary every N seconds.
config
Singleton exposing config_schema parameters from component.yml as attributes:
from pipelogic.worker import config
threshold = float(config.threshold)
mean = tuple(config.mean) # (Double, Double, Double) -> tuple of floats
language = str(config.language)
The runtime auto-converts each value to the wrapper type that matches the declared type: field in component.yml.
config.sync()
Picks up live updates pushed from the platform UI. Returns a dict of changed keys:
def expensive_worker(image):
changed = config.sync()
if "model_path" in changed:
reload_model()
return run_inference(image)
Most components don't need this — re-reading the value each tick is enough.
virtual_in: VirtualInputStream
Module-level singleton bound to the worker's virtual input queue.
- Name
push(*messages, blocking=True)- Type
- None
- Description
Push one or more messages onto the queue. With
blocking=False, drops messages when the queue is full instead of blocking — the right choice for live data sources.
- Name
close()- Type
- None
- Description
Closes the queue. Once closed, the worker stops as soon as it tries to read past the last queued message.
virtual_out: VirtualOutputStream
Module-level singleton bound to the worker's virtual output queue.
- Name
set_on_push(callback)- Type
- None
- Description
Register a callable that the runtime invokes for each emitted virtual-output message. The callback runs on a runtime-managed thread.
- Name
ack(tag)- Type
- None
- Description
Acknowledge processing of a virtual-output message (used by external sinks that want backpressure).
- Name
stop()- Type
- None
- Description
Signal that no more virtual-output messages will be produced.
Streaming-mode primitives
from pipelogic.worker import (
InputsPullRequest, InputsPullOption, StreamPullShift,
VirtualInputStream, VirtualOutputStream,
)
- Name
StreamPullShift.from_fixed(skip, min_pull, max_pull)- Type
- StreamPullShift
- Description
Skip the first
skipmessages, then pull betweenmin_pullandmax_pullmore.
- Name
InputsPullOption([shifts...], virtual_shift=None)- Type
- InputsPullOption
- Description
A single permissible pull configuration. List one shift per declared input stream, in order.
- Name
InputsPullRequest(option | [options...])- Type
- InputsPullRequest
- Description
Wrap one option, or a list of acceptable options. The runtime triggers the worker as soon as one is satisfied.