MapReduce论文读后感
在MIT6.824的第一个Lab中,就是要我们实现一个MapReduce系统,当时只是粗略看了一下其基本的实现,再参照网上思路完成了该Lab。今天面试被问到该论文,很多问题都没答好,看来论文的阅读还是十分重要。🤔
0 前言
首先从摘要出发,摘要首先基本说明MapReduce大致流程,其实现需要注意的点以及其作用。
MapReduce
是用于处理和生成超大数据集的算法模型的一个模型。通常流程是:用户用户首先创建一个 Map
函数处理一个基于 key/value pair
的数据集合,输出中间的基于 key/value pair
的数据集合(shuffle
);然后再创建一个 Reduce
函数用来合并所有的具有相同中间 key
值的中间 value
值
实现该模型需要注意的是
- 如何分割输入数据
- 在大量计算机组成的集群上调度
- 集群中计算机的错误处理
- 管理集群中计算之间必要的通信
在初期,该模型是Google用来处理海量的原始数据,比如文档抓取,Web请求日志等等;还有处理各种类型的衍生数据,比如倒排索引,Web文档的图结构的各种表示形式。这些数据处理很容易理解,但由于数据量太大,需要在多台主机上并行计算,但如何分发数据,处理错误?为此诞生了MapRedece
模型。
1 编程模型
论文在第二部分讲了该模型的大致框架以及应用,感觉框架很容易理解,这里贴张图即可。
1 |
|
有趣的栗子
- 计算URL访问频率:
Map
函数处理日志中web
页面请求的记录,然后输出(URL,1)
。Reduce
函数把相同URL
的value
值都累加起来,产生(URL,记录总数)
结果 - 倒转网络链接图:
Map
函数在源页面(source)
中搜索所有的链接目标(target)
并输出为(target,source)
。Reduce 函数把给定链接目标(target)
的链接组合成一个列表,输出(target,list(source))
。 - 倒排索引:
Map
函数分析每个文档输出一个(词,文档号)
的列表,Reduce
函数的输入是一个给定词的所有(词,文档号)
,排序所有的文档号,输出(词,list(文档号))
。所有的输出集合形成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置。
2 实现
这一部分简单聊了集群计算环境的MapReduce
实现。有一句话我觉得很有意思,大概是说MapReduce
有许多实现方式,取决于具体环境,比如,用于小型的共享内存方式的机器,另外一种实现方式则适用于大型 NUMA 架构的多处理器的主机,还有适合大型的网络连接集群。
这里我觉得有意思得是NUMA,之前从来没接触过,上网查阅资料发现,这是一种非均匀内存访问架构,指多处理器系统中,内存的访问时间是依赖于处理器和内存之间的相对位置的。这种设计里存在和处理器相对近的内存,通常被称作本地内存;还有和处理器相对远的内存, 通常被称为非本地内存。
这和分布式有点扯远,具体可以看这篇博文:【计算机体系结构】NUMA架构详解
2.1 执行概括
这在我Lab1的实现提到了,6.824 Lab1:MapReduce
这里不多BB
2.2 Master数据结构
论文中说要维护的数据结构有如下
- 每个
Map
和Reduce
任务的状态 Worker
机器的标识(非空闲)- 中间文件存储区域的大小和位置
在做Lab时没根据这里来做,这里就不多说吧
2.3 容错
谈到分布式就不得不谈到容错,一个好的分布式系统一定能有个优秀的容错机制
worker故障
解决这种故障通过:
master
周期性的ping
每个worker
。如果在一个约定的时间范围没有收到worker
返回的信息,master
就将这个worker
标记为失效。所有由这个失效的 worker
完成的 Map
任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的 worker
。同样的,worker
失效时正在运行的 Map
或 Reduce
任务也将被重新置为空闲状态,等待重新调度。
当 worker
故障时,由于已经完成的 Map
任务的输出存储在这台机器上,Map
任务的输出已不可访问了,因此必须重新执行。而已经完成的 Reduce
任务的输出存储在全局文件系统上,因此不需要再次执行。
当一个 Map
任务首先被 worker A
执行,之后由于 worker A
失效了又被调度到 worker B
执行,这个“新执行”的动作会被通知给所有执行 Reduce
任务的 worker
。任何还没有从 worker A
读取数据的 Reduce
任将从 worker B
读取数据。
master故障
最简单的解决方法就是实现落库,让 master
周期性的将上面描述的数据结构的写入磁盘,即检查点(checkpoint)。如果这个 master
任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master
进程。但在Lab的实现是挂了就寄了= =,毕竟只有一个master
进程恢复比较麻烦
在失效方面的处理机制
这里的翻译很奇怪,原文是semantics in the presence of failures
。我不太理解为什么这样叫,但这一部分还是挺重要的。
它规定了该模型依赖于Map
和 Reduce
任务的输出是原子提交的,这样才能保证任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。
实现的原理是:每个工作中的任务把它的输出写到私有的临时文件中。
当一个 Map
任务完成的时,worker
发送一个包含 R 个临时文件名的完成消息给 master
。如果 master
从一个已经完成的 Map 任务再次接收到到一个完成消息,master
将忽略这个消息;否则,master
将这 R 个文件的名字记录在数据结构里。
当 Reduce
任务完成时,Reduce worker
进程以原子的方式把临时文件重命名为最终的输出文件。如果同一个 Reduce
任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名操作执行。我们依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个 Reduce
任务产生的数据。
这一部分论文重要的是指出原子写入这个事,只有保证了操作是确定性的,才可以使用合理的处理机制。
存储位置
这一部分讲了咋在多台机器上分配资源。感觉用处不是很大,不细说了
任务粒度
通常,将把 Map
拆分成了 M
个片段、把 Reduce
拆分成 R
个片段执行。理想情况下,M
和 R
应当比集群中 worker
的机器数量要多得多。在每台 worker
机器都执行大量的不同任务能够提高集群的动态的负载均衡能力,并且能够加快故障恢复的速度:失效机器上执行的大量 Map
任务都可以分布到所有其他的 worker
机器上去执行。
但是实际上,在我们的具体实现中对 M
和 R
的取值都有一定的客观限制,因为 master
必须执行 O(M+R)
次调度,并且在内存中保存 O(M*R)个状态(对影响内存使用的因素还是比较小的:O(M*R)块状态,大概每对 Map
任务/Reduce
任务 1 个字节就可以了)。
更进一步,R 值通常是由用户指定的,因为每个 Reduce 任务最终都会生成一个独立的输出文件。实际使用时我们也倾向于选择合适的 M 值,以使得每一个独立任务都是处理大约 16M 到 64M 的输入数据(这样,上面描写的输入数据本地存储优化策略才最有效),另外,我们把 R 值设置为我们想使用的 worker 机器数量的小的倍数。我们通常会用这样的比例来执行 MapReduce:M=200000,R=5000,使用 2000 台 worker 机器。
备用任务
面试就被问到这里,当时在Lab实现是有的任务10s没完成直接分配给别的worker
,结果被问到到论文是咋解决的😢
因为在运算过程中,如果有一台机器花了很长的时间才完成最后几个 Map 或 Reduce 任务,导致 MapReduce 操作总的执行时间超过预期。出现“落伍者”的原因非常多。
论文中解决方案
当一个 MapReduce
操作接近完成的时候,master
调度备用(backup
)任务进程来执行剩下的、处于处理中状态(in-progress
)的任务。无论是最初的执行进程、还是备用(backup
)任务进程完成了任务,我们都把这个任务标记成为已经完成。
……..
后续就是该模型一些扩展,优化,以及性能分析等了,感觉现在还不太会用到,就不细说了。
3 总结
整篇论文看下来还是很清晰的,这种分布式系统的实现可以算是一个里程碑,模型简单容易理解,并且可以解决大多数的问题。在技术实现上,MapReduce 封装了并行处理、容错处理、数据本地化优化、负载均衡等等技术难点的细节,这使得 MapReduce库易于使用。并且,让性能优化提升到了仅仅通过增加机器就可以实现的地步,而不是花高价去雇人来优化。
论文的最后,指出:
- 约束编程模式使得并行和分布式计算非常容易,也易于构造容错的计算环境;
- 网络带宽是稀有资源。大量的系统优化是针对减少网络传输量为目的的:本地优化策略使大量的数据从本地磁盘读取,中间文件写入本地磁盘、并且只写一份中间文件也节约了网络带宽
- 多次执行相同的任务可以减少性能缓慢的机器带来的负面影响,同时解决了由于机器失效导致的数据丢失问题。