Skip to content

Workflow

workflow

Workflow authoring primitives: decorators, context, commands, and replayer.

A workflow is a Python class registered with :func:defn. Its run method is a generator that yields command dataclasses (ScheduleActivity, StartTimer, StartChildWorkflow, …) — the worker's replayer drives the generator forward by resolving each yielded command against the current history of the workflow run. Yield a list of commands to run them in parallel.

Determinism-sensitive helpers live on the :class:WorkflowContext passed to run: :meth:WorkflowContext.random, :meth:WorkflowContext.uuid4, :meth:WorkflowContext.now, and :meth:WorkflowContext.side_effect all produce values that are recorded on first execution and replayed verbatim on every subsequent replay of the same history.

ActivityRetryPolicy dataclass

ActivityRetryPolicy(max_attempts=3, initial_interval_seconds=1.0, backoff_coefficient=2.0, maximum_interval_seconds=None, non_retryable_error_types=list(), backoff_seconds=None)

Retry policy applied to one scheduled activity call.

The policy is snapped onto the durable activity execution when the workflow task completes, so later code deploys do not change the retry budget for an already-scheduled activity.

ChildWorkflowRetryPolicy dataclass

ChildWorkflowRetryPolicy(max_attempts=3, initial_interval_seconds=1.0, backoff_coefficient=2.0, maximum_interval_seconds=None, non_retryable_error_types=list(), backoff_seconds=None)

Bases: ActivityRetryPolicy

Retry policy applied to one started child workflow call.

ScheduleActivity dataclass

ScheduleActivity(activity_type, arguments, queue=None, retry_policy=None, start_to_close_timeout=None, schedule_to_start_timeout=None, schedule_to_close_timeout=None, heartbeat_timeout=None)

Command requesting an activity task.

StartTimer dataclass

StartTimer(delay_seconds)

Command requesting a durable timer.

CompleteWorkflow dataclass

CompleteWorkflow(result)

Command completing a workflow with a payload result.

FailWorkflow dataclass

FailWorkflow(message, exception_type=None, non_retryable=False)

Command failing a workflow with diagnostic metadata.

ContinueAsNew dataclass

ContinueAsNew(workflow_type=None, arguments=list(), task_queue=None)

Workflow return value that starts a new run with fresh history.

RecordSideEffect dataclass

RecordSideEffect(result)

Command recording the result of a non-deterministic function.

StartChildWorkflow dataclass

StartChildWorkflow(workflow_type, arguments=list(), task_queue=None, parent_close_policy=None, retry_policy=None, execution_timeout_seconds=None, run_timeout_seconds=None)

Command requesting a child workflow run.

RecordVersionMarker dataclass

RecordVersionMarker(change_id, version, min_supported, max_supported)

Command recording a workflow code-version marker.

UpsertSearchAttributes dataclass

UpsertSearchAttributes(attributes)

Command updating workflow search attributes.

WorkflowContext

WorkflowContext(*, run_id='', current_time=None)

Replay-safe helper surface passed to workflow run methods.

start_timer

start_timer(seconds)

Yield a durable timer that resolves after seconds seconds.

sleep

sleep(seconds)

Sleep for seconds seconds of durable wall time.

Sugar over :meth:start_timer that accepts a float and rounds up to the next whole second (the server stores timer deadlines as integer seconds). The call is still a single yield of a durable command — use yield ctx.sleep(60) or bare yield ctx.sleep(60) from the workflow run method.

defn

defn(*, name)

Register a class as a workflow type under a language-neutral name.

Scans the class for @signal-decorated methods and builds a signal registry at decoration time so the replayer can dispatch incoming signals without re-inspecting the class on every history event.

signal

signal(name)

Mark a workflow method as the handler for an external signal.

Example::

@workflow.defn(name="approval")
class ApprovalWorkflow:
    def __init__(self) -> None:
        self.approved: bool = False

    @workflow.signal("approve")
    def on_approve(self, by: str) -> None:
        self.approved = True

The decorated method is called by the replayer when a matching SignalReceived history event is observed, with the signal's decoded arguments unpacked into positional parameters. Handler return values are ignored; to expose state back to the workflow's main run loop, mutate self.* attributes (as on_approve does above) and yield the usual commands from run().

registry

registry()

Return a copy of workflow types registered in this process.