Workflow¶
Workflow retry and timeout settings are durable command budgets. Use
ActivityRetryPolicy on ctx.schedule_activity(...) for activity attempts and
ChildWorkflowRetryPolicy on ctx.start_child_workflow(...) for child workflow
attempts. Use TransportRetryPolicy only for client HTTP retries.
workflow
¶
Workflow authoring primitives: decorators, context, commands, and replayer.
A workflow is a Python class registered with :func:defn. Its run method
is a generator that yields command dataclasses (ScheduleActivity,
StartTimer, StartChildWorkflow, …) — the worker's replayer drives the
generator forward by resolving each yielded command against the current
history of the workflow run. Yield a list of commands to run them in
parallel.
Determinism-sensitive helpers live on the :class:WorkflowContext passed to
run: :meth:WorkflowContext.random, :meth:WorkflowContext.uuid4,
:meth:WorkflowContext.uuid7, :meth:WorkflowContext.now,
:meth:WorkflowContext.patched, :meth:WorkflowContext.deprecate_patch, and
:meth:WorkflowContext.side_effect all produce values that are recorded on
first execution and replayed verbatim on every subsequent replay of the same
history.
ActivityRetryPolicy
dataclass
¶
ActivityRetryPolicy(max_attempts=3, initial_interval_seconds=1.0, backoff_coefficient=2.0, maximum_interval_seconds=None, non_retryable_error_types=list(), backoff_seconds=None)
Retry policy applied to one scheduled activity call.
The policy is snapped onto the durable activity execution when the workflow task completes, so later code deploys do not change the retry budget for an already-scheduled activity. It is a server-side durable retry policy, not the SDK HTTP transport retry policy.
non_retryable_error_types names failure types that should bypass this
retry budget. An activity worker can also report non_retryable=True on
a failure to stop retrying that activity execution.
ChildWorkflowRetryPolicy
dataclass
¶
ChildWorkflowRetryPolicy(max_attempts=3, initial_interval_seconds=1.0, backoff_coefficient=2.0, maximum_interval_seconds=None, non_retryable_error_types=list(), backoff_seconds=None)
Bases: ActivityRetryPolicy
Retry policy applied to one started child workflow call.
This is recorded with the child workflow command and controls durable server-side child attempts. It is separate from SDK HTTP transport retry and from activity retry.
ScheduleActivity
dataclass
¶
ScheduleActivity(activity_type, arguments, queue=None, retry_policy=None, start_to_close_timeout=None, schedule_to_start_timeout=None, schedule_to_close_timeout=None, heartbeat_timeout=None)
Command requesting an activity task.
Timeout fields are activity budgets, not HTTP request timeouts:
start_to_close_timeout limits one activity attempt,
schedule_to_start_timeout limits queue wait before an attempt starts,
schedule_to_close_timeout limits the whole activity execution including
retries, and heartbeat_timeout limits the gap between activity
heartbeats.
CompleteWorkflow
dataclass
¶
Command completing a workflow with a payload result.
FailWorkflow
dataclass
¶
Command failing a workflow with diagnostic metadata.
CompleteUpdate
dataclass
¶
Worker command completing an accepted workflow update.
FailUpdate
dataclass
¶
Worker command failing an accepted workflow update.
ContinueAsNew
dataclass
¶
Workflow return value that starts a new run with fresh history.
RecordSideEffect
dataclass
¶
Command recording the result of a non-deterministic function.
StartChildWorkflow
dataclass
¶
StartChildWorkflow(workflow_type, arguments=list(), task_queue=None, parent_close_policy=None, retry_policy=None, execution_timeout_seconds=None, run_timeout_seconds=None)
Command requesting a child workflow run.
execution_timeout_seconds limits the overall child workflow execution.
run_timeout_seconds limits one child run. These budgets are durable
server-side workflow budgets and are separate from client HTTP timeouts.
RecordVersionMarker
dataclass
¶
Command recording a workflow code-version marker.
UpsertSearchAttributes
dataclass
¶
Command updating workflow search attributes.
WaitCondition
dataclass
¶
WaitCondition(predicate, condition_key=None, condition_definition_fingerprint=None, timeout_seconds=None)
Command that yields execution until a workflow-defined predicate becomes true.
The replayer evaluates predicate locally against in-memory workflow state
(typically mutated by signal/update handlers). The server records a
ConditionWaitOpened history event and re-drives the workflow when any
signal arrives or, if timeout_seconds is provided, when the timeout
elapses (a TimerFired history event with timer_kind=condition_timeout).
WorkflowContext
¶
Replay-safe helper surface passed to workflow run methods.
sleep
¶
Sleep for seconds seconds of durable wall time.
Sugar over :meth:start_timer that accepts a float and rounds up to
the next whole second (the server stores timer deadlines as integer
seconds). The call is still a single yield of a durable command —
use yield ctx.sleep(60) or bare yield ctx.sleep(60) from the
workflow run method.
wait_condition
¶
Yield execution until predicate() returns truthy.
The predicate is evaluated against the workflow's in-memory state on
every replay tick — typically mutated by @signal / @update
handlers as external events arrive. If timeout is provided and
elapses before the predicate becomes true, the yield resolves to
False (otherwise True). The fractional timeout is rounded
up to the next whole second to match the server's integer-second
timer resolution.
patched
¶
Record or read a patch marker and resolve to True for patched runs.
New runs record version 1 for change_id and replay as True.
Older runs that reached this code without a marker resolve the legacy
default version -1 and replay as False.
deprecate_patch
¶
Keep a patch marker alive after the old branch has been removed.
Replayer
¶
Replay captured workflow history without a live server.
Register one or more workflow classes, then call :meth:replay with a
server-exported history list or a dictionary containing an events key.
If the history includes a WorkflowStarted event, the replayer can infer
the workflow type and start input from that event.
defn
¶
Register a class as a workflow type under a language-neutral name.
Scans the class for @signal, @query, and @update decorated
methods and builds registries at decoration time so worker-side dispatch
can use stable receiver names without re-inspecting the class on every
history event or control-plane request.
signal
¶
Mark a workflow method as the handler for an external signal.
Example::
@workflow.defn(name="approval")
class ApprovalWorkflow:
def __init__(self) -> None:
self.approved: bool = False
@workflow.signal("approve")
def on_approve(self, by: str) -> None:
self.approved = True
The decorated method is called by the replayer when a matching
SignalReceived history event is observed, with the signal's
decoded arguments unpacked into positional parameters. Handler return
values are ignored; to expose state back to the workflow's main run
loop, mutate self.* attributes (as on_approve does above) and
yield the usual commands from run().
query
¶
Mark a workflow method as a read-only query handler.
Query methods are invoked against replayed workflow state. They must not
mutate self or perform I/O. The server-side worker query transport is
still implemented separately; this decorator records the Python receiver
metadata and is used by :func:query_state.
update
¶
Mark a workflow method as an update handler.
The returned function also exposes .validator for the common pattern::
@workflow.update("approve")
def approve(self, approved: bool) -> dict: ...
@approve.validator
def validate_approve(self, approved: bool) -> None: ...
This release records receiver metadata only. The server-side Python update execution transport is tracked separately.
update_validator
¶
Mark a workflow method as the validator for an update name.
commands_to_server_commands
¶
commands_to_server_commands(commands, task_queue, *, payload_codec=serializer.AVRO_CODEC, size_warning=serializer.DEFAULT_PAYLOAD_SIZE_WARNING, warning_context=None, external_storage=None, external_storage_threshold_bytes=None)
Convert workflow commands to the server wire shape with batched payload encoding.
query_state
¶
query_state(workflow_cls, history_events, start_input, query_name, args=None, *, workflow_id=None, run_id='', payload_codec=None, external_storage=None, external_storage_cache=None)
Replay a workflow to current state and invoke a registered query.
This is the Python-side core that a future server-routed query task can
call after fetching durable history. Unknown query names and handler
exceptions are normalized to :class:~durable_workflow.errors.QueryFailed.
apply_update
¶
apply_update(workflow_cls, history_events, start_input, update_id, *, workflow_id=None, run_id='', payload_codec=None, external_storage=None, external_storage_cache=None)
Replay current workflow state and run one accepted update handler.
The server remains the durable authority: it accepts the update, sends a
workflow task carrying workflow_update_id, and records
UpdateApplied / UpdateCompleted when this helper's worker command
is submitted. Python only reconstructs in-memory state and runs the
registered receiver method for the accepted update.