一:写数据流程

(1)客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。

(2)NameNode 返回是否可以上传。

(3)客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。

(4)NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3。

(5)客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。

(6)dn1、dn2、dn3 逐级应答客户端。

(7)客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存), 以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet 会放入一个应答队列等待应答。

(8)当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务 器。(重复执行 3-7 步)。

二:读数据流程

(1)客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。

(2)挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。

(3)DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位 来做校验)。

(4)客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。

三:网络拓扑-节点距离计算

在 HDFS 写数据的过程中,NameNode 会选择距离待上传数据最近距离的 DataNode 接 收数据。那么这个最近距离怎么计算呢?

节点距离:两个节点到达最近的共同祖先的距离总和。

假设有数据中心 d1 机架 r1 中的节点 n1。该节点可以表示为/d1/r1/n1。

两个数据中心依靠外部的网络连接,数据中心可看作一个机房。

四:机架感知(副本存储节点选择)

4.1 机架感知说明

Apache Hadoop 3.3.4 – HDFS Architecture

对于常见情况,当复制因子为3时,HDFS的放置策略是,如果编写器在datanode上,则将一个副本放在本地计算机上,否则放在与编写器在同一机架中的随机datanode上,另一个副本放在不同(远程)机架中的节点上,最后一个放在同一远程机架中的不同节点上。

此策略减少了机架间的写入流量,从而总体上提高了写入性能。机架故障的几率远小于节点故障的几率;该策略不影响数据可靠性和可用性保证。但是,它不会减少读取数据时使用的聚合网络带宽,因为数据块仅放置在两个不同的机架中,而不是三个。

使用这种策略,数据块的副本不会均匀分布在机架上。两个副本位于一个机架的不同节点上,另一个副本位于另一个机架的节点上。该策略提高了写入性能,而不影响数据可靠性或读取性能。

4.2 源码说明

Crtl + n 查找 BlockPlacementPolicyDefault,在该类中查找 chooseTargetInOrder 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

/**
* Calculate the maximum number of replicas to allocate per rack. It also
* limits the total number of replicas to the total number of nodes in the
* cluster. Caller should adjust the replica count to the return value.
*
* @param numOfChosen The number of already chosen nodes.
* @param numOfReplicas The number of additional nodes to allocate.
* @return integer array. Index 0: The number of nodes allowed to allocate
* in addition to already chosen nodes.
* Index 1: The maximum allowed number of nodes per rack. This
* is independent of the number of chosen nodes, as it is calculated
* using the target number of replicas.
*/
private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
int clusterSize = clusterMap.getNumOfLeaves();//datanode的数量
int totalNumOfReplicas = numOfChosen + numOfReplicas;//总共的副本数量
if (totalNumOfReplicas > clusterSize) { //如果总的数量大于集群datanode的数量
numOfReplicas -= (totalNumOfReplicas-clusterSize); //修正副本的数量
totalNumOfReplicas = clusterSize; //总的副本的数量等于集群datanode的数量
}
// No calculation needed when there is only one rack or picking one node.//如果只有一个rack或者只选取一个结点,那么没有必要计算。
int numOfRacks = clusterMap.getNumOfRacks();
if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
return new int[] {numOfReplicas, totalNumOfReplicas};
}

int maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2;//如果totalNumOfReplicas-1 < numOfRacks,那么maxNodesPerRack为2。
//如果totalNumOfReplicas - 1 = numOfRacks 那么 maxNodesPerRack=3,如果totalNumOfReplicas-1 > numOfRacks,那么就在各机架平均分配值再加2。
// At this point, there are more than one racks and more than one replicas
// to store. Avoid all replicas being in the same rack.
//
// maxNodesPerRack has the following properties at this stage.
// 1) maxNodesPerRack >= 2
// 2) (maxNodesPerRack-1) * numOfRacks > totalNumOfReplicas
// when numOfRacks > 1
//
// Thus, the following adjustment will still result in a value that forces
// multi-rack allocation and gives enough number of total nodes.
if (maxNodesPerRack == totalNumOfReplicas) {
maxNodesPerRack--;
}
return new int[] {numOfReplicas, maxNodesPerRack};
}

五:副本节点选择