Breadcrumbs

How to create your own custom workers?

This guide explains how to build a custom worker that runs alongside beVault’s orchestrator (States) using the open-source Python library bevault-workers-python. The library gives you BaseWorker and WorkerManager so you focus on business logic while the runtime handles Step Functions activities, polling, heartbeats, and optional store configuration.

What you are building

A worker is a Python class that:

  1. Subclasses BaseWorker and sets a unique name (this becomes the logical activity name in the orchestration layer).

  2. Implements handle(self, input_data), where input_data is the JSON payload States sends to the activity (already parsed from the task input).

At runtime, WorkerManager discovers worker classes from a Python package you specify, registers AWS Step Functions activities (or compatible endpoints), and runs worker processes that poll for tasks and invoke handle.

Prerequisites

  • Python 3 environment.

  • Install the library (from PyPI or the repo):

    Bash
    pip install bevault_workers
    

    Development install from GitHub:

    Bash
    git clone https://github.com/depfac/bevault-workers-python.git
    cd bevault-workers-python
    pip install -e ".[dev]"
    
  • Configuration for States / Step Functions: the worker process uses environment variables (see the repo’s README and examples/.env.sample) for authentication, region, service URL, concurrency, heartbeats, and optional States store sync. Copy .env.sample to .env and adjust values for your environment.

Minimal project layout

A typical consumer project looks like this:

your-project/
├── main.py                 # constructs WorkerManager and calls start()
├── config.json             # optional: local data store definitions
├── logging_config.json     # optional: logging configuration
├── .env                    # Step Functions / States settings
├── requirements.txt        # pin bevault_workers
└── workers/                # or any importable package name you choose
    ├── __init__.py
    └── my_worker.py        # one module per worker (recommended)

The workers_module argument to WorkerManager must be the import name of that package (for example "workers").

Step 1 — Implement a worker class

Create a module under your workers package and define a class that subclasses BaseWorker:

Python
from bevault_workers import BaseWorker

class MyWorker(BaseWorker):
    name = "my_custom_worker"  # must be unique among discovered workers

    def handle(self, input_data):
        # input_data is a dict parsed from the activity task JSON input
        return {"status": "success", "result": input_data}

Required pieces

Piece

Purpose

name

Identifies the worker. Used when registering activities and when States targets this activity.

handle(input_data)

Your implementation; receives the task payload as a dict.

Return value contract (important)

The activity loop expects handle to return a dictionary that includes a status key. The runtime behaves roughly as follows:

  • status == "error" — the task is reported as failed to Step Functions; include an error_message string for operators.

  • status == "canceled" — treated as a cooperative cancel path (see cancellation below).

  • Any other status (commonly "success") — the full dict is serialized to JSON and sent as the successful task output.

So a robust pattern is:

Python
try:
    # ... work ...
    return {"status": "success", "result": ...}
except Exception as e:
    return {"status": "error", "error_message": str(e)}

Omitting status or returning a non-dict can break the activity loop’s assumptions.

Logging

Use self.get_logger() to obtain a logger named for this worker process (see library examples).

Optional: long-running work and cancellation

The base class exposes is_canceled() and internal cancellation signaling tied to heartbeat/task lifecycle. For long loops, periodically check is_canceled() and exit early, returning something like {"status": "canceled", "message": "..."} if appropriate.

Step 2 — Wire up WorkerManager

Your application entry point constructs WorkerManager and calls start() (blocking until shutdown):

Python
from bevault_workers import WorkerManager

def main():
    manager = WorkerManager(
        config_path="config.json",   # optional; omit or use if you have local stores
        workers_module="workers",    # package that contains your worker modules
    )
    manager.start()

if __name__ == "__main__":
    main()
  • config_path: Path to JSON used to load local store definitions into StoreRegistry. Omit or adjust if you only use synced stores or no stores.

  • workers_module: Defaults to "workers" if omitted. Must match your Python package name.

The repository’s examples/main.py shows the same pattern; the root main.py uses dev_workers for in-repo development.

Step 3 — How discovery finds your classes

WorkerManager imports the package you named and walks immediate submodules (one level under that package). For each submodule, it collects every class that:

  • subclasses BaseWorker,

  • is not BaseWorker itself, and

  • is defined in that submodule (not merely imported from elsewhere).

So place each worker class in its own file under the workers package (e.g. workers/my_worker.py), not only in workers/__init__.py, unless your structure still satisfies the discovery rules.

Step 4 — Use data stores from workers (optional)

If your pipeline passes store names in the task input, resolve them with StoreRegistry.get("<Name>"). Store Name values come from config.json locally, or from States store sync when enabled (beVault 3.10+); local definitions override synced ones for the same name.

Example pattern (from the repo’s examples/workers/custom_worker.py):

Python
from bevault_workers import BaseWorker, StoreRegistry

class CustomWorker(BaseWorker):
    name = "my_custom_worker"

    def handle(self, input_data):
        store = StoreRegistry.get(input_data["outputStore"])
        result = store.execute(query="SELECT VERSION()")
        return {"status": "success", "result": result}

Configure matching stores in config.json (see examples/config.json.sample and the README Stores section).

Step 5 — Run and integrate with States

When start() runs, the manager loads stores (if configured), discovers workers, creates Step Functions activities for each worker name (with an optional environment prefix from settings), and spawns multiple processes per worker according to max concurrency in your environment settings.

Your beVault / States workflow must reference the activity ARN (or equivalent) that corresponds to each worker. In practice you coordinate naming between the worker’s name, the configured environment prefix, and the activity resources your orchestration expects—the library creates activities programmatically on startup; ensure your deployment and IAM permissions align with that model.

Deployment

To run your custom worker next to beVault in a repeatable way, package your application—not only the library—as its own Docker image: put main.py, your workers/ package, requirements.txt (pinning bevault_workers), and any config.json / logging_config.json templates in the image, inject secrets and environment-specific values at runtime (for example via orchestrator secrets or mounted env files), and expose the same networking and IAM expectations your States/Step Functions client needs. That image can then be deployed on the same hosts or cluster as beVault, or on adjacent nodes in the same network, so upgrades to the library or your worker code stay isolated from the beVault stack while still sharing configuration patterns and store definitions. Treat the container as the unit you version, test in CI, and roll out—keeping “worker app” lifecycle separate from “beVault platform” lifecycle but operationally aligned.