Skip to main content

Workflow message passing - Python SDK

A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates. The Workflow implementation defines these endpoints via handler methods that can react to incoming messages and return values. Temporal Clients use messages to read Workflow state and control its execution. See Workflow message passing for a general overview of this topic. This page introduces these features for the Temporal Python SDK.

Write message handlers

info

The code that follows is part of a working message passing sample.

Follow these guidelines when writing your message handlers:

  • Message handlers are defined as methods on the Workflow class, using one of the three decorators: @workflow.query, @workflow.signal, and @workflow.update.
  • The parameters and return values of handlers and the main Workflow function must be serializable.
  • Prefer data classes to multiple input parameters. Data class parameters allow you to add fields without changing the calling signature.

Query handlers

A Query is a synchronous operation that retrieves state from a Workflow Execution:

class Language(IntEnum):
Chinese = 1
English = 2
French = 3

@dataclass
class GetLanguagesInput:
include_unsupported: bool

@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self.greetings = {
Language.CHINESE: "你好,世界",
Language.ENGLISH: "Hello, world",
}

@workflow.query
def get_languages(self, input: GetLanguagesInput) -> list[Language]:
# 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
if input.include_unsupported:
return list(Language)
else:
return list(self.greetings)
  • The Query decorator can accept arguments. Refer to the API docs: @workflow.query.

  • A Query handler uses def, not async def. You can't perform async operations like executing an Activity in a Query handler.

Signal handlers

A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow:

@dataclass
class ApproveInput:
name: str

@workflow.defn
class GreetingWorkflow:
...
@workflow.signal
def approve(self, input: ApproveInput) -> None:
# 👉 A Signal handler mutates the Workflow state but cannot return a value.
self.approved_for_release = True
self.approver_name = input.name
  • The Signal decorator can accept arguments. Refer to the API docs: @workflow.signal.

  • The handler should not return a value. The response is sent immediately from the server, without waiting for the Workflow to process the Signal.

  • Signal (and Update) handlers can be async def. This allows you to use Activities, Child Workflows, durable asyncio.sleep Timers, workflow.wait_condition conditions, and more. See Async handlers and Workflow message passing for guidelines on safely using async Signal and Update handlers.

Update handlers and validators

An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:

class Language(IntEnum):
Chinese = 1
English = 2
French = 3

@workflow.defn
class GreetingWorkflow:
...
@workflow.update
def set_language(self, language: Language) -> Language:
# 👉 An Update handler can mutate the Workflow state and return a value.
previous_language, self.language = self.language, language
return previous_language

@set_language.validator
def validate_language(self, language: Language) -> None:
if language not in self.greetings:
# 👉 In an Update validator you raise any exception to reject the Update.
raise ValueError(f"{language.name} is not supported")
  • The Update decorator can take arguments (like, name, dynamic and unfinished_policy) as described in the API reference docs for workflow.update.

  • About validators:

    • Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you can skip them.
    • The SDK automatically provides a validator decorator named @<update-handler-name>.validator. The validator must accept the same argument types as the handler and return None.
  • Accepting and rejecting Updates with validators:

    • To reject an Update, raise an exception of any type in the validator.
    • Without a validator, Updates are always accepted.
  • Validators and Event History:

    • The WorkflowExecutionUpdateAccepted event is written into the History whether the acceptance was automatic or programmatic.
    • When a Validator raises an error, the Update is rejected and WorkflowExecutionUpdateAccepted won't be added to the Event History. The caller receives an "Update failed" error.
  • Use workflow.current_update_info to obtain information about the current Update. This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once.

  • Update (and Signal) handlers can be async def, letting them use Activities, Child Workflows, durable asyncio.sleep Timers, workflow.wait_condition conditions, and more. See Async handlers and Workflow message passing for safe usage guidelines.

Send messages

To send Queries, Signals, or Updates, you call methods on a WorkflowHandle object:

For example:

client = await Client.connect("localhost:7233")
workflow_handle = await client.start_workflow(
GreetingWorkflow.run, id="greeting-workflow-1234", task_queue="my-task-queue"
)

To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.

Send a Query

Use WorkflowHandle.query to send a Query to a Workflow Execution:

