6.824 Lab 4: Sharded Key/Value Service

​ 该部分要求我们实现ShardKV。在前面Lab中,我们实现一个构成基础分布式数据库的框架,支持多节点的数据一致性,CURD,日志快照。但在单一集群中,这样的实现会导致所有的请求都要由Leader来处理,当数据增长到一定程度,会导致请求响应时间变长。解决办法就是,将数据按某种方式分开存储到不同的集群,将不同的请求发送到不同的集群中

​ Lab还是偏难的,论文中也没有具体去写这一部分,很多需要参照网上资料去完成。

0 前言 & 要求

​ Lab4分为两部分:第一部分实现一个ShardCtrler,用于将Client请求定位到Group;第二部分实现一个ShardKV,从整体来看,它维护了许多raft组(Group),通过与ShardCtriler进行交互更新配置(Config),并且提供如Lab3实现的读写服务。难点在于实现时要考虑不同raft组之间分片迁移以及如何进行非阻塞式的配置更新,还有对删除分片进行清理,空日志检测。这部分直接看的Lab4-2B这位大佬的,感觉如果自己去想的话,估计得花上许久,先借鉴下别人的做法,后续有时间想到新的思路再回来修改。

1 实验分析

1.1 整体架构

ShardKV整体架构图(图片来自网络)

​ 可以看出,ShardCtrler作为一个控制器的角色,需要获得Client的请求Key,找到对应Shard,通过Shard再找到相应GroupGroup还会映射相应server,需要将这些信息返回给Client,才能发送请求到正确的集群中。

符号解释

Shard:互不相交且组成完整数据的每一个数据子集。例如以a开头的字母可构成一个Shard

Config:在某一阶段,Shard与集群的对应关系。里面维护了Shard->gidgid->server的信息。

1.2 服务端提供的功能

​ 读写服务:如Lab3实现的一样,应该要提供对数据PutGetAppend操作。不同的是,在一个Group中,会存在不同的分片,而我们的取值应该是在不同的分片数组进行,所以需要维护的是一组分片状态机

​ 配置更新:在2A中除Query以外的操作(Move,Join,Leave)都会使得生成新的配置,Server需要定期地拉取和提交新的配置

​ 分片迁移:Move操作会使得分片向其他的Group进行迁移,我们也需要定期的检测哪些分片需要进行迁移

​ 分片清理:Leave操作会删除某个Group,这样会导致有部分分片需要被回收,需要定期检测

​ 空日志检测:

1.3 Challenge

​ 实验要求提出有两个Challenge需要解决,才能供生产使用

  1. 如何处理分片删除的状态转移?。假设我们有两个组,G1 和 G2,并且有一个新配置 C 将分片 S 从 G1 移动到 G2。如果 G1 在转移到 C 时从其数据库中删除了 S 中的所有键,那么当 G2 试图转移到 C 时,如何获取 S 的数据?也就是说,我们应该要保证及时清理不再属于本分片的数据
  2. 如何处理配置更改期间的客户端请求?我们不可能每次进行配置更新都阻塞客户端操作,这显然是生产不可用的。并且要考虑,当如果G3进行配置更新时,需要来自G1的分片S1,G2的分片S2,要保证收到必要的分片就可以立即提供服务,而不是等待所有分片到达。也就是说,不仅要保证分片迁移时不影响未迁移分片的读写服务,还要不同的分片数据能够独立迁移

1.4 分片状态

​ 维护Shard的状态原因如下:

  1. 防止分片Shard中间状态被覆盖,从而导致任务被丢弃。只有当分片Shard的状态都为默认状态才允许拉取最新配置
  2. 由于challenge2要求不能阻塞客户端的操作,并且要保证配置更新和分片状态变化(状态变化意味着是否可提供读写服务)彼此独立。所以需要将不同raft Group的分片数据独立起来,分别提交多条日志来维护状态。

