最近分布式系统开发小结: Slave模块Executors设计

更新一段我在linkedin上对这个项目的描述,目前项目已经开发完在使用了。本文并不是最新的设计。

背景
解决HDFS/Hive/RDBMS/FTP/MongoDB等数据源之间的批量数据同步问题

特性
跨机房场景下的链路优化;多路输入和输出的任务模型;数据容错和可持久化;任务失败恢复

任务调度
把任务配置解析为物理执行计划,Master控制任务的调度和失败恢复,基于Mesos完成资源分配和任务调度。Slave分布在各个数据中心,具体传输任务的调起做到链路优化选择。高并发场景下,增加Mesos Slave节点来保证可扩展性(CPU和MEM资源),Master将元数据记录在ZK上,并通过争抢ZK锁实现互备。

数据传输
传输组件分为Input、Cache和Output三种Executor,各自进程内通过双队列优化传输速度。数据以bundle为单位传输,通常上百行为一个bundle且可压缩,Netty作网络通信。Input端异步备份一份数据在BookKeeper,Cache使用Beanstalkd做消息队列,Output端处理Bundle成功或失败,会有守护线程异步删除或更新beanstalkd内的Message(类似Storm Topology里的Ack),Executors会把bundle传输状态更新在ZookKeeper上,某一Executor挂掉都可以在一台slave上重新调起并恢复任务继续进行。Input和Output端的Reader和Writer是插件化的。

====================================  我是更新线 ====================================

之前在最近分布式系统开发小结里,提到了一个在开发中的系统的大致设计,本文是我负责部分的一个详细设计。在阅读本文前可以先浏览下之前那篇文章,对于系统的功能和概况有个基本了解。

1. Slave总体设计

Slave模块主要需要实现不同的Mesos Executors,包括Input, MemoryStorage和Output三种Executor。每个Dpump任务会由Scheduler Manager经过逻辑执行计划和物理执行计划的拆分,从Knowledge Center获取知识,最终将切分后的Task分配给相应的Slaves执行,并通过Mesos Master,分配资源并调起Slave上的各自的Executor。三种Executors的执行逻辑图如下。

数据通过Bundle形式在三种Executor之间的流通,每个Bundle有唯一ID、一个String[]、以及一个Index。Index用于标记每个Bundle最后数据输出的最新成功行,即我们容错粒度控制在行级别。对Input、Cache、Output作一个简单介绍:

  •  Input,也叫Reader。每个Task内只有一个Input Executor,负责从数据源(HDFS、FTP、MySQL、MongoDB等)读出数据,将数据经过切分、处理、压缩后通过Netty流式传输给MemoryStorage。
  • Cache,也叫MemoryStorage。每个Task内只有一个Cache Executor,负责从Input端接收Bundle,将Bundle存取往一个队列内,当有Output连接的时候,将Bundle取出输送给Output
  • Output,也叫Writer。每个Task可能有多个Output Executor,负责将数据最终输出到数据目的源。Output从Cache端得到Bundle的过程也是流式的。

整个Task的流通都是流式的,且Slave之间的网络通信使用的是Netty这个NIO框架,传输过程中还涉及到Bundle高效的正反序列化和压缩、解压缩。最重要的一点是Input、Cache、Output三个部分各自都有容错设计,其中Input和Output通过向Zookeeper记录和获取Bundle状态保证处理Bundle的不重不漏,而Cache通过对队列内消息内容的钝化,保证自身已保存的Bundle不丢失,并能在新的Cache Executor起来后,可以继续为Output提供Bundle输出。

2. Slave 详细设计

下面详细介绍三种Executor的设计,阅读过程中请参考这张Task进程图。

2.1 Input设计

2.1.1 数据流通

每个Input负责一次Job(每个Job对应多个Tasks)内最小粒度的文件块读取,比如可能是一个HDFS Block,一张Hive表的一个分区甚至是一张MySQL表。

Input内还分有Writer、Buffer(双队列)和Reader。Writer是一个单线程,从数据源获取数据并切分好Bundle,每个Bundle有唯一ID和定长的字符串数组,然后将Bundle存入双队列的输入头,在双队列的读出头有若干个Reader线程抢占Bundle,每个Reader获取到Bundle后释放锁并做二次处理、压缩,最终Reader通过Netty Client将Bundle包装成一个传输格式,以二进制流的方式通过Channel流向Cache。

2.1.2 容错

Writer端切分Bundle保证了从同一个数据源的同份文件块读取数据生成Bundle是有序的,每次Netty往Channel里写入一份Bundle的时候,会通过Companion线程异步更新此Task下znode内的Bitmap,该Bitmap标记每个Bundle在Input端是否被传输。每次Input启动的时候,Netty会读取znode上的Bitmap缓存在内存里,发送Bundle前根据id作一次校对。所以当Input挂掉或重启时,可以保证发送给Cache的Bundle不重不漏。

