HDFS读文件过程分析:读取文件的Block数据

我们可以从java.io.InputStream类中看到,抽象出一个read方法,用来读取已经打开的InputStream实例中的字节,每次调用read方法,会读取一个字节数据,该方法抽象定义,如下所示:
public abstract int read() throws IOException;
Hadoop的DFSClient.DFSInputStream类实现了该抽象逻辑,如果我们清楚了如何从HDFS中读取一个文件的一个block的一个字节的原理,更加抽象的顶层只需要迭代即可获取到该文件的全部数据。
HDFS读文件过程分析:获取文件对应的Block列表(http://shiyanjun.cn/archives/925.html)中,我们已经获取到一个文件对应的Block列表信息,打开一个文件,接下来就要读取实际的物理块数据,我们从下面的几个方面来详细说明读取数据的过程。

Client从Datanode读取文件的一个字节

下面,我们通过分析DFSClient.DFSInputStream中实现的代码,读取HDFS上文件的内容。首先从下面的方法开始:

1 @Override
2 public synchronized int read() throws IOException {
3 int ret = read( oneByteBuf, 0, 1 );
4 return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
5 }

上面调用read(oneByteBuf, 0, 1)读取一个字节到单字节缓冲区oneByteBuf中,具体实现见如下方法:

01 @Override
02 public synchronized int read(byte buf[], int off, int len) throws IOException {
03 checkOpen(); // 检查Client是否正在运行
04 if (closed) {
05 throw new IOException("Stream closed");
06 }
07 failures = 0;
08
09 if (pos < getFileLength()) { // getFileLength()获取文件所包含的总字节数,pos表示读取当前文件的第(pos+1)个字节
10 int retries = 2;
11 while (retries > 0) {
12 try {
13 if (pos > blockEnd) { // blockEnd表示文件的长度(字节数)
14 currentNode = blockSeekTo(pos); // 找到第pos个字节数据所在的Datanode(实际根据该字节数据所在的block元数据来定位)
15 }
16 int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
17 int result = readBuffer(buf, off, realLen); // 读取一个字节到缓冲区中
18
19 if (result >= 0) {
20 pos += result; // 每成功读取result个字节,pos增加result
21 } else {
22 // got a EOS from reader though we expect more data on it.
23 throw new IOException("Unexpected EOS from the reader");
24 }
25 if (stats != null && result != -1) {
26 stats.incrementBytesRead(result);
27 }
28 return result;
29 } catch (ChecksumException ce) {
30 throw ce;
31 } catch (IOException e) {
32 if (retries == 1) {
33 LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
34 }
35 blockEnd = -1;
36 if (currentNode != null) { addToDeadNodes(currentNode); }
37 if (--retries == 0) {
38 throw e;
39 }
40 }
41 }
42 }
43 return -1;
44 }

读取文件数据的一个字节,具体过程如下:

  1. 检查流对象是否处于打开状态(前面已经获取到文件对应的block列表的元数据,并打开一个InputStream对象)
  2. 从文件的第一个block开始读取,首先需要找到第一个block对应的数据块所在的Datanode,可以从缓存的block列表中查询到(如果查找不到,则会与Namenode进行一次RPC通信请求获取到)
  3. 打开一个到该读取的block所在Datanode节点的流,准备读取block数据
  4. 建立了到Datanode的连接后,读取一个字节数据到字节缓冲区中,返回读取的字节数(1个字节)

在读取的过程中,以字节为单位,通过判断某个偏移位置的字节属于哪个block(根据block元数据所限定的字节偏移范围),在根据这个block去定位某一个Datanode节点,这样就可连续地读取一个文件的全部数据(组成文件的、连续的多个block数据块)。

查找待读取的一个字节所在的Datanode节点

上面public synchronized int read(byte buf[], int off, int len) throws IOException方法,调用了blockSeekTo方法来获取,文件某个字节索引位置的数据所在的Datanode节点。其实,很容易就能想到,想要获取到数据所在的Datanode节点,一定是从block元数据中计算得到,然后根据Client缓存的block映射列表,找到block对应的Datanode列表,我们看一下blockSeekTo方法的代码实现:

01 private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
02 ... ...
03
04 DatanodeInfo chosenNode = null;
05 int refetchToken = 1; // only need to get a new access token once
06 while (true) {
07 LocatedBlock targetBlock = getBlockAt(target, true); // 获取字节偏移位置为target的字节数据所在的block元数据对象
08 assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
09 long offsetIntoBlock = target - targetBlock.getStartOffset();
10
11 DNAddrPair retval = chooseDataNode(targetBlock); // 选择一个Datanode去读取数据
12 chosenNode = retval.info;
13 InetSocketAddress targetAddr = retval.addr;
14
15 // 先尝试从本地读取数据,如果数据不在本地,则正常去读取远程的Datanode节点
16 Block blk = targetBlock.getBlock();
17 Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
18 if (shouldTryShortCircuitRead(targetAddr)) {
19 try {
20 blockReader = getLocalBlockReader(conf, src, blk, accessToken,
21 chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock); // 创建一个用来读取本地数据的BlockReader对象
22 return chosenNode;
23 } catch (AccessControlException ex) {
24 LOG.warn("Short circuit access failed ", ex);
25 //Disable short circuit reads
26 shortCircuitLocalReads = false;
27 } catch (IOException ex) {
28 if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
29 /* Get a new access token and retry. */
30 refetchToken--;
31 fetchBlockAt(target);
32 continue;
33 } else {
34 LOG.info("Failed to read " + targetBlock.getBlock()
35 + " on local machine" + StringUtils.stringifyException(ex));
36 LOG.info("Try reading via the datanode on " + targetAddr);
37 }
38 }
39 }
40
41 // 本地读取失败,按照更一般的方式去读取远程的Datanode节点来获取数据
42 try {
43 s = socketFactory.createSocket();
44 LOG.debug("Connecting to " + targetAddr);
45 NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
46 s.setSoTimeout(socketTimeout);
47 blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),
48 accessToken,
49 blk.getGenerationStamp(),
50 offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
51 buffersize, verifyChecksum, clientName); // 创建一个远程的BlockReader对象
52 return chosenNode;
53 } catch (IOException ex) {
54 if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
55 refetchToken--;
56 fetchBlockAt(target);
57 } else {
58 LOG.warn("Failed to connect to " + targetAddr
59 + ", add to deadNodes and continue" + ex);
60 if (LOG.isDebugEnabled()) {
61 LOG.debug("Connection failure", ex);
62 }
63 // Put chosen node into dead list, continue
64 addToDeadNodes(chosenNode); // 读取失败,会将选择的Datanode加入到Client的dead node列表,为下次读取选择合适的Datanode读取文件数据提供参考元数据信息
65 }
66 if (s != null) {
67 try {
68 s.close();
69 } catch (IOException iex) { }
70 }
71 s = null;
72 }
73 }
74 }