​ 每个分片有四种状态:

  • Serving:分片的默认状态,如果当前raft Group在当前config下负责管理此分片,则该分片可以提供读写服务,否则该分片暂不可以提供读写服务,但不会阻塞配置更新协程拉取新配置
  • Pulling:表示当前 raft Group 在当前 config 下负责管理此分片,暂不可以提供读写服务,需要当前 raft Group 从上一个配置该分片所属 raft Group 拉数据过来之后才可以提供读写服务,系统会有一个分片迁移协程检测所有分片的 Pulling 状态,接着以 raft Group为单位去对应 raft Group 拉取数据,接着尝试重放该分片的所有数据到本地并将分片状态置为 Serving,以继续提供服务。
  • BePulling:表示当前 raft Group 在当前 config 下不负责管理此分片,不可以提供读写服务,但当前 raft Group 在上一个 config 时复制管理此分片,因此当前 config 下负责管理此分片的 raft Group 拉取完数据后会向本 raft 组发送分片清理的 rpc,接着本 raft 组将数据清空并重置为 serving 状态即可。
  • GCing:表示当前 raft Group在当前 config 下负责管理此分片,可以提供读写服务,但需要清理掉上一个配置该分片所属 raft Group 的数据。系统会有一个分片清理协程检测所有分片的 GCing 状态,接着以 raft Group 为单位去对应 raft Group 删除数据,一旦远程 raft Group 删除数据成功,则本地会尝试将相关分片的状态置为 Serving

​ 还有一件事,如何考虑更新Shard状态的时机

​ 如果在 apply 协程更新配置的时候由 leader 异步启动对应的协程,让其独立的根据 raft 组为粒度拉取数据,这会出现以下问题:leader apply 了新配置后便挂了,然后此时 follower 也 apply 了该配置但并不会启动该任务,在该 raft 组的新 leader 选出来后,该任务已经无法被执行了。

​ 因此,不能在 apply 配置的时候启动异步任务,而是应该只更新 shard 的状态,由单独的协程去异步的执行分片迁移,分片清理等任务

1.5 日志类型

​ 同样的,在Lab3,我们封装了Get,Put,Append的请求指令,在KVServer中,我们又增添了一些操作,所以需要增加对应的请求类型,以用作raft层的日志存储。

  • Operation:客户端传来的读写操作日志,有 PutGetAppend 等请求。
  • Configuration:配置更新日志,包含一个配置。
  • InsertShards:分片更新日志,包含至少一个分片的数据和配置版本。
  • DeleteShards:分片删除日志,包含至少一个分片的 id 和配置版本。
  • EmptyEntry:空日志,Data 为空,使得状态机达到最新。

1.6 设计总览

image-20220519134739421.png

​ 该设计会发现和Lab3的大致设计一致,不同的是,多了4个协程来保证分片的状态转移。

幂等性

​ 这里幂等性指的是不同协程做的操作不会影响数据的正确性。

​ 考虑第一种情况,某个raft Group集体被kill并重启时,这时候配置可能都更新过好几次了,然后重启后raft Group首先需要重放所有日志(在raft层进行日志同步,应用到上层),在这个过程,由于分片迁移协程和apply协程是并行的,迁移协程可能检测到某个中间状态从而发起数据的迁移,这是如果对数据进行覆盖则会破坏正确性。所以,为每个分片迁移和清理请求带上一个配置版本,只有版本相同才可以覆盖,

​ 考虑第二种情况,当前raft Group更新了配置,迁移协程根据配置从其他raft Group拉取分片数据和去重表进行提交,系统依旧对外提供服务,迁移协程不会重复拉该分片的数据。然后这时整个raft Group被kill并重启时,重放日志到更新配置的日志时,迁移协程恰好捕捉到最新配置的中间状态,并再次向其他 raft 组拉数据和去重表并尝试提交,这样在 apply 该分片更新日志时两者版本一致,从而进行了直接覆盖。(如果在 raft 回放完毕和迁移协程拉到数据并提交日志中间该 raft 组又执行了该分片的写操作,那么直接覆盖就会导致这些改动丢失)。所以,版本比较还不够完备,需要判断分片的状态。

2 实验过程

2.1 ShardCtrler的四种操作