supported_languages = await workflow_handle.query(
GreetingWorkflow.get_languages, GetLanguagesInput(supported_only=True)
)
  • Sending a Query doesn’t add events to a Workflow's Event History.

  • You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not safe and, therefore, not supported.

  • A Worker must be online and polling the Task Queue to process a Query.

Send a Signal

You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that haven’t closed.

Send a Signal from a Client

Use WorkflowHandle.signal to send a Signal:

await workflow_handle.signal(GreetingWorkflow.approve, ApproveInput(name="me"))
  • The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.

  • The WorkflowExecutionSignaled Event appears in the Workflow's Event History.

Send a Signal from a Workflow

A Workflow can send a Signal to another Workflow, known as an External Signal. You'll need a Workflow handle for the external Workflow. Use get_external_workflow_handle_for:

# ...
@workflow.defn
class WorkflowB:
@workflow.run
async def run(self) -> None:
handle = workflow.get_external_workflow_handle_for(WorkflowA.run, "workflow-a")
await handle.signal(WorkflowA.your_signal, "signal argument")

When an External Signal is sent:

Signal-With-Start

Signal-With-Start allows a Client to send a Signal to a Workflow Execution, starting the Execution if it is not already running. To use Signal-With-Start, call the start_workflow method and pass the start_signal argument with the name of your Signal:

from temporalio.client import Client
# ...
async def main():
client = await Client.connect("localhost:7233")
await client.start_workflow(
GreetingWorkflow.run,
id="your-signal-with-start-workflow",
task_queue="signal-tq",
start_signal="submit_greeting",
start_signal_args=["User Signal with Start"],
)

Send an Update

An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.

A client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions or perform an Update equivalent of Signal-With-Start.

  • WorkflowExecutionUpdateAccepted is added to the Event History when the Worker confirms that the Update passed validation.
  • WorkflowExecutionUpdateCompleted is added to the Event History when the Worker confirms that the Update has finished.

To send an Update to a Workflow Execution, you can:

  • Call execute_update and wait for the Update to complete. This code fetches an Update result:

    previous_language = await workflow_handle.execute_update(
    GreetingWorkflow.set_language, Language.Chinese
    )
  • Send start_update to receive an UpdateHandle as soon as the Update is accepted or rejected.

    • Use this UpdateHandle later to fetch your results.
    • async def Update handlers normally perform long-running asynchronous operations, such as executing an Activity.
    • start_update only waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete.

    For example:

    # Wait until the update is accepted
    update_handle = await workflow_handle.start_update(
    HelloWorldWorkflow.set_greeting,
    HelloWorldInput("World"),
    )
    # Wait until the update is completed
    update_result = await update_handle.result()

    For more details, see the "Async handlers" section.

To obtain an Update handle, you can:

  • Use start_update to start an Update and return the handle, as shown in the preceding example.
  • Use get_update_handle_for to fetch a handle for an in-progress Update using the Update ID.
SEND MESSAGES WITHOUT TYPE SAFETY

In real-world development, sometimes you may be unable to import Workflow Definition method signatures. When you don't have access to the Workflow Definition or it isn't written in Python, you can still use APIs that aren't type-safe, and dynamic method invocation. Pass method names instead of method objects to:

Use these non-type safe APIs:

Message handler patterns

This section covers common write operations, such as Signal and Update handlers. It doesn't apply to pure read operations, like Queries or Update Validators.

tip

For additional information, see Inject work into the main Workflow, Ensuring your messages are processed exactly once, and this sample demonstrating safe async message handling.

Use async handlers

Signal and Update handlers can be async def as well as def. Using async def allows you to use await with Activities, Child Workflows, asyncio.sleep Timers, workflow.wait_condition conditions, etc. This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await calls. It's essential to understand the things that could go wrong in order to use async def handlers safely. See Workflow message passing for guidance on safe usage of async Signal and Update handlers, the Safe message handlers sample, and the Controlling handler concurrency and Waiting for message handlers to finish sections below.

The following code executes an Activity that makes a network call to a remote service. It modifies the Update handler from earlier on this page, turning it into an async def:

@activity.defn
async def call_greeting_service(to_language: Language) -> Optional[str]:
await asyncio.sleep(0.2) # Pretend that we are calling a remote service.
greetings = {
Language.Arabic: "مرحبا بالعالم",
Language.Chinese: "你好,世界",
Language.English: "Hello, world",
Language.French: "Bonjour, monde",
Language.Hindi: "नमस्ते दुनिया",
Language.Spanish: "Hola mundo",
}
return greetings.get(to_language)


