Spark 相关术语
一:RDD (Resilient Distributed DataSet)
弹性分布式数据集,是对数据集在spark存储和计算过程中的一种抽象。
是一组只读、可分区的分布式数据集合。
一个RDD 包含多个分区Partition(类似于MapReduce中的InputSplit中的block),分区是依照一定的规则的,将具有相同规则的属性的数据记录放在一起。
横向上可切分并行计算,以分区Partition为切分后的最小存储和计算单元。
纵向上可进行内外存切换使用,即当数据在内存不足时,可以用外存磁盘来补充。
二:Partition (分区)
Partition类似hadoop的Split中的block,计算是以partition为单位进行的,提供了一种划分数据的方式。
Partition的划分依据有很多,常见的有Hash分区、范围分区等,也可以自己定义的,像HDFS文件,划分的方式就和MapReduce一样,以文件的block来划分不同的partition。
一个Partition交给一个Task去计算处理。
三:算子
英文简称:Operator,简称op
广义上讲,对任何函数进行某 ...
Spark 架构设计
一:架构总览
二:角色作用
Client:面向用户,对外提供接口,提交代码的入口。
Driver Program:驱动器程序,用于解耦客户端和内部实际操作,将用户程序转化为任务。
SparkContent:Spark 上下文,承接作用,用于配置上下文环境。
Cluster Manager(Resource Manager):集群资源管理器,统一资源管理与任务调度。
Application Master:任务的执行,调度指挥者。
Worker Node:工作节点,任务的实际执行者。
三:角色间关系
客户端接收到用户指令、代码;
驱动器服务于客户端,承接指令传达给集群资源管理器;
集群资源管理器根据当前情况,进行资源调度,生成一个任务调度者 AM(Application Master)
AM 给相应的工作节点分配任务;
工作节点执行任务,执行完毕,结果返回给 AM,并向资源管理器汇报自身资源情况,任务已完成当前空闲状态。
AM 接收到计算结果进行汇总,返回给客户端。
四:工作特性
内存计算
多线程
缓存
每一个 AM 都有一批专享的 Executor,以多线程方式启动多个 Task 任 ...
Hadoop 数据压缩
一:概述1.1 优缺点
压缩的优点:以减少磁盘 IO、减少磁盘存储空间。
压缩的缺点:增加 CPU 开销。
1.2 压缩原则
运算密集型的 Job,少用压缩
IO 密集型的 Job,多用压缩
二:MR 支持的压缩编码2.1 压缩算法对比介绍
压缩格式
Hadoop 是否自带
算法
文件扩展名
是否可切片
换成压缩格式后,原来的程序是否需要修改
DEFLATE
是,直接使用
DEFLATE
.deflate
否
和文本处理一样,不需要修改
Gzip
是,直接使用
DEFLATE
.gz
否
和文本处理一样,不需要修改
bzip2
是,直接使用
bzip2
.bz2
是
和文本处理一样,不需要修改
LZO
否,需要安装
LZO
.lzo
是
需要建索引,还需要指定 输入格式
Snappy
是,直接使用
Snappy
.snappy
否
和文本处理一样,不需要修改
2.2 压缩性能的比较
压缩算法
原始文件大小
压缩文件大小
压缩速度
解压速度
gzip
8.3GB
1.8GB
17.5MB/s
58MB/s
bzip2
8 ...
MapReduce 开发总结
一:输入数据接口:InputFormat
默认使用的实现类是:TextInputFormat
TextInputFormat 的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为 key,行内容作为 value 返回。
CombineTextInputFormat 可以把多个小文件合并成一个切片处理,提高处理效率。
二:逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map() setup() cleanup ()
三:Partitioner 分区
有默认实现 HashPartitioner,逻辑是根据 key 的哈希值和 numReduces 来返回一个 分区号;key.hashCode()&Integer.MAXVALUE % numReduces
如果业务上有特别的需求,可以自定义分区。
四:Comparable 排序
当我们用自定义的对象作为 key 来输出时,就必须要实现 WritableComparable 接 口,重写其中的 compareTo()方法。
部分排序:对最终输出的每一个文件进行内部排序。
全排序:对所有数据进行排序,通常只有一 ...
MapReduce-ETL数据清洗
一:数据清洗(ETL)“ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取 (Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL 一词较常用在数据仓库,但其对象并不限于数据仓库。
在运行核心业务 MapReduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户 要求的数据。清理的过程往往只需要运行 Mapper 程序,不需要运行 Reduce 程序。
二:案例分析2.1 需求去除日志中字段个数小于等于 11 的日志。
(1)输入数据:web.log
(2)期望输出数据:每行字段长度都大于 11。
2.2 需求分析在 Map 阶段对输入的数据根据规则进行过滤清洗。
2.3 代码实现(1)编写 WebLogMapper 类12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849package cn.aiyingke.mapreduce.ETL;import org.apache.h ...
MapReduce Join 应用
一:Reduce Join Map 端的主要工作:为来自不同表或文件的 key/value 对,打标签以区别不同来源的记 录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出。
Reduce 端的主要工作:在 Reduce 端以连接字段作为 key 的分组已经完成,我们只需要 在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志)分开,最后进 行合并就 ok 了。
二:Reduce Join 案例实操2.1 需求订单数据表
商品信息表
将商品信息表中数据根据商品 pid 合并到订单数据表中。
最终数据形式:
2.2 需求分析 通过将关联条件作为 Map 输出的 key,将两表满足 Join 条件的数据并携带数据所来源 的文件信息,发往同一个 ReduceTask,在 Reduce 中进行数据的串联。
2.3 Reduce端表合并(数据倾斜)
2.4 代码实现(1)创建商品和订单合并后的 TableBean 类1234567891011121314151617181920212223242526272 ...
MapReduce 内核源码解析
一:MapTask 工作机制
(1)Read 阶段:MapTask 通过 InputFormat 获得的 RecordReader,从输入 InputSplit 中 解析出一个个 key/value。
(2)Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并 产生一系列新的 key/value。
(3)Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用 OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。
(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上, 生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤 1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 Partition 进行排序, ...
MapReduce OutputFormat数据输出
一:OutputFormat 接口实现类1.1 概述OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat 接口。下面我们介绍几种常见的OutputFormat实现类。
1.2 OutputFormat实现类
1.3 默认输出格式TextOutputFormat1.4 自定义OutputFormat(1) 应用场景例如:输出数据到MySQL/HBase/Elasticsearch等存储框架中。
(2)自定义OutputFormat步骤
自定义一个类继承FileOutputFormat
改写RecordWriter,具体改写输出数据的方法write()
二:自定义 OutputFormat 案例实操2.1 需求过滤输入的 log 日志,包含 aiyingke 的网站输出到 e:/aiyingke.log,不包含 aiyingke 的网站输出到 e:/other.log;
(1)输入数据
log.txt
(2)期望输出数据
aiyingke.log
other.log
2.2 需 ...
Mapreduce Combiner合并
一:Combiner合并概述(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combiner组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行
Reducer是接收全局所有Mapper的输出结果
(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv 应该跟Reducer的输入kv类型要对应起来。以下求平均值就不能够使用 Combiner;
二:自定义 Combiner 实现步骤2.1 自定义一个 Combiner 继承 Reducer,重写 Reduce 方法1234```## 2.2 在 Job 驱动类中设置
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 ...
Mapreduce WritableComparable排序
一:排序概述
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按 照key进行排序。该操作属于 Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大 小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到 一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完 毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
二:排序分类(1)部分排序MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
(2)全排序最终输 ...