​ 整体实现和Lab3大同小异,由于Config包含信息较小,所以不需要snapshot。客户端会发送RPC进行一些操作,每当shard->gid的对应关系被改变时,都需要创建一个新Config来保存。ShardCtrler需要支持JoinLeaveMoveQuery四种操作。

RPC结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type CommandRequest struct {
Servers map[int][]string // For Join
GIDs []int // For Leave
Shard int // For Move
GID int // For Move
Num int // For Query
Op OperationOp
ClientId int64
CommandId int64
}

type CommandResponse struct {
Err Err
Config Config
}

Query

​ 查询指定版本的Config

1
2
3
4
5
6
func (cf *MemoryConfigStateMachine) Query(num int) (Config, Err) {
if num < 0 || num >= len(cf.Configs) {
return cf.Configs[len(cf.Configs)-1], OK
}
return cf.Configs[num], OK
}

Move

​ 将指定的Shard交由新的Group负责

1
2
3
4
5
6
7
8
9
10
11
12
func (cf *MemoryConfigStateMachine) Move(shard, gid int) Err {
length := len(cf.Configs)
lastConfig := cf.Configs[length-1]
newConfig := Config{
Num: length,
Shards: lastConfig.Shards,
Groups: deepCopy(lastConfig.Groups),
}
newConfig.Shards[shard] = gid
cf.Configs = append(cf.Configs, newConfig)
return OK
}

Join

​ 为当前Config添加一些server,这些server有可能是加入到现有Group中,也可能是新增的Group

​ 自平衡方案很简单:每次将shard数量最多的Group分配给shard数量最少的Group,直到差值小于等于1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (cf *MemoryConfigStateMachine) Join(groups map[int][]string) Err {
length := len(cf.Configs)
lastConfig := cf.Configs[length-1]
newConfig := Config{
Num: length,
Shards: lastConfig.Shards,
Groups: deepCopy(lastConfig.Groups),
}
// 为新Config加入之前没有的Group
for gid, servers := range groups {
if _, ok := newConfig.Groups[gid]; !ok {
newServers := make([]string, len(servers))
copy(newServers, servers)
newConfig.Groups[gid] = newServers
}
}
// 得到每个Group对应的Shards Map
s2g := Group2Shards(newConfig)
// 进行Shard平衡
for {
source, target := GetGIDWithMaximumShards(s2g), GetGIDWithMinimumShards(s2g)
if source != 0 && len(s2g[source])-len(s2g[target]) <= 1 {
break
}
s2g[target] = append(s2g[target], s2g[source][0])
s2g[source] = s2g[source][1:]
}
// 一个Group中会有可能包含许多Shard
var newShards [NShards]int
for gid, shards := range s2g {
for _, shard := range shards {
newShards[shard] = gid
}
}
newConfig.Shards = newShards
cf.Configs = append(cf.Configs, newConfig)
return OK
}

Leave

​ 删除Group。如果删除之后还有Group,则需要将Shrad均匀分配给所有Group

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (cf *MemoryConfigStateMachine) Leave(gIds []int) Err {
length := len(cf.Configs)
lastConfig := cf.Configs[length-1]
newConfig := Config{
Num: length,
Shards: lastConfig.Shards,
Groups: deepCopy(lastConfig.Groups),
}
s2g := Group2Shards(newConfig)
orphanShards := make([]int, 0)
// 删除Group,并将删除的shards保存在orphanShards中
for _, gid := range gIds {
if _, ok := newConfig.Groups[gid]; ok {
delete(newConfig.Groups, gid)
}
if shards, ok := s2g[gid]; ok {
orphanShards = append(orphanShards, shards...)
delete(s2g, gid)
}
}
var newShards [NShards]int
// 如果还有Group,则将删除Group中的shard分配到最少shard的Group
if len(newConfig.Groups) != 0 {
for _, shard := range orphanShards {
target := GetGIDWithMinimumShards(s2g)
s2g[target] = append(s2g[target], shard)
}
for gid, shards := range s2g {
for _, shard := range shards {
newShards[shard] = gid
}
}
}
newConfig.Shards = newShards
cf.Configs = append(cf.Configs, newConfig)
return OK
}

2.2 ShardKV客户端