上面代码中,主要包括如下几个要点:

  • 选择合适的Datanode节点,提高读取效率

在读取文件的时候,首先会从Namenode获取文件对应的block列表元数据,返回的block列表是按照Datanode的网络拓扑结构进行排序过的(本地节点优先,其次是同一机架节点),而且,Client还维护了一个dead node列表,只要此时bock对应的Datanode列表中节点不出现在dead node列表中就会被返回,用来作为读取数据的Datanode节点。

  • 如果Client为集群Datanode节点,尝试从本地读取block

通过调用chooseDataNode方法返回一个Datanode结点,通过判断,如果该节点地址是本地地址,并且该节点上对应的block元数据信息的状态不是正在创建的状态,则满足从本地读取数据块的条件,然后会创建一个LocalBlockReader对象,直接从本地读取。在创建LocalBlockReader对象的过程中,会先从缓存中查找一个本地Datanode相关的LocalDatanodeInfo对象,该对象定义了与从本地Datanode读取数据的重要信息,以及缓存了待读取block对应的本地路径信息,可以从LocalDatanodeInfo类定义的属性来说明:

1 private ClientDatanodeProtocol proxy = null;
2 private final Map<Block, BlockLocalPathInfo> cache;

如果缓存中存在待读取的block的相关信息,可以直接进行读取;否则,会创建一个proxy对象,以及计算待读取block的路径信息BlockLocalPathInfo,最后再加入到缓存,为后续可能的读取加速。我们看一下如果没有从缓存中找到LocalDatanodeInfo信息(尤其是BlockLocalPathInfo),则会执行如下逻辑:

1 // make RPC to local datanode to find local pathnames of blocks
2 pathinfo = proxy.getBlockLocalPathInfo(blk, token);

