ワンクリックで
rust-dpdk
用户态网络专家。处理 DPDK, 用户态驱动, 高性能网络, packet processing, 零拷贝, RSS 负载均衡
Codex または Claude でインストール この Prompt をコピーして Codex、Claude、または他のアシスタントに貼り付けると、Skill ページを確認してインストールできます。
メニュー
用户态网络专家。处理 DPDK, 用户态驱动, 高性能网络, packet processing, 零拷贝, RSS 负载均衡
Codex または Claude でインストール この Prompt をコピーして Codex、Claude、または他のアシスタントに貼り付けると、Skill ページを確認してインストールできます。
SOC 職業分類に基づく
Actor model expert covering message passing, state isolation, supervision trees, deadlock prevention, fault tolerance, Actix framework, and Erlang-style concurrency patterns.
Rust anti-patterns and common mistakes expert. Handles code review issues with clone abuse, unwrap in production, String misuse, index loops, and refactoring guidance.
Advanced async patterns expert covering Stream implementation, zero-copy buffers, tokio::spawn lifetimes, plugin system scheduling, tonic streaming, and async lifetime management.
Advanced async patterns expert. Handles Stream processing, backpressure control, select/join operations, cancellation, Future trait implementation, and async runtime optimization.
Authentication and authorization expert covering JWT, API keys, OAuth, RBAC, password hashing, distributed token storage, and session management patterns.
Caching and distributed storage expert covering Redis, connection pools, TTL strategies, cache patterns (Cache-Aside, Write-Through), invalidation, and performance optimization.
| name | rust-dpdk |
| description | 用户态网络专家。处理 DPDK, 用户态驱动, 高性能网络, packet processing, 零拷贝, RSS 负载均衡--- |
| 特性 | 内核网络栈 | DPDK |
|---|---|---|
| 上下文切换 | 每次包都切换 | 轮询模式,无切换 |
| 内存拷贝 | 多次拷贝 | 零拷贝 |
| 中断 | 频繁中断 | 轮询 (poll mode driver) |
| 延迟 | 较高 | 微秒级 |
| 吞吐量 | 万级 PPS | 百万级 PPS |
| CPU 利用率 | 较低但有开销 | 高但高效 |
// DPDK 核心结构
struct DpdkContext {
memory_pool: Mempool, // 内存池
ports: Vec<Port>, // 网卡端口
rx_queues: Vec<RxQueue>, // 接收队列
tx_queues: Vec<TxQueue>, // 发送队列
cpu_cores: Vec<Core>, // CPU 核心分配
}
struct Port {
port_id: u16,
mac_addr: [u8; 6],
link_speed: u32,
max_queues: u16,
}
struct Mempool {
name: String,
buffer_size: usize,
cache_size: usize,
total_buffers: u32,
}
// 创建 DPDK 内存池
fn create_mempool() -> Result<Mempool, DpdkError> {
let mempool = unsafe {
rte_mempool_create(
b"packet_pool\0".as_ptr() as *const c_char,
NUM_BUFFERS as u32, // 缓冲区数量
BUFFER_SIZE as u16, // 每个缓冲区大小
CACHE_SIZE as u32, // CPU 缓存大小
0, // 私有数据大小
Some(rte_pktmbuf_pool_init), // 初始化函数
std::ptr::null(), // 初始化参数
Some(rte_pktmbuf_init), // 对象初始化函数
std::ptr::null(), // 对象参数
rte_socket_id() as i32, // 内存所在 Socket
0, // 标志位
)
};
if mempool.is_null() {
Err(DpdkError::MempoolCreateFailed)
} else {
Ok(Mempool { inner: mempool })
}
}
// 分配缓冲区
fn alloc_mbuf(mempool: &Mempool) -> Option<*mut rte_mbuf> {
unsafe {
let mbuf = rte_pktmbuf_alloc(mempool.inner);
if mbuf.is_null() {
None
} else {
Some(mbuf)
}
}
}
// 零拷贝接收数据包
fn process_packets(
port_id: u16,
queue_id: u16,
bufs: &mut [*mut rte_mbuf; MAX_BURST_SIZE],
) -> usize {
let num_received = unsafe {
rte_eth_rx_burst(
port_id,
queue_id,
bufs.as_mut_ptr(),
MAX_BURST_SIZE as u16,
)
};
// 直接处理 mbuf,不需要拷贝
for i in 0..num_received {
let mbuf = bufs[i];
// 访问数据(零拷贝)
let data_ptr = unsafe {
rte_pktmbuf_mtod(mbuf, *const u8)
};
let data_len = unsafe {
rte_pktmbuf_pkt_len(mbuf)
};
// 处理数据包
process_packet(data_ptr, data_len);
// 释放 mbuf 回内存池
unsafe {
rte_pktmbuf_free(mbuf);
}
}
num_received
}
// 批量发送数据包
fn transmit_packets(
port_id: u16,
queue_id: u16,
packets: &[Packet],
) -> usize {
let mut mbufs: Vec<*mut rte_mbuf> = packets
.iter()
.map(|p| p.to_mbuf())
.collect();
let sent = unsafe {
rte_eth_tx_burst(
port_id,
queue_id,
mbufs.as_mut_ptr(),
mbufs.len() as u16,
)
};
// 释放未发送的 mbuf
for i in sent..mbufs.len() {
unsafe {
rte_pktmbuf_free(mbufs[i]);
}
}
sent
}
// 配置 RSS (Receive Side Scaling)
fn configure_rss(port_id: u16) -> Result<(), DpdkError> {
let mut port_info: rte_eth_dev_info = unsafe { std::mem::zeroed() };
unsafe {
rte_eth_dev_info_get(port_id, &mut port_info);
}
// 配置 RSS 哈希
let mut rss_conf: rte_eth_rss_conf = unsafe { std::mem::zeroed() };
rss_conf.rss_key_len = 40;
rss_conf.rss_hf = RTE_ETH_RSS_TCP | RTE_ETH_RSS_UDP | RTE_ETH_RSS_IPV4;
unsafe {
let ret = rte_eth_dev_rss_hash_conf_update(
port_id,
&rss_conf,
);
if ret < 0 {
return Err(DpdkError::RssConfigFailed);
}
}
Ok(())
}
// 根据哈希值分配队列
fn get_queue_by_hash(hash: u32, num_queues: u16) -> u16 {
// 使用简单的取模分发
(hash % num_queues as u32) as u16
}
// 配置多队列
fn configure_multi_queue(port_id: u16, num_queues: u16) -> Result<(), DpdkError> {
let mut port_conf: rte_eth_conf = unsafe { std::mem::zeroed() };
port_conf.rxmode.split_hdr_size = 0;
port_conf.rxmode.mq_mode = rte_eth_mq_mode::ETH_MQ_RX_RSS;
port_conf.txmode.mq_mode = rte_eth_mq_mode::ETH_MQ_TX_NONE;
// 配置 RX 队列
let mut rx_conf: rte_eth_rxconf = unsafe { std::mem::zeroed() };
rx_conf.rx_free_thresh = 32;
rx_conf.rx_drop_en = 0;
// 配置 TX 队列
let mut tx_conf: rte_eth_txconf = unsafe { std::mem::zeroed() };
tx_conf.tx_free_thresh = 32;
// 分配 RX 队列
for queue in 0..num_queues {
unsafe {
let ret = rte_eth_rx_queue_setup(
port_id,
queue,
1024, // 队列深度
rte_socket_id() as u32,
&rx_conf,
mempool.inner,
);
if ret < 0 {
return Err(DpdkError::QueueSetupFailed);
}
}
}
// 分配 TX 队列
for queue in 0..num_queues {
unsafe {
let ret = rte_eth_tx_queue_setup(
port_id,
queue,
1024,
rte_socket_id() as u32,
&tx_conf,
);
if ret < 0 {
return Err(DpdkError::QueueSetupFailed);
}
}
}
Ok(())
}
use std::os::raw::c_int;
use std::thread;
fn set_cpu_affinity(core_id: u32) -> Result<(), DpdkError> {
let mut cpuset: cpu_set_t = unsafe { std::mem::zeroed() };
unsafe {
CPU_SET(core_id as usize, &mut cpuset);
let ret = pthread_setaffinity_np(
pthread_self(),
std::mem::size_of::<cpu_set_t>(),
&cpuset,
);
if ret != 0 {
return Err(DpdkError::AffinitySetFailed);
}
}
Ok(())
}
// 为每个 RX 队列分配专用核心
fn allocate_cores_for_queues(num_queues: u16) {
for queue in 0..num_queues {
thread::spawn(move || {
set_cpu_affinity(queue as u32).unwrap();
process_queue(queue);
});
}
}
| 优化点 | 方法 |
|---|---|
| 内存对齐 | 缓存行对齐 (64 字节) |
| 无锁队列 | 使用 SPSC 队列 |
| 批处理 | 批量收发减少系统调用 |
| CPU 亲和性 | 核心绑定减少上下文切换 |
| Hugepages | 2MB/1GB 大页减少 TLB miss |
rust-dpdk
│
├─► rust-performance → 性能优化
├─► rust-embedded → no_std 环境
└─► rust-concurrency → 并发模型