关于HDFS的数据可见性

以前写的一篇文章。

以前一直知道,写入hdfs的数据不会马上可见。
稍微看了些代码,总结下。

关于HDFS写

单一写,并发读

传统的文件系统是允许对一个文件并发写入的,只是如果不同步的话,文件内容会乱掉。http://blog.chinaunix.net/uid-11452714-id-3771084.html
HDFS不允许并发写,但可以并发读:http://www.cnblogs.com/ZisZ/p/3253570.html
大多数分布式文件系统都不允许并发写,代价太大。

如果多线程试图同时写一个文件,只有一个线程可以正常写,其他线程会抛出AlreadyBeingCreatedException异常:

1
2
3
4
5
6
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /tmp/appendTest for DFSClient_NONMAPREDUCE_-427798443_10 on client 172.31.132.146 because current leaseholder is trying to recreate file.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2275)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2153)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2386)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2347)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:508)

如果一个客户端A获取了lease,但写数据时意外退出,文件没有close,lease不会自己释放(正常close的话lease是会释放的)。
只能等时间超过soft limit后,另一个客户端B尝试写同一个文件,NN回收lease;或者时间超过hard limit后lease被NN的一个后台线程回收。

所以如果客户端B尝试写同一个文件,如果还没超出hard limit,第一次尝试必定会失败的,因为同一个文件的lease还被占用着:

1
2
3
4
5
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.RecoveryInProgressException): Failed to close file /tmp/appendTest. Lease recovery is in progress. Try again later.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2310)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2153)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2386)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2347)

再尝试时,如果时间已经超过soft limit,才能成功获得lease;否则必定继续失败,只有等超过soft limit后,NN才会把lease分给新的客户端。
所以写数据时一般要加上重试机制。
可以自己写个程序验证下,用以下命令可以看到正在写还没有close的文件:

1
hadoop fsck -openforwrite /tmp

写数据机制

其实hadoop权威指南已经讲得比较清楚了,这里结合代码复述下。相关的类主要是DFSClient和DFSOutputStream。这块还有点复杂,我也没完全看懂。

我们一般通过FileSystem.create或FileSystem.append方法获得output stream(其实是DFSOutputStream),然后write(byte[])写数据。
注意客户端写数据是直接和datanode交互的,只有申请新block时才需要和namenode交互。
如果是create,客户端通过RPC协议(ClientProtocol.addBlock方法,这个类名字和mapreduce的RPC重复了。。。)向NN申请一个新block,开始写数据。
如果是append,客户端会先判断目标文件的最后一个block是否写满,如果已满就申请新的block,否则就在最后一个block上追加。
无论如何,客户端会得到一个目标block用于写入。

NN在分配一个block时还会返回对应的pipeline。如果副本数设置为3,那么pipeline就是3个节点。如果是create,NN挑3个节点组成pipeline(这里有规则的,但对我们这种单机房单机架的来说,就是随机)。如果客户端同时也是DN,那么必定有一个副本同时在当前节点上(这个没从代码上求证过)。如果是append,并且在原来的block上追加数据,那返回的pipeline就是原来的3个节点。

我们调用write(byte[])方法时,数据并没有马上写入pipeline。DFSOutputStream会暂时缓存数据。
数据的发送是以packet为单位的,一个packet大小默认64K(dfs.client-write-packet-size,默认65536)。DFSOutputStream内部有两个queue:dataQueue和ackQueue。待写入的数据达到64K时,DFSOutputStream将数据包装成一个packet并放入dataQueue,等待一个守护线程DataStreamer去消费。
DataStreamer从dataQueue中取出packet,发到pipeline,将packet加入ackQueue。pipiline中的所有节点都将数据写入后,DataStreamer会收到ack消息,并将packet从ackQueue中移除。这样才算是数据真正写入完毕。

