| name | rust-async |
| description | Advanced async patterns expert. Handles Stream processing, backpressure control, select/join operations, cancellation, Future trait implementation, and async runtime optimization. |
| metadata | {"triggers":["async","Stream","backpressure","select","Future","tokio","async-std","cancellation","poll","async trait"]} |
Solution Patterns
Pattern 1: Stream Processing
use tokio_stream::{self as stream, StreamExt};
async fn process_stream(stream: impl Stream<Item = Data>) {
stream
.chunks(100)
.for_each(|batch| async {
process_batch(batch).await;
})
.await;
}
When to use: Processing continuous data flows (websockets, file streams, API pagination).
Key insight: Streams are async iterators - pull-based, lazy evaluation.
Pattern 2: Backpressure Control
use tokio::sync::Semaphore;
use std::sync::Arc;
let semaphore = Arc::new(Semaphore::new(10));
let stream = tokio_stream::iter(0..1000)
.map(|i| {
let permit = semaphore.clone().acquire_owned();
async move {
let _permit = permit.await?;
process(i).await
}
})
.buffer_unordered(100);
When to use: Prevent overwhelming downstream systems or resource exhaustion.
Trade-offs: Adds latency but prevents overload.
Pattern 3: Select Multiplexing
use tokio::select;
use tokio::time::{sleep, Duration};
async fn multiplex() {
loop {
select! {
msg = receiver.recv() => {
if let Some(msg) = msg {
handle(msg).await;
} else {
break;
}
}
_ = sleep(Duration::from_secs(5)) => {
check_health().await;
}
else => break,
}
}
}
When to use: Waiting on multiple async operations, first-to-complete wins.
Gotcha: All branches must be cancellation-safe.
Pattern 4: Task Cancellation
use tokio::time::timeout;
use std::time::Duration;
async fn with_timeout() -> Result<Value, TimeoutError> {
timeout(Duration::from_secs(5), long_operation()).await
.map_err(|_| TimeoutError)?
}
let mut task = tokio::spawn(async move {
loop {
tokio::task::yield_now().await;
if let Err(_) = work().await {
return;
}
}
});
task.abort();
let _ = task.await;
When to use: Operations with time limits or user-requested cancellation.
Key insight: Cancellation is cooperative - requires yield points.
Workflow
Step 1: Choose Stream vs Iterator
Sync data source?
→ Use Iterator (more efficient)
Async data source (network, DB)?
→ Use Stream
Need backpressure?
→ Definitely Stream
Step 2: Design Concurrency Strategy
Sequential processing?
→ for_each / fold
Limited concurrency?
→ buffer_unordered(N) + Semaphore
Unlimited (dangerous)?
→ Use with extreme caution
Step 3: Handle Cancellation
Long-running task?
→ Add timeout wrapper
User-initiated?
→ Implement abort signal
Resource cleanup?
→ Use Drop or explicit cleanup
Join vs Try_Join
Join - Wait for All
use tokio::join;
let (a, b, c) = join!(
fetch_user(),
fetch_posts(),
fetch_comments()
);
Use when: All results needed regardless of individual failures.
Try_Join - Fail Fast
use tokio::try_join;
let (a, b) = try_join!(
async_op_a(),
async_op_b()
)?;
Use when: All operations must succeed, fail fast on errors.
Combined Pattern
async fn fetch_dashboard() -> Result<Dashboard, Error> {
let (user, posts, comments) = try_join!(
fetch_user(),
fetch_posts(),
fetch_comments()
)?;
Ok(Dashboard { user, posts, comments })
}
Common Errors & Solutions
| Error | Cause | Solution |
|---|
.await forgotten | Future not polled | Check all async fn calls have .await |
| Cancellation unhandled | Task aborted mid-operation | Implement cooperative cancellation |
| Missing backpressure | Unbounded concurrency | Use Semaphore or buffer_unordered |
| Deadlock | Lock held across .await | Minimize lock scope, drop before await |
| Async drop unsupported | Drop in async context | Use spawn for cleanup or blocking drop |
Backpressure Strategies
Strategy 1: Semaphore-Based
let sem = Arc::new(Semaphore::new(10));
stream
.map(|item| {
let sem = sem.clone();
async move {
let _permit = sem.acquire().await?;
process(item).await
}
})
.buffer_unordered(10)
Pros: Precise control, easy to reason about
Cons: Semaphore overhead
Strategy 2: Buffered Stream
stream
.chunks(100)
.for_each_concurrent(5, |batch| async move {
process_batch(batch).await
})
.await
Pros: Simple, built-in to StreamExt
Cons: Less fine-grained control
Strategy 3: Channel-Based
let (tx, mut rx) = mpsc::channel(100);
tx.send(item).await?;
while let Some(item) = rx.recv().await {
process(item).await;
}
Pros: Natural backpressure from bounded channel
Cons: Extra copy/move overhead
Performance Tips
| Pattern | Performance Insight |
|---|
select! | More lightweight than multiple tokio::spawn |
buffer_unordered | More flexible than for_each_concurrent |
.chunks() | Reduces per-item overhead for bulk operations |
| Lock-free at await | Never hold locks across .await points |
spawn_blocking | Use for CPU-bound work in async context |
Advanced: Future Trait
Implementing Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if Instant::now() >= self.when {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
When to implement: Custom async primitives, advanced control flow.
Gotcha: Must properly handle wakeup notifications.
Review Checklist
When reviewing async code:
Verification Commands
cargo check
cargo test
cargo clippy -- -W clippy::await_holding_lock
RUSTFLAGS="--cfg tokio_unstable" cargo run
cargo flamegraph --bin your-app
Common Pitfalls
1. Forgotten Await
Symptom: Future never executes, unexpected behavior
async fn bad() {
fetch_data();
}
async fn good() {
fetch_data().await;
}
2. Unbounded Concurrency
Symptom: Resource exhaustion, system overload
let futures: Vec<_> = urls.iter()
.map(|url| fetch(url))
.collect();
let results = join_all(futures).await;
use futures::stream::{self, StreamExt};
let results = stream::iter(urls)
.map(|url| fetch(url))
.buffer_unordered(10)
.collect::<Vec<_>>()
.await;
3. Lock Across Await
Symptom: Deadlock, "future cannot be sent between threads safely"
let guard = mutex.lock().await;
some_async_op().await;
drop(guard);
let value = {
let guard = mutex.lock().await;
guard.clone()
};
some_async_op().await;
4. Async Drop
Symptom: Cannot await in Drop impl
impl Drop for Resource {
fn drop(&mut self) {
self.cleanup().await;
}
}
impl Resource {
async fn cleanup(self) {
}
}
impl Drop for Resource {
fn drop(&mut self) {
let handle = self.handle.take();
tokio::spawn(async move {
if let Some(h) = handle {
h.cleanup().await;
}
});
}
}
Related Skills
- rust-concurrency - Thread safety, Send/Sync basics
- rust-async-pattern - Async architecture patterns
- rust-ownership - Lifetime issues in async contexts
- rust-pin - Pin and self-referential types
- rust-performance - Async performance optimization
- rust-web - Async web frameworks (axum, actix)
Localized Reference