6.824 Lab1:MapReduce
0 前言&要求
    该部分是要求构建一个MapReduce System。需要实现一个Worker可以具有Map和Reduce函数来处理和写入文件,并且还要实现一个Coordinator来对任务进行分配以及处理失败的Worker。另外,Worker和Coordinator之间通过RPC进行交互,实现时还需要自定义一些请求参数和应答参数。通过实现以上代码来完成分布式统计多篇文章单词出现次数。
    在实现代码前,可以先看看已经提供的非分布式的实现方式,在src/main/mrsequential.go中。并且在运行前,要先build下提供好的插件,通过go自带的plugins进行打包的,里面提供了基础的Map和Reduce函数用于统计单个文章的单词出现次数。实现很简单,就是通过Map将每个单词次数都设置为1,并添加到一个哈希桶,然后对key进行排序,通过Reduce来计算出总次数即可。

    通过依照课程提供文档命令执行后,会得到一个mr-out-0,里面就包含了单词和出现次数,也是本lab要完成的标准。
    那么在分布式下,也就要求我们执行许多Worker来完成Map和Reduce操作。

1 实验分析
1.1 架构设计
 只要讲到MapReduce,一定会搬出这张经典图片,该图片源自论文MapReduce,也是Google的三架马车之一,后面两架后续会谈到

 上图反映了MapReduce的程序流程:
- MapReduce将输入文件分成几份
- 通过master来进行任务的分配给worker
- 某个worker会执行map从分片中读取内容,然后解析出key/value对(可以认为wc.go的map进行解析),然后传递给用户定义的Map函数,将产生的中间key/value缓存到内存
- 每过一段时间,会将缓存的数据写到本地磁盘,数据位置传递给master,master再将其传递给reduce worker
- 执行reduce的worker收到后,通过RPC从磁盘读取中间key/value。读取完之后,根据中间key进行排序,然后将其聚集到一起
- 读取完之后,将key和对应的中间value集合传递给用户自定义Reduce函数,其输出会写到一个文件中。
- 完成
 这里简单阐述论文中的逻辑,后续实现和这种还有些出入。
1.2 数据结构
 在设计程序前,要先抽象几个数据结构
Task
| 1 |  | 
    worker需要得到Task才能执行,而Coordinater则是进行任务分配的,对于每个Task,需要有filename对应其记录文件的位置,通过ID来识别每个任务。另外,由于要考虑到超时的问题,所以需要记录任务的开始时间;并且对于每个任务,需要有个状态来标识。
任务状态
| 1 |  | 
Coordinator
| 1 |  | 
    Coordinator需要对任务进行管理和分配。所以需要维护一个[]Task和files。由于在Map和Reduce要记录数量,所以还需要两个int型。并且还需要一个调度阶段记录当前Coordinator执行到哪一个阶段了。
 通过管道来进行消息的监听和阻塞。
HeartbeatResponse
| 1 |  | 
    当worker请求时,Coordinator需要返回心跳信息,里面包含了任务的执行文章路径,id,Reduce数量,Map数量。
    注意Job和Task的区别:
- Job。整个MapReduce计算称为Job。
- Task。每一次MapReduce调用称为Task。
ReportRequest
| 1 |  | 
    该结构体用于worker向Coordinator返回处理信息。
1.3 测试文件
 2021的lab1测试一共有6个:
wc test:最基本的测试,只要实现单词计数的功能即可。
indexer test:会开启多个worker进行工作
map/reduce parallelism:会在worker进行Map时创建一个mr-worker + PID的文件,然后统计几个特定文件名数量,判断同时运行的worker数量
job count test:对任务进行计数
early test:是否有worker在全部任务完成前退出
crash test:需要保证该系统是具有容错能力的,如果分配的任务没有在规定时间完成,则认定任务失败
2 实验过程
2.1 Worker
    在论文中,是由coodinater主动分配任务给worker进行实现的,而这里的实现是workre通过RPC发送请求获得response进行处理。对于每个worker,轮询的去做任务即可,主要逻辑如下:
| 1 |  | 
    实现对应的doMapTask和doReduceTask即可。实现doMapTask要注意将中间量分别存到文件中,并且使用go自带的json包进行键值存储。还要注意以下两点
原子写入文件
    实现时需要保证文件的原子性写入,避免异常情况导致文件受损,所以可以生成一个临时文件再利用OS.Rename进行原子性替换。
中间态的存储
    在mrsequential的实现中,将所有的kva([]KeyValue)都存到一个intermediate中,但在这里实现是,使用一个[][]KeyValue的哈希表进行存储的,同一个key的kv将会放在一起,后续实现reduce只需要遍历该哈希表即可。
2.2 Coodinator
 这里实现的主要有两个点:监听通道进行调度和轮询选择任务进行分配
| 1 |  | 
| 1 |  | 
状态维护
    这里Coodinator主要维护的是task的状态,当请求到来时,会对所有的任务进行轮询,只有当任务状态为Idle或Working下超时了,才会被分配。
管道
    在coordinator开启一个协程不断监控heartbeatCh和reportCh中的数据并进行处理。并且,管道可以很方便的控制并发,只有当msg.ok可输出时,才可以返回。这里需要考虑channel传值时的一些情况(直接引用了):
- channel 传 struct{}:对于仅需要传输 happens-before 语义不需要传输数据的场景,创建的 channel 应该是 struct{} 类型,go 对其做了特别优化可以不耗费内存。
- channel 传指针:对于 report handler,其往 reportCh 中 send msg 时只需要传输 request 的指针,等 coordinator 的 schedule 协程处理完毕后即可返回,这里并没有什么问题。对于 heartbeat handler,其会相对复杂些,因为其往 heartbeatCh 中 send msg 时传输了 response 的指针,coordinator 的 schedule 协程需要对该指针对应的数据做处理后再返回,那么此时 rpc 协程在返回时是否能够看到另一个 goroutine 对其的修改呢?对于这种场景,如果协程间满足 happens-before 语义的话是可以的,如果不满足则不一定可以。那么是否满足 happens-before 语义呢?很多人都知道对于无 buffer 的 channel,其
receive是 happens-beforesend的,那么似乎就无法判断其是否满足 happens-before 语义了。实际上,send与send 完成是有区别的,可以参考此博客,严格来说:对于无 buffer 的 channel,其send starthappens-beforereceive completehappens-beforesend complete,因此有了这个更清晰的语义,我们很显然可以得到schedule 协程修改 responsehappens-beforeworker rpc 协程返回 response,因此这样写应该是没有问题的,我的 race detector 也没有报任何错误。
2.3 RPC
    RPC的作用有两点:worker请求任务,worker处理完任务进行汇报
worker
| 1 |  | 
coodinator
| 1 |  | 
3 总结
 整体实现还是参照着github一个文档,感觉写的不错。重要的内容的理解上要花上些时间,自己又好久没写过go了,花了几个小时看了下go的文档才上手。
    梳理下实现流程基本就是:worker通过RPC向coodinator请求任务,coodinator监听管道收到后,则开始轮询维护好的任务组,根据不同的任务状态对response进行修改,然后返回;worker收到后判断是什么类型的任务:map,对文章进行内容读取,然后使用哈希表保存中间态,每个写入到一个临时文件中,然后发送汇报信息;reduce:根据生成filePath进行读取,也就是取出kva,然后存到一个哈希表map[string][]string中,送给reduceF处理即可。
参考
MIT6.824-2022/lab1.md at practice · 2w1nd/MIT6.824-2022 (github.com)