其实一个packet不全是数据。DFSOutputStream会将数据组合成一个个chunk(dfs.bytes-per-checksum,默认512),每一个chunk加一个校验值。默认的校验(dfs.checksum.type,默认CRC32C)需要占用4个字节(见DataChecksum类),也就是说每个chunk实际占用516个字节。一个packet最多存储65536/516=127个chunk。所以,一个packet的实际大小只有127*516=65532字节,其中只有65024个字节是真正的数据。这个计算逻辑见DFSOutputStream:

1
2
3
4
5
6
7
8
9
10
11
private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize(); // 512+4
chunksPerPacket = Math.max(psize/chunkSize, 1); // 65536/516=127
packetSize = chunkSize*chunksPerPacket; // 127*516=65532
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
", chunkSize=" + chunkSize +
", chunksPerPacket=" + chunksPerPacket +
", packetSize=" + packetSize);
}
}

DataStreamer每次写到pipeline的数据也不一定是64K。上面说过,一个满的packet只有65024个字节,而且write时还会生成一个header信息,先写出header,再写出packet本身。如果是最后一个packet,还可能凑不满127个chunk,就更小了。数据也不一定能被512字节整除。还要考虑到写入的数据不能超过block size(block size必须是chunk的整数倍,否则会报错),也会对packet大小做一些调整。

Packet的结构见DFSOutputStream.Packet类。

写数据时,其实是先写到一个512字节的buffer里,写满了就调用flushBuffer()方法计算checksum,将checksum和数据写入currentPacket。如果currentPacket已经写满了,就放入dataQueue,这里会阻塞,因为缓存的packet有个最大值,默认80个(这个80是写死在程序里的,不知为何)。如果已经写到block的最后,还会发送一个空的packet对象,要求DN将数据持久化(这个机制见下面的分析)。之后通过和NN的RPC协议申请一个新的block继续写。

数据可见性

http://www.cnblogs.com/ZisZ/p/3253354.html(只能参考。原作者的hadoop版本比较老。)

客户端写入hdfs的数据不是立即可见的。

以前一直以为正在写的整个block都不可见,其实不是。只要写入pipeline并且ack的数据,都是可见的。

只有缓存在写客户端的数据,对其他读客户端才是不可见的。根据上面的描述,客户端最多缓存80个packet(dataQueue和ackQueue的size之和,这个80是写死在程序里的),每个packet大概64K,所以总共有大概5M的数据不可见。之所以是“大概”,因为packet中有校验数据,而且有一个currentPacket不在queue里。准确的值是65024*81=5.023M,差不多。
如果客户端意外挂掉,缓存的数据会完全丢失,也就是说最多丢5M的数据。

但是,写到pipeline的数据虽然能看到,但不能保证不丢失。因为DN端也会将数据缓存(这个缓存机制还不太明白,没看过代码),而不是立即写到磁盘。极端情况下,pipeline里的3个节点都挂掉,写入pipeline的数据也会丢。
只有写满一个block时,客户端才会发送一个空的packet,这个packet的header有个特殊的标志位,要求DN将当前block的数据刷到磁盘。
所以极端情况下,可能会丢一个block的数据(这是某些资料的说法,我没看DN的代码求证过。感觉上有点问题,难道整个block都缓存在内存里?只有DN内存里的数据会丢吧,如果blocksize设的很大,岂不是很耗内存。所以感觉不太可能丢整个block,应该也是有一个buffer之类的)。不过3个节点一起挂掉的概率很小吧。

虽然写入pipeline的数据对客户端可见了(去读这个文件的话可以读到)。但如果看hadoop fs -ls看这个文件,会发现这个文件的大小没有变化,可能还是0字节。

因为客户端写入数据时只需要和DN交互,NN只知道这个文件有哪个block在写,但写入的数据量是不知道的。客户端读的时候也是直接读DN上的block,所以可以读到pipeline中的数据。
只有等一个block写完或者客户端主动close,NN那边才能看到大小的变化(只有这时才会与NN交互)。
如果客户端意外挂掉,等超过1个小时(hard limit)文件大小也会变化。
如果客户端意外挂掉,另一个客户端1分钟(soft limit)后重新获取lease并且append,上一个客户端写入pipeline的数据也还在的。

