Hadoop 2.3.0解决了哪些问题

  Hadoop 2.3.0已经发布了,其中最大的亮点就是集中式的缓存管理(HDFS centralized cache management)。这个功能对于提升Hadoop系统和上层应用的执行效率与实时性有很大帮助,本文从原理、架构和代码剖析三个角度来探讨这一功能。

  主要解决了哪些问题

  用户可以根据自己的逻辑指定一些经常被使用的数据或者高优先级任务对应的数据,让他们常驻内存而不被淘汰到磁盘。例如在Hive或 Impala构建的数据仓库应用中fact表会频繁地与其他表做JOIN,显然应该让fact常驻内存,这样DataNode在内存使用紧张的时候也不会把这些数据淘汰出去,同时也实现了对于 mixed workloads的SLA。

  centralized cache是由NameNode统一管理的,那么HDFS client(例如MapReduce、Impala)就可以根据block被cache的分布情况去调度任务,做到memory-locality。

  HDFS原来单纯靠DataNode的OS buffer cache,这样不但没有把block被cache的分布情况对外暴露给上层应用优化任务调度,也有可能会造成cache浪费。例如一个block的三个 replica分别存储在三个DataNote 上,有可能这个block同时被这三台DataNode的OS buffer cache,那么从HDFS的全局看就有同一个block在cache中存了三份,造成了资源浪费。

  加快HDFS client读速度。过去NameNode处理读请求时只根据拓扑远近决定去哪个DataNode读,现在还要加入speed的因素。当HDFS client和要读取的block被cache在同一台DataNode的时候,可以通过zero-copy read直接从内存读,略过磁盘I/O、checksum校验等环节。

  即使数据被cache的DataNode节点宕机,block移动,集群重启,cache都不会受到影响。因为cache被 NameNode统一管理并被被持久化到FSImage和EditLog,如果cache的某个block的DataNode宕机,NameNode会调度其他存储了这个replica的DataNode,把它cache到内存。

  基本概念

  cache directive: 表示要被cache到内存的文件或者目录。

  cache pool: 用于管理一系列的cache directive,类似于命名空间。同时使用UNIX风格的文件读、写、执行权限管理机制。命令例子:

  hdfs cacheadmin -addDirective -path /user/hive/warehouse/fact.db/city -pool financial -replication 1

  以上代码表示把HDFS上的文件city(其实是Hive上的一个fact表)放到HDFS centralized cache的financial这个cache pool下,而且这个文件只需要被缓存一份。

  系统架构与原理

  用户可以通过hdfs cacheadmin命令行或者HDFS API显式指定把HDFS上的某个文件或者目录放到HDFS centralized cache中。这个centralized cache由分布在每个DataNode节点的off-heap内存组成,同时被NameNode统一管理。每个DataNode节点使用 mmap/mlock把存储在磁盘文件中的HDFS block映射并锁定到off-heap内存中。

  DFSClient读取文件时向NameNode发送getBlockLocations RPC请求。NameNode会返回一个LocatedBlock列表给DFSClient,这个LocatedBlock对象里有这个block的 replica所在的DataNode和cache了这个block的DataNode。可以理解为把被cache到内存中的replica当做三副本外的一个高速的replica。

  注:centralized cache和distributed cache的区别:

  distributed cache将文件分发到各个DataNode结点本地磁盘保存,并且用完后并不会被立即清理的,而是由专门的一个线程根据文件大小限制和文件数目上限周期性进行清理。本质上distributed cache只做到了disk locality,而centralized cache做到了memory locality。

  实现逻辑与代码剖析

  HDFS centralized cache涉及到多个操作,其处理逻辑非常类似。为了简化问题,以addDirective这个操作为例说明。

  1.NameNode处理逻辑

  NameNode内部主要的组件如图所示。FSNamesystem里有个CacheManager是centralized cache在NameNode端的核心组件。我们都知道BlockManager负责管理分布在各个DataNode上的block replica,而CacheManager则是负责管理分布在各个DataNode上的block cache。

  DFSClient给NameNode发送名为addCacheDirective的RPC, 在ClientNamenodeProtocol.proto这个文件中定义相应的接口。

  NameNode接收到这个RPC之后处理,首先把这个需要被缓存的Path包装成CacheDirective加入CacheManager所管理的directivesByPath中。这时对应的File/Directory并没有被cache到内存。

  一旦CacheManager那边添加了新的CacheDirective,触发 CacheReplicationMonitor.rescan()来扫描并把需要通知DataNode做cache的block加入到 CacheReplicationMonitor. cachedBlocks映射中。这个rescan操作在NameNode启动时也会触发,同时在NameNode运行期间以固定的时间间隔触发。

  Rescan()函数主要逻辑如下:

  rescanCacheDirectives()->rescanFile():依次遍历每个等待被cache的directive(存储在 CacheManager. directivesByPath里),把每个等待被cache的directive包含的block都加入到 CacheReplicationMonitor.cachedBlocks集合里面。

  rescanCachedBlockMap():调用 CacheReplicationMonitor.addNewPendingCached()为每个等待被cache的block选择一个合适的 DataNode去cache(一般是选择这个block的三个replica所在的DataNode其中的剩余可用内存最多的一个),加入对应的 DatanodeDescriptor的pendingCached列表。

  2.NameNode与DataNode的RPC逻辑

  DataNode定期向NameNode发送heartbeat RPC用于表明它还活着,同时DataNode还会向NameNode定期发送block report(默认6小时)和cache block(默认10秒)用于同步block和cache的状态。

  NameNode会在每次处理某一DataNode的heartbeat RPC时顺便检查该DataNode的pendingCached列表是否为空,不为空的话发送DatanodeProtocol.DNA_CACHE命令给具体的DataNode去cache对应的block replica。

  3.DataNode处理逻辑

  DataNode内部主要的组件如图所示。DataNode启动的时候只是检查了一下dfs.datanode.max.locked.memory是否超过了OS的限制,并没有把留给Cache使用的内存空间锁定。

  在DataNode节点上每个BlockPool对应有一个BPServiceActor线程向NameNode发送heartbeat、接收 response并处理。如果接收到来自NameNode的RPC里面的命令是DatanodeProtocol.DNA_CACHE,那么调用 FsDatasetImpl.cacheBlock()把对应的block cache到内存。

  这个函数先是通过RPC传过来的blockId找到其对应的FsVolumeImpl (因为执行cache block操作的线程cacheExecutor是绑定在对应的FsVolumeImpl里的);然后调用 FsDatasetCache.cacheBlock()把这个block封装成MappableBlock加入到mappableBlockMap里统一管理起来,然后向对应的FsVolumeImpl.cacheExecutor线程池提交一个CachingTask异步任务(cache的过程是异步执行的)。

  FsDatasetCache有个成员mappableBlockMap(HashMap)管理着这台DataNode的所有的 MappableBlock及其状态(caching/cached/uncaching)。目前DataNode中”哪些block被cache到内存里了”也是只保存了soft state(和NameNode的block map一样),是DataNode向NameNode 发送heartbeat之后从NameNode那问回来的,没有持久化到DataNode本地硬盘。

  CachingTask的逻辑: 调用MappableBlock.load()方法把对应的block从DataNode本地磁盘通过mmap映射到内存中,然后通过mlock锁定这块内存空间,并对这个映射到内存的block做checksum检验其完整性。这样对于memory-locality的DFSClient就可以通过 zero-copy直接读内存中的block而不需要校验了。

  4.DFSClient读逻辑:

  HDFS的读主要有三种: 网络I/O读 -> short circuit read -> zero-copy read。网络I/O读就是传统的HDFS读,通过DFSClient和Block所在的DataNode建立网络连接传输数据。

  当DFSClient和它要读取的block在同一台DataNode时,DFSClient可以跨过网络I/O直接从本地磁盘读取数据,这种读取数据的方式叫short circuit read。目前HDFS实现的short circuit read是通过共享内存获取要读的block在DataNode磁盘上文件的file descriptor(因为这样比传递文件目录更安全),然后直接用对应的file descriptor建立起本地磁盘输入流,所以目前的short circuit read也是一种zero-copy read。

  增加了Centralized cache的HDFS的读接口并没有改变。DFSClient通过RPC获取LocatedBlock时里面多了个成员表示哪个DataNode把这个 block cache到内存里面了。如果DFSClient和该block被cache的DataNode在一起,就可以通过zero-copy read大大提升读效率。而且即使在读取的过程中该block被uncache了,那么这个读就被退化成了本地磁盘读,一样能够获取数据。

  对上层应用的影响

  对于HDFS上的某个目录已经被addDirective缓存起来之后,如果这个目录里新加入了文件,那么新加入的文件也会被自动缓存。这一点对于Hive/Impala式的应用非常有用。

  HBase in-memory table:可以直接把某个HBase表的HFile放到centralized cache中,这会显著提高HBase的读性能,降低读请求延迟。

  和Spark RDD的区别:多个RDD的之间的读写操作可能完全在内存中完成,出错就重算。HDFS centralized cache中被cache的block一定是先写到磁盘上的,然后才能显式被cache到内存。也就是说只能cache读,不能cache写。

  目前的centralized cache不是DFSClient读了谁就会把谁cache,而是需要DFSClient显式指定要cache谁,cache多长时间,淘汰谁。目前也没有类似LRU的置换策略,如果内存不够用的时候需要client显式去淘汰对应的directive到磁盘。

  现在还没有跟YARN整合,需要用户自己调整好留给DataNode用于cache的内存和NodeManager的内存使用。

  参考文献

  http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html

  https://issues.apache.org/jira/browse/HDFS-4949

  作者简介

  梁堰波,北京航空航天大学计算机硕士,美团网资深工程师,曾在法国电信、百度和VMware工作和实习过,这几年一直在折腾Hadoop/HBase/Impala和数据挖掘相关的东西,新浪微博 @DataScientist 。