上面proxy为ClientDatanodeProtocol类型,Client与Datanode进行RPC通信的协议,RPC调用getBlockLocalPathInfo获取block对应的本地路径信息,可以在Datanode类中查看具体实现,如下所示:

1 BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);

Datanode调用FSDataset(实现接口FSDatasetInterface)的getBlockLocalPathInfo,如下所示:

1 @Override //FSDatasetInterface
2 public BlockLocalPathInfo getBlockLocalPathInfo(Block block)
3 throws IOException {
4 File datafile = getBlockFile(block); // 获取本地block在本地Datanode文件系统中的文件路径
5 File metafile = getMetaFile(datafile, block); // 获取本地block在本地Datanode文件系统中的元数据的文件路径
6 BlockLocalPathInfo info = new BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath());
7 return info;
8 }

接着可以直接去读取该block文件(如果需要检查校验和文件,会读取block的元数据文件metafile):

01 ... // BlockReaderLocal类的newBlockReader静态方法
02 // get a local file system
03 File blkfile = new File(pathinfo.getBlockPath());
04 dataIn = new FileInputStream(blkfile);
05
06 if (!skipChecksum) { // 如果检查block的校验和
07 // get the metadata file
08 File metafile = new File(pathinfo.getMetaPath());
09 checksumIn = new FileInputStream(metafile);
10
11 // read and handle the common header here. For now just a version
12 BlockMetadataHeader header = BlockMetadataHeader.readHeader(newDataInputStream(checksumIn));
13 short version = header.getVersion();
14 if (version != FSDataset.METADATA_VERSION) {
15 LOG.warn("Wrong version (" + version + ") for metadata file for " + blk + " ignoring ...");
16 }
17 DataChecksum checksum = header.getChecksum();
18 localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, checksumIn);
19 } else {
20 localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, dataIn);
21 }

在上面代码中,返回了BlockLocalPathInfo,但是很可能在这个过程中block被删除了,在删除block的时候,Namenode会调度指派该Datanode删除该block,恰好在这个时间间隔内block对应的BlockLocalPathInfo信息已经失效(文件已经被删除),所以上面这段代码再try中会抛出异常,并在catch中捕获到IO异常,会从缓存中再清除掉失效的block到BlockLocalPathInfo的映射信息。

  • 如果Client非集群Datanode节点,远程读取block

如果Client不是Datanode本地节点,则只能跨网络节点远程读取,首先创建Socket连接:

1 s = socketFactory.createSocket();
2 LOG.debug("Connecting to " + targetAddr);
3 NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
4 s.setSoTimeout(socketTimeout);

建立Client到目标Datanode(targetAddr)的连接,然后同样也是创建一个远程BlockReader对象RemoteBlockReader来辅助读取block数据。创建RemoteBlockReader过程中,首先向目标Datanode发送RPC请求:

01 // in and out will be closed when sock is closed (by the caller)
02 DataOutputStream out = new DataOutputStream(newBufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
03
04 //write the header.
05 out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client与Datanode之间传输数据的版本号
06 out.write( DataTransferProtocol.OP_READ_BLOCK ); // 传输操作类型:读取block
07 out.writeLong( blockId ); // block ID
08 out.writeLong( genStamp ); // 时间戳信息
09 out.writeLong( startOffset ); // block起始偏移量
10 out.writeLong( len ); // block长度
11 Text.writeString(out, clientName); // 客户端标识
12 accessToken.write(out);
13 out.flush();

然后获取到DataInputStream对象来读取Datanode的响应信息:

1 DataInputStream in = new DataInputStream(
2 new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize));

最后,返回一个对象RemoteBlockReader:

1 return new RemoteBlockReader(file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock);

借助BlockReader来读取block字节

我们再回到blockSeekTo方法中,待读取block所在的Datanode信息、BlockReader信息都已经具备,接着就可以从包含输入流(InputStream)对象的BlockReader中读取数据块中一个字节数据:

1 int result = readBuffer(buf, off, realLen);

将block数据中一个字节读取到buf中,如下所示:

