6.824-lab3:Fault-tolerant Key/Value Service

​ 在Lab3中,需要使用前面实现好的Raft来构建一个K/V数据库。客户端(client)和服务端(server)进行交互。整体难度肯定比之前Raft低多了。服务器就相当于Raft中的每个节点,

0 前言 & 要求

​ 该Lab3分为两部分:第一部分需要实现一个支持GETPUTAPPEND的K/V数据库,这不需要考虑进行日志压缩/快照。而在第二部分则需要开始考虑进行日志压缩。实验代码在src/kvraft中。

1 实验分析

1.1 总览图

总览图

​ KVServer(server.go)是State Machine,每个KVServer对应一个Raft peer node,KVServer之间只通过Raft 服务来达成共识,不会直接交互。 在实现时,KVServer需要知道每个请求的客户端以及请求状态。所以使用nRand为每个客户端生成一个唯一的ID,并且每个请求需要有一个唯一ID,保证请求唯一性。

交互流程图

​ 用文字说明下收到一次请求的流程:

  1. Server收到客户端的Command
  2. 启动StartCommand提交给Leader,然后开启一个resultChannel等待返回的Response
  3. 底层共识达成后,Raft会将这个Command进行Apply,在Server中会开启一个协程Applier监听Apply Channel中的Meesage
  4. 一旦收到,则message中的command执行(get全部执行,put,append则需要判重)
  5. 然后,将构造response传入resultChannel
  6. 返回结果给Client

1.2 将请求路由到Leader

​ 因为由于一开始Client无法保证百分百就与Follower进行通信,所以需要一种方法得到Leader的位置。论文中提出了两种方法:Follwer拒绝请求并返回Leader的ID;使用Follwer进行请求代理。为了实现简单,我采用了轮询的方法。

1.3 Duplicate Request的处理

​ 在论文中称为实现线性化语义,简而言之就是过滤掉重复的请求。核心思想就是:由服务端保存客户端每次操作的结果,并使用它们跳过重复的请求。也就是说,维护一个preOperations记录每次执行成功的操作,调用HandleCommand时进行去重即可。需要注意的是,对于GET这种幂等操作,不对它进行防重。

​ (线性一致性:来自客户端的多个请求,可以按照一个顺序进行排列,且排列顺序与客户端请求的实际时间相符合)

1
2
3
4
5
6
7
8
func (kv *KVServer) isDuplicateRequest(clientId int64, commandId int64) bool {
if context, ok := kv.preOperations[clientId]; ok {
if context.MaxAppliedCommandId >= commandId {
return true
}
}
return false
}

1.4 日志快照

​ 这里主要考虑的两个问题是:

  • 快照什么时候被创建
  • 快照需要保存什么

​ 对于问题一,在实验说明中已经说明,创建一个kvserver的时候会传入一个maxraftstate,表示Raft状态日志最大允许大小,通过使用persister.RaftStateSize() 进行比较,如果超过,则建立快照。

​ 对于问题二,在hint中说了,kvserver在建立checkpoints的时候还是要能够检测哪些重复的请求,也就是说要将之前的请求preOperations保存到快照中;还有,需要保存MemoryKV,下次启动进行恢复

2 实验过程

2.1 Part 1

客户端

​ 原本的代码框架将putAndAppendGET分成两种RPC,我参照别人文档,感觉一个Command RPC比较简洁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Clerk struct {
servers []*labrpc.ClientEnd
leaderId int64
clientId int64 // 客户端ID
commandId int64 // 该client发送的请求的序列号
}

