Hadoop读写文件时内部工作机制是怎样的?

读文件

&">nbsp;  读文件时内部工作机制参看下图:

  客户端通过调用FileSystem对象(对应于HDFS文件系统,调用DistributedFileSystem对象)的open()方法来打开文件(也即图中的第一步),DistributedFileSystem通过RPC(Remote Procedure Call)调用询问NameNode来得到此文件最开始几个block的文件位置(第二步)。对每一个block来说,namenode返回拥有此block备份的所有namenode的地址信息(按集群的拓扑网络中与客户端距离的远近排序,关于在Hadoop集群中如何进行网络拓扑请看下面介绍)。如果客户端本身就是一个datanode(如客户端是一个mapreduce任务)并且此datanode本身就有所需文件block的话,客户端便从本地读取文件。
  以上步骤完成后,DistributedFileSystem会返回一个FSDataInputStream(支持文件seek),客户端可以从FSDataInputStream中读取数据。FSDataInputStream包装了一个DFSInputSteam类,用来处理namenode和datanode的I/O操作。
  客户端然后执行read()方法(第三步),DFSInputStream(已经存储了欲读取文件的开始几个block的位置信息)连接到第一个datanode(也即最近的datanode)来获取数据。通过重复调用read()方法(第四、第五步),文件内的数据就被流式的送到了客户端。当读到该block的末尾时,DFSInputStream就会关闭指向该block的流,转而找到下一个block的位置信息然后重复调用read()方法继续对该block的流式读取。这些过程对于用户来说都是透明的,在用户看来这就是不间断的流式读取整个文件。
  当真个文件读取完毕时,客户端调用FSDataInputSteam中的close()方法关闭文件输入流(第六步)。
  如果在读某个block是DFSInputStream检测到错误,DFSInputSteam就会连接下一个datanode以获取此block的其他备份,同时他会记录下以前检测到的坏掉的datanode以免以后再无用的重复读取该datanode。DFSInputSteam也会检查从datanode读取来的数据的校验和,如果发现有数据损坏,它会把坏掉的block报告给namenode同时重新读取其他datanode上的其他block备份。
  这种设计模式的一个好处是,文件读取是遍布这个集群的datanode的,namenode只是提供文件block的位置信息,这些信息所需的带宽是很少的,这样便有效的避免了单点瓶颈问题从而可以更大的扩充集群的规模。

Hadoop中的网络拓扑

在Hadoop集群中如何衡量两个节点的远近呢?要知道,在高速处理数据时,数据处理速率的唯一限制因素就是数据在不同节点间的传输速度:这是由带宽的可怕匮乏引起的。所以我们把带宽作为衡量两个节点距离大小的标准。
但是计算两个节点之间的带宽是比较复杂的,而且它需要在一个静态的集群下才能衡量,但Hadoop集群一般是随着数据处理的规模动态变化的(且两两节点直接相连的连接数是节点数的平方)。于是Hadoop使用了一个简单的方法来衡量距离,它把集群内的网络表示成一个树结构,两个节点之间的距离就是他们离共同祖先节点的距离之和。树一般按数据中心(datacenter),机架(rack),计算节点(datanode)的结构组织。计算节点上的本地运算速度最快,跨数据中心的计算速度最慢(现在跨数据中心的Hadoop集群用的还很少,一般都是在一个数据中心内做运算的)。
假如有个计算节点n1处在数据中心c1的机架r1上,它可以表示为/c1/r1/n1,下面是不同情况下两个节点的距离:
• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)
• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)
• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)
• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)
如下图所示:

  