01 private synchronized int readBuffer(byte buf[], int off, int len) throws IOException {
02 IOException ioe;
03 boolean retryCurrentNode = true;
04
05 while (true) {
06 // retry as many times as seekToNewSource allows.
07 try {
08 return blockReader.read(buf, off, len); // 调用blockReader的read方法读取字节数据到buf中
09 } catch ( ChecksumException ce ) {
10 LOG.warn("Found Checksum error for " + currentBlock + " from " + currentNode.getName() + " at " + ce.getPos());
11 reportChecksumFailure(src, currentBlock, currentNode);
12 ioe = ce;
13 retryCurrentNode = false; // 只尝试读取当前选择的Datanode一次,失败的话就会被加入到Client的dead node列表中
14 } catch ( IOException e ) {
15 if (!retryCurrentNode) {
16 LOG.warn("Exception while reading from " + currentBlock + " of " + src + " from " + currentNode + ": " + StringUtils.stringifyException(e));
17 }
18 ioe = e;
19 }
20 boolean sourceFound = false;
21 if (retryCurrentNode) {
22 /* possibly retry the same node so that transient errors don't
23 * result in application level failures (e.g. Datanode could have
24 * closed the connection because the client is idle for too long).
25 */
26 sourceFound = seekToBlockSource(pos);
27 } else {
28 addToDeadNodes(currentNode); // 加入到Client的dead node列表中
29 sourceFound = seekToNewSource(pos); // 从当前选择的Datanode上读取数据失败,会再次选择一个Datanode,这里seekToNewSource方法内部调用了blockSeekTo方法去选择一个Datanode
30 }
31 if (!sourceFound) {
32 throw ioe;
33 }
34 retryCurrentNode = false;
35 }
36 }

通过BlockReaderLocal或者RemoteBlockReader来读取block数据,逻辑非常类似,主要是控制读取字节的偏移量,记录偏移量的状态信息,详细可以查看它们的源码。

DataNode节点处理读文件Block请求

我们可以在DataNode端看一下,如何处理一个读取Block的请求。如果Client与DataNode不是同一个节点,则为远程读取文件Block,首先Client需要发送一个请求头信息,代码如下所示:

01 //write the header.
02 out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client与Datanode之间传输数据的版本号
03 out.write( DataTransferProtocol.OP_READ_BLOCK ); // 传输操作类型:读取block
04 out.writeLong( blockId ); // block ID
05 out.writeLong( genStamp ); // 时间戳信息
06 out.writeLong( startOffset ); // block起始偏移量
07 out.writeLong( len ); // block长度
08 Text.writeString(out, clientName); // 客户端标识
09 accessToken.write(out);
10 out.flush();

DataNode节点端通过验证数据传输版本号(DataTransferProtocol.DATA_TRANSFER_VERSION)一致以后,会判断传输操作类型,如果是读操作DataTransferProtocol.OP_READ_BLOCK,则会通过Client建立的Socket来创建一个OutputStream对象,然后通过BlockSender向Client发送Block数据,代码如下所示:

查看源代码打印帮助

1 try {
2 blockSender = new BlockSender(block, startOffset, length, true, true, false, datanode, clientTraceFmt); // 创建BlockSender对象
3 } catch(IOException e) {
4 out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
5 throw e;
6 }
7
8 out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 回复一个响应Header信息:成功状态
9 long read = blockSender.sendBlock(out, baseStream, null); // 发送请求的Block数据
时间: 2024-09-17 12:31:51

HDFS读文件过程分析:读取文件的Block数据的相关文章

android从资源文件中读取文件流并显示的方法_Android