2.2 Cache设计

2.2.1 消息队列

Cache本身是一个Netty Server,接收Input和Output多个Netty Client的连接,并对不同的Channel做不同Event处理。Cache Executor需要一个多状态的消息队列,这里采用的是Beanstalkd队列,下图为该Beanstalkd内消息(job)的状态变化图。

 每次Cache将新的Bundle put进Beanstalkd的时候需要选择一个tube(管道),Beanstalkd可以开启多个独立的tube,tube内存放jobs,每个job有自己唯一的job id,而job消息体就是我们的bundleBytes(Bundle存入Cache直接存的就是序列化后的byte[])。

每个job存入queue后是ready状态,被reserve之后,就不能被客户端再次获取到,即Cache每次会从每个tube里按顺序reserve一个job,并发送消息体给Output(一个output对应一个tube),这个过程保证每个job被消费一次,且只能被一个Output消费。如果Output端消费成功,则该job会被delete掉;如果该job消费失败,则会被重新置为ready,重新置为ready可能是因为超时(每个job被reserve的时候都有一个Time-To-Run时间设置)了,也可以是客户端release掉该job。

2.2.2 Acker设计

这里,对于tube内job的后续处理交给Acker这个线程来做。Acker的设计灵感来源于Storm。Storm Topology内每个bolt对tuple的执行和处理最终都会给Spout一个ack响应,而拓扑过程中整棵Tuple树的成功/失败执行状态会由Acker守护进程进行跟踪,以此来保证每个tuple被完全处理,而acker对tuple的跟踪算法是Storm的主要突破之一。

Cache端的Acker线程会监听zookeeper上znode树上各个节点的事件变化,从而掌握被Output消费的所有Bundle的最后状态,对应地删除、释放,或者更新Queue里的job。需要注意的是这里还涉及到一个更新job的过程。前面提到Bundle内维护了一个index,而Output消费bundle的时候,如果是数据行写了一半出现了异常或者挂掉了,我们需要记录bundle内数据行的最新index并将此信息也记录在znode上。对于这种最坏情况,Acker负责将该fail的job从queue里delete掉,并更改job内bundle bytes内容,重置新的index,再把新的job put进queue里。这是我们最不希望看到的情况,同时也是我们对Bundle能做的最细粒度的容错设计。

2.2.3 容错

Beanstalkd启动之后可以打开binlog开关,binlog是Beanstalkd容错恢复的机制,将内存里的消息队列结构映射到硬盘上。对于Cache的容错设计,直观的办法在于将这份binlog存在NFS或HDFS上,来保证Cache挂掉重启后,能获取到之前保存的Bundle数据,继续提供服务。

2.3 Output设计

Output在最终的Bundle消费阶段,会把数据导向新的数据源。每个Output获取的Bundle来自于Cache里的一个tube,而每个Bundle的执行情况也会由Companion线程异步更新到Zookeeper上。

 对于Output来说,它只需要关心从Cache端获取的每个Bundle都照常处理就可以了,不需要关心这个Bundle之前是否被消费过,被消费到哪里。原因在于,Cache端的job状态的变更和job的更新可以由Acker保障,而Acker是从zk上得到这些job的状态并对Queue异步更新。如果Acker挂了,只要重新起一个线程获取znode上最新的状态就可以了。对于Output来说,能传过来的Bundle,对应到queue里就是ready状态的job,这个job可能被消费过了,但是他的index也因此得到了更新,Output端对于所有Bundle的处理是一致的,唯一需要关心的是Output需要把Bundle的信息异步更新给zk,如果Output挂了,重新起一个Output接着从Cache读Bundle就可以了。

3. Slave模块总结

Slave模块三种Executor的设计,主要考虑的是各个Executor挂掉之后,怎样保证数据处理的不重复和不遗漏。我们依赖Zookeeper的可靠性,记录、更新、判断Bundle的状态,做到Input、Cache、Output各司其职,最到最小粒度的容错。Executor本身的失败和重启则由Mesos保障,Mesos作为资源管理系统,由Master监控Slave上各个Executor的执行状况,通过回调,可以在合适的Slave上再次启动挂掉的Executor进程,保证业务Task的顺利进行。

(全文完)

时间: 2024-11-02 02:54:12

最近分布式系统开发小结: Slave模块Executors设计的相关文章

最近分布式系统开发小结

用最简单的语言梳理一下最近十天做的分布式系统模块的开发.这是一个还在开发中的项目,配图也是设计原图.希望能更多地从开源项目里汲取营养,一边实战,一边积累. 系统概述 最近在设计和开发一个分布式系统的流式处理模块,整个系统用于跨集群.跨机房搬运不同数据源内的数据到另一份或多份数据源上,包括HDFS.MySQl.MongoDB.FTP等.功能比较像Hadoop的Sqoop,但是能扩展支持更多的数据源,且本身是个集群部署,不像Sqoop需要依赖Hadoop的MR. 我们整个cluster的资源管理借助