客户端执行逻辑如下:

  1. 使用key2shard()ShardCtrler找到一个key对应哪个分片shard
  2. 根据shard从当前配置获取到gid
  3. 根据gid获取到servers信息
  4. 循环查找到leaderid,直到返回请求成功、ErrWrongGroup或整个 group 都遍历请求过
  5. Query 最新的配置,回到步骤1循环重复;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (ck *Clerk) Command(request *CommandRequest) string {
request.ClientId, request.CommandId = ck.clientId, ck.commandId
for {
shard := key2shard(request.Key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
if _, ok = ck.leaderIds[gid]; !ok {
ck.leaderIds[gid] = 0
}
oldLeaderId := ck.leaderIds[gid]
newLeaderId := oldLeaderId
for {
var response CommandResponse
ok := ck.make_end(servers[newLeaderId]).Call("ShardKV.Command", request, &response)
if ok && (response.Err == OK || response.Err == ErrNoKey) {
ck.commandId ++
return response.Value
} else if ok && response.Err == ErrWrongGroup {
break
} else {
newLeaderId = (newLeaderId + 1) % len(servers) // 找Leader
if newLeaderId == oldLeaderId {
break
}
continue
}
}
}
time.Sleep(100 * time.Millisecond)
ck.config = ck.sm.Query(-1)
}
}

2.3 ShardKV服务端

​ 这部分是最难的,首先明确系统整体的交互方式:

  • 服务需要创建多个raft Group来承载所有分片的读写任务
  • 服务端需要完成配置更新分片迁移分片清理空日志检测等功能

2.3.1 结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
type ShardKV struct {
mu sync.RWMutex
dead int32
rf *raft.Raft
applyCh chan raft.ApplyMsg

makeEnd func(string) *labrpc.ClientEnd
gid int
sc *shardctrler.Clerk

maxRaftState int // 执行快照的阈值
lastApplied int

// 相对于单集群多出的字段
lastConfig shardctrler.Config
currentConfig shardctrler.Config

stateMachines map[int]*Shard // server数据存储
lastOperations map[int64]OperationContext // 通过记录最后一个commandId和clientId对应的response来判断log是否重复
notifyChans map[int]chan *CommandResponse // 记录每个kv操作的返回值
}

// 用于提交日志
func (kv *ShardKV) applier() {
for kv.killed() == false {
select {
case message := <-kv.applyCh:
if message.CommandValid {
kv.mu.Lock()
if message.CommandIndex <= kv.lastApplied {
kv.mu.Unlock()
continue
}
kv.lastApplied = message.CommandIndex

var response *CommandResponse
command := message.Command.(Command)
switch command.Op {
case Operation:
operation := command.Data.(CommandRequest)
response = kv.applyOperation(&message, &operation)
case Configuration:
nextConfig := command.Data.(shardctrler.Config)
response = kv.applyConfiguration(&nextConfig)
case InsertShards:
shardsInfo := command.Data.(ShardOperationResponse)
response = kv.applyInsertShards(&shardsInfo)
case DeleteShards:
shardsInfo := command.Data.(ShardOperationRequest)
response = kv.applyDeleteShards(&shardsInfo)
case EmptyEntry:
response = kv.applyEmptyEntry()
}

// 仅当节点为领导者时才通知 currentTerm 日志的相关通道
if currentTerm, isLeader := kv.rf.GetState(); isLeader && message.CommandTerm == currentTerm {
ch := kv.getNotifyChan(message.CommandIndex)
ch <- response
}

needSnapshot := kv.needSnapshot()
if needSnapshot {
kv.takeSnapshot(message.CommandIndex)
}

kv.mu.Unlock()
} else if message.SnapshotValid {
kv.mu.Lock()
if kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot) {
kv.restoreSnapshot(message.Snapshot)
kv.lastApplied = message.SnapshotIndex
}
kv.mu.Unlock()
} else {
panic(fmt.Sprintf("unexpected Message %v", message))
}
}
}
}

2.3.2 读写服务

