| name | aiohttp |
| description | Async HTTP server and client for Python with WebSocket support, middleware, streaming, and server-sent events |
| metadata | {"author":"OSS AI Skills","version":"1.0.0","tags":["python","http","async","server","websocket","sse"]} |
aiohttp
Asynchronous HTTP client/server framework for Python.
Overview
aiohttp is a powerful asynchronous HTTP client and server framework built on top of asyncio. It provides both a web server for building web applications and a client for making HTTP requests.
Key Features:
- Async web server and client
- WebSocket support (client and server)
- Server-Sent Events (SSE)
- Middleware system
- Request/response streaming
- Cookie handling
- File uploads
- Web server routing
- Connection keepalive
- Support for HTTP/1.1 and HTTP/2
Installation
pip install aiohttp
pip install aiohttp[dev]
pip install aiohttp[speedups]
pip install aiohttp[cryptography, speedups]
Web Server
Basic Server
from aiohttp import web
async def handle_request(request):
"""Simple request handler."""
return web.Response(text="Hello, World!")
app = web.Application()
app.router.add_get('/', handle_request)
if __name__ == '__main__':
web.run_app(app, host='127.0.0.1', port=8080)
Running Server
from aiohttp import web
app = web.Application()
web.run_app(app)
web.run_app(
app,
host='0.0.0.0',
port=8080,
access_log=logger,
shutdown_timeout=60,
ssl_context=ssl_context,
print=lambda x: print(x.strip())
)
async def init_app():
app = web.Application()
app.router.add_get('/', lambda request: web.Response(text="OK"))
return app
web.run_app(init_app())
Application Factory
from aiohttp import web
def create_app():
"""Application factory pattern."""
app = web.Application()
app.middlewares.append(security_middleware)
app.router.add_get('/api', api_handler)
app['db'] = create_database_pool()
return app
app = create_app()
web.run_app(app)
Routing
Basic Routes
from aiohttp import web
app = web.Application()
async def get_handler(request):
return web.Response(text="GET request")
async def post_handler(request):
data = await request.post()
return web.json_response({"received": dict(data)})
async def put_handler(request):
data = await request.json()
return web.json_response({"updated": data})
async def delete_handler(request):
return web.Response(text="Deleted")
app.router.add_get('/resource', get_handler)
app.router.add_post('/resource', post_handler)
app.router.add_put('/resource', put_handler)
app.router.add_delete('/resource', delete_handler)
@web.view('/items')
class ItemView(web.View):
async def get(self):
return web.json_response({"items": []})
async def post(self):
data = await self.request.json()
return web.json_response({"created": data}, status=201)
Variable Routes
from aiohttp import web
app = web.Application()
app.router.add_get('/users/{user_id}', get_user)
app.router.add_post('/users/{user_id}/posts', create_post)
async def get_user(request):
user_id = request.match_info['user_id']
return web.json_response({"id": user_id, "name": "John"})
async def create_post(request):
user_id = request.match_info['user_id']
data = await request.json()
return web.json_response({
"user_id": user_id,
"post": data
}, status=201)
app.router.add_get('/users/{user_id:int}', get_user_by_id)
app.router.add_get('/files/{filename:[a-zA-Z0-9_\\.]+}', get_file)
async def get_user_by_id(request):
user_id = request.match_info['user_id']
return web.json_response({"id": user_id})
app.router.add_get('/users/{user_id:int=1}', get_default_user)
Resource Routes
from aiohttp import web
app = web.Application()
resource = app.router.add_resource('/api', name='api')
resource.add_get(get_handler)
resource.add_post(post_handler)
url = app.router['api'].url_for()
print(str(url))
resource = app.router.add_resource('/users/{user_id}', name='user_detail')
url = app.router['user_detail'].url_for(user_id=42)
print(str(url))
Route Lifecycle
from aiohttp import web
async def on_request_start(request):
"""Called when request starts."""
print(f"Request started: {request.method} {request.path}")
async def on_request_match(request, mapping):
"""Called when route is matched."""
print(f"Matched route: {mapping}")
app = web.Application()
app.on_request_start.append(on_request_start)
app.on_request_match.append(on_request_match)
Request Handling
Reading Request Data
from aiohttp import web
async def handle_request(request):
query = request.query
page = request.query.get('page', '1')
page = int(page)
tags = request.query.getall('tag')
data = await request.post()
username = data.get('username')
password = data.get('password')
json_data = await request.json()
body = await request.read()
auth_header = request.headers.get('Authorization')
content_type = request.content_type
remote = request.remote
host = request.host
user_id = request.match_info.get('user_id')
return web.json_response({
"query": dict(query),
"data": json_data
})
Request Properties
from aiohttp import web
async def handler(request):
print(request.method)
print(request.url)
print(request.url.path)
print(request.url.query)
print(request.version)
print(request.cookies)
print(request.content_type)
print(request.can_read_body)
print(request.payload)
return web.Response(text="OK")
Response
Basic Responses
from aiohttp import web
async def handlers(request):
return web.Response(text="Hello")
return web.Response(text="Created", status=201)
return web.json_response({"key": "value"})
return web.json_response(
{"data": "test"},
headers={"X-Custom": "value"}
)
return web.HTTPFound('/new-location')
return web.HTTPUnauthorized(
headers={'WWW-Authenticate': 'Basic realm="Login"'}
)
Response Types
from aiohttp import web
async def text_response(request):
return web.Response(
text="Plain text",
content_type="text/plain"
)
async def json_response(request):
return web.json_response(
{"message": "JSON data"},
dumps=lambda x: json.dumps(x, indent=2)
)
async def bytes_response(request):
return web.Response(
body=b"Binary data",
content_type="application/octet-stream"
)
async def stream_response(request):
"""Streaming response for large files."""
response = web.StreamResponse()
response.headers['Content-Type'] = 'text/plain'
await response.prepare(request)
for i in range(10):
await response.write(f"Line {i}\n".encode())
await asyncio.sleep(0.1)
await response.write_eof()
return response
async def file_response(request):
"""Serve a file."""
response = web.FileResponse('path/to/file.txt')
response.headers['Content-Disposition'] = 'attachment; filename="file.txt"'
return response
WebSocket Response
from aiohttp import web, WSMsgType
async def websocket_handler(request):
"""WebSocket handler."""
ws = web.WebSocketResponse()
await ws.prepare(request)
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
text = msg.data
await ws.send_str(f"Echo: {text}")
elif msg.type == WSMsgType.BINARY:
await ws.send_bytes(msg.data)
elif msg.type == WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
finally:
await ws.close()
return ws
app.router.add_get('/ws', websocket_handler)
Server-Sent Events
from aiohttp import web
import asyncio
async def sse_handler(request):
"""Server-Sent Events handler."""
response = web.StreamResponse()
response.headers['Content-Type'] = 'text/event-stream'
response.headers['Cache-Control'] = 'no-cache'
response.headers['Connection'] = 'keep-alive'
await response.prepare(request)
try:
for i in range(10):
data = json.dumps({"count": i})
response.write(f"data: {data}\n\n".encode())
await response.drain()
await asyncio.sleep(1)
finally:
await response.write_eof()
return response
Middleware
Creating Middleware
from aiohttp import web
@web.middleware
async def auth_middleware(request, handler):
"""Authentication middleware."""
if request.path.startswith('/public'):
return await handler(request)
auth_header = request.headers.get('Authorization')
if not auth_header:
return web.HTTPUnauthorized(text="No auth header")
if not await verify_token(auth_header):
return web.HTTPForbidden(text="Invalid token")
return await handler(request)
app = web.Application(middlewares=[auth_middleware])
app = web.Application(
middlewares=[
logging_middleware,
auth_middleware,
error_middleware
]
)
Common Middleware Patterns
from aiohttp import web
import time
@web.middleware
async def log_middleware(request, handler):
start = time.time()
response = await handler(request)
duration = time.time() - start
print(f"{request.method} {request.path} - {response.status} - {duration:.3f}s")
return response
@web.middleware
async def cors_middleware(request, handler):
response = await handler(request)
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
return response
@web.middleware
async def rate_limit_middleware(request, handler):
ip = request.remote
if await is_rate_limited(ip):
return web.HTTPTooManyRequests(text="Rate limited")
await increment_rate_limit(ip)
return await handler(request)
Static Files
from aiohttp import web
app = web.Application()
app.router.add_static('/static/', 'path/to/static')
app.router.add_static(
'/static/',
'path/to/static',
show_index=True,
follow_symlinks=True,
append_version=True
)
app.router.add_get('/favicon.ico', lambda r: web.FileResponse('favicon.ico'))
Templates
Jinja2 Integration
pip install aiohttp-jinja2 jinja2
from aiohttp import web
import aiohttp_jinja2
import jinja2
loader = jinja2.FileSystemLoader('templates')
env = aiohttp_jinja2.Environment(
loader=loader,
autoescape=True,
enable_async=True
)
aiohttp_jinja2.setup(app, environment=env)
@aiohttp_jinja2.template('index.html')
async def index(request):
return {
'title': 'My Page',
'users': ['Alice', 'Bob', 'Charlie']
}
Template Filters
from aiohttp import web
import aiohttp_jinja2
import jinja2
env = aiohttp_jinja2.Environment(
loader=jinja2.FileSystemLoader('templates')
)
@env.template_filter('uppercase')
def uppercase(s):
return s.upper()
aiohttp_jinja2.setup(app, environment=env)
Client
Basic Client Usage
import aiohttp
import asyncio
async def fetch():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
data = await response.json()
print(data)
async with session.post(
'https://api.example.com/users',
json={'name': 'John', 'email': 'john@example.com'}
) as response:
result = await response.json()
async with session.put(
'https://api.example.com/users/1',
data={'name': 'Jane'}
) as response:
pass
async with session.delete('https://api.example.com/users/1') as response:
pass
asyncio.run(fetch())
Client Configuration
import aiohttp
async def configured_client():
async with aiohttp.ClientSession(
base_url='https://api.example.com',
headers={'Authorization': 'Bearer token'}
) as session:
async with session.get('/users/1') as response:
pass
timeout = aiohttp.ClientTimeout(
total=30,
connect=5,
sock_read=10
)
async with aiohttp.ClientSession(timeout=timeout) as session:
pass
async with aiohttp.ClientSession(
cookies={'session': 'abc123'}
) as session:
pass
import ssl
ssl_context = ssl.create_default_context()
async with aiohttp.ClientSession(
ssl=ssl_context
) as session:
pass
Client Request Options
import aiohttp
async def client_options():
async with aiohttp.ClientSession() as session:
async with session.get(
'/search',
params={'q': 'python', 'page': 1}
) as response:
pass
async with session.get(
'/api',
headers={'Authorization': 'Bearer token'}
) as response:
pass
async with session.post(
'/users',
json={'name': 'John'}
) as response:
pass
async with session.post(
'/login',
data={'username': 'john', 'password': 'secret'}
) as response:
pass
async with session.post(
'/upload',
data={'file': open('file.txt', 'rb')}
) as response:
pass
async with session.post(
'/data',
data='raw string',
content_type='text/plain'
) as response:
pass
Client Response
import aiohttp
async def handle_response():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
print(response.status)
print(response.headers)
print(response.content_type)
text = await response.text()
json_data = await response.json()
content = await response.read()
print(response.cookies)
print(response.history)
Client WebSocket
import aiohttp
async def websocket_client():
async with aiohttp.ClientSession() as session:
async with session.ws_connect('wss://example.com/ws') as ws:
await ws.send_str('Hello')
await ws.send_json({'type': 'message', 'data': 'test'})
msg = await ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
text = msg.data
elif msg.type == aiohttp.WSMsgType.BINARY:
data = msg.data
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"Error: {ws.exception()}")
await ws.ping()
await ws.close()
Advanced
Application Signals
from aiohttp import web
async def on_startup(app):
"""Called on startup."""
print("Application starting")
app['db'] = await create_db_pool()
async def on_cleanup(app):
"""Called on cleanup."""
print("Application cleaning up")
await app['db'].close()
async def on_shutdown(app):
"""Called on shutdown."""
print("Application shutting down")
app = web.Application()
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)
app.on_shutdown.append(on_shutdown)
Lifespan Context
from aiohttp import web
@web.lifespanContext
async def lifespan(app):
"""Context manager for application lifespan."""
app['db'] = await create_db_pool()
app['cache'] = await create_cache()
yield
await app['cache'].close()
await app['db'].close()
app = web.Application(lifespan=lifespan)
Signals
from aiohttp import web
from aiohttp import signals as signals
async def pre_signal_handler(app, services):
print("Pre-signal handler")
app.signal(signals.pre_shutdown).append(pre_signal_handler)
async def post_signal_handler(app):
print("Post-signal")
web.run_app(app, print=lambda x: None)
app.signal(web.Signals.POST_SIGNALS).append(post_signal_handler)
Testing
Test Client
from aiohttp import web
import aiohttp
from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
async def test_handler(request):
return web.json_response({"test": True})
app = web.Application()
app.router.add_get('/test', test_handler)
async def run_tests():
async with aiohttp.test_utils.TestClient(app) as client:
async with client.get('/test') as response:
assert response.status == 200
data = await response.json()
assert data == {"test": True}
asyncio.run(run_tests())
class MyTestCase(AioHTTPTestCase):
async def get_application(self):
app = web.Application()
app.router.add_get('/', lambda r: web.Response(text='OK'))
return app
@unittest_run_loop
async def test_index(self):
async with self.client.get('/') as response:
text = await response.text()
assert text == 'OK'
pytest-aiohttp
pip install pytest-aiohttp
import pytest
from aiohttp import web
@pytest.fixture
def app():
app = web.Application()
app.router.add_get('/', lambda r: web.Response(text='OK'))
return app
@pytest.mark.parametrize('path,expected', [
('/', 'OK'),
])
async def test_root(client, path, expected):
async with client.get(path) as response:
assert response.status == 200
text = await response.text()
assert text == expected
Performance
Keepalive
from aiohttp import web
app = web.Application(
client_max_cache_size=1000
)
async with aiohttp.ClientSession() as session:
for _ in range(100):
async with session.get('https://api.example.com/data'):
pass
Streaming
from aiohttp import web
async def upload_handler(request):
"""Handle streaming upload."""
reader = request.content
with open('uploaded.file', 'wb') as f:
while True:
chunk = await reader.read(1024 * 1024)
if not chunk:
break
f.write(chunk)
return web.Response(text="Uploaded")
Compression
from aiohttp import web
async with aiohttp.ClientSession() as session:
async with session.get(
'https://api.example.com/data',
headers={'Accept-Encoding': 'gzip, deflate'}
) as response:
pass
Security
CORS
from aiohttp import web
@web.middleware
async def cors_middleware(request, handler):
if request.method == 'OPTIONS':
response = web.Response()
else:
response = await handler(request)
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
response.headers['Access-Control-Max-Age'] = '3600'
return response
app = web.Application(middlewares=[cors_middleware])
Rate Limiting
from aiohttp import web
import time
from collections import defaultdict
class RateLimiter:
def __init__(self, max_requests, window):
self.max_requests = max_requests
self.window = window
self.requests = defaultdict(list)
def is_allowed(self, ip):
now = time.time()
self.requests[ip] = [
t for t in self.requests[ip]
if now - t < self.window
]
if len(self.requests[ip]) >= self.max_requests:
return False
self.requests[ip].append(now)
return True
@web.middleware
async def rate_limit_middleware(request, handler):
ip = request.remote
limiter = request.app['rate_limiter']
if not limiter.is_allowed(ip):
return web.HTTPTooManyRequests(text="Rate limited")
return await handler(request)
app = web.Application()
app['rate_limiter'] = RateLimiter(max_requests=100, window=60)
app.middlewares.append(rate_limit_middleware)
Common Issues
Memory Leaks
async def bad_request():
session = aiohttp.ClientSession()
response = await session.get('https://example.com')
data = await response.json()
async def good_request():
async with aiohttp.ClientSession() as session:
async with session.get('https://example.com') as response:
data = await response.json()
SSL Errors
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False)
) as session:
pass
import ssl
ssl_context = ssl.create_default_context()
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=ssl_context)
) as session:
pass
Timeouts
timeout = aiohttp.ClientTimeout(total=30, connect=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get('https://example.com') as response:
pass
Best Practices
Session Management
async def bad_handler(request):
session = aiohttp.ClientSession()
async with session.get(url) as response:
return response
async def setup(app):
app['session'] = aiohttp.ClientSession()
async def cleanup(app):
await app['session'].close()
app.on_startup.append(setup)
app.on_cleanup.append(cleanup)
async def good_handler(request):
session = request.app['session']
async with session.get(url) as response:
return response
Connection Settings
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
ttl_dns_cache=300,
ssl=True,
keepalive_timeout=30,
)
session = aiohttp.ClientSession(connector=connector)
Error Handling
async def safe_request(url):
try:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
logger.error(f"Request failed: {e}")
return None
finally:
pass
Do:
- Reuse ClientSession (not create per request)
- Always use
async with for responses
- Set timeouts on all requests
- Use
raise_for_status() for HTTP errors
Don't:
- Create ClientSession in handler
- Forget to close sessions on shutdown
- Use sync I/O in handlers
- Store large data in memory (use streaming)
References