@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self.lock = asyncio.Lock()
...
...
@workflow.update
async def set_language(self, language: Language) -> Language:
if language not in self.greetings:
# 👉 Use a lock here to ensure that multiple calls to set_language are processed in order.
async with self.lock:
greeting = await workflow.execute_activity(
call_greeting_service,
language,
start_to_close_timeout=timedelta(seconds=10),
)
if greeting is None:
# 👉 An update validator cannot be async, so cannot be used to check that the remote
# call_greeting_service supports the requested language. Raising ApplicationError
# will fail the Update, but the WorkflowExecutionUpdateAccepted event will still be
# added to history.
raise ApplicationError(
f"Greeting service does not support {language.name}"
)
self.greetings[language] = greeting
previous_language, self.language = self.language, language
return previous_language

After updating the code to use an async def, your Update handler can schedule an Activity and await the result. Although an async def Signal handler can also execute an Activity, using an Update handler allows the Client to receive a result or error once the Activity completes. This lets your client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc.

Add wait conditions to block

Sometimes, async def Signal or Update handlers need to meet certain conditions before they should continue. You can use workflow.wait_condition to prevent the code from proceeding until a condition is true. You specify the condition by passing a function that returns True or False, and you can optionally set a timeout. This is an important feature that helps you control your handler logic.

Here are three important use cases for workflow.wait_condition:

  • Wait for a Signal or Update to arrive.
  • Wait in a handler until it's appropriate to continue.
  • Wait in the main Workflow until all active handlers have finished.

Wait for a Signal or Update to arrive

It's common to use workflow.condition to wait for a particular Signal or Update to be sent by a Client:

@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self.approved_for_release = False
self.approver_name: Optional[str] = None

@workflow.signal
def approve(self, input: ApproveInput) -> None:
self.approved_for_release = True
self.approver_name = input.name

@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self.approved_for_release)
...
return self.greetings[self.language]

Use wait conditions in handlers

It's common to use a Workflow wait condition to wait until a handler should start. You can also use wait conditions anywhere else in the handler to wait for a specific condition to become True. This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become True.

Consider a ready_for_update_to_execute method that runs before your Update handler executes. The workflow.wait_condition method waits until your condition is met:

@workflow.update
async def my_update(self, update_input: UpdateInput) -> str:
await workflow.wait_condition(
lambda: self.ready_for_update_to_execute(update_input)
)

You can also use wait conditions anywhere else in the handler to wait for a specific condition to become true. This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become true.

Ensure your handlers finish before the Workflow completes

Workflow wait conditions can ensure your handler completes before a Workflow finishes. When your Workflow uses async def Signal or Update handlers, your main Workflow method can return or continue-as-new while a handler is still waiting on an async task, such as an Activity result. The Workflow completing may interrupt the handler before it finishes crucial work and cause client errors when trying retrieve Update results. Use workflow.wait_condition and all_handlers_finished to address this problem and allow your Workflow to end smoothly:

@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> str:
...
await workflow.wait_condition(workflow.all_handlers_finished)
return "workflow-result"

By default, your Worker will log a warning when you allow a Workflow Execution to finish with unfinished handler executions. You can silence these warnings on a per-handler basis by passing the unfinished_policy argument to the @workflow.signal / workflow.update decorator:

@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)
async def my_update(self) -> None:
...

See Finishing handlers before the Workflow completes for more information.

Use @workflow.init to operate on Workflow input before any handler executes

Normally, your Workflow __init__ method won't have any parameters. However, if you use the @workflow.init decorator on your __init__ method, you can give it the same Workflow parameters as your @workflow.run method. The SDK will then ensure that your __init__ method receives the Workflow input arguments that the Client sent. (The Workflow input arguments are also passed to your @workflow.run method -- that always happens, whether or not you use the @workflow.init decorator.) This is useful if you have message handlers that need access to workflow input: see Initializing the Workflow first.

Here's an example. Notice that __init__ and get_greeting must have the same parameters, with the same type annotations:

@dataclass
class MyWorkflowInput:
name: str