​ 根据1.4阐述,只有当分片状态为ServingGCing,当前raft Group在当前config下负责管理此分片,本raft Group才可以为该分片提供读写服务,否则返回 ErrWrongGroup 让客户端重新拉取最新的 config 并重试即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (kv *ShardKV) Command(request *CommandRequest, response *CommandResponse) {
kv.mu.RLock()
// 重复请求直接返回结果,不参与raft层
if request.Op != Get && kv.isDuplicateRequest(request.ClientId, request.CommandId) {
lastResponse := kv.lastOperations[request.ClientId].LastResponse
response.Value, response.Err = lastResponse.Value, lastResponse.Err
kv.mu.RUnlock()
return
}

// 直接返回 ErrWrongGroup 让客户端获取最新配置,如果此分片当前无法提供此key,则执行重试
if !kv.canServe(key2shard(request.Key)){
response.Err = ErrWrongGroup
kv.mu.RUnlock()
return
}
kv.mu.RUnlock()
kv.Execute(NewOperationCommand(request), response)
}

func (kv *ShardKV) Execute(command Command, response *CommandResponse) {
// 不持有锁以提高吞吐量
// 当 KVServer 持有锁进行快照时,底层 raft 仍然可以提交 raft 日志
index, _, isLeader := kv.rf.Start(command)
if !isLeader {
response.Err = ErrWrongLeader
return
}
kv.mu.Lock()
ch := kv.getNotifyChan(index)
kv.mu.Unlock()
select {
case result := <-ch:
response.Value, response.Err = result.Value, result.Err
case <-time.After(ExecuteTimeout):
response.Err = ErrTimeout
}

// 释放 notifyChan 以减少内存占用
// 为什么是异步的? 为了提高吞吐量,这里不需要阻塞客户端请求
go func() {
kv.mu.Lock()
kv.removeOutdatedNotifyChan(index)
kv.mu.Unlock()
}()
}

func (kv *ShardKV) applyOperation(message *raft.ApplyMsg, operation *CommandRequest) *CommandResponse {
var response *CommandResponse
shardID := key2shard(operation.Key)
if kv.canServe(shardID) {
if operation.Op != Get && kv.isDuplicateRequest(operation.ClientId, operation.CommandId) {
return kv.lastOperations[operation.ClientId].LastResponse
} else {
response = kv.applyLogToStateMachine(operation, shardID)
if operation.Op != Get {
kv.lastOperations[operation.ClientId] = OperationContext{operation.CommandId, response}
}
return response
}
}
return &CommandResponse{
Err: ErrWrongGroup,
Value: "",
}
}

func (kv *ShardKV) canServe(shardID int) bool {
return kv.currentConfig.Shards[shardID] == kv.gid && (kv.stateMachines[shardID].Status == Serving || kv.stateMachines[shardID].Status == GCing)
}

2.3.3 配置更新

​ 配置更新协程负责定时检测所有分片的状态,一旦存在至少一个分片的状态不为默认状态,则预示其他协程仍然还没有完成任务,那么此时需要阻塞新配置的拉取和提交。

apply 配置更新日志时需要保证幂等性:

  • 不同版本的配置更新日志:apply 时仅可逐步递增的去更新配置,否则返回ErrOutDated
  • 相同版本的配置更新日志:由于配置更新日志仅由配置更新协程提交,而配置更新协程只有检测到比本地更大地配置时才会提交配置更新日志,所以该情形不会出现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (kv *ShardKV) configureAction() {
canPerformNextConfig := true
kv.mu.RLock()
for _, shard := range kv.stateMachines {
if shard.Status != Serving {
canPerformNextConfig = false
break
}
}
currentConfigNum := kv.currentConfig.Num
kv.mu.RUnlock()
if canPerformNextConfig {
nextConfig := kv.sc.Query(currentConfigNum + 1)
if nextConfig.Num == currentConfigNum + 1 {
kv.Execute(NewConfigurationCommand(&nextConfig), &CommandResponse{})
}
}
}

func (kv *ShardKV) applyConfiguration(nextConfig *shardctrler.Config) *CommandResponse {
if nextConfig.Num == kv.currentConfig.Num+1 {
kv.updateShardStatus(nextConfig)
kv.lastConfig = kv.currentConfig
kv.currentConfig = *nextConfig
return &CommandResponse{OK, ""}
}
return &CommandResponse{ErrOutDated, ""}
}

