Skip to content

Workflow

Workflow retry and timeout settings are durable command budgets. Use ActivityRetryPolicy on ctx.schedule_activity(...) for activity attempts and ChildWorkflowRetryPolicy on ctx.start_child_workflow(...) for child workflow attempts. Use TransportRetryPolicy only for client HTTP retries.

workflow

Workflow authoring primitives: decorators, context, commands, and replayer.

A workflow is a Python class registered with :func:defn. Its run method is a generator that yields command dataclasses (ScheduleActivity, StartTimer, StartChildWorkflow, …) — the worker's replayer drives the generator forward by resolving each yielded command against the current history of the workflow run. Yield a list of commands to run them in parallel.

Determinism-sensitive helpers live on the :class:WorkflowContext passed to run: :meth:WorkflowContext.random, :meth:WorkflowContext.uuid4, :meth:WorkflowContext.uuid7, :meth:WorkflowContext.now, :meth:WorkflowContext.patched, :meth:WorkflowContext.deprecate_patch, and :meth:WorkflowContext.side_effect all produce values that are recorded on first execution and replayed verbatim on every subsequent replay of the same history.

ActivityRetryPolicy dataclass

ActivityRetryPolicy(max_attempts=3, initial_interval_seconds=1.0, backoff_coefficient=2.0, maximum_interval_seconds=None, non_retryable_error_types=list(), backoff_seconds=None)

Retry policy applied to one scheduled activity call.

The policy is snapped onto the durable activity execution when the workflow task completes, so later code deploys do not change the retry budget for an already-scheduled activity. It is a server-side durable retry policy, not the SDK HTTP transport retry policy.

non_retryable_error_types names failure types that should bypass this retry budget. An activity worker can also report non_retryable=True on a failure to stop retrying that activity execution.

ChildWorkflowRetryPolicy dataclass

ChildWorkflowRetryPolicy(max_attempts=3, initial_interval_seconds=1.0, backoff_coefficient=2.0, maximum_interval_seconds=None, non_retryable_error_types=list(), backoff_seconds=None)

Bases: ActivityRetryPolicy

Retry policy applied to one started child workflow call.

This is recorded with the child workflow command and controls durable server-side child attempts. It is separate from SDK HTTP transport retry and from activity retry.

ScheduleActivity dataclass

ScheduleActivity(activity_type, arguments, queue=None, retry_policy=None, start_to_close_timeout=None, schedule_to_start_timeout=None, schedule_to_close_timeout=None, heartbeat_timeout=None)

Command requesting an activity task.

Timeout fields are activity budgets, not HTTP request timeouts: start_to_close_timeout limits one activity attempt, schedule_to_start_timeout limits queue wait before an attempt starts, schedule_to_close_timeout limits the whole activity execution including retries, and heartbeat_timeout limits the gap between activity heartbeats.

StartTimer dataclass

StartTimer(delay_seconds)

Command requesting a durable timer.

CompleteWorkflow dataclass

CompleteWorkflow(result)

Command completing a workflow with a payload result.

FailWorkflow dataclass

FailWorkflow(message, exception_type=None, non_retryable=False)

Command failing a workflow with diagnostic metadata.

CompleteUpdate dataclass

CompleteUpdate(update_id, result)

Worker command completing an accepted workflow update.

FailUpdate dataclass

FailUpdate(update_id, message, exception_type=None, exception_class=None, non_retryable=True)

Worker command failing an accepted workflow update.

ContinueAsNew dataclass

ContinueAsNew(workflow_type=None, arguments=list(), task_queue=None)

Workflow return value that starts a new run with fresh history.

RecordSideEffect dataclass

RecordSideEffect(result)

Command recording the result of a non-deterministic function.

StartChildWorkflow dataclass

StartChildWorkflow(workflow_type, arguments=list(), task_queue=None, parent_close_policy=None, retry_policy=None, execution_timeout_seconds=None, run_timeout_seconds=None)

Command requesting a child workflow run.

execution_timeout_seconds limits the overall child workflow execution. run_timeout_seconds limits one child run. These budgets are durable server-side workflow budgets and are separate from client HTTP timeouts.

RecordVersionMarker dataclass

RecordVersionMarker(change_id, version, min_supported, max_supported, result_kind='version')

Command recording a workflow code-version marker.

UpsertSearchAttributes dataclass

UpsertSearchAttributes(attributes)

Command updating workflow search attributes.

WaitCondition dataclass

WaitCondition(predicate, condition_key=None, condition_definition_fingerprint=None, timeout_seconds=None)

Command that yields execution until a workflow-defined predicate becomes true.

