| name | copilot-sdk-python |
| description | Guide for building Python applications using the GitHub Copilot SDK. Use when integrating GitHub Copilot into Python apps: creating CopilotClient sessions, defining custom tools, handling streaming responses, managing session lifecycle, Pydantic parameter validation, BYOK (Bring Your Own Key) provider configuration, or any GitHub Copilot SDK API pattern. Requires Python 3.9+, asyncio, and GitHub Copilot CLI in PATH. |
Core Principles
- The SDK is in technical preview and may have breaking changes
- Requires Python 3.9 or later
- Requires GitHub Copilot CLI installed and in PATH
- Uses async/await patterns throughout (asyncio)
- Supports both async context managers and manual lifecycle management
- Type hints provided for better IDE support
Installation
Always install via pip:
pip install github-copilot-sdk
poetry add github-copilot-sdk
uv add github-copilot-sdk
Client Initialization
Basic Client Setup
from copilot import CopilotClient, PermissionHandler
import asyncio
async def main():
async with CopilotClient() as client:
pass
asyncio.run(main())
Client Configuration Options
When creating a CopilotClient, use a dict with these keys:
cli_path - Path to CLI executable (default: "copilot" from PATH or COPILOT_CLI_PATH env var)
cli_url - URL of existing CLI server (e.g., "localhost:8080"). When provided, client won't spawn a process
port - Server port (default: 0 for random)
use_stdio - Use stdio transport instead of TCP (default: True)
log_level - Log level (default: "info")
auto_start - Auto-start server (default: True)
auto_restart - Auto-restart on crash (default: True)
cwd - Working directory for the CLI process (default: os.getcwd())
env - Environment variables for the CLI process (dict)
Manual Server Control
For explicit control:
from copilot import CopilotClient
import asyncio
async def main():
client = CopilotClient({"auto_start": False})
await client.start()
await client.stop()
asyncio.run(main())
Use force_stop() when stop() takes too long.
Session Management
Creating Sessions
Use a dict for SessionConfig:
session = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"model": "gpt-5",
"streaming": True,
"tools": [...],
"system_message": { ... },
"available_tools": ["tool1", "tool2"],
"excluded_tools": ["tool3"],
"provider": { ... }
})
Session Config Options
session_id - Custom session ID (str)
model - Model name ("gpt-5", "claude-sonnet-4.5", etc.)
tools - Custom tools exposed to the CLI (list[Tool])
system_message - System message customization (dict)
available_tools - Allowlist of tool names (list[str])
excluded_tools - Blocklist of tool names (list[str])
provider - Custom API provider configuration (BYOK) (ProviderConfig)
streaming - Enable streaming response chunks (bool)
mcp_servers - MCP server configurations (list)
custom_agents - Custom agent configurations (list)
config_dir - Config directory override (str)
skill_directories - Skill directories (list[str])
disabled_skills - Disabled skills (list[str])
on_permission_request - Permission request handler (callable)
Resuming Sessions
session = await client.resume_session("session-id", {
"on_permission_request": PermissionHandler.approve_all,
"tools": [my_new_tool]
})
Session Operations
session.session_id - Get session identifier (str)
await session.send({"prompt": "...", "attachments": [...]}) - Send message, returns str (message ID)
await session.send_and_wait({"prompt": "..."}, timeout=60.0) - Send and wait for idle, returns SessionEvent | None
await session.abort() - Abort current processing
await session.get_messages() - Get all events/messages, returns list[SessionEvent]
await session.destroy() - Clean up session
Event Handling
Event Subscription Pattern
ALWAYS use asyncio events or futures for waiting on session events:
import asyncio
done = asyncio.Event()
def handler(event):
if event.type == "assistant.message":
print(event.data.content)
elif event.type == "session.idle":
done.set()
session.on(handler)
await session.send({"prompt": "..."})
await done.wait()
Unsubscribing from Events
The on() method returns a function that unsubscribes:
unsubscribe = session.on(lambda event: print(event.type))
unsubscribe()
Event Types
Use attribute access for event type checking:
def handler(event):
if event.type == "user.message":
pass
elif event.type == "assistant.message":
print(event.data.content)
elif event.type == "tool.executionStart":
pass
elif event.type == "tool.executionComplete":
pass
elif event.type == "session.start":
pass
elif event.type == "session.idle":
pass
elif event.type == "session.error":
print(f"Error: {event.data.message}")
session.on(handler)
Streaming Responses
Enabling Streaming
Set streaming: True in SessionConfig:
session = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"model": "gpt-5",
"streaming": True
})
Handling Streaming Events
Handle both delta events (incremental) and final events:
import asyncio
done = asyncio.Event()
def handler(event):
if event.type == "assistant.message.delta":
print(event.data.delta_content, end="", flush=True)
elif event.type == "assistant.reasoning.delta":
print(event.data.delta_content, end="", flush=True)
elif event.type == "assistant.message":
print("\n--- Final ---")
print(event.data.content)
elif event.type == "assistant.reasoning":
print("--- Reasoning ---")
print(event.data.content)
elif event.type == "session.idle":
done.set()
session.on(handler)
await session.send({"prompt": "Tell me a story"})
await done.wait()
Note: Final events (assistant.message, assistant.reasoning) are ALWAYS sent regardless of streaming setting.
Custom Tools
Defining Tools with define_tool
Use define_tool for tool definitions:
from copilot import define_tool
async def fetch_issue(issue_id: str):
return {"id": issue_id, "status": "open"}
session = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"model": "gpt-5",
"tools": [
define_tool(
name="lookup_issue",
description="Fetch issue details from tracker",
parameters={
"type": "object",
"properties": {
"id": {"type": "string", "description": "Issue ID"}
},
"required": ["id"]
},
handler=lambda args, inv: fetch_issue(args["id"])
)
]
})
Using Pydantic for Parameters
The SDK works well with Pydantic models:
from pydantic import BaseModel, Field
class WeatherArgs(BaseModel):
location: str = Field(description="City name")
units: str = Field(default="fahrenheit", description="Temperature units")
async def get_weather(args: WeatherArgs, inv):
return {"temperature": 72, "units": args.units}
session = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"tools": [
define_tool(
name="get_weather",
description="Get weather for a location",
parameters=WeatherArgs.model_json_schema(),
handler=lambda args, inv: get_weather(WeatherArgs(**args), inv)
)
]
})
Tool Return Types
- Return any JSON-serializable value (automatically wrapped)
- Or return a ToolResult dict for full control:
{
"text_result_for_llm": str,
"result_type": "success" | "failure",
"error": str,
"tool_telemetry": dict
}
Tool Handler Signature
Tool handlers receive two arguments:
args (dict) - The tool arguments passed by the LLM
invocation (ToolInvocation) - Metadata about the invocation
invocation.session_id - Session ID
invocation.tool_call_id - Tool call ID
invocation.tool_name - Tool name
invocation.arguments - Same as args parameter
Tool Execution Flow
When Copilot invokes a tool, the client automatically:
- Runs your handler function
- Serializes the return value
- Responds to the CLI
System Message Customization
Append Mode (Default - Preserves Guardrails)
session = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"model": "gpt-5",
"system_message": {
"mode": "append",
"content": """
<workflow_rules>
- Always check for security vulnerabilities
- Suggest performance improvements when applicable
</workflow_rules>
"""
}
})
Replace Mode (Full Control - Removes Guardrails)
session = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"model": "gpt-5",
"system_message": {
"mode": "replace",
"content": "You are a helpful assistant."
}
})
File Attachments
Attach files to messages:
await session.send({
"prompt": "Analyze this file",
"attachments": [
{
"type": "file",
"path": "/path/to/file.py",
"display_name": "My File"
}
]
})
Message Delivery Modes
Use the mode key in message options:
"enqueue" - Queue message for processing
"immediate" - Process message immediately
await session.send({
"prompt": "...",
"mode": "enqueue"
})
Multiple Sessions
Sessions are independent and can run concurrently:
session1 = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"model": "gpt-5",
})
session2 = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"model": "claude-sonnet-4.5",
})
await asyncio.gather(
session1.send({"prompt": "Hello from session 1"}),
session2.send({"prompt": "Hello from session 2"})
)
Bring Your Own Key (BYOK)
Use custom API providers via provider:
session = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"provider": {
"type": "openai",
"base_url": "https://api.openai.com/v1",
"api_key": "your-api-key"
}
})
Session Lifecycle Management
Listing Sessions
sessions = await client.list_sessions()
for metadata in sessions:
print(f"{metadata.session_id}: {metadata.summary}")
Deleting Sessions
await client.delete_session(session_id)
Getting Last Session ID
last_id = await client.get_last_session_id()
if last_id:
session = await client.resume_session(last_id, on_permission_request=PermissionHandler.approve_all)
Checking Connection State
state = client.get_state()
Error Handling
Standard Exception Handling
try:
session = await client.create_session(on_permission_request=PermissionHandler.approve_all)
await session.send({"prompt": "Hello"})
except Exception as e:
print(f"Error: {e}")
Session Error Events
Monitor session.error event type for runtime errors:
def handler(event):
if event.type == "session.error":
print(f"Session Error: {event.data.message}")
session.on(handler)
Connectivity Testing
Use ping to verify server connectivity:
response = await client.ping("health check")
print(f"Server responded at {response['timestamp']}")
Resource Cleanup
Automatic Cleanup with Context Managers
ALWAYS use async context managers for automatic cleanup:
async with CopilotClient() as client:
async with await client.create_session(on_permission_request=PermissionHandler.approve_all) as session:
await session.send({"prompt": "Hello"})
Manual Cleanup with Try-Finally
client = CopilotClient()
try:
await client.start()
session = await client.create_session(on_permission_request=PermissionHandler.approve_all)
try:
pass
finally:
await session.destroy()
finally:
await client.stop()
Best Practices
- Always use async context managers (
async with) for automatic cleanup
- Use asyncio.Event or asyncio.Future to wait for session.idle event
- Handle session.error events for robust error handling
- Use if/elif chains for event type checking
- Enable streaming for better UX in interactive scenarios
- Use define_tool for tool definitions
- Use Pydantic models for type-safe parameter validation
- Dispose event subscriptions when no longer needed
- Use system_message with mode: "append" to preserve safety guardrails
- Handle both delta and final events when streaming is enabled
- Use type hints for better IDE support and code clarity
Common Patterns
Simple Query-Response
from copilot import CopilotClient, PermissionHandler
import asyncio
async def main():
async with CopilotClient() as client:
async with await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"model": "gpt-5",
}) as session:
done = asyncio.Event()
def handler(event):
if event.type == "assistant.message":
print(event.data.content)
elif event.type == "session.idle":
done.set()
session.on(handler)
await session.send({"prompt": "What is 2+2?"})
await done.wait()
asyncio.run(main())
Multi-Turn Conversation
async def send_and_wait(session, prompt: str):
done = asyncio.Event()
result = []
def handler(event):
if event.type == "assistant.message":
result.append(event.data.content)
print(event.data.content)
elif event.type == "session.idle":
done.set()
elif event.type == "session.error":
result.append(None)
done.set()
unsubscribe = session.on(handler)
await session.send({"prompt": prompt})
await done.wait()
unsubscribe()
return result[0] if result else None
async with await client.create_session(on_permission_request=PermissionHandler.approve_all) as session:
await send_and_wait(session, "What is the capital of France?")
await send_and_wait(session, "What is its population?")
SendAndWait Helper
async with await client.create_session(on_permission_request=PermissionHandler.approve_all) as session:
response = await session.send_and_wait(
{"prompt": "What is 2+2?"},
timeout=60.0
)
if response and response.type == "assistant.message":
print(response.data.content)
Tool with Dataclass Return Type
from dataclasses import dataclass, asdict
from copilot import define_tool
@dataclass
class UserInfo:
id: str
name: str
email: str
role: str
async def get_user(args, inv) -> dict:
user = UserInfo(
id=args["user_id"],
name="John Doe",
email="john@example.com",
role="Developer"
)
return asdict(user)
session = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"tools": [
define_tool(
name="get_user",
description="Retrieve user information",
parameters={
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "User ID"}
},
"required": ["user_id"]
},
handler=get_user
)
]
})
Streaming with Progress
import asyncio
current_message = []
done = asyncio.Event()
def handler(event):
if event.type == "assistant.message.delta":
current_message.append(event.data.delta_content)
print(event.data.delta_content, end="", flush=True)
elif event.type == "assistant.message":
print(f"\n\n=== Complete ===")
print(f"Total length: {len(event.data.content)} chars")
elif event.type == "session.idle":
done.set()
unsubscribe = session.on(handler)
await session.send({"prompt": "Write a long story"})
await done.wait()
unsubscribe()
Error Recovery
def handler(event):
if event.type == "session.error":
print(f"Session error: {event.data.message}")
session.on(handler)
try:
await session.send({"prompt": "risky operation"})
except Exception as e:
print(f"Failed to send: {e}")
Using TypedDict for Type Safety
from typing import TypedDict, List
class MessageOptions(TypedDict, total=False):
prompt: str
attachments: List[dict]
mode: str
class SessionConfig(TypedDict, total=False):
model: str
streaming: bool
tools: List
options: MessageOptions = {
"prompt": "Hello",
"mode": "enqueue"
}
await session.send(options)
config: SessionConfig = {
"on_permission_request": PermissionHandler.approve_all,
"model": "gpt-5",
"streaming": True
}
session = await client.create_session(config)
Async Generator for Streaming
from typing import AsyncGenerator
async def stream_response(session, prompt: str) -> AsyncGenerator[str, None]:
"""Stream response chunks as an async generator."""
queue = asyncio.Queue()
done = asyncio.Event()
def handler(event):
if event.type == "assistant.message.delta":
queue.put_nowait(event.data.delta_content)
elif event.type == "session.idle":
done.set()
unsubscribe = session.on(handler)
await session.send({"prompt": prompt})
while not done.is_set():
try:
chunk = await asyncio.wait_for(queue.get(), timeout=0.1)
yield chunk
except asyncio.TimeoutError:
continue
while not queue.empty():
yield queue.get_nowait()
unsubscribe()
async for chunk in stream_response(session, "Tell me a story"):
print(chunk, end="", flush=True)
Decorator Pattern for Tools
from typing import Callable, Any
from copilot import define_tool
def copilot_tool(
name: str,
description: str,
parameters: dict
) -> Callable:
"""Decorator to convert a function into a Copilot tool."""
def decorator(func: Callable) -> Any:
return define_tool(
name=name,
description=description,
parameters=parameters,
handler=lambda args, inv: func(**args)
)
return decorator
@copilot_tool(
name="calculate",
description="Perform a calculation",
parameters={
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Math expression"}
},
"required": ["expression"]
}
)
def calculate(expression: str) -> float:
return eval(expression)
session = await client.create_session({
"on_permission_request": PermissionHandler.approve_all,
"tools": [calculate]})
Python-Specific Features
Async Context Manager Protocol
The SDK implements __aenter__ and __aexit__:
class CopilotClient:
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
return False
class CopilotSession:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.destroy()
return False
Dataclass Support
Event data is available as attributes:
def handler(event):
print(event.type)
print(event.data.content)
print(event.data.delta_content)