使用Websharp Service Locator简化分布式系统开发

web|分布式 使用Websharp Service Locator 简化分布式系统开发 什么是Websharp Service Locator对于多层的应用系统来说,我们通常把它们划分成客户端.应用服务层和数据库.在应用服务层,我们需要考虑至少两个方面的问题: ü 如何实现业务逻辑 ü 如何向客户端提供服务. 我们可能使用多种技术来实现服务的提供:Webservice..Net Remoting.甚至EJB等.如此多的实现技术,带来的很大的灵活性,但同时也带来了问题,其中一个就是,有多少种服务

Android简明开发教程六:用户界面设计

Activity是Android应用用户界面的基本组成部件.但Activity本身并不提供用户界面(User Interface).从程序结构层次上 来说,一个Android应用是类android.app.Application的一个实例, Application中可以包含多个android.app.Activity实例. 每个Activity 带一个Window类,这个类在Android平台上没有提供太多功能,主要可以用来控制标题栏(屏幕顶端).比如设置 UI全屏显示可以使用如下代码: req

用户注册的邮箱激活模块的设计与实现

----------------------------------------------------------------------------------------------[版权申明:本文系作者原创,转载请注明出处] 文章出处:http://blog.csdn.net/sdksdk0/article/details/52144698作者:朱培      ID:sdksdk0      邮箱: zhupei@tianfang1314.cn    ------------------

开发一个地图模块,储存数据用什么数据库?

问题描述 开发一个地图模块,储存数据用什么数据库? 用C++开发游戏,想建一个数据库模块来存地图数据,数据类型基本上是一个坐标 对一个贴图名称 和一个贴图大小的简单数据,但是条目数量可能比较大,需要排序和检索功能.只安装在客户端本地,不需要联网.用什么数据库会比较合适?或者不用数据库的什么方法来储存数据?希望 在满足上诉简单要求的基础上尽可能轻量又高效. 用在安卓.win32.ISO,希望跨平台 解决方案 地图不建议用数据库,应该直接设计自己的数据结构,那样效率高很多. 解决方案二: 跨平台用x

剖析Go编写的Socket服务器模块解耦及基础模块的设计_Golang

Server的解耦-通过Router+Controller实现逻辑分发 在实际的系统项目工程中中,我们在写代码的时候要尽量避免不必要的耦合,否则你以后在更新和维护代码的时候会发现如同深陷泥潭,随便改点东西整个系统都要变动的酸爽会让你深切后悔自己当初为什么非要把东西都写到一块去(我不会说我刚实习的时候就是这么干的...) 所以这一篇主要说说如何设计Sever的内部逻辑,将Server处理Client发送信息的这部分逻辑与Sevrer处理Socket连接的逻辑进行解耦- 这一块的实现灵感主要是在读一

IM系统中聊天记录模块的设计与实现

看到很多开发IM系统的朋友都想实现聊天记录存储和查询这一不可或缺的功能,这里我就把自己前段时间为傲瑞通(OrayTalk)开发聊天记录模块的经验分享出来,供需要的朋友参考下. 一.总体设计 1.存储位置 从一开始我们就打算在服务端和客户端本地同时存储聊天记录,而且,在客户端查看聊天记录时,可以选择是从本地加载.还是从服务器加载.这样做的好处有两个: (1)从本地加载聊天记录速度非常快. (2)当更换了登录的机器,在任何地方任何时刻都可以从服务器加载完整的聊天记录,记录永远不会丢失. 2.存储方案

支持PaaS的CTS测试云平台部分模块的设计与实现

支持PaaS的CTS测试云平台部分模块的设计与实现 南京大学  朱珺辰 本文主要介绍与研究的主要是软件相关的云测试.本文实现的Cloud Test Service (简称CTS)平台云系统正是在这一背景下应运而生.CTS平台云目前通过Eucalyptus私有云框架整合底层的硬件资源,自身统一管理调度Eucalyptus提供的虚拟实例,通过提供了服务接口以及一套开发规范实现平台即服务,并提供了友好易用的交互界面面向用户.测试开发人员根据需求定制CTS平台上的应用云,称为测试服务,并注册到其上,交付

ASP用户登录模块的设计

ASP用户登录模块的设计 用户登录验证脚本,Chkpwd.asp <% '=======用户登录验证脚本======= '如果尚未定义Passed对象,则将其定义为false,表示没有通过验证 If IsEmpty(Session("Passed")) Then Session("Passed")=false End If   'Session("Passed")=False,表示尚未通过验证,则开始读取从表单传来的验证信息 If Sess