写文件

  现在我们来看一下Hadoop中的写文件机制解析,通过写文件机制我们可以更好的了解一下Hadoop中的一致性模型。

  上图为我们展示了一个创建一个新文件并向其中写数据的例子。
  首先客户端通过DistributedFileSystem上的create()方法指明一个欲创建的文件的文件名(第一步),DistributedFileSystem再通过RPC调用向NameNode申请创建一个新文件(第二步,这时该文件还没有分配相应的block)。namenode检查是否有同名文件存在以及用户是否有相应的创建权限,如果检查通过,namenode会为该文件创建一个新的记录,否则的话文件创建失败,客户端得到一个IOException异常。DistributedFileSystem返回一个FSDataOutputStream以供客户端写入数据,与FSDataInputStream类似,FSDataOutputStream封装了一个DFSOutputStream用于处理namenode与datanode之间的通信。
  当客户端开始写数据时(第三步),DFSOutputStream把写入的数据分成包(packet), 放入一个中间队列——数据队列(data queue)中去。DataStreamer从数据队列中取数据,同时向namenode申请一个新的block来存放它已经取得的数据。namenode选择一系列合适的datanode(个数由文件的replica数决定)构成一个管道线(pipeline),这里我们假设replica为3,所以管道线中就有三个datanode。DataSteamer把数据流式的写入到管道线中的第一个datanode中(第四步),第一个datanode再把接收到的数据转到第二个datanode中(第四步),以此类推。
  DFSOutputStream同时也维护着另一个中间队列——确认队列(ack queue),确认队列中的包只有在得到管道线中所有的datanode的确认以后才会被移出确认队列(第五步)。
  如果某个datanode在写数据的时候当掉了,下面这些对用户透明的步骤会被执行:
  1)管道线关闭,所有确认队列上的数据会被挪到数据队列的首部重新发送,这样可以确保管道线中当掉的datanode下流的datanode不会因为当掉的datanode而丢失数据包。
  2)在还在正常运行的datanode上的当前block上做一个标志,这样当当掉的datanode重新启动以后namenode就会知道该datanode上哪个block是刚才当机时残留下的局部损坏block,从而可以把它删掉。
  3)已经当掉的datanode从管道线中被移除,未写完的block的其他数据继续被写入到其他两个还在正常运行的datanode中去,namenode知道这个block还处在under-replicated状态(也即备份数不足的状态)下,然后他会安排一个新的replica从而达到要求的备份数,后续的block写入方法同前面正常时候一样。
  有可能管道线中的多个datanode当掉(虽然不太经常发生),但只要dfs.replication.min(默认为1)个replica被创建,我们就认为该创建成功了。剩余的replica会在以后异步创建以达到指定的replica数。
  当客户端完成写数据后,它会调用close()方法(第六步)。这个操作会冲洗(flush)所有剩下的package到pipeline中,等待这些package确认成功,然后通知namenode写入文件成功(第七步)。这时候namenode就知道该文件由哪些block组成(因为DataStreamer向namenode请求分配新block,namenode当然会知道它分配过哪些blcok给给定文件),它会等待最少的replica数被创建,然后成功返回。

replica是如何分布的

Hadoop在创建新文件时是如何选择block的位置的呢,综合来说,要考虑以下因素:带宽(包括写带宽和读带宽)和数据安全性。如果我们把三个备份全部放在一个datanode上,虽然可以避免了写带宽的消耗,但几乎没有提供数据冗余带来的安全性,因为如果这个datanode当机,那么这个文件的所有数据就全部丢失了。另一个极端情况是,如果把三个冗余备份全部放在不同的机架,甚至数据中心里面,虽然这样数据会安全,但写数据会消耗很多的带宽。Hadoop 0.17.0给我们提供了一个默认replica分配策略(Hadoop 1.X以后允许replica策略是可插拔的,也就是你可以自己制定自己需要的replica分配策略)。replica的默认分配策略是把第一个备份放在与客户端相同的datanode上(如果客户端在集群外运行,就随机选取一个datanode来存放第一个replica),第二个replica放在与第一个replica不同机架的一个随机datanode上,第三个replica放在与第二个replica相同机架的随机datanode上。如果replica数大于三,则随后的replica在集群中随机存放,Hadoop会尽量避免过多的replica存放在同一个机架上。选取replica的放置位置后,管道线的网络拓扑结构如下所示:

