Mapreduce Combiner合并
一:Combiner合并概述(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combiner组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行
Reducer是接收全局所有Mapper的输出结果
(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv 应该跟Reducer的输入kv类型要对应起来。以下求平均值就不能够使用 Combiner;
二:自定义 Combiner 实现步骤2.1 自定义一个 Combiner 继承 Reducer,重写 Reduce 方法1234```## 2.2 在 Job 驱动类中设置
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 ...
Mapreduce WritableComparable排序
一:排序概述
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按 照key进行排序。该操作属于 Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大 小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到 一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完 毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
二:排序分类(1)部分排序MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
(2)全排序最终输 ...
MapReduce Partition分区
一:问题引出要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机 归属地不同省份输出到不同文件中(分区)
二:默认Partitioner分区12345public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }}
默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个 key存储到哪个分区。
三:自定义Partitioner 步骤3.1 自定义类继承Partitioner,重写getPartition()方法12345678public class CustomPartitioner extends Partitioner<Text ...
MapReduce Shuffle机制
一:Shuffle 机制
Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。
MapReduce 工作流程
一:MapReduce详细工作流程图
上面的流程是整个 MapReduce 最全工作流程,但是 Shuffle 过程只是从第 7 步开始到第 16 步结束,具体 Shuffle 过程详解,如下:
(1)MapTask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中
(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
(3)多个溢出文件会被合并成大的溢出文件
(4)在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序
(5)ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据
(6)ReduceTask 会抓取到同一个分区的来自不同 MapTask 的结果文件,ReduceTask 会将这些文件再进行合并(归并排序)
(7)合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 Group,调用用户自定义的 reduce()方法)
注意:
(1)Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执 ...
MapReduce 切片原理
一:MapReduce 框架原理
二:切片与 MapTask 并行度决定机制2.1 问题引出
MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
1G 的数据,启动 8 个 MapTask,可以提高集群的并发处理能力。那么 1K 的数 据,也启动 8 个 MapTask,会提高集群性能吗?MapTask 并行任务是否越多越好呢?哪些因 素影响了 MapTask 并行度?
2.2 MapTask 并行度决定机制
数据块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行 存储。数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask。
2.3 数据切片与MapTask并行度决定机制
三:Job 提交流程源码和切片源码详解3.1 Job 提交流程源码详解1234567891011121314151617181920212223242526272829303132333435363738394 ...
Hadoop 序列化
一:序列化概述1.1 什么是序列化?
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁 盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换 成内存中的对象。
1.2 为什么要序列化?一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能 由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的” 对象,可以将“活的”对象发送到远程计算机。
1.3 为什么不用 Java 的序列化?Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带 很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以, Hadoop 自己开发了一套序列化机制(Writable)。
1.4 Hadoop 序列化特点(1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互。
二:自定义 bean 对象实现序列化接口(Writable)在企业开发中往往常用的基本序列化类型不能满足 ...
MapReduce WordCount案例
一:需求对本地 WordCount.txt 文件进行词频统计,其内容均按 “,”分割,将统计结果输出到 outputDir 目录下;
二:代码实现2.1 环境引入(1)Maven 依赖1234567891011121314151617<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> ...
MapReduce 概述
一:MapReduce 简介MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
二:MapReduce 优缺点2.1 优点(1)MapReduce 易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量 廉价的 PC 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一 样的。就是因为这个特点使得 MapReduce 编程变得非常流行。
(2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
(3)高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高 的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行, 不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
(4)适合 PB 级 ...
HDFS DataNode
一:DataNode 工作机制
(1)一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件,一个是数据 本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
(2)DataNode 启动后向 NameNode 注册,通过后,周期性(6 小时)的向 NameNode 上 报所有的块信息。
DN 向 NN 汇报当前解读信息的时间间隔,默认 6 小时;
12345<property> <name>dfs.blockreport.intervalMsec</name> <value>21600000</value> <description>Determines block reporting interval in milliseconds.</description></property>
DN 扫描自己节点块信息列表的时间,默认 6 小时;
12345<property> <name>dfs.datanode.directorys ...