@workflow.defn
class WorkflowRunSeesWorkflowInitWorkflow:
@workflow.init
def __init__(self, workflow_input: MyWorkflowInput) -> None:
self.name_with_title = f"Sir {workflow_input.name}"
self.title_has_been_checked = False

@workflow.run
async def get_greeting(self, workflow_input: MyWorkflowInput) -> str:
await workflow.wait_condition(lambda: self.title_has_been_checked)
return f"Hello, {self.name_with_title}"

@workflow.update
async def check_title_validity(self) -> bool:
# 👉 The handler is now guaranteed to see the workflow input
# after it has been processed by __init__.
is_valid = await workflow.execute_activity(
check_title_validity,
self.name_with_title,
schedule_to_close_timeout=timedelta(seconds=10),
)
self.title_has_been_checked = True
return is_valid

Use asyncio.Lock to prevent concurrent handler execution

Concurrent processes can interact in unpredictable ways. Incorrectly written concurrent message-passing code may not work correctly when multiple handler instances run simultaneously. Here's an example of a pathological case:

@workflow.defn
class MyWorkflow:

@workflow.signal
async def bad_async_handler(self):
data = await workflow.execute_activity(
fetch_data, start_to_close_timeout=timedelta(seconds=10)
)
self.x = data.x
# 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then
# there may be times when the Workflow has self.x from one Activity execution and self.y from another.
await asyncio.sleep(1) # or await anything else
self.y = data.y

Coordinating access using asyncio.Lock corrects this code. Locking makes sure that only one handler instance can execute a specific section of code at any given time:

@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
...
self.lock = asyncio.Lock()
...

@workflow.signal
async def safe_async_handler(self):
async with self.lock:
data = await workflow.execute_activity(
fetch_data, start_to_close_timeout=timedelta(seconds=10)
)
self.x = data.x
# ✅ OK: the scheduler may switch now to a different handler execution, or to the main workflow
# method, but no other execution of this handler can run until this execution finishes.
await asyncio.sleep(1) # or await anything else
self.y = data.y

Message handler troubleshooting

When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:

See Exceptions in message handlers for a non–Python-specific discussion of this topic.

Problems when sending a Signal

When using Signal, the only exceptions that will result from your requests during its execution are the RPCErrors described above.

For Queries and Updates, the Client waits for a response from the Worker, and therefore additional errors may occur during the handler Execution by the Worker.

Problems when sending an Update

When working with Updates, in addition to the RPCErrors described above, you may encounter these errors:

  • No Workflow Workers are polling the Task Queue: Your request will be retried by the SDK Client indefinitely. You can use asyncio.timeout to impose a timeout. This raises a temporalio.client.WorkflowUpdateRPCTimeoutOrCancelledError exception.

  • Update failed: You'll receive a temporalio.client.WorkflowUpdateFailedError exception. There are two ways this can happen:

    • The Update was rejected by an Update validator defined in the Workflow alongside the Update handler.

    • The Update failed after having been accepted.

    Update failures are like Workflow failures. Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler. These might include:

  • The handler caused the Workflow Task to fail: A Workflow Task Failure causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage:

    • If the request hasn't been accepted by the server, you receive a FAILED_PRECONDITION temporalio.service.RPCError exception.
    • If the request has been accepted, it is durable. Once the Workflow is healthy again after a code deploy, use an UpdateHandle to fetch the Update result.
  • The Workflow finished while the Update handler execution was in progress: You'll receive a temporalio.service.RPCError exception with a status attribute of RPCStatusCode NOT_FOUND. This happens if the Workflow finished while the Update handler execution was in progress, for example because

Problems when sending a Query

When working with Queries, in addition to the RPCErrors described above, you may encounter these errors:

  • There is no Workflow Worker polling the Task Queue: You'll receive a temporalio.service.RPCError exception on which the status attribute is RPCStatusCode FAILED_PRECONDITION.

  • Query failed: You'll receive a temporalio.client.WorkflowQueryFailedError exception if something goes wrong during a Query. Any exception in a Query handler will trigger this error. This differs from Signal and Update requests, where exceptions can lead to Workflow Task Failure instead.

  • The handler caused the Workflow Task to fail. This would happen, for example, if the Query handler blocks the thread for too long without yielding.

Dynamic components

