Flume-NG中的Channel与Transaction关系

在sink和source中(不管是内置还是自定义的),基本都有如下代码:

...
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Event event = null;
    Status result = Status.READY;
    transaction.begin();
    ...
    event = channel.take();//getChannelProcessor().processEvent(event);,前者用于sink后者用于source
    ...
    transaction.commit();
    transaction.rollback()
    transaction.close();
    ...

那么有些人就要问了?从上述代码中似乎只需要获取channel就可以了,因为获取数据时只需要event = channel.take()或者

getChannelProcessor().processEvent(event)?这样对吗?你可以去掉transaction试试,结果显示是不行的,出错!

  那么为什么呢?这确实有点让人疑惑,但实际上channel.take()操作是transaction.doTake()。也就是实际的put和take等操作都是在transaction中进行的,因此要用channel必须要先创建transcation才可以使用。而channel.getTransaction()方法就是获取(已经创建)或创建(还没有)transcation,BasicChannelSemantics的相对应代码如下:  

@Override
  public Transaction getTransaction() {

    if (!initialized) {
      synchronized (this) {
        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }

    BasicTransactionSemantics transaction = currentTransaction.get();//获取transcation
    if (transaction == null || transaction.getState().equals(//如果transaction不存在或者已关闭就创建
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();//创建
      currentTransaction.set(transaction);//赋值给currentTransaction
    }
    return transaction;
  }

本栏目更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/webkf/tools/

该方法在所有channel的父类BasicChannelSemantics中,然后在具体实现的channel类中需要实现protected abstract BasicTransactionSemantics createTransaction()这个抽象方法来获取相应的transaction对象。BasicChannelSemantics把transaction.take()和transaction.put(event)方法进一步封装成take()和put(event)方法,这俩方法就是暴露在sink或者source中的channel.take()和channel.put(event)方法。

@Override
  public void put(Event event) throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    transaction.put(event);
  }

  @Override
  public Event take() throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    return transaction.take();
  }

作者:cnblogs 玖疯

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索channel interleaving
, channel
, 方法
, event
, transactions
, transaction
, 大数据 flume ng
, transactional
, ''takes
, Transcation
Preconditions
flume ng filechannel、flume transaction、flume channel、flume filechannel、flume kafka channel,以便于您获取更多的相关知识。

时间: 2024-08-04 01:35:19

Flume-NG中的Channel与Transaction关系的相关文章

Flume(NG)架构设计要点及配置实践

Flume NG是一个分布式.可靠.可用的系统,它能够将不同数据源的海量日志数据进行高效收集.聚合.移动,最后存储到一个中心化数据存储系统中.由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本.经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡. 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元,带有一个可选的消息头 Flow:Even

高可用Hadoop平台-Flume NG实战图解篇

1.概述 今天补充一篇关于Flume的博客,前面在讲解高可用的Hadoop平台的时候遗漏了这篇,本篇博客为大家讲述以下内容: Flume NG简述 单点Flume NG搭建.运行 高可用Flume NG搭建 Failover测试 截图预览 下面开始今天的博客介绍. 2.Flume NG简述 Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.轻量,配置简单,适用于各种日志收集,并支持Failover和负载均衡.并且它拥有非常丰富的组件.Fl

Flume NG 简介及配置实战

    Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera

阐述Flume OG到 Flume NG发生的革命性变化

但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,这点可以在 BigInsights http://www.aliyun.com/zixun/aggregation/11790.html">产品文档的 troubleshooting 板块发现.为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-

html-CSS中,line-height与font-size关系

问题描述 CSS中,line-height与font-size关系 如果font-size大于line-height会怎么显示,这幅图该怎么理解 解决方案 font-size大于line-height会溢出容器显示,如果容器增加overflow:hidden就会隐藏溢出的内容 <div style="font-size:20px;line-height:12px">abb</div> <div style="font-size:20px;line

C++UDP组播编程中,组播地址(组播号)与组播地址的关系是什么?和TCP中IP和端口的关系一样吗?

问题描述 C++UDP组播编程中,组播地址(组播号)与组播地址的关系是什么?和TCP中IP和端口的关系一样吗? C++UDP组播编程中,组播地址(组播号)与组播地址的关系是什么?和TCP中IP和端口的关系一样吗?另外,一台计算机上(不经过路由器或交换机)只能有一个组播吗?急求!!! 解决方案 组播地址和多播地支的关系与TCP中IP和端口的关系不太一样. 首先,网卡查看由信道传送过来的帧,确定是否接收该帧,若接收后就将它传往设备驱动程序.通常网卡仅接收那些目的地址为网卡物理地址或广播地址的帧. 使

css-CSS中,line-height与font-size关系

问题描述 CSS中,line-height与font-size关系 html: <span class="span3">line-height</span> CSS: .span3 { font-size: 1.2em; border: 5px solid #ccc; padding: 5px; margin: 5px; } div { line-height: 50px; background-color: #efa; } 显示结果: 问题:为什么div显示的

javascript中基本类型和引用类型的关系是什么?

问题描述 javascript中基本类型和引用类型的关系是什么? 他们是谁包含谁的关系,还是相互独立的关系? 为什么说Number 对象是 Number 原始类型的引用类型? 解决方案 首先不要用面向对象的思考方式去看js 因为js是偏向函数式语言的. 因此所谓的基本类型和引用类型只是为了类比java等语言而来的东东. js中是函数当道,比如这个Number他实际上是一个函数名字,不是类型的概念. 函数和普通的变量都是对象.(具有属性和方法的封装体) js中的"继承"是通过原型链完成的

db2中的用户和schema关系是怎样的

问题描述 db2中的用户和schema关系是怎样的 db2中的用户和schema关系是怎样的 一个用户下面可以有很多schema? 解决方案 数据库中Schema有两种含义,一种是概念上的Schema,指的是一组DDL语句集,该语句集完整地描述了数据库的结构.还有一种是物理上的 Schema,指的是数据库中的一个名字空间,它包含一组表.视图和存储过程等命名对象.简单的说,Schema就是一个(数据库)用户所拥有的数据库的对象. 在一个数据库中可以有多个应用的数据表,这些不同应用的表可以放在不同的