Flume数据传输事务分析

Flume数据传输事务分析

本文基于ThriftSource,MemoryChannel,HdfsSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。一般情况下,用MemoryChannel就好了,我们公司用的就是这个,FileChannel速度慢,虽然提供日志级别的数据恢复,但是一般情况下,不断电MemoryChannel是不会丢数据的。

Flume提供事物操作,保证用户的数据的可靠性,主要体现在:

  • 数据在传输到下个节点时(通常是批量数据),如果接收节点出现异常,比如网络异常,则回滚这一批数据。因此有可能导致数据重发
  • 同个节点内,Source写入数据到Channel,数据在一个批次内的数据出现异常,则不写入到Channel。已接收到的部分数据直接抛弃,靠上一个节点重发数据。

编程模型

Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
//事物开始
txn.begin();
try {

  Event eventToStage = EventBuilder.withBody("Hello Flume!",
                       Charset.forName("UTF-8"));
  //往临时缓冲区Put数据
  ch.put(eventToStage);
  //或者ch.take()

  //将这些数据提交到channel中
  txn.commit();
} catch (Throwable t) {
  txn.rollback();

  if (t instanceof Error) {
    throw (Error)t;
  }
} finally {
  txn.close();
}

Put事务流程

Put事务可以分为以下阶段:

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,抛弃数据

我们从Source数据接收到写入Channel这个过程对Put事物进行分析。

ThriftSource会spawn多个Worker线程(ThriftSourceHandler)去处理数据,Worker处理数据的接口,我们只看batch批量处理这个接口:

    @Override
    public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {

      List<Event> flumeEvents = Lists.newArrayList();
      for(ThriftFlumeEvent event : events) {
        flumeEvents.add(EventBuilder.withBody(event.getBody(),
          event.getHeaders()));
      }

        //ChannelProcessor,在Source初始化的时候传进来.将数据写入对应的Channel
        getChannelProcessor().processEventBatch(flumeEvents);
        ...

      return Status.OK;
    }

事务逻辑都在processEventBatch这个方法里:

public void processEventBatch(List<Event> events) {
    ...
    //预处理每行数据,有人用来做ETL嘛
    events = interceptorChain.intercept(events);
    ...
    //分类数据,划分不同的channel集合对应的数据

    // Process required channels
    Transaction tx = reqChannel.getTransaction();
    ...
        //事务开始,tx即MemoryTransaction类实例
        tx.begin();
        List<Event> batch = reqChannelQueue.get(reqChannel);
        for (Event event : batch) {
          // 这个put操作实际调用的是transaction.doPut
          reqChannel.put(event);
        }
        //提交,将数据写入Channel的队列中
        tx.commit();
      } catch (Throwable t) {
        //回滚
        tx.rollback();
        ...
      }
    }
    ...
  }

每个Worker线程都拥有一个Transaction实例,保存在Channel(BasicChannelSemantics)里的ThreadLocal变量currentTransaction.

那么,事务到底做了什么?

实际上,Transaction实例包含两个双向阻塞队列LinkedBlockingDeque(感觉没必要用双向队列,每个线程写自己的putList,又不是多个线程?),分别为:

  • putList
  • takeList

对于Put事物操作,当然是只用到putList了。putList就是一个临时的缓冲区,数据会先put到putList,最后由commit方法会检查channel是否有足够的缓冲区,有则合并到channel的队列。

channel.put -> transaction.doPut:

    protected void doPut(Event event) throws InterruptedException {
      //计算数据字节大小
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
      //写入临时缓冲区putList
      if (!putList.offer(event)) {
        throw new ChannelException(
          "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }
      putByteCounter += eventByteSize;
    }

transaction.commit:

@Override
    protected void doCommit() throws InterruptedException {
      //检查channel的队列剩余大小是否足够
      ...

      int puts = putList.size();
      ...
      synchronized(queueLock) {
        if(puts > 0 ) {
          while(!putList.isEmpty()) {
            //写入到channel的队列
            if(!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }
          }
        }
        //清除临时队列
        putList.clear();
        ...
      }
      ...
    }

如果在事务期间出现异常,比如channel剩余空间不足,则rollback:

@Override
    protected void doRollback() {
    ...
        //抛弃数据,没合并到channel的内存队列
        putList.clear();
      ...
    }

Take事务

Take事务分为以下阶段:

  • doTake:先将数据取到临时缓冲区takeList
  • 将数据发送到下一个节点
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。

Sink其实是由SinkRunner线程调用Sink.process方法来了处理数据的。我们从HdfsEventSink的process方法说起,Sink类都有个process方法,用来处理传输数据的逻辑。:

public Status process() throws EventDeliveryException {
    ...
    Transaction transaction = channel.getTransaction();
    ...
    //事务开始
    transaction.begin();
    ...
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
        //take数据到临时缓冲区,实际调用的是transaction.doTake
        Event event = channel.take();
        if (event == null) {
          break;
        }
        ...
      //写数据到HDFS
      bucketWriter.append(event);
      ...
      // flush all pending buckets before committing the transaction
      for (BucketWriter bucketWriter : writers) {
        bucketWriter.flush();
      }
      //commit
      transaction.commit();
      ...
    } catch (IOException eIO) {
      transaction.rollback();
      ...
    } finally {
      transaction.close();
    }
  }

大致流程图:

接着看看channel.take,作用是将数据放到临时缓冲区,实际调用的是transaction.doTake:

protected Event doTake() throws InterruptedException {
      ...
      //从channel内存队列取数据
      synchronized(queueLock) {
        event = queue.poll();
      }
      ...
      //将数据放到临时缓冲区
      takeList.put(event);
      ...
      return event;
    }

接着,HDFS写线程bucketWriter将take到的数据写到HDFS,如果批数据都写完了,则要commit了:

protected void doCommit() throws InterruptedException {
    ...
    takeList.clear();
    ...
}

很简单,其实就是清空takeList而已。如果bucketWriter在写数据到HDFS的时候出现异常,则要rollback:

protected void doRollback() {
      int takes = takeList.size();
      //检查内存队列空间大小,是否足够takeList写回去
      synchronized(queueLock) {
        Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
            "queue to rollback takes. This should never happen, please report");
        while(!takeList.isEmpty()) {
          queue.addFirst(takeList.removeLast());
        }
        ...
      }
      ...
    }