A dynamic Workflow, Activity, Signal, Update, or Query is a kind of unnamed item. Normally, these items are registered by name with the Worker and invoked at runtime. When an unregistered or unrecognized Workflow, Activity, or message request arrives with a recognized method signature, the Worker can use a pre-registered dynamic stand-in.

For example, you might send a request to start a Workflow named "MyUnknownWorkflow". After receiving a Workflow Task, the Worker may find that there's no registered Workflow Definitions of that type. It then checks to see if there's a registered dynamic Workflow. If the dynamic Workflow signature matches the incoming Workflow signature, the Worker invokes that just as it would invoke a non-dynamic statically named version.

By registering dynamic versions of your Temporal components, the Worker can fall back to these alternate implementations for name mismatches.

caution

Use dynamic elements judiciously and as a fallback mechanism, not a primary design. They can introduce long-term maintainability and debugging issues. Reserve dynamic invocation use for cases where a name is not or can't be known at compile time.

Set a dynamic Signal, Query, or Update handler

A dynamic Signal, Query, or Update refers to a special stand-in handler. It's used when an unregistered handler request arrives.

Consider a Signal, where you might send something like workflow.signal(MyWorkflow.my_signal_method, my_arg). This is a type-safe compiler-checked approach that guarantees a method exists. There's also a non-type-safe string-based form: workflow.signal('some-name', my_arg). When sent to the server, the name is checked only after arriving at the Worker. This is where "dynamic handlers" come in.

After failing to find a handler with a matching name and type, the Worker checks for a registered dynamic stand-in handler. If found, the Worker uses that instead.

You must opt handlers into dynamic access. Add dynamic=True to the handler decorator (for example, @workflow.signal(dynamic=True)) to make a handler dynamic. The handler's signature must accept (self, name: str, args: Sequence[RawValue]). Use a payload_converter function to convert RawValue objects to your required type. For example:

from typing import Sequence

from temporalio.common import RawValue
...

@workflow.signal(dynamic=True)
async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
...

This sample creates a dynamic_signal Signal. When an unregistered or unrecognized Signal arrives with a matching signature, dynamic assignment uses this handler to manage the Signal. It is responsible for transforming the sequence contents into usable data in a form that the method's logic can process and act on.

Set a dynamic Workflow

A dynamic Workflow refers to a special stand-in Workflow Definition. It's used when an unknown Workflow Execution request arrives.

Consider the "MyUnknownWorkflow" example described earlier. The Worker may find there's no registered Workflow Definitions of that name or type. After failing to find a Workflow Definition with a matching type, the Worker looks for a dynamic stand-in. If found, it invokes that instead.

To participate, your Workflow must opt into dynamic access. Adding dynamic=True to the @workflow.defn decorator makes the Workflow Definition eligible to participate in dynamic invocation. You must register the Workflow with the Worker before it can be invoked.

The Workflow Definition's primary Workflow method must accept a single argument of type Sequence[temporalio.common.RawValue]. Use a payload_converter function to convert RawValue objects to your required type. For example:

# ...
@workflow.defn(dynamic=True)
class DynamicWorkflow:
@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
name = workflow.payload_converter().from_payload(args[0].payload, str)
return await workflow.execute_activity(
default_greeting,
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)

This Workflow converts the first Sequence element to a string, and uses that to execute an Activity.

Set a dynamic Activity

A dynamic Activity is a stand-in implementation. It's used when an Activity Task with an unknown Activity type is received by the Worker.

To participate, your Activity must opt into dynamic access. Adding dynamic=True to the @activity.defn decorator makes the Workflow Definition eligible to participate in dynamic invocation. You must register the Activity with the Worker before it can be invoked.

The Activity Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]. Use a payload_converter function to convert RawValue objects to your required types. For example:

# ...
@activity.defn(dynamic=True)
async def dynamic_greeting(args: Sequence[RawValue]) -> str:
arg1 = activity.payload_converter().from_payload(args[0].payload, YourDataClass)
return (
f"{arg1.greeting}, {arg1.name}!\nActivity Type: {activity.info().activity_type}"
)
# ...
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
"unregistered_activity",
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)

This example invokes an unregistered Activity by name. The Worker resolves it using the registered dynamic Activity instead. When possible, prefer to use compiler-checked type-safe arguments rather than Activity name strings.