Skip to content

Testing

testing

In-process test harness for workflow authoring.

:class:WorkflowEnvironment drives a workflow to completion in a single Python process, without a running server or worker. It reuses the same :func:durable_workflow.workflow.replay machinery the worker uses, but resolves yielded commands against user-registered activity mocks and auto-fires timers / side-effects / search-attribute upserts so tests do not need a real clock or Redis.

Typical use::

def test_my_workflow():
    env = WorkflowEnvironment()
    env.register_activity_result("charge_card", {"id": "ch_1"})
    env.register_activity_result("send_receipt", None)
    result = env.execute_workflow(OrderWorkflow, "order-1", {"amount": 42})
    assert result == {"status": "complete", "charge_id": "ch_1"}

When a workflow yields :class:~durable_workflow.workflow.ContinueAsNew, the harness records a WorkflowContinuedAsNew event on the completing run, resets history, and starts a new run with the command's arguments. :attr:WorkflowEnvironment.runs exposes the per-run record (input, workflow type, history events, and terminal command) so tests can assert on the whole chain. The chain length is bounded by continue_as_new_limit; exceeding it raises :exc:RuntimeError.

For regression-testing workflow code against production histories, use :func:replay_history — it hands the real durable history straight to the worker's replayer and surfaces any non-determinism as a raised exception.

WorkflowRunRecord dataclass

WorkflowRunRecord(workflow_type, input, history=list(), terminal=None)

One run within an execute_workflow chain.

A single-run workflow produces one record with :attr:terminal of type :class:~durable_workflow.workflow.CompleteWorkflow or :class:~durable_workflow.workflow.FailWorkflow. A workflow that yields :class:~durable_workflow.workflow.ContinueAsNew produces one record per link; the terminal of each non-final link is the ContinueAsNew command itself and the final link terminates as usual.

WorkflowEnvironment

WorkflowEnvironment(*, iteration_limit=1000, continue_as_new_limit=50)

Drives a workflow to completion against user-registered activity mocks.

:param iteration_limit: upper bound on per-run replay iterations (guard against workflows that never yield a terminal command). :param continue_as_new_limit: upper bound on the number of continue-as-new links in one execute_workflow chain. The first run counts as link 1; a workflow that continues five times runs six links. Exceeding the limit raises :exc:RuntimeError.

runs property

runs

Per-run records produced by the most recent execute_workflow call.

Empty before the first execute_workflow call. Reset at the start of each call.

run_count property

run_count

Number of runs in the most recent execute_workflow chain.

register_activity_result

register_activity_result(name, result)

Canned response: every call to name returns result.

register_activity

register_activity(name, fn)

Callable mock: fn(*arguments) is invoked for each scheduled call.

Use this when the test needs the mock to vary with arguments (e.g. look up by order id) or to capture invocations.

register_child_workflow_result

register_child_workflow_result(workflow_type, result)

Canned response for child workflow completions.

register_workflow

register_workflow(workflow_cls)

Make workflow_cls resolvable by name for continue-as-new.

continue_as_new(workflow_type="other-name") looks up the next workflow class by its @workflow.defn name. The class passed to :meth:execute_workflow is registered automatically; call this method for additional workflow classes the chain may continue into.

signal

signal(name, args=None, *, run=None)

Queue a signal to be delivered before the next iteration.

Signals are drained in the order they were queued and injected into the workflow history as SignalReceived events; the replayer dispatches each to its registered @workflow.signal handler.

Pass run to target a specific link in a continue-as-new chain (run=1 is the first run, run=2 the run that starts after the first ContinueAsNew, and so on). Without run the signal is consumed at the current front of the chain — on the first run before the first iteration, and on later iterations in whatever link happens to be running.

execute_workflow

execute_workflow(workflow_cls, *args, run_id='test-run')

Drive workflow_cls to a terminal state and return its result.

Raises :class:~durable_workflow.errors.WorkflowFailed if the workflow ended in the failed state. Activities that do not have a registered mock raise :class:KeyError so tests fail loudly on missing fixtures.

If the workflow yields :class:~durable_workflow.workflow.ContinueAsNew, the harness starts a new run with the new input and, if the command named a workflow_type, the matching registered workflow class. The chain is bounded by continue_as_new_limit and returns the terminal result of the final run.

replay_history

replay_history(workflow_cls, history_events, start_input=None, *, run_id='', payload_codec=None)

Replay a production history against current workflow code.

Hands the durable history directly to the worker's replayer. Raises any exception the workflow would raise during replay — for example a non-determinism failure when run yields a different command sequence from the one recorded in history.

This is the supported way to regression-test a workflow change against real production traffic: dump the history from Client.get_history, save the JSON, and replay it on every PR.

replay_history_file

replay_history_file(workflow_cls, path, start_input=None, *, run_id='', payload_codec=None)

Convenience wrapper: load a JSON history file and replay it.

Accepts either a list of events at the top level or a dict with an events key (matching the shape of Client.get_history).