时间: 2024-10-05 07:46:18

Hadoop 2.3.0解决了哪些问题的相关文章

eclipse/intellij idea 远程调试hadoop 2.6.0

很多hadoop初学者估计都我一样,由于没有足够的机器资源,只能在虚拟机里弄一个linux安装hadoop的伪分布,然后在host机上win7里使用eclipse或Intellj idea来写代码测试,那么问题来了,win7下的eclipse或intellij idea如何远程提交map/reduce任务到远程hadoop,并断点调试? 一.准备工作 1.1 在win7中,找一个目录,解压hadoop-2.6.0,本文中是D:\yangjm\Code\study\hadoop\hadoop-2.

mac OS X Yosemite 上编译hadoop 2.6.0/2.7.0及TEZ 0.5.2/0.7.0 注意事项

1.jdk 1.7问题 hadoop 2.7.0必须要求jdk 1.7.0,而oracle官网已经声明,jdk 1.7 以后不准备再提供更新了,所以趁现在还能下载,赶紧去down一个mac版吧 http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html 应该选择mac ox 64位的版本 http://download.oracle.com/otn-pub/java/jdk/7u79-b1

package-centOS 6.4+hadoop 2.5.0编译出错

问题描述 centOS 6.4+hadoop 2.5.0编译出错 使用命令:mvn package -Pdist,native -DskipTests -Dtar时出现以上错误,已经卡了三天了,请教大神帮忙看下,谢谢谢 解决方案 hadoop auth examples编译失败 参考:http://blog.csdn.net/w13770269691/article/details/16883663/ 解决方案二: 这个完全不一样?可否帮我再看看,可以加我QQ:937038088...如能帮我解

