在Lab3中,需要使用前面实现好的Raft
来构建一个K/V数据库。客户端(client
)和服务端(server
)进行交互。整体难度肯定比之前Raft
低多了。服务器就相当于Raft
中的每个节点,
0 前言 & 要求
该Lab3分为两部分:第一部分需要实现一个支持GET
,PUT
,APPEND
的K/V数据库,这不需要考虑进行日志压缩/快照。而在第二部分则需要开始考虑进行日志压缩。实验代码在src/kvraft
中。
1 实验分析
1.1 总览图
KVServer(server.go)是State Machine,每个KVServer对应一个Raft peer node,KVServer之间只通过Raft 服务来达成共识,不会直接交互。 在实现时,KVServer需要知道每个请求的客户端以及请求状态。所以使用nRand
为每个客户端生成一个唯一的ID,并且每个请求需要有一个唯一ID,保证请求唯一性。
用文字说明下收到一次请求的流程:
- Server收到客户端的
Command
- 启动
Start
将Command
提交给Leader,然后开启一个resultChannel
等待返回的Response
- 底层共识达成后,Raft会将这个
Command
进行Apply
,在Server中会开启一个协程Applier
监听Apply Channel
中的Meesage
- 一旦收到,则
message
中的command
执行(get全部执行,put,append则需要判重)
- 然后,将构造
response
传入resultChannel
中
- 返回结果给
Client
1.2 将请求路由到Leader
因为由于一开始Client无法保证百分百就与Follower进行通信,所以需要一种方法得到Leader的位置。论文中提出了两种方法:Follwer拒绝请求并返回Leader的ID;使用Follwer进行请求代理。为了实现简单,我采用了轮询的方法。
1.3 Duplicate Request的处理
在论文中称为实现线性化语义,简而言之就是过滤掉重复的请求。核心思想就是:由服务端保存客户端每次操作的结果,并使用它们跳过重复的请求。也就是说,维护一个preOperations
记录每次执行成功的操作,调用HandleCommand
时进行去重即可。需要注意的是,对于GET
这种幂等操作,不对它进行防重。
(线性一致性:来自客户端的多个请求,可以按照一个顺序进行排列,且排列顺序与客户端请求的实际时间相符合)
1 2 3 4 5 6 7 8
| func (kv *KVServer) isDuplicateRequest(clientId int64, commandId int64) bool { if context, ok := kv.preOperations[clientId]; ok { if context.MaxAppliedCommandId >= commandId { return true } } return false }
|
1.4 日志快照
这里主要考虑的两个问题是:
对于问题一,在实验说明中已经说明,创建一个kvserver的时候会传入一个maxraftstate
,表示Raft状态日志最大允许大小,通过使用persister.RaftStateSize()
进行比较,如果超过,则建立快照。
对于问题二,在hint中说了,kvserver在建立checkpoints的时候还是要能够检测哪些重复的请求,也就是说要将之前的请求preOperations
保存到快照中;还有,需要保存MemoryKV
,下次启动进行恢复
2 实验过程
2.1 Part 1
客户端
原本的代码框架将putAndAppend
和GET
分成两种RPC,我参照别人文档,感觉一个Command RPC
比较简洁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| type Clerk struct { servers []*labrpc.ClientEnd leaderId int64 clientId int64 commandId int64 }
func (ck *Clerk) Command(request *CommandRequest) string { request.ClientId, request.CommandId = ck.clientId, ck.commandId for { var response CommandResponse if !ck.servers[ck.leaderId].Call("KVServer.HandleCommand", request, &response) || response.Err == ErrWrongLeader || response.Err == ErrTimeout { ck.leaderId = (ck.leaderId + 1) % int64(len(ck.servers)) continue } ck.commandId++ return response.Value } }
|
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13
| type KVServer struct { mu sync.RWMutex rf *raft.Raft applyCh chan raft.ApplyMsg dead int32
maxRaftState int lastApplied int
resultChans map[int]chan *CommandResponse preOperations map[int64]OperationContext stateMachine KVStateMachine }
|
处理模型
客户端请求到来后,将日志放到raft层进行同步并且注册一个channel去阻塞等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func (kv *KVServer) HandleCommand(request *CommandRequest, response *CommandResponse) { kv.mu.RLock() if request.Op != GET && kv.isDuplicateRequest(request.ClientId, request.CommandId) { lastResponse := kv.preOperations[request.ClientId].LastResponse response.Value, response.Err = lastResponse.Value, lastResponse.Err kv.mu.RUnlock() return } kv.mu.RUnlock()
index, _, isLeader := kv.rf.Start(Command{request}) if !isLeader { response.Err = ErrWrongLeader return }
kv.mu.Lock() ch := kv.getResultChan(index) kv.mu.Unlock()
DPrintf("请求节点:%v, 请求:%v", request.ClientId, request)
select { case result := <-ch: response.Value, response.Err = result.Value, result.Err case <-time.After(ExecuteTimeout): response.Err = ErrTimeout }
go func() { kv.mu.Lock() delete(kv.resultChans, index) kv.mu.Unlock() }() }
|
然后开启一个applier
协程监控applyCh
,得到已经commit的日志后,将其apply
到状态机中,根据command index
得到对应的channel
,最后将状态机执行的结果 push 到 channel 中,客户端得以解除阻塞并回复结果给客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
func (kv *KVServer) applier() { for kv.killed() == false { select { case message := <-kv.applyCh: if message.CommandValid { kv.mu.Lock() if message.CommandIndex <= kv.lastApplied { kv.mu.Unlock() continue } kv.lastApplied = message.CommandIndex
var response *CommandResponse command := message.Command.(Command) if command.Op != GET && kv.isDuplicateRequest(command.ClientId, command.CommandId) { response = kv.preOperations[command.ClientId].LastResponse } else { response = kv.applyLogToStateMachine(command) if command.Op != GET { kv.preOperations[command.ClientId] = OperationContext{command.CommandId, response} } } DPrintf("{节点 %v} 构造响应: %v 命令操作:%v", kv.rf.GetMe(), response, command.Op) if currentTerm, isLeader := kv.rf.GetState(); isLeader && message.CommandTerm == currentTerm { ch := kv.getResultChan(message.CommandIndex) ch <- response }
if ok := kv.needSnapshot(); ok { kv.takeSnapshot(message.CommandIndex) }
kv.mu.Unlock() } else if message.SnapshotValid { } else { DPrintf("错误的message:%v", message) } } } }
|
2.2 Part 2
通过引入快照,需要在applier
中对每次保存数据大小进行判断,如果超过了maxraftstate
,则需要建立快照,然后调用Snapshot
将过期的log entry
删除,接着底层的raft将snapshot写入到applyCh中。
在applier
还需要判断收到是否是快照,如果是,则调用CondInstallSnapshot
来判断该快照能否应用,可以则调用restoreSnapshot
恢复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
func (kv *KVServer) takeSnapshot(index int) { buffer := new(bytes.Buffer) encode := labgob.NewEncoder(buffer) encode.Encode(kv.stateMachine) encode.Encode(kv.preOperations) kv.rf.Snapshot(index, buffer.Bytes()) }
func (kv *KVServer) restoreSnapshot(snapshot []byte) { if snapshot == nil || len(snapshot) == 0 { return } buffer := bytes.NewBuffer(snapshot) decoder := labgob.NewDecoder(buffer) var stateMachine MemoryKV var preOperations map[int64]OperationContext if decoder.Decode(&stateMachine) != nil || decoder.Decode(&preOperations) != nil { DPrintf("{节点 %v} 恢复快照错误", kv.rf.GetMe()) } kv.stateMachine = &stateMachine kv.preOperations = preOperations }
|
3 总结
该部分难度并不高,其实就是实现一个raft层上面的存储层,唯一值得一提的是为了保证线性一致性,我们需要保证写请求是不能重复的,所以需要为每个客户端维护之前的操作ID,以此用来排重。