This guide explains how to build a custom worker that runs alongside beVault’s orchestrator (States) using the open-source Python library bevault-workers-python. The library gives you BaseWorker and WorkerManager so you focus on business logic while the runtime handles Step Functions activities, polling, heartbeats, and optional store configuration.
What you are building
A worker is a Python class that:
-
Subclasses
BaseWorkerand sets a uniquename(this becomes the logical activity name in the orchestration layer). -
Implements
handle(self, input_data), whereinput_datais the JSON payload States sends to the activity (already parsed from the task input).
At runtime, WorkerManager discovers worker classes from a Python package you specify, registers AWS Step Functions activities (or compatible endpoints), and runs worker processes that poll for tasks and invoke handle.
Prerequisites
-
Python 3 environment.
-
Install the library (from PyPI or the repo):
Bashpip install bevault_workersDevelopment install from GitHub:
Bashgit clone https://github.com/depfac/bevault-workers-python.git cd bevault-workers-python pip install -e ".[dev]" -
Configuration for States / Step Functions: the worker process uses environment variables (see the repo’s README and
examples/.env.sample) for authentication, region, service URL, concurrency, heartbeats, and optional States store sync. Copy.env.sampleto.envand adjust values for your environment.
Minimal project layout
A typical consumer project looks like this:
your-project/
├── main.py # constructs WorkerManager and calls start()
├── config.json # optional: local data store definitions
├── logging_config.json # optional: logging configuration
├── .env # Step Functions / States settings
├── requirements.txt # pin bevault_workers
└── workers/ # or any importable package name you choose
├── __init__.py
└── my_worker.py # one module per worker (recommended)
The workers_module argument to WorkerManager must be the import name of that package (for example "workers").
Step 1 — Implement a worker class
Create a module under your workers package and define a class that subclasses BaseWorker:
from bevault_workers import BaseWorker
class MyWorker(BaseWorker):
name = "my_custom_worker" # must be unique among discovered workers
def handle(self, input_data):
# input_data is a dict parsed from the activity task JSON input
return {"status": "success", "result": input_data}
Required pieces
|
Piece |
Purpose |
|---|---|
|
|
Identifies the worker. Used when registering activities and when States targets this activity. |
|
|
Your implementation; receives the task payload as a |
Return value contract (important)
The activity loop expects handle to return a dictionary that includes a status key. The runtime behaves roughly as follows:
-
status == "error"— the task is reported as failed to Step Functions; include anerror_messagestring for operators. -
status == "canceled"— treated as a cooperative cancel path (see cancellation below). -
Any other
status(commonly"success") — the full dict is serialized to JSON and sent as the successful task output.
So a robust pattern is:
try:
# ... work ...
return {"status": "success", "result": ...}
except Exception as e:
return {"status": "error", "error_message": str(e)}
Omitting status or returning a non-dict can break the activity loop’s assumptions.
Logging
Use self.get_logger() to obtain a logger named for this worker process (see library examples).
Optional: long-running work and cancellation
The base class exposes is_canceled() and internal cancellation signaling tied to heartbeat/task lifecycle. For long loops, periodically check is_canceled() and exit early, returning something like {"status": "canceled", "message": "..."} if appropriate.
Step 2 — Wire up WorkerManager
Your application entry point constructs WorkerManager and calls start() (blocking until shutdown):
from bevault_workers import WorkerManager
def main():
manager = WorkerManager(
config_path="config.json", # optional; omit or use if you have local stores
workers_module="workers", # package that contains your worker modules
)
manager.start()
if __name__ == "__main__":
main()
-
config_path: Path to JSON used to load local store definitions intoStoreRegistry. Omit or adjust if you only use synced stores or no stores. -
workers_module: Defaults to"workers"if omitted. Must match your Python package name.
The repository’s examples/main.py shows the same pattern; the root main.py uses dev_workers for in-repo development.
Step 3 — How discovery finds your classes
WorkerManager imports the package you named and walks immediate submodules (one level under that package). For each submodule, it collects every class that:
-
subclasses
BaseWorker, -
is not
BaseWorkeritself, and -
is defined in that submodule (not merely imported from elsewhere).
So place each worker class in its own file under the workers package (e.g. workers/my_worker.py), not only in workers/__init__.py, unless your structure still satisfies the discovery rules.
Step 4 — Use data stores from workers (optional)
If your pipeline passes store names in the task input, resolve them with StoreRegistry.get("<Name>"). Store Name values come from config.json locally, or from States store sync when enabled (beVault 3.10+); local definitions override synced ones for the same name.
Example pattern (from the repo’s examples/workers/custom_worker.py):
from bevault_workers import BaseWorker, StoreRegistry
class CustomWorker(BaseWorker):
name = "my_custom_worker"
def handle(self, input_data):
store = StoreRegistry.get(input_data["outputStore"])
result = store.execute(query="SELECT VERSION()")
return {"status": "success", "result": result}
Configure matching stores in config.json (see examples/config.json.sample and the README Stores section).
Step 5 — Run and integrate with States
When start() runs, the manager loads stores (if configured), discovers workers, creates Step Functions activities for each worker name (with an optional environment prefix from settings), and spawns multiple processes per worker according to max concurrency in your environment settings.
Your beVault / States workflow must reference the activity ARN (or equivalent) that corresponds to each worker. In practice you coordinate naming between the worker’s name, the configured environment prefix, and the activity resources your orchestration expects—the library creates activities programmatically on startup; ensure your deployment and IAM permissions align with that model.
Deployment
To run your custom worker next to beVault in a repeatable way, package your application—not only the library—as its own Docker image: put main.py, your workers/ package, requirements.txt (pinning bevault_workers), and any config.json / logging_config.json templates in the image, inject secrets and environment-specific values at runtime (for example via orchestrator secrets or mounted env files), and expose the same networking and IAM expectations your States/Step Functions client needs. That image can then be deployed on the same hosts or cluster as beVault, or on adjacent nodes in the same network, so upgrades to the library or your worker code stay isolated from the beVault stack while still sharing configuration patterns and store definitions. Treat the container as the unit you version, test in CI, and roll out—keeping “worker app” lifecycle separate from “beVault platform” lifecycle but operationally aligned.
Reference links
-
Repository: https://github.com/depfac/bevault-workers-python
-
Contributing & tests:
CONTRIBUTING.mdin the repo -
Examples:
examples/(minimal app layout, custom worker, optional custom stores)