Hadoop 2.4.0新特性介绍

在http://www.aliyun.com/zixun/aggregation/33721.html">2014年4月7日,Apache发布了Hadoop 2.4.0 .相比于hadoop 2.3.0,这个版本有了一定的改进,突出的变化可以总结为下列几点(官方文档说明): 1 支持HDFS访问控制列表(ACL,Access Control Lists) 这个特性解决了在一定情况下,文件权限访问的权限问题.其机制是基于Linux文件访问权限的特征,如果你熟悉Linux的文件访问机制,你就不

发布Apache Hadoop 2.6.0——异构存储,长时间运行的服务与滚动升级支持

发布Apache Hadoop 2.6.0--异构存储,长时间运行的服务与滚动升级支持 我很高兴地宣布,在Apache的Hadoop社区已经发布的Apache Hadoop的2.6.0:http://markmail.org/message/gv75qf3orlimn6kt! 特别是,我们很高兴在此版本中相关的三个主要片:异构存储在HDFS使用SSD和内存层,支持长时间运行在YARN服务和滚动升级,将升级您的集群软件,然后重新启动升级的节点而无需关闭群集或丢失正在进行的工作.YARN作为其架构中

Caused by: org.xml.sax.SAXParseException; systemId: file:/home/hadoop/hive-0.12.0/conf/hive-site.xml; lineNumber: 5; columnNumber: 2; The markup in th

1:Hive安装的过程(Hive启动的时候报的错误),贴一下错误,和为什么错,以及解决方法: 1 [root@master bin]# ./hive 2 17/12/11 23:22:56 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive 3 17/12/11 23:22

win7 msconfig最大内存设置为0解决方法

win7 msconfig最大内存设置为0解决方法 Windows 最大内存 本来想写写前因后果的,想想如果你现在也是这个问题的话,一定没时间看完整个始末. 我就说说我的解决方法吧. 进入 启动修复 的 命令提示符(最好是使用有管理员权限的,不过普通用户我也每试过), 使用 bcdedit 命令来查看. 可以查看到你的启动参数. 确认 truncatememory 是否为 0x10000000(我的是这个). 然后执行下面的命令 bcdedit /deletevalue {default} tr

将Spark部署到Hadoop 2.2.0上

本文介绍的是如何将http://www.aliyun.com/zixun/aggregation/14417.html">Apache Spark部署到Hadoop 2.2.0上,如果你们的Hadoop是其他版本,比如CDH4,可直接参考官方说明操作. 需要注意两点:(1)使用的Hadoop必须是2.0系列,比如0.23.x,2.0.x,2.x.x或CDH4.CDH5等,将Spark运行在 Hadoop上,本质上是将Spark运行在Hadoop YARN上,因为Spark自身只提供了作业管

Unsupported major.minor version 51.0解决办法

Unsupported major.minor version 51.0解决办法 我使用的是Eclipse-jee-indigo + JDK 1.6.23环境,结果使用时出现Unsupported major.minor version 51.0错误提示,下面我来介绍Unsupported major.minor version 51.0错误的解决办法   今天偶然间同事遇到一个问题,也加深了自己对eclipse中build path和java compiler compliance level