| name | adding-orchestrator-workflow |
| description | Adds new workflow endpoints to the QuAIA orchestrator. Use when creating new FastAPI endpoints that coordinate agent tasks, handle webhooks, or expose new API functionality. Use when this capability is needed. |
| metadata | {"author":"partarstu"} |
Adding a New Orchestrator Workflow
This skill provides a comprehensive guide for adding new workflow endpoints to the QuAIA™ orchestrator. Workflows are FastAPI endpoints that trigger and coordinate agent tasks.
Overview
The orchestrator (orchestrator/main.py) exposes HTTP endpoints that:
- Receive external requests (webhooks, API calls)
- Route tasks to appropriate agents
- Coordinate multi-agent workflows
- Handle results and trigger follow-up actions
Workflow Architecture
A typical orchestrator workflow:
- Receives a request (POST/GET endpoint)
- Extracts relevant data from the request
- Sends task(s) to agent(s) using
_send_task_to_agent()
- Parses the agent response using helper functions
- Optionally triggers follow-up workflows
- Returns the result to the caller
Step-by-Step Instructions
Step 1: Define the Request Model (if needed)
If your endpoint accepts structured input, create a request model in common/models.py:
📄 Template: resources/models_template.py
Step 2: Define the Response Model (if needed)
If the workflow returns structured data beyond simple status messages, add a response model (also in the template above).
Step 3: Create the Endpoint Function
Add your endpoint in orchestrator/main.py:
📄 Template: resources/endpoint_template.py
Step 4: Helper Functions Reference
The orchestrator provides these helper functions for working with agent tasks:
Sending Tasks to Agents
completed_task = await _send_task_to_agent(
task_content: str,
task_description: str
) -> Task
completed_task = await _send_task_to_agent_with_message(
message: Message,
task_description: str
) -> Task
Parsing Agent Responses
_validate_task_status(task: Task, task_description: str)
artifacts = _get_artifacts_from_task(task: Task, task_description: str) -> list[Artifact]
text_parts = _get_text_content_from_artifacts(
artifacts: list[Artifact],
task_description: str,
any_content_expected: bool = True
) -> list[str]
result = _get_model_from_artifacts(
artifacts: list[Artifact],
task_description: str,
model_type: type[T]
) -> T | AgentExecutionError | None
files = _get_file_contents_from_artifacts(artifacts: list[Artifact]) -> list[FileWithBytes]
Error Handling
_handle_exception(error_message: str, status_code: int = 500)
Step 5: Multi-Agent Workflows
For workflows that involve multiple agents in sequence:
📄 Example: examples/multi_agent_workflow.py
Step 6: Parallel Agent Execution
For workflows that can process items in parallel:
📄 Example: examples/parallel_execution.py
Step 7: Using Execution Lock (Optional)
For workflows that should not run concurrently (e.g., test execution):
@orchestrator_app.post("/exclusive-workflow")
async def exclusive_workflow(request: Request, api_key: str = Depends(_validate_api_key)):
"""Workflow that requires exclusive access."""
async with execution_lock:
return {"message": "Exclusive workflow completed"}
Step 8: Add Webhook URL Configuration (Optional)
If the endpoint will be called via webhooks, add the URL to config.py:
<WORKFLOW_NAME>_WEBHOOK_URL = f"{ORCHESTRATOR_URL}/<endpoint-path>"
Step 9: Update README Documentation
Add documentation for the new endpoint in README.md under "Invoking Orchestrator Workflows":
### <Workflow Name>
Description of what this workflow does.
* **Endpoint:** `POST /<endpoint-path>`
Example payload:
```json
{
"field_name": "value"
}
Response:
{
"message": "Workflow completed successfully",
"result": { ... }
}
### Step 10: Create Unit Tests
Create test cases in `tests/orchestrator/test_endpoints.py` or a new file:
📄 **Example:** [examples/test_endpoint_example.py](examples/test_endpoint_example.py)
## Complete Workflow Example
For a full example including models and endpoint:
📄 **Example:** [examples/complete_workflow.py](examples/complete_workflow.py)
## Verification Checklist
After adding the workflow, verify:
- [ ] Request model (if any) added to `common/models.py`
- [ ] Response model (if any) added to `common/models.py`
- [ ] Endpoint function follows the standard pattern
- [ ] Proper error handling with `_handle_exception()`
- [ ] API key validation via `Depends(_validate_api_key)`
- [ ] Logging at key points (start, completion, errors)
- [ ] Unit tests cover success and failure cases
- [ ] Documentation updated in README.md
- [ ] Tests pass: `pytest tests/orchestrator/ -v`
- [ ] Endpoint accessible: `curl -X POST http://localhost:8000/<endpoint-path> -d '...'`
---
> Converted and distributed by [TomeVault](https://tomevault.io/claim/partarstu) — claim your Tome and manage your conversions.
<!-- tomevault:4.0:skill_md:2026-04-11 -->