总体来说,上述默认的replica分配策略给了我们很好的可用性(blocks放置在两个rack上,较为安全),写带宽优化(写数据只需要跨越一个rack),读带宽优化(你可以从两个机架中选择较近的一个读取)。

一致性模型

  HDFS某些地方为了性能可能会不符合POSIX(是的,你没有看错,POSIX不仅仅只适用于linux/unix,Hadoop 使用了POSIX的设计来实现对文件系统文件流的读取),所以它看起来可能与你所期望的不同,要注意。
  创建了一个文件以后,它是可以在命名空间(namespace)中可以看到的:
    Path p = new Path("p");
    fs.create(p);
    assertThat(fs.exists(p), is(true));
  但是任何向此文件中写入的数据并不能保证是可见的,即使你flush了已经写入的数据,此文件的长度可能仍然为零:
    Path p = new Path("p");
    OutputStream out = fs.create(p);
    out.write("content".getBytes("UTF-8"));
    out.flush();
    assertThat(fs.getFileStatus(p).getLen(), is(0L));
  这是因为,在Hadoop中,只有满一个block数据量的数据被写入文件后,此文件中的内容才是可见的(即这些数据会被写入到硬盘中去),所以当前正在写的block中的内容总是不可见的。
  Hadoop提供了一种强制使buffer中的内容冲洗到datanode的方法,那就是FSDataOutputStream的sync()方法。调用了sync()方法后,Hadoop保证所有已经被写入的数据都被冲洗到了管道线中的datanode中,并且对所有读者都可见了:
    Path p = new Path("p");
    FSDataOutputStream out = fs.create(p);
    out.write("content".getBytes("UTF-8"));
    out.flush();
    out.sync();
    assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
  这个方法就像POSIX中的fsync系统调用(它冲洗给定文件描述符中的所有缓冲数据到磁盘中)。例如,使用java API写一个本地文件,我们可以保证在调用flush()和同步化后可以看到已写入的内容:
    FileOutputStream out = new FileOutputStream(localFile);
    out.write("content".getBytes("UTF-8"));
    out.flush(); // flush to operating system
    out.getFD().sync(); // sync to disk(getFD()返回与该流所对应的文件描述符)
    assertThat(localFile.length(), is(((long) "content".length())));
  在HDFS中关闭一个流隐式的调用了sync()方法:
    Path p = new Path("p");
    OutputStream out = fs.create(p);
    out.write("content".getBytes("UTF-8"));
    out.close();
    assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

  由于Hadoop中的一致性模型限制,如果我们不调用sync()方法的话,我们很可能会丢失多大一个block的数据。这是难以接受的,所以我们应该使用sync()方法来确保数据已经写入磁盘。但频繁调用sync()方法也是不好的,因为会造成很多额外开销。我们可以再写入一定量数据后调用sync()方法一次,至于这个具体的数据量大小就要根据你的应用程序而定了,在不影响你的应用程序的性能的情况下,这个数据量应越大越好。

转载请注明出处:http://www.cnblogs.com/beanmoon/archive/2012/12/17/2821548.html

猜您喜欢:

  1.使用Linux和Hadoop进行分布式计算

  2.Hadoop架构设计、运行原理详解

  3.Hadoop 2.3.0解决了哪些问题

时间: 2024-09-26 11:31:46

Hadoop读写文件时内部工作机制是怎样的?的相关文章

hadoop拷贝文件时 org.apache.hadoop.ipc.RemoteException异常的解决

1.系统或hdfs是否有空间 2.datanode数是否正常 3.是否在safemode 4.防火墙关闭 5.配置方面 6.把NameNode的tmp文件清空,然后重新格式化NameNode

Hadoop的namenode的管理机制,工作机制和datanode的工作原理

HDFS前言: 1) 设计思想 分而治之:将大文件.大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析: 2)在大数据系统中作用: 为各类分布式运算框架(如:mapreduce,spark,tez,--)提供数据存储服务 3)重点概念:文件切块,副本存放,元数据 1:分布式文件系统(Distributed File System): (1):数据量越来越多,在一个操作系统管辖的范围存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,因此迫切需

