pipelogic.worker

The entry point for every Python component.

from pipelogic.worker import run, config, virtual_in, virtual_out

run(worker_function, **options)

Registers your worker function with the runtime and blocks until the backend closes.

  • Name
    worker_function
    Type
    callable
    Description

    A function or lambda. The number of positional parameters must match the input streams declared in component.yml. Special parameter names (state, virtual_input) opt into stateful or virtual-input modes.

  • Name
    use_virtual_output
    Type
    bool, default False
    Description

    If True, the worker function must return {"output": ..., "virtual_output": ...}. The virtual output queue can be drained from a side thread via virtual_out.set_on_push(callback).

  • Name
    initial_state
    Type
    Any | None
    Description

    If provided, opts into stateful mode. The worker function must accept a state keyword argument and return {"output": ..., "state": ...}. The runtime persists state across container restarts.

  • Name
    initial_pull_request
    Type
    InputsPullRequest | None
    Description

    If provided, opts into streaming mode. Each input becomes a list of messages (variable length per tick) and each output must be a list. The worker must return {"output": ..., "pull_request": ...} so the runtime knows what to pull next.

Synchronous (default) mode

from pipelogic.worker import run

def double(x):
    return x * 2

run(double)

One message in per stream per tick, one message out per stream per tick. The function arguments are positional, in the order declared by component.yml.

Multi-output

A function whose component declares output_types returns a tuple in declared order:

worker:
  input_type: Image
  output_types:
    - BoundingBox
    - Image

Stateful

Pass initial_state and accept a state kwarg. Return {"output": ..., "state": ...}. State persists across restarts.

def counter(message, state):
    new_state = state + 1
    return {"output": new_state, "state": new_state}

run(counter, initial_state=0)

Virtual input

Define a virtual_input parameter on your worker. The runtime detects the parameter name via inspect.signature and supplies a queue you can push to from another thread:

import threading
from pipelogic.worker import run, virtual_in

def from_camera(virtual_input):
    return virtual_input[0]

def producer():
    while True:
        virtual_in.push(grab_frame(), blocking=False)

threading.Thread(target=producer, daemon=True).start()
run(from_camera)

Virtual output

from pipelogic.worker import run, virtual_out

def emit_externally(image):
    return {"output": image, "virtual_output": [some_payload]}

def consumer(item):
    upload_to_external_system(item)

virtual_out.set_on_push(consumer)
run(emit_externally, use_virtual_output=True)

Streaming mode

Pass initial_pull_request and the worker now receives lists of messages and must return a pull_request:

from pipelogic.worker import run
from pipelogic.types import InputsPullRequest, InputsPullOption, StreamPullShift

INITIAL = InputsPullRequest(InputsPullOption([
    StreamPullShift.from_fixed(0, 1, 1),
]))

def chunked(messages):
    return {
        "output": [process(m) for m in messages],
        "pull_request": INITIAL,
    }

run(chunked, initial_pull_request=INITIAL)

StreamPullShift.from_fixed(skip, min_pull, max_pull) declares how many messages to skip and how many to receive. Use streaming mode when stream rates are asymmetric — e.g. an audio summarizer that pulls one second of audio at a time and emits one summary every N seconds.

config

Singleton exposing config_schema parameters from component.yml as attributes:

from pipelogic.worker import config

threshold = float(config.threshold)
mean = tuple(config.mean)             # (Double, Double, Double) -> tuple of floats
language = str(config.language)

The runtime auto-converts each value to the wrapper type that matches the declared type: field in component.yml.

config.sync()

Picks up live updates pushed from the platform UI. Returns a dict of changed keys:

def expensive_worker(image):
    changed = config.sync()
    if "model_path" in changed:
        reload_model()
    return run_inference(image)

Most components don't need this — re-reading the value each tick is enough.

virtual_in: VirtualInputStream

Module-level singleton bound to the worker's virtual input queue.

  • Name
    push(*messages, blocking=True)
    Type
    None
    Description

    Push one or more messages onto the queue. With blocking=False, drops messages when the queue is full instead of blocking — the right choice for live data sources.

  • Name
    close()
    Type
    None
    Description

    Closes the queue. Once closed, the worker stops as soon as it tries to read past the last queued message.

virtual_out: VirtualOutputStream

Module-level singleton bound to the worker's virtual output queue.

  • Name
    set_on_push(callback)
    Type
    None
    Description

    Register a callable that the runtime invokes for each emitted virtual-output message. The callback runs on a runtime-managed thread.

  • Name
    ack(tag)
    Type
    None
    Description

    Acknowledge processing of a virtual-output message (used by external sinks that want backpressure).

  • Name
    stop()
    Type
    None
    Description

    Signal that no more virtual-output messages will be produced.

Streaming-mode primitives

from pipelogic.worker import (
    InputsPullRequest, InputsPullOption, StreamPullShift,
    VirtualInputStream, VirtualOutputStream,
)
  • Name
    StreamPullShift.from_fixed(skip, min_pull, max_pull)
    Type
    StreamPullShift
    Description

    Skip the first skip messages, then pull between min_pull and max_pull more.

  • Name
    InputsPullOption([shifts...], virtual_shift=None)
    Type
    InputsPullOption
    Description

    A single permissible pull configuration. List one shift per declared input stream, in order.

  • Name
    InputsPullRequest(option | [options...])
    Type
    InputsPullRequest
    Description

    Wrap one option, or a list of acceptable options. The runtime triggers the worker as soon as one is satisfied.

What's next

  • Concepts — how the run loop fits into the broader system.
  • API: cv — the typed wrappers you'll most often return from a worker.
  • Recipes — stateful counter, sliding-window smoother, virtual-input camera.

Was this page helpful?