func (ck *Clerk) Command(request *CommandRequest) string {
request.ClientId, request.CommandId = ck.clientId, ck.commandId
for {
var response CommandResponse
if !ck.servers[ck.leaderId].Call("KVServer.HandleCommand", request, &response) || response.Err == ErrWrongLeader || response.Err == ErrTimeout {
ck.leaderId = (ck.leaderId + 1) % int64(len(ck.servers))
continue
}
ck.commandId++
return response.Value
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
type KVServer struct {
mu sync.RWMutex
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32

maxRaftState int // 需要保存快照的阈值,当persister.RaftStateSize()大于时,进行snapshots
lastApplied int // 记录当前server处理的最大apply请求,防止由于旧请求影响

resultChans map[int]chan *CommandResponse // 记录每个kv操作的返回值
preOperations map[int64]OperationContext // 记录每个clerk之前请求,防重
stateMachine KVStateMachine
}

处理模型

​ 客户端请求到来后,将日志放到raft层进行同步并且注册一个channel去阻塞等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (kv *KVServer) HandleCommand(request *CommandRequest, response *CommandResponse) {
kv.mu.RLock()
if request.Op != GET && kv.isDuplicateRequest(request.ClientId, request.CommandId) { // 去重
lastResponse := kv.preOperations[request.ClientId].LastResponse
response.Value, response.Err = lastResponse.Value, lastResponse.Err
kv.mu.RUnlock()
return
}
kv.mu.RUnlock()

index, _, isLeader := kv.rf.Start(Command{request})
if !isLeader {
response.Err = ErrWrongLeader
return
}

kv.mu.Lock()
ch := kv.getResultChan(index)
kv.mu.Unlock()

DPrintf("请求节点:%v, 请求:%v", request.ClientId, request)

select {
case result := <-ch:
response.Value, response.Err = result.Value, result.Err
case <-time.After(ExecuteTimeout): // 不能无限的阻塞,需要加入超时取消调用
response.Err = ErrTimeout
}

go func() {
kv.mu.Lock()
delete(kv.resultChans, index)
kv.mu.Unlock()
}()
}

​ 然后开启一个applier协程监控applyCh,得到已经commit的日志后,将其apply到状态机中,根据command index得到对应的channel,最后将状态机执行的结果 push 到 channel 中,客户端得以解除阻塞并回复结果给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// applier
// @Description: 将提交的日志构造为响应传入通道
// @receiver: kv
func (kv *KVServer) applier() {
for kv.killed() == false {
select {
case message := <-kv.applyCh:
if message.CommandValid {
kv.mu.Lock()
if message.CommandIndex <= kv.lastApplied {
kv.mu.Unlock()
continue
}
kv.lastApplied = message.CommandIndex

var response *CommandResponse
command := message.Command.(Command)
if command.Op != GET && kv.isDuplicateRequest(command.ClientId, command.CommandId) {
response = kv.preOperations[command.ClientId].LastResponse
} else {
response = kv.applyLogToStateMachine(command)
if command.Op != GET {
kv.preOperations[command.ClientId] = OperationContext{command.CommandId, response}
}
}
DPrintf("{节点 %v} 构造响应: %v 命令操作:%v", kv.rf.GetMe(), response, command.Op)
if currentTerm, isLeader := kv.rf.GetState(); isLeader && message.CommandTerm == currentTerm {
ch := kv.getResultChan(message.CommandIndex)
ch <- response
}

if ok := kv.needSnapshot(); ok {
kv.takeSnapshot(message.CommandIndex)
}

kv.mu.Unlock()
} else if message.SnapshotValid {
} else {
DPrintf("错误的message:%v", message)
}
}
}
}

2.2 Part 2

​ 通过引入快照,需要在applier中对每次保存数据大小进行判断,如果超过了maxraftstate,则需要建立快照,然后调用Snapshot将过期的log entry删除,接着底层的raft将snapshot写入到applyCh中。

​ 在applier还需要判断收到是否是快照,如果是,则调用CondInstallSnapshot来判断该快照能否应用,可以则调用restoreSnapshot恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// takeSnapshot
// @Description: 执行快照
// @receiver: kv
// @param: index
func (kv *KVServer) takeSnapshot(index int) {
buffer := new(bytes.Buffer)
encode := labgob.NewEncoder(buffer)
encode.Encode(kv.stateMachine)
encode.Encode(kv.preOperations)
kv.rf.Snapshot(index, buffer.Bytes())
}

// restoreSnapshot
// @Description: 恢复快照
// @receiver: kv
// @param: snapshot
func (kv *KVServer) restoreSnapshot(snapshot []byte) {
if snapshot == nil || len(snapshot) == 0 {
return
}
buffer := bytes.NewBuffer(snapshot)
decoder := labgob.NewDecoder(buffer)
var stateMachine MemoryKV
var preOperations map[int64]OperationContext
if decoder.Decode(&stateMachine) != nil || decoder.Decode(&preOperations) != nil {
DPrintf("{节点 %v} 恢复快照错误", kv.rf.GetMe())
}
kv.stateMachine = &stateMachine
kv.preOperations = preOperations
}

3 总结

​ 该部分难度并不高,其实就是实现一个raft层上面的存储层,唯一值得一提的是为了保证线性一致性,我们需要保证写请求是不能重复的,所以需要为每个客户端维护之前的操作ID,以此用来排重。


6.824-lab3:Fault-tolerant Key/Value Service
https://2w1nd.github.io/2022/05/07/MIT6.824/6-824-lab3:Fault-tolerant-Key-Value-Service/
作者
w1nd
发布于
2022年5月7日
许可协议