深刻理解HDFS工作机制

深入理解一个技术的工作机制是灵活运用和快速解决问题的根本方法,也是唯一途径.对于HDFS来说除了要明白它的应用场景和用法以及通用分布式架构之外更重要的是理解关键步骤的原理和实现细节.本篇博文首先对HDFS的重要特性和使用场景做一个简要说明,之后对HDFS的数据读写.元数据管理以及NameNode.SecondaryNamenode的工作机制进行深入分析.过程中也会对一些配置参数做一个说明. 一.HDFS的重要特性 First. HDFS是一个文件系统,用于存储和管理文件,通过统一的命名空间(类似

HDFS的工作机制,HDFS写数据流程,HDFS读数据流程(来自学习资料)

4.hdfs的工作机制 (工作机制的学习主要是为加深对分布式系统的理解,以及增强遇到各种问题时的分析解决能力,形成一定的集群运维能力)   注:很多不是真正理解hadoop技术体系的人会常常觉得HDFS可用于网盘类应用,但实际并非如此.要想将技术准确用在恰当的地方,必须对技术有深刻的理解 4.1概述 1.        HDFS集群分为两大角色:NameNode.DataNode  (Secondary Namenode) 2.        NameNode负责管理整个文件系统的元数据 3. 

C语言 以字符形式读写文件详解及示例代码_C 语言

在C语言中,读写文件比较灵活,既可以每次读写一个字符,也可以读写一个字符串,甚至是任意字节的数据(数据块).本节介绍以字符形式读写文件. 以字符形式读写文件时,每次可以从文件中读取一个字符,或者向文件中写入一个字符.主要使用两个函数:fgetc()和fputc(). 字符读取函数 fgetc fgetc 是 file get char 的缩写,意思是从指定的文件中读取一个字符.它的原型为: int fgetc (FILE *fp); fp 为文件指针.fgetc() 读取成功时返回读取到的字符,

Java读写文件方法总结(推荐)_java

Java的读写文件方法在工作中相信有很多的用处的,本人在之前包括现在都在使用Java的读写文件方法来处理数据方面的输入输出,确实很方便.奈何我的记性实在是叫人着急,很多时候既然都会想不起来怎么写了,不过我的Java代码量也实在是少的可怜,所以应该多多练习.这里做一个总结,集中在一起方面今后查看. Java读文件 package 天才白痴梦; import java.io.BufferedReader; import java.io.File; import java.io.FileInputSt

HBase读写路径的工作机制

HBase 写路径工作机制 在HBase 中无论是增加新行还是修改已有的行,其内部流程都是相同的.HBase 接到命令后存下变化信息,或者写入失败抛出异常.默认情况下,执行写入时会写到两个地方:预写式日志(write-ahead log,也称HLog)和MemStore.HBase 的默认方式是把写入动作记录在这两个地方,以保证数据持久化.只有当这两个地方的变化信息都写入并确认后,才认为写动作完成. MemStore 是内存里的写入缓冲区,HBase 中数据在永久写入硬盘之前在这里累积.当Mem

Hadoop工作机制

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:      1.JobClient 写代码,配置作业,提交作业.      2.JobTracker:初始化作业,分配作业,协调作业运行.这是一个java程序,主类是JobTracker.      3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务.      4.HDFS:保存作业数据.配置信息等,保存作业结果. Map/Re

[C#]解决读写包含汉字的txt文件时乱码的问题

汉字|解决|问题 作者:袁晓辉(版权所有)时间:2005-8-8   当我们用System.IO.StreamReader读取包含汉字的txt文件时,经常会读出乱码(StreamWriater写文本文件也有类似的问题),原因很简单,就是文件的编码(encoding)和StreamReader/Writer的encoding不对应.    为了解决这个问题,我写了一个类,来取得一个文本文件的encoding,这样我们就可以创建对应的StreamReader和StreamWriter来读写,保证不会