2.3.4 分片迁移

​ 分片迁移协程会定时的检测分片的Pulling状态,利用lastConfig计算出相应的servers和要拉取的分片,然后并行的去拉取数据。在该Action中,需要使用waitGroup来保证独立地任务完成后才会进行下一次任务,并且,wg.Wait一定要在释放锁之后,否则就会造成分片串行迁移,不能满足要求。

​ 在执行拉取分片的Handler中,首先要保证是由Leader处理该请求,其次如果发现请求的Config大于当前版本,那就说明拉取的是未来的数据,则返回ErrNotReady让其稍后重试,否则将分片数据和去重表都深拷贝到response即可

​ 在apply分片迁移日志时需要保证幂等性:

  • 不同版本的配置更新日志:仅可执行与当前配置版本相同的分片更新日志,否则返回 ErrOutDated
  • 相同版本的配置更新日志:仅在对应分片状态为 Pulling 时为第一次应用,此时覆盖状态机即可并修改状态为 GCing,以让分片清理协程检测到 GCing 状态并尝试删除远端的分片。否则说明已经应用过,直接 break 即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (kv *ShardKV) migrationAction() {
kv.mu.RLock()
gid2shardIDs := kv.getShardIDsByStatus(Pulling)
var wg sync.WaitGroup
for gid, shardIDs := range gid2shardIDs {
wg.Add(1)
go func(servers []string, configNum int, shardIDs []int) {
defer wg.Done()
pullTaskRequest := ShardOperationRequest{configNum, shardIDs}
for _, server := range servers {
var pullTaskResponse ShardOperationResponse
srv := kv.makeEnd(server)
if srv.Call("ShardKV.GetShardsData", &pullTaskRequest, &pullTaskResponse) && pullTaskResponse.Err == OK {
kv.Execute(NewInsertShardsCommand(&pullTaskResponse), &CommandResponse{})
}
}
}(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIDs)
}
kv.mu.RUnlock()
wg.Wait()
}

func (kv *ShardKV) GetShardsData(request *ShardOperationRequest, response *ShardOperationResponse) {
if _, isLeader := kv.rf.GetState(); !isLeader {
response.Err = ErrWrongLeader
return
}
kv.mu.RLock()
defer kv.mu.RUnlock()

if kv.currentConfig.Num < request.ConfigNum { // 拉取的版本过高
response.Err = ErrNotReady
return
}

response.Shards = make(map[int]map[string]string)
for _, shardID := range request.ShardIDs {
response.Shards[shardID] = kv.stateMachines[shardID].deepCopy()
}

response.LastOperations = make(map[int64]OperationContext)
for clientID, operation := range kv.lastOperations {
response.LastOperations[clientID] = operation.deepCopy()
}

response.ConfigNum, response.Err = request.ConfigNum, OK
}s

func (kv *ShardKV) applyInsertShards(shardsInfo *ShardOperationResponse) *CommandResponse {
if shardsInfo.ConfigNum == kv.currentConfig.Num {
for shardId, shardData := range shardsInfo.Shards {
shard := kv.stateMachines[shardId]
if shard.Status == Pulling {
for key, value := range shardData {
shard.KV[key] = value
}
shard.Status = GCing
} else {
break
}
}
for clientId, operationContext := range shardsInfo.LastOperations {
if lastOperation, ok := kv.lastOperations[clientId]; !ok || lastOperation.MaxAppliedCommandId < operationContext.MaxAppliedCommandId {
kv.lastOperations[clientId] = operationContext
}
}
return &CommandResponse{OK, ""}
}
return &CommandResponse{OK, ""}
}

2.3.5 分片清理

​ 分片清理协程会定时的检测分片的GCing状态,利用lastConfig计算出相应的servers和要拉取的分片,然后并行的去清理分片。

​ 在清理分片的handler中,首先仅由Leader处理该请求,其次如果发现请求中的配置版本小于本地的版本,那说明该请求已经执行过,此时直接返回 OK 即可,否则在本地提交一个删除分片的日志。