hflush和hsync

hflush要求客户端将所有buffer里的数据写入pipeline。之后数据对所有客户端可见。本质就是阻塞所有写入,将currentPacket加入dataQueue(即使currentPacket还没满),然后等待queue中的所有数据都ack。
hsync在hflush的基础上,会将currentPacket的isSync标识设为true,DN收到这样一个packet后,会将数据刷入磁盘。即使没有数据要flush,也会新建一个空的packet对象,设置isSync并发送。

这个两个方法都是为了防止数据丢失的。hflush防止客户端缓存的数据丢失,hsync防止客户端和pipeline缓存的数据丢失。
即使是hsync,也只是保证数据刷到磁盘,但可能在磁盘的缓存里。所以没有绝对的安全的。
而且这两个方法会影响写入的效率。

感觉上,如果对数据可见性有要求,可以定期hflush;客户端挂掉最多丢5M数据,不能接受这种情况,也要定期hflush;其他情况都没必要hflush。
hsync完全没必要,写到pipeline的数据已经很安全了。

上面说过写入pipeline的数据不会立即让NN端的文件大小改变。其实hsync时可以强制更新文件大小。

1
2
3
4
FileSystem fs = FileSystem.get(conf);
// 这里要强制转换下。HdfsDataOutputStream才有对应的hsync(EnumSet)方法,普通的DFSOutputStream没有
HdfsDataOutputStream out = (HdfsDataOutputStream) fs.create(new Path("/tmp/bigBlock2"));
out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));

不用想就知道,肯定对性能影响非常大。

关于HDFS读

其实和本文关系不大,只是顺便整理下。

简单整理下HDFS读数据的机制。

  • 网络读。很好理解。客户端直接连DataNode,通过网络传输数据。最常见,适用于各种情况。
  • 本地socket读。如果客户端同时是DataNode,并且要读的数据就在本地,可以省掉网络传输的过程。这也是MapReduce计算本地性的基本原理。“带宽是最宝贵的资源”
  • 本地磁盘读。即Short-Circuit Local Reads。当要读的数据在本地时,可以不走socket,直接用系统调用读磁盘上的文件。效率更高,但需要编译对应系统的native lib。
  • 内存读。即缓存机制。hadoop 2.3.0新增了DataNode端的缓存机制,可以将一些block缓存到内存中。是否适用看应用场景吧。好处是效率高,坏处是额外占用内存,而且这些内存是off-heap的,不受GC管理。

关于Short-Circuit Local Reads

关于这个配置还有些问题。

网上有很多文档给出的配置是要dfs.block.local-path-access.user属性的,只有特定的用户才能使用local read。实际上那是老的实现方法(legacy),基于HDFS-2246
这种方法配置麻烦,配置项很多,并且有安全隐患。
在hadoop 2.1.0以后的版本已经有了新的实现,基于HDFS-347

新的实现需要libhadoop.so,要在不同系统上分别编译。只需要如下两个配置即可:

1
2
3
4
5
6
7
8
<property>
<name>dfs.client.read.shortcircuit</name>
<value>true</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/lib/hadoop-hdfs/dn_socket</value>
</property>

hadoop 2.5.2的文档中已经给出了2种实现的配置(2.2.0的文档中只有新的实现)。

org.apache.hadoop.hdfs.BlockReaderLocalLegacy类的注释:

1
2
3
4
* This is the legacy implementation based on HDFS-2246, which requires
* permissions on the datanode to be set so that clients can directly access the
* blocks. The new implementation based on HDFS-347 should be preferred on UNIX
* systems where the required native code has been implemented.<br>

参考:
http://blog.cloudera.com/blog/2013/08/how-improved-short-circuit-local-reads-bring-better-performance-and-security-to-hadoop/
http://www.importnew.com/6151.html