The replayer evaluates predicate locally against in-memory workflow state (typically mutated by signal/update handlers). The server records a ConditionWaitOpened history event and re-drives the workflow when any signal arrives or, if timeout_seconds is provided, when the timeout elapses (a TimerFired history event with timer_kind=condition_timeout).

WorkflowContext

WorkflowContext(*, run_id='', current_time=None)

Replay-safe helper surface passed to workflow run methods.

start_timer

start_timer(seconds)

Yield a durable timer that resolves after seconds seconds.

sleep

sleep(seconds)

Sleep for seconds seconds of durable wall time.

Sugar over :meth:start_timer that accepts a float and rounds up to the next whole second (the server stores timer deadlines as integer seconds). The call is still a single yield of a durable command — use yield ctx.sleep(60) or bare yield ctx.sleep(60) from the workflow run method.

wait_condition

wait_condition(predicate, *, key=None, timeout=None)

Yield execution until predicate() returns truthy.

The predicate is evaluated against the workflow's in-memory state on every replay tick — typically mutated by @signal / @update handlers as external events arrive. If timeout is provided and elapses before the predicate becomes true, the yield resolves to False (otherwise True). The fractional timeout is rounded up to the next whole second to match the server's integer-second timer resolution.

patched

patched(change_id)

Record or read a patch marker and resolve to True for patched runs.

New runs record version 1 for change_id and replay as True. Older runs that reached this code without a marker resolve the legacy default version -1 and replay as False.

deprecate_patch

deprecate_patch(change_id)

Keep a patch marker alive after the old branch has been removed.

Replayer

Replayer(*, workflows)

Replay captured workflow history without a live server.

Register one or more workflow classes, then call :meth:replay with a server-exported history list or a dictionary containing an events key. If the history includes a WorkflowStarted event, the replayer can infer the workflow type and start input from that event.

defn

defn(*, name)

Register a class as a workflow type under a language-neutral name.

Scans the class for @signal, @query, and @update decorated methods and builds registries at decoration time so worker-side dispatch can use stable receiver names without re-inspecting the class on every history event or control-plane request.

signal

signal(name)

Mark a workflow method as the handler for an external signal.

Example::

@workflow.defn(name="approval")
class ApprovalWorkflow:
    def __init__(self) -> None:
        self.approved: bool = False

    @workflow.signal("approve")
    def on_approve(self, by: str) -> None:
        self.approved = True

The decorated method is called by the replayer when a matching SignalReceived history event is observed, with the signal's decoded arguments unpacked into positional parameters. Handler return values are ignored; to expose state back to the workflow's main run loop, mutate self.* attributes (as on_approve does above) and yield the usual commands from run().

query

query(name)

Mark a workflow method as a read-only query handler.

Query methods are invoked against replayed workflow state. They must not mutate self or perform I/O. The server-side worker query transport is still implemented separately; this decorator records the Python receiver metadata and is used by :func:query_state.

update

update(name)

Mark a workflow method as an update handler.

The returned function also exposes .validator for the common pattern::

@workflow.update("approve")
def approve(self, approved: bool) -> dict: ...

@approve.validator
def validate_approve(self, approved: bool) -> None: ...

This release records receiver metadata only. The server-side Python update execution transport is tracked separately.

update_validator

update_validator(name)

Mark a workflow method as the validator for an update name.

registry

registry()

Return a copy of workflow types registered in this process.

commands_to_server_commands

commands_to_server_commands(commands, task_queue, *, payload_codec=serializer.AVRO_CODEC, size_warning=serializer.DEFAULT_PAYLOAD_SIZE_WARNING, warning_context=None, external_storage=None, external_storage_threshold_bytes=None)

Convert workflow commands to the server wire shape with batched payload encoding.

query_state

query_state(workflow_cls, history_events, start_input, query_name, args=None, *, workflow_id=None, run_id='', payload_codec=None, external_storage=None, external_storage_cache=None)

Replay a workflow to current state and invoke a registered query.

This is the Python-side core that a future server-routed query task can call after fetching durable history. Unknown query names and handler exceptions are normalized to :class:~durable_workflow.errors.QueryFailed.

apply_update

apply_update(workflow_cls, history_events, start_input, update_id, *, workflow_id=None, run_id='', payload_codec=None, external_storage=None, external_storage_cache=None)

Replay current workflow state and run one accepted update handler.

The server remains the durable authority: it accepts the update, sends a workflow task carrying workflow_update_id, and records UpdateApplied / UpdateCompleted when this helper's worker command is submitted. Python only reconstructs in-memory state and runs the registered receiver method for the accepted update.