6.824 Lab1:MapReduce

0 前言&要求

​ 该部分是要求构建一个MapReduce System。需要实现一个Worker可以具有MapReduce函数来处理和写入文件,并且还要实现一个Coordinator来对任务进行分配以及处理失败的Worker。另外,WorkerCoordinator之间通过RPC进行交互,实现时还需要自定义一些请求参数和应答参数。通过实现以上代码来完成分布式统计多篇文章单词出现次数

​ 在实现代码前,可以先看看已经提供的非分布式的实现方式,在src/main/mrsequential.go中。并且在运行前,要先build下提供好的插件,通过go自带的plugins进行打包的,里面提供了基础的MapReduce函数用于统计单个文章的单词出现次数。实现很简单,就是通过Map将每个单词次数都设置为1,并添加到一个哈希桶,然后对key进行排序,通过Reduce来计算出总次数即可。

image-20220524212329822

​ 通过依照课程提供文档命令执行后,会得到一个mr-out-0,里面就包含了单词和出现次数,也是本lab要完成的标准。

​ 那么在分布式下,也就要求我们执行许多Worker来完成MapReduce操作。

MapReduce WordCount(图片来自网络)

1 实验分析

1.1 架构设计

​ 只要讲到MapReduce,一定会搬出这张经典图片,该图片源自论文MapReduce,也是Google的三架马车之一,后面两架后续会谈到

Execution Overview

​ 上图反映了MapReduce的程序流程:

  1. MapReduce将输入文件分成几份
  2. 通过master来进行任务的分配给worker
  3. 某个worker会执行map从分片中读取内容,然后解析出key/value对(可以认为wc.gomap进行解析),然后传递给用户定义的Map函数,将产生的中间key/value缓存到内存
  4. 每过一段时间,会将缓存的数据写到本地磁盘,数据位置传递给mastermaster再将其传递给reduce worker
  5. 执行reduceworker收到后,通过RPC从磁盘读取中间key/value。读取完之后,根据中间key进行排序,然后将其聚集到一起
  6. 读取完之后,将key和对应的中间value集合传递给用户自定义Reduce函数,其输出会写到一个文件中。
  7. 完成

​ 这里简单阐述论文中的逻辑,后续实现和这种还有些出入。

1.2 数据结构

​ 在设计程序前,要先抽象几个数据结构

Task

1
2
3
4
5
6
type Task struct {
fileName string // 文章名
id int // 任务ID
startTime time.Time // 开始时间
status TaskStatus // 状态
}

worker需要得到Task才能执行,而Coordinater则是进行任务分配的,对于每个Task,需要有filename对应其记录文件的位置,通过ID来识别每个任务。另外,由于要考虑到超时的问题,所以需要记录任务的开始时间;并且对于每个任务,需要有个状态来标识。

任务状态

1
2
3
4
5
const (
Idle TaskStatus = iota
Working
Finished
)

Coordinator

1
2
3
4
5
6
7
8
9
10
11
type Coordinator struct {
files []string // 文章列表
nReduce int // Reduce的任务数
nMap int // Map的任务数
phase SchedulePhase // 阶段
tasks []Task // 任务组

heartbeatCh chan heartbeatMsg // 心跳信息管道
reportCh chan reportMsg // 回复信息管道
doneCh chan struct{} // 完成管道
}

Coordinator需要对任务进行管理和分配。所以需要维护一个[]Taskfiles。由于在MapReduce要记录数量,所以还需要两个int型。并且还需要一个调度阶段记录当前Coordinator执行到哪一个阶段了。

​ 通过管道来进行消息的监听和阻塞。

HeartbeatResponse

1
2
3
4
5
6
7
type HeartbeatResponse struct {
FilePath string // 文章路径
JobType JobType // 任务类型
NReduce int // Reduce的数量
NMap int // Map的数量
Id int
}

​ 当worker请求时,Coordinator需要返回心跳信息,里面包含了任务的执行文章路径,id,Reduce数量,Map数量。

​ 注意JobTask的区别:

  • Job。整个MapReduce计算称为Job。
  • Task。每一次MapReduce调用称为Task。

ReportRequest

1
2
3
4
type ReportRequest struct {
Id int
Phase SchedulePhase
}

​ 该结构体用于workerCoordinator返回处理信息。

1.3 测试文件

​ 2021的lab1测试一共有6个:

wc test:最基本的测试,只要实现单词计数的功能即可。

indexer test:会开启多个worker进行工作

map/reduce parallelism:会在worker进行Map时创建一个mr-worker + PID的文件,然后统计几个特定文件名数量,判断同时运行的worker数量

job count test:对任务进行计数

early test:是否有worker在全部任务完成前退出

crash test:需要保证该系统是具有容错能力的,如果分配的任务没有在规定时间完成,则认定任务失败

2 实验过程

2.1 Worker

​ 在论文中,是由coodinater主动分配任务给worker进行实现的,而这里的实现是workre通过RPC发送请求获得response进行处理。对于每个worker,轮询的去做任务即可,主要逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func Worker(mapF func(string, string) []KeyValue,
reduceF func(string, []string) string) {

for {
response := doHeartbeat()
log.Printf("Worker: receive coordinator's heartbeat %v \n", response)
switch response.JobType { // 判断请求到的任务类型
case MapJob:
doMapTask(mapF, response)
case ReduceJob:
doReduceTask(reduceF, response)
case WaitJob:
time.Sleep(1 * time.Second)
case CompleteJob:
return
default:
panic(fmt.Sprintf("unexpected jobType %v", response.JobType))
}
}
}