时间: 2024-08-01 12:07:51

Flume数据传输事务分析的相关文章

IBM BigInsights Flume 轻松部署可扩展的实时日志收集系统

IBM BigInsights Flume 简介 Flume 是开源的海量日志收集系统,支持对日志的实时性收集.初始的 flume 版本是 flume OG(Flume original generation) 由 Cloudera 公司开发,叫做 Cloudera Flume:后来,cloudera 把 flume 贡献给 Apache,版本改为 FLUME NG(Flume next generation)现在称为 Apache Flume.最初始的 BigInsights 使用 flume

使用Apache Flume抓取数据(1)

使用Apache Flume抓取数据,怎么来抓取呢?不过,在了解这个问题之前,我们必须明确ApacheFlume是什么? 一.什么是Apache Flume Apache Flume是用于数据采集的高性能系统 ,名字来源于原始的近乎实时的日志数据采集工具,现在广泛用于任何流事件数据的采集,支持从很多数据源聚合数据到HDFS. 最初由Cloudera开发 ,在2011年贡献给了Apache基金会 ,在2012年变成了Apache的顶级项目,Flume OG升级换代成了Flume NG. Flume

Flume开源的海量日志收集系统使用指南

BigInsights 将实时日志收集体统 Flume 整合为产品的一部分,支持对 flume 极其相关组件 hadoop.zookeeper 的组合安装,用可视化界面为用户部署实时日志收集系统:另外 BigInsights flume 通过 flume runtime toolkit 支持快速的添加日志收集节点,无需配置,轻松实现日志收集系统的可扩展性. Flume 是开源的海量日志收集系统,支持对日志的实时性收集.初始的 flume 版本是 flume OG(Flume original g

《Flume日志收集与MapReduce模式》一第3章 通  道

第3章 通 道在Flume中,通道指的是位于源与接收器之间的构件.它为流动的事件提供了一个中间区域,从源中读取并且被写到数据处理管道中的接收器的事件处于这个区域中.本章将要介绍的两类通道分别是内存/非持久化通道与本地文件系统/持久化通道.持久化文件通道会在发送者接收到事件前将所有变化写到磁盘上.它要比非持久化的内存通道慢一些,不过可以在出现系统事件或是Flume代理重启时进行恢复.与之相反,内存通道要更快一些,不过在出现失败时会导致数据丢失,并且与拥有大量磁盘空间的文件通道相比,它的存储能力要低

阐述Flume OG到 Flume NG发生的革命性变化

但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,这点可以在 BigInsights http://www.aliyun.com/zixun/aggregation/11790.html">产品文档的 troubleshooting 板块发现.为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-

《Flume日志收集与MapReduce模式》一3.3 小结

3.3 小结 本章介绍了在数据处理管道中常用的两类通道.内存通道提供了更快的速度,这是以故障事件出现时数据丢失为代价的.此外,文件通道提供了更可靠的传输,因为它能容忍代理故障与重启,这是以牺牲性能为代价的.你需要确定哪种通道更适合于你的使用场景.在确定内存通道是否适合时,请问问自己丢失一些数据的经济上的代价如何.在考虑是否使用持久化通道时请衡量它与添加更多的硬件以弥补性能上的差异时的代价相比如何.另一个考虑就是数据问题了.写入到Hadoop中的数据不一定都来自于流式应用日志.如果接收的是每天的数

python网络编程之数据传输UDP实例分析

  本文实例讲述了python网络编程之数据传输UDP实现方法.分享给大家供大家参考.具体分析如下: 一.问题: 你觉得网络上像msn,qq之类的工具在多台机器之间互相传输数据神秘吗?你也想玩一下在两台机器之间传数据吗?今天让python告诉我们基本原理吧,当然只是做简单的了解,实际情况复杂的多. 我们今天用python实现一个简单的udp程序. 二.程序实现: 1) 使用模块 (socket)套接字模块: 套接字模块是一个非常简单的基于对象的接口,它提供对低层BSD套接字样式网络的访问 .使用

最小化数据传输——在客户端存储数据

客户端|数据 将程序输出为其他的语言是程序员喜爱的事情之一,在WEB上我们有 两个不同编程环境:客户端(浏览器)和服务器端,根据HTTP协议的定义, 我们可以在编写在客户端输出其他语言的服务端程序,我们选择了作为服 务端语言.javascript作为客户端输出.在本问中我们将向您演示这样用 该方案把数据存储在客户端,并且在诸如:聊天室.新闻系统或其他您想 实现的应用上达到服务端和客户端(浏览器)的最小的数据传输. 要求以下支持:     PHP4     JavaScript     Frame

fpga-两个FPGA芯片数据传输

问题描述 两个FPGA芯片数据传输 40C 求代码-加q. 1718686998用VHDL设计--------波特率是19200.....其他没有什么要求 解决方案 http://wenku.baidu.com/link?url=sEji1bEz4-dlWzFNL6GXgCDFX7muGgNKeiesv1LNqdbSSDhVCcHvRJZOoO3pXS2arTelYpK66X-pBK3IjpUE4DwfUwIYpUIJ_irDiIPZXkm