| name | rust-dpdk |
| description | 用户态网络专家。处理 DPDK, 用户态驱动, 高性能网络, packet processing, 零拷贝, RSS 负载均衡--- |
DPDK vs 内核网络栈
| 特性 | 内核网络栈 | DPDK |
|---|
| 上下文切换 | 每次包都切换 | 轮询模式,无切换 |
| 内存拷贝 | 多次拷贝 | 零拷贝 |
| 中断 | 频繁中断 | 轮询 (poll mode driver) |
| 延迟 | 较高 | 微秒级 |
| 吞吐量 | 万级 PPS | 百万级 PPS |
| CPU 利用率 | 较低但有开销 | 高但高效 |
核心组件
struct DpdkContext {
memory_pool: Mempool,
ports: Vec<Port>,
rx_queues: Vec<RxQueue>,
tx_queues: Vec<TxQueue>,
cpu_cores: Vec<Core>,
}
struct Port {
port_id: u16,
mac_addr: [u8; 6],
link_speed: u32,
max_queues: u16,
}
struct Mempool {
name: String,
buffer_size: usize,
cache_size: usize,
total_buffers: u32,
}
内存池管理
fn create_mempool() -> Result<Mempool, DpdkError> {
let mempool = unsafe {
rte_mempool_create(
b"packet_pool\0".as_ptr() as *const c_char,
NUM_BUFFERS as u32,
BUFFER_SIZE as u16,
CACHE_SIZE as u32,
0,
Some(rte_pktmbuf_pool_init),
std::ptr::null(),
Some(rte_pktmbuf_init),
std::ptr::null(),
rte_socket_id() as i32,
0,
)
};
if mempool.is_null() {
Err(DpdkError::MempoolCreateFailed)
} else {
Ok(Mempool { inner: mempool })
}
}
fn alloc_mbuf(mempool: &Mempool) -> Option<*mut rte_mbuf> {
unsafe {
let mbuf = rte_pktmbuf_alloc(mempool.inner);
if mbuf.is_null() {
None
} else {
Some(mbuf)
}
}
}
零拷贝接收
fn process_packets(
port_id: u16,
queue_id: u16,
bufs: &mut [*mut rte_mbuf; MAX_BURST_SIZE],
) -> usize {
let num_received = unsafe {
rte_eth_rx_burst(
port_id,
queue_id,
bufs.as_mut_ptr(),
MAX_BURST_SIZE as u16,
)
};
for i in 0..num_received {
let mbuf = bufs[i];
let data_ptr = unsafe {
rte_pktmbuf_mtod(mbuf, *const u8)
};
let data_len = unsafe {
rte_pktmbuf_pkt_len(mbuf)
};
process_packet(data_ptr, data_len);
unsafe {
rte_pktmbuf_free(mbuf);
}
}
num_received
}
批量发送
fn transmit_packets(
port_id: u16,
queue_id: u16,
packets: &[Packet],
) -> usize {
let mut mbufs: Vec<*mut rte_mbuf> = packets
.iter()
.map(|p| p.to_mbuf())
.collect();
let sent = unsafe {
rte_eth_tx_burst(
port_id,
queue_id,
mbufs.as_mut_ptr(),
mbufs.len() as u16,
)
};
for i in sent..mbufs.len() {
unsafe {
rte_pktmbuf_free(mbufs[i]);
}
}
sent
}
RSS 负载均衡
fn configure_rss(port_id: u16) -> Result<(), DpdkError> {
let mut port_info: rte_eth_dev_info = unsafe { std::mem::zeroed() };
unsafe {
rte_eth_dev_info_get(port_id, &mut port_info);
}
let mut rss_conf: rte_eth_rss_conf = unsafe { std::mem::zeroed() };
rss_conf.rss_key_len = 40;
rss_conf.rss_hf = RTE_ETH_RSS_TCP | RTE_ETH_RSS_UDP | RTE_ETH_RSS_IPV4;
unsafe {
let ret = rte_eth_dev_rss_hash_conf_update(
port_id,
&rss_conf,
);
if ret < 0 {
return Err(DpdkError::RssConfigFailed);
}
}
Ok(())
}
fn get_queue_by_hash(hash: u32, num_queues: u16) -> u16 {
(hash % num_queues as u32) as u16
}
多队列配置
fn configure_multi_queue(port_id: u16, num_queues: u16) -> Result<(), DpdkError> {
let mut port_conf: rte_eth_conf = unsafe { std::mem::zeroed() };
port_conf.rxmode.split_hdr_size = 0;
port_conf.rxmode.mq_mode = rte_eth_mq_mode::ETH_MQ_RX_RSS;
port_conf.txmode.mq_mode = rte_eth_mq_mode::ETH_MQ_TX_NONE;
let mut rx_conf: rte_eth_rxconf = unsafe { std::mem::zeroed() };
rx_conf.rx_free_thresh = 32;
rx_conf.rx_drop_en = 0;
let mut tx_conf: rte_eth_txconf = unsafe { std::mem::zeroed() };
tx_conf.tx_free_thresh = 32;
for queue in 0..num_queues {
unsafe {
let ret = rte_eth_rx_queue_setup(
port_id,
queue,
1024,
rte_socket_id() as u32,
&rx_conf,
mempool.inner,
);
if ret < 0 {
return Err(DpdkError::QueueSetupFailed);
}
}
}
for queue in 0..num_queues {
unsafe {
let ret = rte_eth_tx_queue_setup(
port_id,
queue,
1024,
rte_socket_id() as u32,
&tx_conf,
);
if ret < 0 {
return Err(DpdkError::QueueSetupFailed);
}
}
}
Ok(())
}
CPU 亲和性
use std::os::raw::c_int;
use std::thread;
fn set_cpu_affinity(core_id: u32) -> Result<(), DpdkError> {
let mut cpuset: cpu_set_t = unsafe { std::mem::zeroed() };
unsafe {
CPU_SET(core_id as usize, &mut cpuset);
let ret = pthread_setaffinity_np(
pthread_self(),
std::mem::size_of::<cpu_set_t>(),
&cpuset,
);
if ret != 0 {
return Err(DpdkError::AffinitySetFailed);
}
}
Ok(())
}
fn allocate_cores_for_queues(num_queues: u16) {
for queue in 0..num_queues {
thread::spawn(move || {
set_cpu_affinity(queue as u32).unwrap();
process_queue(queue);
});
}
}
性能优化
| 优化点 | 方法 |
|---|
| 内存对齐 | 缓存行对齐 (64 字节) |
| 无锁队列 | 使用 SPSC 队列 |
| 批处理 | 批量收发减少系统调用 |
| CPU 亲和性 | 核心绑定减少上下文切换 |
| Hugepages | 2MB/1GB 大页减少 TLB miss |
与其他技能关联
rust-dpdk
│
├─► rust-performance → 性能优化
├─► rust-embedded → no_std 环境
└─► rust-concurrency → 并发模型