| name | durable-task-python |
| description | Build durable, fault-tolerant workflows in Python using the Durable Task SDK with Azure Durable Task Scheduler. Use when creating orchestrations, activities, entities, or implementing patterns like function chaining, fan-out/fan-in, human interaction, or stateful agents. Applies to any Python application requiring durable execution, state persistence, or distributed transactions without Azure Functions dependency. |
Durable Task Python SDK with Durable Task Scheduler
Build fault-tolerant, stateful workflows in Python applications using the Durable Task SDK connected to Azure Durable Task Scheduler.
Quick Start
Required Packages
pip install durabletask durabletask-azuremanaged azure-identity
Or add to requirements.txt:
durabletask
durabletask-azuremanaged
azure-identity
Minimal Worker + Client Setup
import os
from azure.identity import DefaultAzureCredential
from durabletask import task
from durabletask.client import OrchestrationStatus
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
def hello(ctx: task.ActivityContext, name: str) -> str:
return f"Hello {name}!"
def my_orchestration(ctx: task.OrchestrationContext, input: str):
result = yield ctx.call_activity(hello, input=input)
return result
taskhub = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
secure_channel = endpoint != "http://localhost:8080"
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()
with DurableTaskSchedulerWorker(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub,
token_credential=credential
) as worker:
worker.add_orchestrator(my_orchestration)
worker.add_activity(hello)
worker.start()
dts_client = DurableTaskSchedulerClient(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub,
token_credential=credential
)
instance_id = dts_client.schedule_new_orchestration(my_orchestration, input="World")
state = dts_client.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == OrchestrationStatus.COMPLETED:
print(f"Result: {state.serialized_output}")
Pattern Selection Guide
| Pattern | Use When |
|---|
| Function Chaining | Sequential steps where each depends on the previous |
| Fan-Out/Fan-In | Parallel processing with aggregated results |
| Human Interaction | Workflow pauses for external input/approval |
| Durable Entities | Stateful objects with operations (counters, accounts) |
| Sub-Orchestrations | Reusable workflow components or version isolation |
| Eternal Orchestrations | Long-running background processes with continue_as_new |
| Monitoring | Periodic polling with configurable timeouts |
See references/patterns.md for detailed implementations.
Orchestration Structure
Basic Orchestrator
def my_orchestration(ctx: task.OrchestrationContext, input: str):
"""Orchestrator function - MUST be deterministic"""
step1 = yield ctx.call_activity(step1_activity, input=input)
step2 = yield ctx.call_activity(step2_activity, input=step1)
return step2
Basic Activity
def my_activity(ctx: task.ActivityContext, input: str) -> str:
"""Activity function - can have side effects, I/O, non-determinism"""
print(f"Processing: {input}")
return f"Processed: {input}"
Registering with Worker
with DurableTaskSchedulerWorker(...) as worker:
worker.add_orchestrator(my_orchestration)
worker.add_activity(step1_activity)
worker.add_activity(step2_activity)
worker.start()
Critical Rules
Orchestration Determinism
Orchestrations replay from history - all code MUST be deterministic. When an orchestration resumes, it replays all previous code to rebuild state. Non-deterministic code produces different results on replay, causing failures.
NEVER do inside orchestrations:
datetime.now(), datetime.utcnow() → Use ctx.current_utc_datetime
uuid.uuid4() → Use ctx.new_uuid()
random.random() → Pass random values from activities
- Direct I/O, HTTP calls, database access → Move to activities
time.sleep(), asyncio.sleep() → Use ctx.create_timer()
- Environment variables that may change → Pass as input or use activities
- Global mutable state → Pass state through activity results
ALWAYS use:
yield ctx.call_activity() - Call activities
yield ctx.call_sub_orchestrator() - Call sub-orchestrations
yield ctx.create_timer() - Durable delays
yield ctx.wait_for_external_event() - Wait for events
ctx.current_utc_datetime - Current time
ctx.new_uuid() - Generate GUIDs
ctx.set_custom_status() - Set status
Non-Determinism Patterns (WRONG vs CORRECT)
Getting Current Time
def bad_orchestration(ctx: task.OrchestrationContext, _):
current_time = datetime.now()
if current_time.hour < 12:
yield ctx.call_activity(morning_activity)
def good_orchestration(ctx: task.OrchestrationContext, _):
current_time = ctx.current_utc_datetime
if current_time.hour < 12:
yield ctx.call_activity(morning_activity)
Generating UUIDs/Random Values
def bad_orchestration(ctx: task.OrchestrationContext, _):
order_id = str(uuid.uuid4())
yield ctx.call_activity(create_order, input=order_id)
def good_orchestration(ctx: task.OrchestrationContext, _):
order_id = str(ctx.new_uuid())
yield ctx.call_activity(create_order, input=order_id)
Random Numbers
def bad_orchestration(ctx: task.OrchestrationContext, _):
delay = random.randint(1, 10)
yield ctx.create_timer(timedelta(seconds=delay))
def get_random_delay(ctx: task.ActivityContext, _) -> int:
return random.randint(1, 10)
def good_orchestration(ctx: task.OrchestrationContext, _):
delay = yield ctx.call_activity(get_random_delay)
yield ctx.create_timer(timedelta(seconds=delay))
Sleeping/Delays
def bad_orchestration(ctx: task.OrchestrationContext, _):
yield ctx.call_activity(step1)
time.sleep(60)
yield ctx.call_activity(step2)
def good_orchestration(ctx: task.OrchestrationContext, _):
yield ctx.call_activity(step1)
yield ctx.create_timer(timedelta(seconds=60))
yield ctx.call_activity(step2)
HTTP Calls and I/O
def bad_orchestration(ctx: task.OrchestrationContext, url: str):
import requests
response = requests.get(url)
return response.json()
def fetch_data(ctx: task.ActivityContext, url: str) -> dict:
import requests
response = requests.get(url)
return response.json()
def good_orchestration(ctx: task.OrchestrationContext, url: str):
data = yield ctx.call_activity(fetch_data, input=url)
return data
Database Access
def bad_orchestration(ctx: task.OrchestrationContext, user_id: str):
import sqlite3
conn = sqlite3.connect('db.sqlite')
cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,))
user = cursor.fetchone()
def get_user(ctx: task.ActivityContext, user_id: str) -> dict:
import sqlite3
conn = sqlite3.connect('db.sqlite')
cursor = conn.execute("SELECT * FROM users WHERE id=?", (user_id,))
return dict(cursor.fetchone())
def good_orchestration(ctx: task.OrchestrationContext, user_id: str):
user = yield ctx.call_activity(get_user, input=user_id)
Environment Variables
def bad_orchestration(ctx: task.OrchestrationContext, _):
api_endpoint = os.getenv("API_ENDPOINT")
yield ctx.call_activity(call_api, input=api_endpoint)
def good_orchestration(ctx: task.OrchestrationContext, config: dict):
api_endpoint = config["api_endpoint"]
yield ctx.call_activity(call_api, input=api_endpoint)
def call_api(ctx: task.ActivityContext, _) -> str:
api_endpoint = os.getenv("API_ENDPOINT")
Conditional Logic Based on External State
def bad_orchestration(ctx: task.OrchestrationContext, path: str):
if os.path.exists(path):
yield ctx.call_activity(process_file, input=path)
def check_file_exists(ctx: task.ActivityContext, path: str) -> bool:
return os.path.exists(path)
def good_orchestration(ctx: task.OrchestrationContext, path: str):
exists = yield ctx.call_activity(check_file_exists, input=path)
if exists:
yield ctx.call_activity(process_file, input=path)
Dictionary/Set Iteration Order
def risky_orchestration(ctx: task.OrchestrationContext, items: dict):
for key in items:
yield ctx.call_activity(process, input=key)
def good_orchestration(ctx: task.OrchestrationContext, items: dict):
for key in sorted(items.keys()):
yield ctx.call_activity(process, input=key)
Thread-Local or Global State
counter = 0
def bad_orchestration(ctx: task.OrchestrationContext, _):
global counter
counter += 1
yield ctx.call_activity(process, input=counter)
def good_orchestration(ctx: task.OrchestrationContext, counter: int):
counter += 1
yield ctx.call_activity(process, input=counter)
ctx.continue_as_new(counter)
Using yield
In Python, orchestrator functions use yield to await durable operations:
result = yield ctx.call_activity(my_activity, input="data")
result = ctx.call_activity(my_activity, input="data")
Error Handling
def orchestrator_with_error_handling(ctx: task.OrchestrationContext, input: str):
try:
result = yield ctx.call_activity(risky_activity, input=input)
return result
except task.TaskFailedError as e:
ctx.set_custom_status({"error": str(e)})
yield ctx.call_activity(compensation_activity, input=input)
return "Compensated"
Retry Policies
from durabletask.task import RetryPolicy
retry_policy = RetryPolicy(
first_retry_interval=5,
max_number_of_attempts=3,
backoff_coefficient=2.0,
max_retry_interval=60,
retry_timeout=300
)
def orchestrator(ctx: task.OrchestrationContext, _):
result = yield ctx.call_activity(
unreliable_activity,
input="data",
retry_policy=retry_policy
)
return result
Working with Custom Types
The SDK supports dataclasses, namedtuples, and custom classes:
from dataclasses import dataclass
@dataclass
class Order:
product: str
quantity: int
cost: float
def process_order(ctx: task.ActivityContext, order: Order) -> str:
return f"Processed {order.quantity}x {order.product}"
def order_workflow(ctx: task.OrchestrationContext, order: Order):
result = yield ctx.call_activity(process_order, input=order)
return result
Connection & Authentication
Local Emulator (Default)
taskhub = "default"
endpoint = "http://localhost:8080"
credential = None
secure_channel = False
Azure with DefaultAzureCredential
from azure.identity import DefaultAzureCredential
taskhub = "my-taskhub"
endpoint = "https://my-scheduler.region.durabletask.io"
credential = DefaultAzureCredential()
secure_channel = True
Authentication Helper
def get_connection_config():
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
taskhub = os.getenv("TASKHUB", "default")
is_local = endpoint == "http://localhost:8080"
return {
"host_address": endpoint,
"taskhub": taskhub,
"secure_channel": not is_local,
"token_credential": None if is_local else DefaultAzureCredential()
}
config = get_connection_config()
worker = DurableTaskSchedulerWorker(**config)
client = DurableTaskSchedulerClient(**config)
Local Development with Emulator
docker pull mcr.microsoft.com/dts/dts-emulator:latest
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
Client Operations
instance_id = client.schedule_new_orchestration(my_orchestration, input="data")
instance_id = client.schedule_new_orchestration(
my_orchestration,
input="data",
instance_id="my-custom-id"
)
state = client.wait_for_orchestration_completion(instance_id, timeout=60)
state = client.get_orchestration_state(instance_id)
client.raise_orchestration_event(instance_id, "approval_received", data=approval_data)
client.terminate_orchestration(instance_id, output="User cancelled")
client.suspend_orchestration(instance_id)
client.resume_orchestration(instance_id)
References
- patterns.md - Detailed pattern implementations (Fan-Out/Fan-In, Human Interaction, Entities, Sub-Orchestrations)
- setup.md - Azure Durable Task Scheduler provisioning and deployment