该部分要求我们实现ShardKV。在前面Lab中,我们实现一个构成基础分布式数据库的框架,支持多节点的数据一致性,CURD,日志快照。但在单一集群中,这样的实现会导致所有的请求都要由Leader来处理,当数据增长到一定程度,会导致请求响应时间变长。解决办法就是,将数据按某种方式分开存储到不同的集群,将不同的请求发送到不同的集群中 。
Lab还是偏难的,论文中也没有具体去写这一部分,很多需要参照网上资料去完成。
0 前言 & 要求 Lab4分为两部分:第一部分实现一个ShardCtrler
,用于将Client请求定位到Group;第二部分实现一个ShardKV
,从整体来看,它维护了许多raft组(Group),通过与ShardCtriler进行交互更新配置(Config),并且提供如Lab3实现的读写服务。难点在于实现时要考虑不同raft组之间分片迁移以及如何进行非阻塞式的配置更新,还有对删除分片进行清理,空日志检测。这部分直接看的Lab4-2B 这位大佬的,感觉如果自己去想的话,估计得花上许久,先借鉴抄下别人的做法,后续有时间想到新的思路再回来修改。
1 实验分析 1.1 整体架构
可以看出,ShardCtrler
作为一个控制器的角色,需要获得Client的请求Key
,找到对应Shard
,通过Shard
再找到相应Group
,Group
还会映射相应server
,需要将这些信息返回给Client,才能发送请求到正确的集群中。
符号解释
Shard
:互不相交且组成完整数据的每一个数据子集。例如以a
开头的字母可构成一个Shard
Config
:在某一阶段,Shard
与集群的对应关系。里面维护了Shard->gid
,gid->server
的信息。
1.2 服务端提供的功能 读写服务:如Lab3实现的一样,应该要提供对数据Put
,Get
,Append
操作。不同的是,在一个Group中,会存在不同的分片,而我们的取值应该是在不同的分片数组进行,所以需要维护的是一组分片状态机
配置更新:在2A中除Query以外的操作(Move,Join,Leave)都会使得生成新的配置,Server需要定期地拉取和提交新的配置
分片迁移:Move操作会使得分片向其他的Group进行迁移,我们也需要定期的检测哪些分片需要进行迁移
分片清理:Leave操作会删除某个Group,这样会导致有部分分片需要被回收,需要定期检测
空日志检测:
1.3 Challenge 实验要求提出有两个Challenge需要解决,才能供生产使用
如何处理分片删除的状态转移? 。假设我们有两个组,G1 和 G2,并且有一个新配置 C 将分片 S 从 G1 移动到 G2。如果 G1 在转移到 C 时从其数据库中删除了 S 中的所有键,那么当 G2 试图转移到 C 时,如何获取 S 的数据?也就是说,我们应该要保证及时清理不再属于本分片的数据
如何处理配置更改期间的客户端请求? 我们不可能每次进行配置更新都阻塞客户端操作,这显然是生产不可用的。并且要考虑,当如果G3进行配置更新时,需要来自G1的分片S1,G2的分片S2,要保证收到必要的分片就可以立即提供服务,而不是等待所有分片到达。也就是说,不仅要保证分片迁移时不影响未迁移分片的读写服务,还要不同的分片数据能够独立迁移
1.4 分片状态 维护Shard的状态原因如下:
防止分片Shard
中间状态被覆盖,从而导致任务被丢弃。只有当分片Shard
的状态都为默认状态才允许拉取最新配置
由于challenge2要求不能阻塞客户端的操作,并且要保证配置更新和分片状态变化(状态变化意味着是否可提供读写服务)彼此独立。所以需要将不同raft Group
的分片数据独立起来,分别提交多条日志来维护状态。
每个分片有四种状态:
Serving:分片的默认状态,如果当前raft Group
在当前config
下负责管理此分片,则该分片可以提供读写服务,否则该分片暂不可以提供读写服务,但不会阻塞配置更新协程拉取新配置
Pulling:表示当前 raft Group
在当前 config
下负责管理此分片,暂不可以提供读写服务,需要当前 raft Group
从上一个配置该分片所属 raft Group
拉数据过来之后才可以提供读写服务,系统会有一个分片迁移协程检测所有分片的 Pulling
状态,接着以 raft Group
为单位去对应 raft Group
拉取数据,接着尝试重放该分片的所有数据到本地并将分片状态置为 Serving
,以继续提供服务。
BePulling:表示当前 raft Group
在当前 config
下不负责管理此分片,不可以提供读写服务,但当前 raft Group
在上一个 config
时复制管理此分片,因此当前 config
下负责管理此分片的 raft Group
拉取完数据后会向本 raft
组发送分片清理的 rpc
,接着本 raft
组将数据清空并重置为 serving
状态即可。
GCing:表示当前 raft Group
在当前 config
下负责管理此分片,可以提供读写服务,但需要清理掉上一个配置该分片所属 raft Group
的数据。系统会有一个分片清理协程检测所有分片的 GCing
状态,接着以 raft Group
为单位去对应 raft Group
删除数据,一旦远程 raft Group
删除数据成功,则本地会尝试将相关分片的状态置为 Serving
。
还有一件事,如何考虑更新Shard状态的时机
如果在 apply 协程更新配置的时候由 leader 异步启动对应的协程,让其独立的根据 raft 组为粒度拉取数据,这会出现以下问题:leader apply 了新配置后便挂了,然后此时 follower 也 apply 了该配置但并不会启动该任务,在该 raft 组的新 leader 选出来后,该任务已经无法被执行了。
因此,不能在 apply 配置的时候启动异步任务,而是应该只更新 shard 的状态,由单独的协程去异步的执行分片迁移,分片清理等任务 。
1.5 日志类型 同样的,在Lab3,我们封装了Get,Put,Append的请求指令,在KVServer中,我们又增添了一些操作,所以需要增加对应的请求类型,以用作raft层的日志存储。
Operation:客户端传来的读写操作日志,有 Put
,Get
,Append
等请求。
Configuration:配置更新日志,包含一个配置。
InsertShards:分片更新日志,包含至少一个分片的数据和配置版本。
DeleteShards:分片删除日志,包含至少一个分片的 id 和配置版本。
EmptyEntry:空日志,Data
为空,使得状态机达到最新。
1.6 设计总览
该设计会发现和Lab3的大致设计一致,不同的是,多了4个协程来保证分片的状态转移。
幂等性
这里幂等性指的是不同协程做的操作不会影响数据的正确性。
考虑第一种情况,某个raft Group
集体被kill并重启时,这时候配置可能都更新过好几次了,然后重启后raft Group
首先需要重放所有日志(在raft层进行日志同步,应用到上层),在这个过程,由于分片迁移协程和apply协程是并行的,迁移协程可能检测到某个中间状态从而发起数据的迁移,这是如果对数据进行覆盖则会破坏正确性。所以,为每个分片迁移和清理请求带上一个配置版本,只有版本相同才可以覆盖,
考虑第二种情况,当前raft Group
更新了配置,迁移协程根据配置从其他raft Group
拉取分片数据和去重表进行提交,系统依旧对外提供服务,迁移协程不会重复拉该分片的数据。然后这时整个raft Group
被kill并重启时,重放日志到更新配置的日志时,迁移协程恰好捕捉到最新配置的中间状态,并再次向其他 raft 组拉数据和去重表并尝试提交,这样在 apply 该分片更新日志时两者版本一致,从而进行了直接覆盖。(如果在 raft 回放完毕和迁移协程拉到数据并提交日志中间该 raft 组又执行了该分片的写操作,那么直接覆盖就会导致这些改动丢失)。所以,版本比较还不够完备,需要判断分片的状态。
2 实验过程 2.1 ShardCtrler的四种操作 整体实现和Lab3大同小异,由于Config
包含信息较小,所以不需要snapshot
。客户端会发送RPC进行一些操作,每当shard->gid
的对应关系被改变时,都需要创建一个新Config
来保存。ShardCtrler
需要支持Join
,Leave
,Move
,Query
四种操作。
RPC结构体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type CommandRequest struct { Servers map [int ][]string GIDs []int Shard int GID int Num int Op OperationOp ClientId int64 CommandId int64 }type CommandResponse struct { Err Err Config Config }
Query
查询指定版本的Config
1 2 3 4 5 6 func (cf *MemoryConfigStateMachine) Query (num int ) (Config, Err) { if num < 0 || num >= len (cf.Configs) { return cf.Configs[len (cf.Configs)-1 ], OK } return cf.Configs[num], OK }
Move
将指定的Shard
交由新的Group
负责
1 2 3 4 5 6 7 8 9 10 11 12 func (cf *MemoryConfigStateMachine) Move (shard, gid int ) Err { length := len (cf.Configs) lastConfig := cf.Configs[length-1 ] newConfig := Config{ Num: length, Shards: lastConfig.Shards, Groups: deepCopy(lastConfig.Groups), } newConfig.Shards[shard] = gid cf.Configs = append (cf.Configs, newConfig) return OK }
Join
为当前Config
添加一些server
,这些server
有可能是加入到现有Group
中,也可能是新增的Group
。
自平衡方案很简单:每次将shard
数量最多的Group
分配给shard
数量最少的Group
,直到差值小于等于1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (cf *MemoryConfigStateMachine) Join (groups map [int ][]string ) Err { length := len (cf.Configs) lastConfig := cf.Configs[length-1 ] newConfig := Config{ Num: length, Shards: lastConfig.Shards, Groups: deepCopy(lastConfig.Groups), } for gid, servers := range groups { if _, ok := newConfig.Groups[gid]; !ok { newServers := make ([]string , len (servers)) copy (newServers, servers) newConfig.Groups[gid] = newServers } } s2g := Group2Shards(newConfig) for { source, target := GetGIDWithMaximumShards(s2g), GetGIDWithMinimumShards(s2g) if source != 0 && len (s2g[source])-len (s2g[target]) <= 1 { break } s2g[target] = append (s2g[target], s2g[source][0 ]) s2g[source] = s2g[source][1 :] } var newShards [NShards]int for gid, shards := range s2g { for _, shard := range shards { newShards[shard] = gid } } newConfig.Shards = newShards cf.Configs = append (cf.Configs, newConfig) return OK }
Leave
删除Group
。如果删除之后还有Group
,则需要将Shrad
均匀分配给所有Group
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (cf *MemoryConfigStateMachine) Leave (gIds []int ) Err { length := len (cf.Configs) lastConfig := cf.Configs[length-1 ] newConfig := Config{ Num: length, Shards: lastConfig.Shards, Groups: deepCopy(lastConfig.Groups), } s2g := Group2Shards(newConfig) orphanShards := make ([]int , 0 ) for _, gid := range gIds { if _, ok := newConfig.Groups[gid]; ok { delete (newConfig.Groups, gid) } if shards, ok := s2g[gid]; ok { orphanShards = append (orphanShards, shards...) delete (s2g, gid) } } var newShards [NShards]int if len (newConfig.Groups) != 0 { for _, shard := range orphanShards { target := GetGIDWithMinimumShards(s2g) s2g[target] = append (s2g[target], shard) } for gid, shards := range s2g { for _, shard := range shards { newShards[shard] = gid } } } newConfig.Shards = newShards cf.Configs = append (cf.Configs, newConfig) return OK }
2.2 ShardKV客户端 客户端执行逻辑如下:
使用key2shard()
去ShardCtrler
找到一个key
对应哪个分片shard
根据shard
从当前配置获取到gid
根据gid
获取到servers
信息
循环查找到leaderid
,直到返回请求成功、ErrWrongGroup
或整个 group 都遍历请求过
Query
最新的配置,回到步骤1 循环重复;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 func (ck *Clerk) Command (request *CommandRequest) string { request.ClientId, request.CommandId = ck.clientId, ck.commandId for { shard := key2shard(request.Key) gid := ck.config.Shards[shard] if servers, ok := ck.config.Groups[gid]; ok { if _, ok = ck.leaderIds[gid]; !ok { ck.leaderIds[gid] = 0 } oldLeaderId := ck.leaderIds[gid] newLeaderId := oldLeaderId for { var response CommandResponse ok := ck.make_end(servers[newLeaderId]).Call("ShardKV.Command" , request, &response) if ok && (response.Err == OK || response.Err == ErrNoKey) { ck.commandId ++ return response.Value } else if ok && response.Err == ErrWrongGroup { break } else { newLeaderId = (newLeaderId + 1 ) % len (servers) if newLeaderId == oldLeaderId { break } continue } } } time.Sleep(100 * time.Millisecond) ck.config = ck.sm.Query(-1 ) } }
2.3 ShardKV服务端 这部分是最难的,首先明确系统整体的交互方式:
服务需要创建多个raft Group
来承载所有分片的读写任务
服务端需要完成配置更新 ,分片迁移 ,分片清理 ,空日志检测 等功能
2.3.1 结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 type ShardKV struct { mu sync.RWMutex dead int32 rf *raft.Raft applyCh chan raft.ApplyMsg makeEnd func (string ) *labrpc .ClientEnd gid int sc *shardctrler.Clerk maxRaftState int lastApplied int lastConfig shardctrler.Config currentConfig shardctrler.Config stateMachines map [int ]*Shard lastOperations map [int64 ]OperationContext notifyChans map [int ]chan *CommandResponse }func (kv *ShardKV) applier () { for kv.killed() == false { select { case message := <-kv.applyCh: if message.CommandValid { kv.mu.Lock() if message.CommandIndex <= kv.lastApplied { kv.mu.Unlock() continue } kv.lastApplied = message.CommandIndex var response *CommandResponse command := message.Command.(Command) switch command.Op { case Operation: operation := command.Data.(CommandRequest) response = kv.applyOperation(&message, &operation) case Configuration: nextConfig := command.Data.(shardctrler.Config) response = kv.applyConfiguration(&nextConfig) case InsertShards: shardsInfo := command.Data.(ShardOperationResponse) response = kv.applyInsertShards(&shardsInfo) case DeleteShards: shardsInfo := command.Data.(ShardOperationRequest) response = kv.applyDeleteShards(&shardsInfo) case EmptyEntry: response = kv.applyEmptyEntry() } if currentTerm, isLeader := kv.rf.GetState(); isLeader && message.CommandTerm == currentTerm { ch := kv.getNotifyChan(message.CommandIndex) ch <- response } needSnapshot := kv.needSnapshot() if needSnapshot { kv.takeSnapshot(message.CommandIndex) } kv.mu.Unlock() } else if message.SnapshotValid { kv.mu.Lock() if kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot) { kv.restoreSnapshot(message.Snapshot) kv.lastApplied = message.SnapshotIndex } kv.mu.Unlock() } else { panic (fmt.Sprintf("unexpected Message %v" , message)) } } } }
2.3.2 读写服务 根据1.4阐述,只有当分片状态为Serving
或GCing
,当前raft Group
在当前config
下负责管理此分片,本raft Group
才可以为该分片提供读写服务,否则返回 ErrWrongGroup
让客户端重新拉取最新的 config
并重试即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 func (kv *ShardKV) Command (request *CommandRequest, response *CommandResponse) { kv.mu.RLock() if request.Op != Get && kv.isDuplicateRequest(request.ClientId, request.CommandId) { lastResponse := kv.lastOperations[request.ClientId].LastResponse response.Value, response.Err = lastResponse.Value, lastResponse.Err kv.mu.RUnlock() return } if !kv.canServe(key2shard(request.Key)){ response.Err = ErrWrongGroup kv.mu.RUnlock() return } kv.mu.RUnlock() kv.Execute(NewOperationCommand(request), response) }func (kv *ShardKV) Execute (command Command, response *CommandResponse) { index, _, isLeader := kv.rf.Start(command) if !isLeader { response.Err = ErrWrongLeader return } kv.mu.Lock() ch := kv.getNotifyChan(index) kv.mu.Unlock() select { case result := <-ch: response.Value, response.Err = result.Value, result.Err case <-time.After(ExecuteTimeout): response.Err = ErrTimeout } go func () { kv.mu.Lock() kv.removeOutdatedNotifyChan(index) kv.mu.Unlock() }() }func (kv *ShardKV) applyOperation (message *raft.ApplyMsg, operation *CommandRequest) *CommandResponse { var response *CommandResponse shardID := key2shard(operation.Key) if kv.canServe(shardID) { if operation.Op != Get && kv.isDuplicateRequest(operation.ClientId, operation.CommandId) { return kv.lastOperations[operation.ClientId].LastResponse } else { response = kv.applyLogToStateMachine(operation, shardID) if operation.Op != Get { kv.lastOperations[operation.ClientId] = OperationContext{operation.CommandId, response} } return response } } return &CommandResponse{ Err: ErrWrongGroup, Value: "" , } }func (kv *ShardKV) canServe (shardID int ) bool { return kv.currentConfig.Shards[shardID] == kv.gid && (kv.stateMachines[shardID].Status == Serving || kv.stateMachines[shardID].Status == GCing) }
2.3.3 配置更新 配置更新协程负责定时检测所有分片的状态,一旦存在至少一个分片的状态不为默认状态 ,则预示其他协程仍然还没有完成任务,那么此时需要阻塞新配置的拉取和提交。
在 apply
配置更新日志时需要保证幂等性:
不同版本的配置更新日志:apply
时仅可逐步递增的去更新配置,否则返回ErrOutDated
。
相同版本的配置更新日志:由于配置更新日志仅由配置更新协程提交,而配置更新协程只有检测到比本地更大地配置时才会提交配置更新日志,所以该情形不会出现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (kv *ShardKV) configureAction () { canPerformNextConfig := true kv.mu.RLock() for _, shard := range kv.stateMachines { if shard.Status != Serving { canPerformNextConfig = false break } } currentConfigNum := kv.currentConfig.Num kv.mu.RUnlock() if canPerformNextConfig { nextConfig := kv.sc.Query(currentConfigNum + 1 ) if nextConfig.Num == currentConfigNum + 1 { kv.Execute(NewConfigurationCommand(&nextConfig), &CommandResponse{}) } } }func (kv *ShardKV) applyConfiguration (nextConfig *shardctrler.Config) *CommandResponse { if nextConfig.Num == kv.currentConfig.Num+1 { kv.updateShardStatus(nextConfig) kv.lastConfig = kv.currentConfig kv.currentConfig = *nextConfig return &CommandResponse{OK, "" } } return &CommandResponse{ErrOutDated, "" } }
2.3.4 分片迁移 分片迁移协程会定时的检测分片的Pulling
状态,利用lastConfig
计算出相应的servers
和要拉取的分片,然后并行的去拉取数据。在该Action中,需要使用waitGroup
来保证独立地任务完成后才会进行下一次任务,并且,wg.Wait
一定要在释放锁之后,否则就会造成分片串行迁移,不能满足要求。
在执行拉取分片的Handler中,首先要保证是由Leader
处理该请求,其次如果发现请求的Config
大于当前版本,那就说明拉取的是未来的数据,则返回ErrNotReady
让其稍后重试,否则将分片数据和去重表都深拷贝到response
即可
在apply
分片迁移日志时需要保证幂等性:
不同版本的配置更新日志:仅可执行与当前配置版本相同的分片更新日志,否则返回 ErrOutDated
。
相同版本的配置更新日志:仅在对应分片状态为 Pulling
时为第一次应用,此时覆盖状态机即可并修改状态为 GCing
,以让分片清理协程检测到 GCing
状态并尝试删除远端的分片。否则说明已经应用过,直接 break
即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 func (kv *ShardKV) migrationAction () { kv.mu.RLock() gid2shardIDs := kv.getShardIDsByStatus(Pulling) var wg sync.WaitGroup for gid, shardIDs := range gid2shardIDs { wg.Add(1 ) go func (servers []string , configNum int , shardIDs []int ) { defer wg.Done() pullTaskRequest := ShardOperationRequest{configNum, shardIDs} for _, server := range servers { var pullTaskResponse ShardOperationResponse srv := kv.makeEnd(server) if srv.Call("ShardKV.GetShardsData" , &pullTaskRequest, &pullTaskResponse) && pullTaskResponse.Err == OK { kv.Execute(NewInsertShardsCommand(&pullTaskResponse), &CommandResponse{}) } } }(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIDs) } kv.mu.RUnlock() wg.Wait() }func (kv *ShardKV) GetShardsData (request *ShardOperationRequest, response *ShardOperationResponse) { if _, isLeader := kv.rf.GetState(); !isLeader { response.Err = ErrWrongLeader return } kv.mu.RLock() defer kv.mu.RUnlock() if kv.currentConfig.Num < request.ConfigNum { response.Err = ErrNotReady return } response.Shards = make (map [int ]map [string ]string ) for _, shardID := range request.ShardIDs { response.Shards[shardID] = kv.stateMachines[shardID].deepCopy() } response.LastOperations = make (map [int64 ]OperationContext) for clientID, operation := range kv.lastOperations { response.LastOperations[clientID] = operation.deepCopy() } response.ConfigNum, response.Err = request.ConfigNum, OK }sfunc (kv *ShardKV) applyInsertShards (shardsInfo *ShardOperationResponse) *CommandResponse { if shardsInfo.ConfigNum == kv.currentConfig.Num { for shardId, shardData := range shardsInfo.Shards { shard := kv.stateMachines[shardId] if shard.Status == Pulling { for key, value := range shardData { shard.KV[key] = value } shard.Status = GCing } else { break } } for clientId, operationContext := range shardsInfo.LastOperations { if lastOperation, ok := kv.lastOperations[clientId]; !ok || lastOperation.MaxAppliedCommandId < operationContext.MaxAppliedCommandId { kv.lastOperations[clientId] = operationContext } } return &CommandResponse{OK, "" } } return &CommandResponse{OK, "" } }
2.3.5 分片清理 分片清理协程会定时的检测分片的GCing
状态,利用lastConfig
计算出相应的servers
和要拉取的分片,然后并行的去清理分片。
在清理分片的handler
中,首先仅由Leader
处理该请求,其次如果发现请求中的配置版本小于本地的版本,那说明该请求已经执行过,此时直接返回 OK 即可,否则在本地提交一个删除分片的日志。
在 apply
分片清理日志时需要保证幂等性:
不同版本的配置更新日志:仅可执行与当前配置版本相同地分片删除日志,否则已经删除过,直接返回 OK 即可。
相同版本的配置更新日志:如果分片状态为 GCing
,说明是本 raft Group
已成功删除远端 raft Group
的数据,现需要更新分片状态为默认状态以支持配置的进一步更新;否则如果分片状态为 BePulling
,则说明本 raft Group
第一次删除该分片的数据,此时直接重置分片即可。否则说明该请求已经应用过,直接 break
返回 OK 即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 func (kv *ShardKV) gcAction () { kv.mu.RLock() gid2shardIDs := kv.getShardIDsByStatus(GCing) var wg sync.WaitGroup for gid, shardIDs := range gid2shardIDs { wg.Add(1 ) go func (servers []string , configNum int , shardIDs []int ) { defer wg.Done() gcTaskRequest := ShardOperationRequest{configNum, shardIDs} for _, server := range servers { var gcTaskResponse ShardOperationResponse srv := kv.makeEnd(server) if srv.Call("ShardKV.DeleteShardsData" , &gcTaskRequest, &gcTaskResponse) && gcTaskResponse.Err == OK { kv.Execute(NewDeleteShardsCommand(&gcTaskRequest), &CommandResponse{}) } } }(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIDs) } kv.mu.RUnlock() wg.Wait() }func (kv *ShardKV) DeleteShardsData (request *ShardOperationRequest, response *ShardOperationResponse) { if _, isLeader := kv.rf.GetState(); !isLeader { response.Err = ErrWrongLeader return } kv.mu.RLock() if kv.currentConfig.Num > request.ConfigNum { response.Err = OK kv.mu.RUnlock() return } kv.mu.RUnlock() var commandResponse CommandResponse kv.Execute(NewDeleteShardsCommand(request), &commandResponse) response.Err = commandResponse.Err }func (kv *ShardKV) applyDeleteShards (shardsInfo *ShardOperationRequest) *CommandResponse { if shardsInfo.ConfigNum == kv.currentConfig.Num { for _, shardId := range shardsInfo.ShardIDs { shard := kv.stateMachines[shardId] if shard.Status == GCing { shard.Status = Serving } else if shard.Status == BePulling { kv.stateMachines[shardId] = NewShard() } else { break } } return &CommandResponse{OK, "" } } return &CommandResponse{OK, "" } }
2.3.6 空日志检测 空日志检测协程会定时检测raft
层的 leader
是否拥有当前 term
的日志,如果没有则提交一条空日志,这使得新 leader
的状态机能够迅速达到最新状态,从而避免多 raft
组间的活锁状态。
1 2 3 4 5 6 7 8 9 func (kv *ShardKV) checkEntryInCurrentTermAction () { if !kv.rf.HasLogInCurrentTerm() { kv.Execute(NewEmptyEntryCommand(), &CommandResponse{}) } }func (kv *ShardKV) applyEmptyEntry () *CommandResponse { return &CommandResponse{OK, "" } }
3 总结 对于这种mutil-raft架构,难点在于如何既对内保证分片之间状态变化,又对外保证客户端请求的正确响应。对外提供读写服务,侧重于维持每个请求的线性一致性,也就是服务端要保存客户端操作的结果,使用其跳过重复的请求。对内,开启一个协程不断接收已经同步完成的Command,然后运用到状态机中,除此之外,还需要开启配置更新,分片迁移,分片清理,空日志检测四个协程定时处理相关分片,以此维持系统安全。
参考 mit-6.824 2021 Lab4:ShardKV
MIT6.824-2021/lab4