​ 在 apply 分片清理日志时需要保证幂等性:

  • 不同版本的配置更新日志:仅可执行与当前配置版本相同地分片删除日志,否则已经删除过,直接返回 OK 即可。
  • 相同版本的配置更新日志:如果分片状态为 GCing,说明是本 raft Group 已成功删除远端 raft Group 的数据,现需要更新分片状态为默认状态以支持配置的进一步更新;否则如果分片状态为 BePulling,则说明本 raft Group 第一次删除该分片的数据,此时直接重置分片即可。否则说明该请求已经应用过,直接 break 返回 OK 即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (kv *ShardKV) gcAction() {
kv.mu.RLock()
gid2shardIDs := kv.getShardIDsByStatus(GCing)
var wg sync.WaitGroup
for gid, shardIDs := range gid2shardIDs {
wg.Add(1)
go func(servers []string, configNum int, shardIDs []int) {
defer wg.Done()
gcTaskRequest := ShardOperationRequest{configNum, shardIDs}
for _, server := range servers {
var gcTaskResponse ShardOperationResponse
srv := kv.makeEnd(server)
if srv.Call("ShardKV.DeleteShardsData", &gcTaskRequest, &gcTaskResponse) && gcTaskResponse.Err == OK {
kv.Execute(NewDeleteShardsCommand(&gcTaskRequest), &CommandResponse{})
}
}
}(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIDs)
}
kv.mu.RUnlock()
wg.Wait()
}

func (kv *ShardKV) DeleteShardsData(request *ShardOperationRequest, response *ShardOperationResponse) {
// only delete shards when role is leader
if _, isLeader := kv.rf.GetState(); !isLeader {
response.Err = ErrWrongLeader
return
}

kv.mu.RLock()
if kv.currentConfig.Num > request.ConfigNum {
response.Err = OK
kv.mu.RUnlock()
return
}
kv.mu.RUnlock()

var commandResponse CommandResponse
kv.Execute(NewDeleteShardsCommand(request), &commandResponse)

response.Err = commandResponse.Err
}

func (kv *ShardKV) applyDeleteShards(shardsInfo *ShardOperationRequest) *CommandResponse {
if shardsInfo.ConfigNum == kv.currentConfig.Num {
for _, shardId := range shardsInfo.ShardIDs {
shard := kv.stateMachines[shardId]
if shard.Status == GCing {
shard.Status = Serving
} else if shard.Status == BePulling {
kv.stateMachines[shardId] = NewShard()
} else {
break
}
}
return &CommandResponse{OK, ""}
}
return &CommandResponse{OK, ""}
}

2.3.6 空日志检测

​ 空日志检测协程会定时检测raft 层的 leader 是否拥有当前 term 的日志,如果没有则提交一条空日志,这使得新 leader 的状态机能够迅速达到最新状态,从而避免多 raft 组间的活锁状态。

1
2
3
4
5
6
7
8
9
func (kv *ShardKV) checkEntryInCurrentTermAction() {
if !kv.rf.HasLogInCurrentTerm() {
kv.Execute(NewEmptyEntryCommand(), &CommandResponse{})
}
}

func (kv *ShardKV) applyEmptyEntry() *CommandResponse {
return &CommandResponse{OK, ""}
}

3 总结

​ 对于这种mutil-raft架构,难点在于如何既对内保证分片之间状态变化,又对外保证客户端请求的正确响应。对外提供读写服务,侧重于维持每个请求的线性一致性,也就是服务端要保存客户端操作的结果,使用其跳过重复的请求。对内,开启一个协程不断接收已经同步完成的Command,然后运用到状态机中,除此之外,还需要开启配置更新,分片迁移,分片清理,空日志检测四个协程定时处理相关分片,以此维持系统安全。

参考

mit-6.824 2021 Lab4:ShardKV

MIT6.824-2021/lab4


6.824 Lab 4: Sharded Key/Value Service
https://2w1nd.github.io/2022/05/18/MIT6.824/6-824-Lab4:Sharded-Key-Value-Service/
作者
w1nd
发布于
2022年5月18日
许可协议