​ 实现对应的doMapTaskdoReduceTask即可。实现doMapTask要注意将中间量分别存到文件中,并且使用go自带的json包进行键值存储。还要注意以下两点

原子写入文件

​ 实现时需要保证文件的原子性写入,避免异常情况导致文件受损,所以可以生成一个临时文件再利用OS.Rename进行原子性替换。

中间态的存储

​ 在mrsequential的实现中,将所有的kva[]KeyValue)都存到一个intermediate中,但在这里实现是,使用一个[][]KeyValue的哈希表进行存储的,同一个keykv将会放在一起,后续实现reduce只需要遍历该哈希表即可。

2.2 Coodinator

​ 这里实现的主要有两个点:监听通道进行调度轮询选择任务进行分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 调度函数
func (c *Coordinator) schedule() {
c.initMapPhase()
for {
select {
// 收到请求
case msg := <-c.heartbeatCh:
if c.phase == CompletePhase {
msg.response.JobType = CompleteJob
} else if c.selectTask(msg.response) {
switch c.phase {
case MapPhase:
...
case ReducePhase:
...
case CompletePhase:
...
}
}
msg.ok <- struct{}{}
// 收到通知
case msg := <-c.reportCh:
...
msg.ok <- struct{}{}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 选择任务进行分配
func (c *Coordinator) selectTask(response *HeartbeatResponse) bool {
allFinished, hasNewJob := true, false
for id, task := range c.tasks {
switch task.status {
case Idle:
...
case Working:
...
case Finished:
}
...
}
...
return allFinished
}

状态维护

​ 这里Coodinator主要维护的是task的状态,当请求到来时,会对所有的任务进行轮询,只有当任务状态为IdleWorking下超时了,才会被分配。

管道

​ 在coordinator开启一个协程不断监控heartbeatChreportCh中的数据并进行处理。并且,管道可以很方便的控制并发,只有当msg.ok可输出时,才可以返回。这里需要考虑channel传值时的一些情况(直接引用了):

  • channel 传 struct{}:对于仅需要传输 happens-before 语义不需要传输数据的场景,创建的 channel 应该是 struct{} 类型,go 对其做了特别优化可以不耗费内存。
  • channel 传指针:对于 report handler,其往 reportCh 中 send msg 时只需要传输 request 的指针,等 coordinator 的 schedule 协程处理完毕后即可返回,这里并没有什么问题。对于 heartbeat handler,其会相对复杂些,因为其往 heartbeatCh 中 send msg 时传输了 response 的指针,coordinator 的 schedule 协程需要对该指针对应的数据做处理后再返回,那么此时 rpc 协程在返回时是否能够看到另一个 goroutine 对其的修改呢?对于这种场景,如果协程间满足 happens-before 语义的话是可以的,如果不满足则不一定可以。那么是否满足 happens-before 语义呢?很多人都知道对于无 buffer 的 channel,其 receive 是 happens-before send 的,那么似乎就无法判断其是否满足 happens-before 语义了。实际上,sendsend 完成是有区别的,可以参考此博客,严格来说:对于无 buffer 的 channel,其 send start happens-before receive complete happens-before send complete,因此有了这个更清晰的语义,我们很显然可以得到 schedule 协程修改 response happens-before worker rpc 协程返回 response,因此这样写应该是没有问题的,我的 race detector 也没有报任何错误。

2.3 RPC

​ RPC的作用有两点:worker请求任务,worker处理完任务进行汇报

worker

1
2
3
4
5
6
7
8
9
10
11
// 请求
func doHeartbeat() *HeartbeatResponse {
response := HeartbeatResponse{}
call("Coordinator.Heartbeat", &HeartbeatRequest{}, &response)
return &response
}

// 汇报
func doReport(id int, phase SchedulePhase) {
call("Coordinator.Report", &ReportRequest{id, phase}, &ReportResponse{})
}

coodinator

1
2
3
4
5
6
7
8
9
10
11
12
13
func (c *Coordinator) Heartbeat(request *HeartbeatRequest, response *HeartbeatResponse) error {
msg := heartbeatMsg{response, make(chan struct{})}
c.heartbeatCh <- msg
<-msg.ok
return nil
}

func (c *Coordinator) Report(request *ReportRequest, response *ReportResponse) error {
msg := reportMsg{request, make(chan struct{})}
c.reportCh <- msg
<-msg.ok
return nil
}

3 总结

​ 整体实现还是参照着github一个文档,感觉写的不错。重要的内容的理解上要花上些时间,自己又好久没写过go了,花了几个小时看了下go的文档才上手。

​ 梳理下实现流程基本就是:worker通过RPC向coodinator请求任务,coodinator监听管道收到后,则开始轮询维护好的任务组,根据不同的任务状态对response进行修改,然后返回;worker收到后判断是什么类型的任务:map,对文章进行内容读取,然后使用哈希表保存中间态,每个写入到一个临时文件中,然后发送汇报信息;reduce:根据生成filePath进行读取,也就是取出kva,然后存到一个哈希表map[string][]string中,送给reduceF处理即可。

参考

MIT6.824-2022/lab1.md at practice · 2w1nd/MIT6.824-2022 (github.com)


6.824 Lab1:MapReduce
https://2w1nd.github.io/2022/04/16/MIT6.824/6-824-Lab1:MapReduce/
作者
w1nd
发布于
2022年4月16日
许可协议