| name | rust-async-pattern |
| description | Advanced async patterns expert covering Stream implementation, zero-copy buffers, tokio::spawn lifetimes, plugin system scheduling, tonic streaming, and async lifetime management. |
| metadata | {"triggers":["async pattern","Stream","tokio::spawn","zero-copy","plugin system","tonic","streaming","BorrowedMessage","async scheduling","lifetime async"]} |
Solution Patterns
Pattern 1: Stream with Internal Buffer (Worker + Channel)
use tokio::sync::mpsc::{channel, Sender, Receiver};
use bytes::Bytes;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct SessionWorker {
rx_events: Receiver<Bytes>,
tx_snapshots: Sender<SnapshotResponse>,
buf: Vec<u8>,
}
impl SessionWorker {
pub async fn run(&mut self) {
while let Some(event) = self.rx_events.recv().await {
let snapshot = self.process_event(event);
let _ = self.tx_snapshots.send(snapshot).await;
}
}
fn process_event(&mut self, event: Bytes) -> SnapshotResponse {
let start = self.buf.len();
self.buf.extend_from_slice(&event);
SnapshotResponse {
id: self.next_id(),
payload: Bytes::copy_from_slice(&self.buf[start..]),
}
}
}
pub struct SessionStream {
rx_snapshots: Receiver<SnapshotResponse>,
}
impl Stream for SessionStream {
type Item = Result<SnapshotResponse, Status>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.rx_snapshots.poll_recv(cx) {
Poll::Ready(Some(snapshot)) => Poll::Ready(Some(Ok(snapshot))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Clone)]
pub struct SnapshotResponse {
pub id: u64,
pub payload: Bytes,
}
Pattern 2: tokio::spawn with Non-'static Lifetimes (Arena Pattern)
use std::sync::Arc;
use parking_lot::RwLock;
pub struct BorrowedMessage<'a> {
pub raw: &'a [u8],
pub meta: MessageMeta,
}
pub struct MessageArena {
buffers: Arc<RwLock<Vec<Arc<Vec<u8>>>>>,
}
impl MessageArena {
pub fn new() -> Self {
Self {
buffers: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn alloc(&self, data: &[u8]) -> MessageRef {
let mut buffers = self.buffers.write();
let idx = buffers.len();
buffers.push(Arc::new(data.to_vec()));
MessageRef {
index: idx,
arena: self.buffers.clone(),
}
}
pub fn get(&self, msg_ref: &MessageRef) -> Option<Arc<Vec<u8>>> {
let buffers = self.buffers.read();
buffers.get(msg_ref.index).cloned()
}
}
#[derive(Clone)]
pub struct MessageRef {
index: usize,
arena: Arc<RwLock<Vec<Arc<Vec<u8>>>>>,
}
impl MessageRef {
pub fn data(&self) -> Option<Arc<Vec<u8>>> {
let buffers = self.arena.read();
buffers.get(self.index).cloned()
}
}
pub trait Plugin: Send + Sync {
async fn handle(&self, msg: MessageRef) -> Result<(), HandlerError>;
}
fn dispatch_to_plugins(plugins: &[Arc<dyn Plugin>], msg: MessageRef) {
for plugin in plugins {
let plugin = Arc::clone(plugin);
let msg = msg.clone();
tokio::spawn(async move {
if let Err(e) = plugin.handle(msg).await {
log::error!("Plugin error: {}", e);
}
});
}
}
Pattern 3: Plugin System with Actor Pattern (Event Loop)
use tokio::sync::mpsc;
struct PluginActor<P: Plugin> {
plugin: P,
queue: mpsc::Receiver<PluginMsg>,
arena: Arc<MessageArena>,
}
enum PluginMsg {
Process(MessageRef),
Shutdown,
}
impl<P: Plugin> PluginActor<P> {
pub async fn run(&mut self) {
while let Some(msg) = self.queue.recv().await {
match msg {
PluginMsg::Process(msg_ref) => {
if let Err(e) = self.plugin.handle(msg_ref).await {
log::error!("Plugin error: {}", e);
}
}
PluginMsg::Shutdown => break,
}
}
}
}
pub struct PluginDispatcher {
actors: Vec<mpsc::Sender<PluginMsg>>,
}
impl PluginDispatcher {
pub async fn dispatch(&self, msg: MessageRef) {
for actor in &self.actors {
let _ = actor.send(PluginMsg::Process(msg.clone())).await;
}
}
}
Pattern 4: Zero-Copy with Owned Snapshots
use bytes::Bytes;
pub struct ZeroCopyBuffer {
data: Bytes,
}
impl ZeroCopyBuffer {
pub fn new(data: Vec<u8>) -> Self {
Self {
data: Bytes::from(data),
}
}
pub fn slice(&self, range: std::ops::Range<usize>) -> Bytes {
self.data.slice(range)
}
}
pub struct ZeroCopyStream {
buffer: ZeroCopyBuffer,
position: usize,
chunk_size: usize,
}
impl Stream for ZeroCopyStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.position >= self.buffer.data.len() {
return Poll::Ready(None);
}
let end = (self.position + self.chunk_size).min(self.buffer.data.len());
let chunk = self.buffer.slice(self.position..end);
self.position = end;
Poll::Ready(Some(Ok(chunk)))
}
}
Pattern 5: Tonic Streaming with Snapshot Pattern
use tonic::{Request, Response, Status};
use futures::Stream;
pub struct MyService {
arena: Arc<MessageArena>,
}
#[tonic::async_trait]
impl MyServiceTrait for MyService {
type StreamResponse = Pin<Box<dyn Stream<Item = Result<SnapshotResponse, Status>> + Send>>;
async fn stream_data(
&self,
request: Request<StreamRequest>,
) -> Result<Response<Self::StreamResponse>, Status> {
let (tx, rx) = mpsc::channel(100);
let arena = self.arena.clone();
tokio::spawn(async move {
let mut buffer = Vec::new();
for i in 0..100 {
let data = format!("chunk {}", i);
buffer.extend_from_slice(data.as_bytes());
let snapshot = SnapshotResponse {
id: i,
payload: Bytes::copy_from_slice(&buffer),
};
if tx.send(Ok(snapshot)).await.is_err() {
break;
}
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream)))
}
}
Architecture Patterns
When to Use Each Pattern
| Pattern | Use Case | Pros | Cons |
|---|
| Worker + Channel | Stream with internal state | Clean separation, no lifetime issues | Extra allocation for snapshots |
| Arena + Index | Plugin systems, tokio::spawn | Can spawn tasks, zero-copy possible | Complex lifecycle management |
| Actor Event Loop | Coordinated scheduling | No spawn needed, backpressure control | Single-threaded per actor |
| Bytes (Arc) | Network buffers | True zero-copy via reference counting | Reference counting overhead |
| Owned Snapshots | API boundaries | Simple, always works | Copies data |
Workflow
Step 1: Identify Lifetime Constraint
Check for:
โ Stream returning borrowed data? Worker + Channel
โ tokio::spawn with <'a> lifetimes? Arena or owned data
โ Self-referential struct? Redesign with indices
โ API boundary (GraphQL, gRPC)? Use owned DTOs
Step 2: Choose Architecture
Decision tree:
โ Need true zero-copy? Use Bytes (Arc-based)
โ Need to spawn tasks? Arena or owned data
โ Plugin system? Actor pattern or Arena
โ Simple API? Owned snapshots (accept copy cost)
Step 3: Validate Constraints
Verify:
โ All spawned tasks are 'static
โ Stream::Item doesn't borrow self
โ No self-referential pointers
โ API types are Send + Sync + 'static
Review Checklist
When implementing advanced async patterns:
Verification Commands
cargo check
cargo expand --lib my_async_fn
cargo test --test stream_tests
cargo bench --bench buffer_bench
cargo clippy -- -W clippy::future_not_send
Common Pitfalls
1. Stream Returning Borrowed Data
Symptom: "hidden type captures lifetime that does not appear in bounds"
pub struct BadStream<'buf> {
buf: Vec<u8>,
}
impl<'buf> Stream for BadStream<'buf> {
type Item = &'buf [u8];
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}
pub struct GoodStream {
rx: Receiver<Bytes>,
}
impl Stream for GoodStream {
type Item = Bytes;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
2. Spawning Non-'static Future
Symptom: "borrowed data escapes outside of method"
async fn bad_spawn(data: &[u8]) {
tokio::spawn(async move {
process(data).await;
});
}
async fn good_spawn(data: &[u8]) {
let owned = data.to_vec();
tokio::spawn(async move {
process(&owned).await;
});
}
async fn better_spawn(data: Arc<Vec<u8>>) {
tokio::spawn(async move {
process(&data).await;
});
}
3. Blocking Operations in Async
Symptom: Task blocks event loop, other tasks can't progress
async fn bad_async() {
let data = std::fs::read("file.txt").unwrap();
}
async fn good_async() {
let data = tokio::fs::read("file.txt").await.unwrap();
}
async fn blocking_async() {
let data = tokio::task::spawn_blocking(|| {
std::fs::read("file.txt").unwrap()
}).await.unwrap();
}
Decision Matrix
When to Spawn vs Event Loop
| Scenario | Approach |
|---|
| Independent parallel tasks | tokio::spawn |
| Coordinated scheduling | Event loop |
| Plugin system | Actor pattern |
| Long-running stateful | Actor |
| Short-lived tasks | spawn |
| Need backpressure | Channel + actor |
| Complex lifecycle | Actor with supervision |
Related Skills
- rust-async - Async/await fundamentals
- rust-lifetime-complex - Advanced lifetime patterns
- rust-pin - Pin and self-referential types
- rust-actor - Actor model patterns
- rust-concurrency - Concurrency primitives
- rust-performance - Zero-copy optimization
Localized Reference
- Chinese version: SKILL_ZH.md - ๅฎๆดไธญๆ็ๆฌ๏ผๅ
ๅซๆๆๅ
ๅฎน