本文实例讲述了android从资源文件中读取文件流并显示的方法.分享给大家供大家参考.具体如下: 在android中,假如有的文本文件,比如TXT放在raw下,要直接读取出来,放到屏幕中显示,可以这样: private void doRaw(){ InputStream is = this.getResources().openRawResource(R.raw.ziliao); try{ doRead(is); }catch(IOException e){ e.printStackTrace(

flex+java选择本地文件然后读取文件内容并导入到数据库中,怎么实现啊,求高手指点,,

问题描述 flex+java选择本地文件然后读取文件内容并导入到数据库中,怎么实现啊,求高手指点,,需要实现的功能是:选择本地文件txt或xls的,然后把文件内容导入数据库中,, 解决方案 解决方案二:问错版块了解决方案三:引用1楼p2227的回复: 问错版块了 !!!!!!那应该算哪个版块的,java+flex解决方案四:flex有个块的http://forum.csdn.net/SList/Flex/java一个大版块了,你展开再自己决定啰

写文件-Java读取文件然后再修改回去

问题描述 Java读取文件然后再修改回去 有一个文件存着很多对象,现在读取其中的一个对象,然后 将其修改,最终再将这个对象再保存回原文件.这个怎么实现.C语言由于可以控制读文件指针,所以可以定位指正到指定的位置,可是Java怎么修改文件指针? 解决方案 可以试试RandomAccessFile类. 解决方案二: 在Java下使用DOM来读取/修改Xml文件java 修改 读取properties文件 解决方案三: 可以试试RandomAccessFile类. 解决方案四: 可以试试RandomA

c++ 读取文件-c++读取文件时怎么读取行数和列数建立动态数组

问题描述 c++读取文件时怎么读取行数和列数建立动态数组 现有一个txt文件,里面全是数字,c++读取时怎么自动判断其行数和列数,并根据行数列数建立动态数组小白求解 解决方案 行数 是通过换行字符来进行判断列数 计算两个换行符之间的字符位置来得到 通过对文件内容的按字符遍历来实现统计的功能 解决方案二: 通过换行符确定行数,通过最长一行中的字符数确定列数 解决方案三: #include <sstream>#include <string>std::string line;std::

关于数据的归档存入文件和读取文件

需求:我们都知道NSArry中如果存放的是普通的字符串类型,是很容易存入到plist文件中,也很容易从文件中读取出来,那如果NSArray中存放的是自定义的Person对象呢?该如何存入文件中去呢? 下面我来简单写一个NSArray中继承了NSCoding协议的自定义Person对象,将这个array数组存入到plist文件中,并且能够实现读取文件中的数据的Demo 实现步骤: 1.创建一个SingleView的项目命名为test,然后创建一个自定义的Person对象,具有NSString *n

HDFS读文件过程分析:获取文件对应的Block列表

在使用Java读取一个文件系统中的一个文件时,我们会首先构造一个DataInputStream对象,然后就能够从文件中读取数据.对于存储在HDFS上的文件,也对应着类似的工具类,但是底层的实现逻辑却是非常不同的.我们先从使用DFSClient.DFSDataInputStream类来读取HDFS上一个文件的一段代码来看,如下所示: 01 package org.shirdrn.hadoop.hdfs; 02 03 import java.io.BufferedReader; 04 import

为什么有时候读取文件,atime不更新

在linux中,使用stat foo.txt 命令可以看到文件foo.txt的三个时间: atime:access time,访问时间 mtime:modify time,修改时间,文件内容有修改 ctime:change time,create time,改变时间,文件的索引节点发生变化,具体的情况有:1.文件内容有修改:2.文件权限有修改:3.inode变了:4.重命名(重命名不会导致inode改变) PS: 1.如果用vi去修改某个文件,可能会发现这三个时间都被更新了,因为vi使用了临时文

从Java的jar文件中读取数据的方法

  这篇文章主要介绍了从Java的jar文件中读取数据的方法,实例分析了java档案文件的相关操作技巧,需要的朋友可以参考下 本文实例讲述了从Java的jar文件中读取数据的方法.分享给大家供大家参考.具体如下: Java 档案 (Java Archive, JAR) 文件是基于 Java 技术的打包方案.它们允许开发人员把所有相关的内容 (.class.图片.声音和支持文件等) 打包到一个单一的文件中.JAR 文件格式支持压缩.身份验证和版本,以及许多其它特性. 从 JAR 文件中得到它所包含

解决-从linux服务器中读取文件数据

问题描述 从linux服务器中读取文件数据 现有一个需求: 在一台服务器上写日志文件,每当日志文件写到一定大小时,比如是1G,会将这个日志文件改名成另一个名字,并新建一个与原文件名相同的日志文件,再往这个新建的日志文件里写数据:要求写一个程序能实时地读取日志文件中的内容,并且不能写日志操作.重命名操作.不能修改日志文件的任何数据,保持日志文件的完整性. 首先,这个问题在windows下几乎无解,因为一个程序打开了一个文件,再要对文件重命名是不可能的:而在Linux下,可以得到完美解决.因为Lin

Java 读取文件方法大全_java

1.按字节读取文件内容 public class ReadFromFile { public static void readFileByBytes(String fileName) { File file = new File(fileName); InputStream in = null; try { System.out.println("以字节为单位读取文件内容,一次读一个字节:"); // 一次读一个字节 in = new FileInputStream(file); in