6.824 Lab1:MapReduce
0 前言&要求
该部分是要求构建一个MapReduce System。需要实现一个Worker
可以具有Map
和Reduce
函数来处理和写入文件,并且还要实现一个Coordinator
来对任务进行分配以及处理失败的Worker
。另外,Worker
和Coordinator
之间通过RPC进行交互,实现时还需要自定义一些请求参数和应答参数。通过实现以上代码来完成分布式统计多篇文章单词出现次数。
在实现代码前,可以先看看已经提供的非分布式的实现方式,在src/main/mrsequential.go
中。并且在运行前,要先build
下提供好的插件,通过go自带的plugins
进行打包的,里面提供了基础的Map
和Reduce
函数用于统计单个文章的单词出现次数。实现很简单,就是通过Map
将每个单词次数都设置为1,并添加到一个哈希桶,然后对key
进行排序,通过Reduce
来计算出总次数即可。
通过依照课程提供文档命令执行后,会得到一个mr-out-0
,里面就包含了单词和出现次数,也是本lab要完成的标准。
那么在分布式下,也就要求我们执行许多Worker
来完成Map
和Reduce
操作。
1 实验分析
1.1 架构设计
只要讲到MapReduce,一定会搬出这张经典图片,该图片源自论文MapReduce,也是Google的三架马车之一,后面两架后续会谈到
上图反映了MapReduce的程序流程:
MapReduce
将输入文件分成几份- 通过
master
来进行任务的分配给worker
- 某个
worker
会执行map
从分片中读取内容,然后解析出key/value
对(可以认为wc.go
的map
进行解析),然后传递给用户定义的Map
函数,将产生的中间key/value
缓存到内存 - 每过一段时间,会将缓存的数据写到本地磁盘,数据位置传递给
master
,master
再将其传递给reduce worker
- 执行
reduce
的worker
收到后,通过RPC从磁盘读取中间key/value
。读取完之后,根据中间key
进行排序,然后将其聚集到一起 - 读取完之后,将
key
和对应的中间value
集合传递给用户自定义Reduce
函数,其输出会写到一个文件中。 - 完成
这里简单阐述论文中的逻辑,后续实现和这种还有些出入。
1.2 数据结构
在设计程序前,要先抽象几个数据结构
Task
1 |
|
worker
需要得到Task
才能执行,而Coordinater
则是进行任务分配的,对于每个Task
,需要有filename
对应其记录文件的位置,通过ID
来识别每个任务。另外,由于要考虑到超时的问题,所以需要记录任务的开始时间;并且对于每个任务,需要有个状态来标识。
任务状态
1 |
|
Coordinator
1 |
|
Coordinator
需要对任务进行管理和分配。所以需要维护一个[]Task
和files
。由于在Map
和Reduce
要记录数量,所以还需要两个int型。并且还需要一个调度阶段记录当前Coordinator
执行到哪一个阶段了。
通过管道来进行消息的监听和阻塞。
HeartbeatResponse
1 |
|
当worker
请求时,Coordinator
需要返回心跳信息,里面包含了任务的执行文章路径,id,Reduce数量,Map数量。
注意Job
和Task
的区别:
- Job。整个MapReduce计算称为Job。
- Task。每一次MapReduce调用称为Task。
ReportRequest
1 |
|
该结构体用于worker
向Coordinator
返回处理信息。
1.3 测试文件
2021的lab1测试一共有6个:
wc test
:最基本的测试,只要实现单词计数的功能即可。
indexer test
:会开启多个worker
进行工作
map/reduce parallelism
:会在worker
进行Map
时创建一个mr-worker
+ PID
的文件,然后统计几个特定文件名数量,判断同时运行的worker
数量
job count test
:对任务进行计数
early test
:是否有worker
在全部任务完成前退出
crash test
:需要保证该系统是具有容错能力的,如果分配的任务没有在规定时间完成,则认定任务失败
2 实验过程
2.1 Worker
在论文中,是由coodinater
主动分配任务给worker
进行实现的,而这里的实现是workre
通过RPC
发送请求获得response
进行处理。对于每个worker
,轮询的去做任务即可,主要逻辑如下:
1 |
|
实现对应的doMapTask
和doReduceTask
即可。实现doMapTask
要注意将中间量分别存到文件中,并且使用go自带的json
包进行键值存储。还要注意以下两点
原子写入文件
实现时需要保证文件的原子性写入,避免异常情况导致文件受损,所以可以生成一个临时文件再利用OS.Rename
进行原子性替换。
中间态的存储
在mrsequential
的实现中,将所有的kva
([]KeyValue
)都存到一个intermediate
中,但在这里实现是,使用一个[][]KeyValue
的哈希表进行存储的,同一个key
的kv
将会放在一起,后续实现reduce
只需要遍历该哈希表即可。
2.2 Coodinator
这里实现的主要有两个点:监听通道进行调度和轮询选择任务进行分配
1 |
|
1 |
|
状态维护
这里Coodinator
主要维护的是task
的状态,当请求到来时,会对所有的任务进行轮询,只有当任务状态为Idle
或Working
下超时了,才会被分配。
管道
在coordinator
开启一个协程不断监控heartbeatCh
和reportCh
中的数据并进行处理。并且,管道可以很方便的控制并发,只有当msg.ok
可输出时,才可以返回。这里需要考虑channel
传值时的一些情况(直接引用了):
- channel 传 struct{}:对于仅需要传输 happens-before 语义不需要传输数据的场景,创建的 channel 应该是 struct{} 类型,go 对其做了特别优化可以不耗费内存。
- channel 传指针:对于 report handler,其往 reportCh 中 send msg 时只需要传输 request 的指针,等 coordinator 的 schedule 协程处理完毕后即可返回,这里并没有什么问题。对于 heartbeat handler,其会相对复杂些,因为其往 heartbeatCh 中 send msg 时传输了 response 的指针,coordinator 的 schedule 协程需要对该指针对应的数据做处理后再返回,那么此时 rpc 协程在返回时是否能够看到另一个 goroutine 对其的修改呢?对于这种场景,如果协程间满足 happens-before 语义的话是可以的,如果不满足则不一定可以。那么是否满足 happens-before 语义呢?很多人都知道对于无 buffer 的 channel,其
receive
是 happens-beforesend
的,那么似乎就无法判断其是否满足 happens-before 语义了。实际上,send
与send 完成
是有区别的,可以参考此博客,严格来说:对于无 buffer 的 channel,其send start
happens-beforereceive complete
happens-beforesend complete
,因此有了这个更清晰的语义,我们很显然可以得到schedule 协程修改 response
happens-beforeworker rpc 协程返回 response
,因此这样写应该是没有问题的,我的 race detector 也没有报任何错误。
2.3 RPC
RPC的作用有两点:worker
请求任务,worker
处理完任务进行汇报
worker
1 |
|
coodinator
1 |
|
3 总结
整体实现还是参照着github一个文档,感觉写的不错。重要的内容的理解上要花上些时间,自己又好久没写过go了,花了几个小时看了下go的文档才上手。
梳理下实现流程基本就是:worker
通过RPC向coodinator
请求任务,coodinator
监听管道收到后,则开始轮询维护好的任务组,根据不同的任务状态对response
进行修改,然后返回;worker
收到后判断是什么类型的任务:map
,对文章进行内容读取,然后使用哈希表保存中间态,每个写入到一个临时文件中,然后发送汇报信息;reduce
:根据生成filePath
进行读取,也就是取出kva
,然后存到一个哈希表map[string][]string
中,送给reduceF
处理即可。
参考
MIT6.824-2022/lab1.md at practice · 2w1nd/MIT6.824-2022 (github.com)