원클릭으로
rust-distributed
分布式系统专家。处理 Raft, 2PC, 共识算法, 分布式事务, 分布式一致性, 分布式协调
Codex 또는 Claude로 설치 이 Prompt를 복사해 Codex, Claude 또는 다른 어시스턴트에 붙여 넣으면 Skill 페이지를 검토하고 설치를 진행할 수 있습니다.
메뉴
分布式系统专家。处理 Raft, 2PC, 共识算法, 分布式事务, 分布式一致性, 分布式协调
Codex 또는 Claude로 설치 이 Prompt를 복사해 Codex, Claude 또는 다른 어시스턴트에 붙여 넣으면 Skill 페이지를 검토하고 설치를 진행할 수 있습니다.
SOC 직업 분류 기준
| name | rust-distributed |
| description | 分布式系统专家。处理 Raft, 2PC, 共识算法, 分布式事务, 分布式一致性, 分布式协调--- |
┌─────────────────────────────────────────────────────┐
│ Raft 集群 │
├─────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Leader │ ◄──►│ Follower│ ◄──►│ Follower│ │
│ │ 节点 │ │ 节点 │ │ 节点 │ │
│ └────┬────┘ └─────────┘ └─────────┘ │
│ │ │
│ - 处理客户端请求 │
│ - 复制日志到 Follower │
│ - 管理心跳和选举 │
└─────────────────────────────────────────────────────┘
// Raft 节点状态
enum RaftState {
Follower,
Candidate,
Leader,
}
struct RaftNode {
state: RaftState,
current_term: u64,
voted_for: Option<u64>,
log: Vec<LogEntry>,
commit_index: usize,
last_applied: usize,
// 选举相关
election_timeout: Duration,
last_heartbeat: Instant,
// 集群配置
node_id: u64,
peers: Vec<u64>,
}
struct LogEntry {
term: u64,
index: usize,
command: Vec<u8>,
}
impl RaftNode {
// Leader 复制日志到 Follower
fn replicate_log(&mut self, peer: u64) {
let prev_log_index = self.get_last_log_index_for(peer);
let prev_log_term = self.get_last_log_term_for(peer);
let entries: Vec<LogEntry> = self.log
[(prev_log_index + 1)..]
.to_vec();
let rpc = AppendEntriesRequest {
term: self.current_term,
leader_id: self.node_id,
prev_log_index,
prev_log_term,
entries,
leader_commit: self.commit_index,
};
self.send_append_entries(peer, rpc);
}
}
impl RaftNode {
fn start_election(&mut self) {
self.state = RaftState::Candidate;
self.current_term += 1;
self.voted_for = Some(self.node_id);
let mut votes = 1;
// 向所有节点请求投票
for peer in &self.peers {
let request = RequestVoteRequest {
term: self.current_term,
candidate_id: self.node_id,
last_log_index: self.log.len(),
last_log_term: self.get_last_log_term(),
};
if let Some(response) = self.send_request_vote(peer, request) {
if response.vote_granted {
votes += 1;
if votes > self.peers.len() / 2 {
self.become_leader();
return;
}
}
}
}
// 选举失败,回到 Follower
self.state = RaftState::Follower;
}
}
struct TwoPhaseCommitCoordinator {
transaction_id: u128,
participants: Vec<Participant>,
state: TwoPCState,
}
enum TwoPCState {
Init,
WaitingPrepare,
WaitingCommit,
Committed,
Aborted,
}
impl TwoPhaseCommitCoordinator {
pub fn start_transaction(&mut self) {
self.state = TwoPCState::WaitingPrepare;
// 第一阶段:发送 prepare
for participant in &self.participants {
participant.send(PrepareMessage {
transaction_id: self.transaction_id,
});
}
}
pub fn handle_prepare_response(&mut self, response: PrepareResponse) {
if response.vote == Vote::Abort {
self.abort();
} else if self.all_prepared() {
self.state = TwoPCState::WaitingCommit;
// 第二阶段:发送 commit
for participant in &self.participants {
participant.send(CommitMessage {
transaction_id: self.transaction_id,
});
}
}
}
}
struct Participant {
transaction_manager: TransactionManager,
state: ParticipantState,
}
enum ParticipantState {
Init,
Prepared,
Committed,
Aborted,
}
impl Participant {
pub fn handle_prepare(&mut self, msg: PrepareMessage) {
// 执行本地事务操作
let result = self.transaction_manager.execute();
match result {
Ok(_) => {
self.state = ParticipantState::Prepared;
self.send(PrepareResponse {
vote: Vote::Commit,
..msg
});
}
Err(_) => {
self.send(PrepareResponse {
vote: Vote::Abort,
..msg
});
}
}
}
}
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 阻塞 | 协调者故障 | 超时机制、备份协调者 |
| 单点故障 | 依赖协调者 | 分布式协调者 (etcd/ZooKeeper) |
| 性能 | 多次网络往返 | 批量提交、优化超时 |
// 最终一致性
trait EventuallyConsistent {
fn put(&self, key: &str, value: &str);
fn get(&self, key: &str) -> Option<String>;
}
// 强一致性(线性化)
trait Linearizable {
fn put(&self, key: &str, value: &str) -> Result<()>;
fn get(&self, key: &str) -> Result<String>;
}
// 顺序一致性
trait SequentialConsistent {
fn put(&self, key: &str, value: &str);
fn get(&self, key: &str) -> Vec<String>; // 返回历史版本
}
// Snowflake 算法
struct SnowflakeGenerator {
worker_id: u64,
datacenter_id: u64,
sequence: u64,
last_timestamp: u64,
}
impl SnowflakeGenerator {
pub fn generate(&mut self) -> u64 {
let timestamp = current_timestamp();
if timestamp == self.last_timestamp {
self.sequence = (self.sequence + 1) & 0xFFF; // 12位
if self.sequence == 0 {
// 等待下一毫秒
while current_timestamp() == timestamp {}
}
} else {
self.sequence = 0;
}
self.last_timestamp = timestamp;
(timestamp << 22) // 41位时间戳
| (self.datacenter_id << 17) // 5位数据中心
| (self.worker_id << 12) // 5位工作节点
| self.sequence // 12位序列号
}
}
use std::sync::{Arc, atomic::{AtomicU64, Ordering}};
use std::time::Duration;
struct DistributedLock {
key: String,
ttl: Duration,
owner: u64,
}
impl DistributedLock {
// 基于 etcd 的分布式锁
pub async fn try_lock(&self, owner: u64, ttl: Duration) -> Result<bool, LockError> {
let response = etcd_client.put(
format!("/lock/{}", self.key),
owner.to_string(),
Some(PutOptions::new().with_ttl(ttl))
).await?;
// 如果是第一个设置者,获得锁
Ok(response.prev_key().is_none())
}
pub async fn unlock(&self, owner: u64) -> Result<(), LockError> {
// 只能由锁的持有者释放
let response = etcd_client.get(format!("/lock/{}", self.key)).await?;
if response.value() == owner.to_string() {
etcd_client.delete(format!("/lock/{}", self.key)).await?;
}
Ok(())
}
}
// 事件溯源模式
trait EventSourced {
type Event;
fn apply(&mut self, event: Self::Event);
fn snapshot(&self) -> Self;
}
struct Aggregate {
version: u64,
events: Vec<Event>,
state: AggregateState,
}
impl Aggregate {
pub fn new() -> Self {
Self {
version: 0,
events: Vec::new(),
state: AggregateState::Init,
}
}
pub fn apply_event(&mut self, event: Event) {
self.state.transition(&event);
self.events.push(event);
self.version += 1;
}
pub fn snapshot(&self) -> EventSourcedSnapshot {
EventSourcedSnapshot {
version: self.version,
state: self.state.clone(),
}
}
}
| 问题 | 原因 | 解决 |
|---|---|---|
| 脑裂 | 网络分区 | 法定人数、任期机制 |
| 活锁 | 选举超时冲突 | 随机化超时 |
| 数据不一致 | 并发写入 | 冲突解决策略 |
| 性能瓶颈 | 单点写入 | 分片、复制 |
rust-distributed
│
├─► rust-concurrency → 并发控制
├─► rust-performance → 性能优化
└─► rust-async → 异步通信
Actor model expert covering message passing, state isolation, supervision trees, deadlock prevention, fault tolerance, Actix framework, and Erlang-style concurrency patterns.
Rust anti-patterns and common mistakes expert. Handles code review issues with clone abuse, unwrap in production, String misuse, index loops, and refactoring guidance.
Advanced async patterns expert covering Stream implementation, zero-copy buffers, tokio::spawn lifetimes, plugin system scheduling, tonic streaming, and async lifetime management.
Advanced async patterns expert. Handles Stream processing, backpressure control, select/join operations, cancellation, Future trait implementation, and async runtime optimization.
Authentication and authorization expert covering JWT, API keys, OAuth, RBAC, password hashing, distributed token storage, and session management patterns.
Caching and distributed storage expert covering Redis, connection pools, TTL strategies, cache patterns (Cache-Aside, Write-Through), invalidation, and performance optimization.