Flume-ng ThriftSource原理分析

Thrift IDL

Flume Thrift IDL在client包里面,定义如下:

namespace java org.apache.flume.thrift

struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}

enum Status {
  OK,
  FAILED,
  ERROR,
  UNKNOWN
}

service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list<ThriftFlumeEvent> events),
}

注意:event在C#里面是关键字,所以利用Thrift编译器生成客户端的接口时,要把所有event关键字改成其他的、比如events.

Thrift Service

Flume的Source分两种:

  • 实现PollableSource接口
    通过SinkRunner管理Source
  • 实现EventDrivenSource接口
    可以自己接受数据、发送到channel。比如ThriftSource

Flume Thrift Service的实现类在core包

public class ThriftSource extends AbstractSource implements Configurable,
  EventDrivenSource {
  public static final String CONFIG_THREADS = "threads";
  public static final String CONFIG_BIND = "bind";
  public static final String CONFIG_PORT = "port";
  private Integer port;
  private String bindAddress;
  private int maxThreads = 0;
  private SourceCounter sourceCounter;
  private TServer server;
  private TServerTransport serverTransport;
  private ExecutorService servingExecutor;
  public void start() {
      //创建工作者线程池
      ...
      args.protocolFactory(new TCompactProtocol.Factory());
      args.inputTransportFactory(new TFastFramedTransport.Factory());
      args.outputTransportFactory(new TFastFramedTransport.Factory());

      //ThriftSourceProtocol是Flume Thrift Service的真正实现
      args.processor(new ThriftSourceProtocol
        .Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
     /**
     * Start serving.
     */
    servingExecutor.submit(new Runnable() {
      @Override
      public void run() {
        server.serve();
      }
    });
    ...
  }

Flume Thrift Service真正的实现类是内部类ThriftSourceHandler

  private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {

    @Override
    public Status append(ThriftFlumeEvent event) throws TException {
      Event flumeEvent = EventBuilder.withBody(event.getBody(),
        event.getHeaders());

      sourceCounter.incrementAppendReceivedCount();
      sourceCounter.incrementEventReceivedCount();

      try {
        //传给channel
        getChannelProcessor().processEvent(flumeEvent);
      } catch (ChannelException ex) {
        logger.warn("Thrift source " + getName() + " could not append events " +
          "to the channel.", ex);
        return Status.FAILED;
      }
      sourceCounter.incrementAppendAcceptedCount();
      sourceCounter.incrementEventAcceptedCount();
      return Status.OK;
    }

    @Override
    public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
      sourceCounter.incrementAppendBatchReceivedCount();
      sourceCounter.addToEventReceivedCount(events.size());

      List<Event> flumeEvents = Lists.newArrayList();
      for(ThriftFlumeEvent event : events) {
        flumeEvents.add(EventBuilder.withBody(event.getBody(),
          event.getHeaders()));
      }

      try {
        getChannelProcessor().processEventBatch(flumeEvents);
      } catch (ChannelException ex) {
        logger.warn("Thrift source %s could not append events to the " +
          "channel.", getName());
        return Status.FAILED;
      }

      sourceCounter.incrementAppendBatchAcceptedCount();
      sourceCounter.addToEventAcceptedCount(events.size());
      return Status.OK;
    }
  }
时间: 2024-08-02 08:01:00

Flume-ng ThriftSource原理分析的相关文章

Flume(NG)架构设计要点及配置实践

Flume NG是一个分布式.可靠.可用的系统,它能够将不同数据源的海量日志数据进行高效收集.聚合.移动,最后存储到一个中心化数据存储系统中.由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本.经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡. 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元,带有一个可选的消息头 Flow:Even

ASP组件上传的三种机制和实现原理分析

上传 ASP 组件 FILE对象 当前,基于浏览器/服务器模式的应用比较流行.当用户需要将文件传输到服务器上时,常用方法之一是运行FTP服务器并将每个用户的FTP默认目录设为用户的Web主目录,这样用户就能运行FTP客户程序并上传文件到指定的 Web目录.这就要求用户必须懂得如何使用FTP客户程序.因此,这种解决方案仅对熟悉FTP且富有经验的用户来说是可行的. 如果我们能把文件上传功能与Web集成,使用户仅用Web浏览器就能完成上传任务,这对于他们来说将是非常方便的.但是,一直以来,由于File

搜索引擎判断网站是否作弊的原理分析(三)

广州SEO陈永继续为大家讲解搜索引擎判断网站如何判断网站是否作弊的原理,上节讲解完TrustRank算法,这一节将详细讲解BadRank算法. BadRank据传是Google采用的反链接作弊算法.它是一种典型的不信任传播模型,即首先构建作弊网页集合,之后利用链接关系来讲这种不信任分值传递到其他网页. BadRank包含的基本假设是:如果一个网页将其链接指向作弊页面,则这个网页也很可能是作弊网页:而如果一个网页被作弊网页指向,则不能说明这个网页是有问题的,因为作弊网页也经常将其链接指向一些知名网

搜索引擎判断网站是否作弊的原理分析(二)

承接搜索引擎判断网站是否作弊的原理分析(一) 广州SEO陈永继续为大家分析信任传播模型.不信任传播模型及异常发现模型3个代表算法,它们分别是TrustRank算法.BadRank算法和SpamRank算法. 我们先详细介绍TrustRank算法 TrustRank算法属于信任传播模型,基本遵循信任传播模型的流程,即算法流程如下两个步骤组成. 步骤一:确定值得信任的网页集合 TrustRank算法需要靠人工审核来判断某个网页应该被放入网页集合,考虑到人工审核工作量大,所以提出了两种初选信任网页集合

IOS开发:Cocos2d触摸分发原理分析

  触摸是iOS程序的精髓所在,良好的触摸体验能让iOS程序得到非常好的效果,例如Clear.鉴于同学们只会用cocos2d的 CCTouchDispatcher 的 api 但并不知道工作原理,但了解触摸分发的过程是极为重要的.毕竟涉及到权限.两套协议等的各种分发. 本文以cocos2d-iphone源代码为讲解.cocos2d-x 于此类似,就不过多赘述了. 零.cocoaTouch的触摸 在讲解cocos2d触摸协议之前,我觉得我有必要提一下CocoaTouch那四个方法.毕竟cocos2

Photoshop图层与色彩原理分析

  PS入门教程 Photoshop图层与色彩原理分析   教程结束,以上就是Photoshop图层与色彩原理分析,希望大家看完这篇教程之后能有一定的帮助! 分类: PS入门教程

web上存漏洞及原理分析、防范方法(文件名检测漏洞)

我们通过前篇:<web上存漏洞及原理分析.防范方法(安全文件上存方法) >,已经知道后端获取服务器变量,很多来自客户端传入的.跟普通的get,post没有什么不同.下面我们看看,常见出现漏洞代码.1.检测文件类型,并且用用户上存文件名保存 复制代码 代码如下: if(isset($_FILES['img'])) { $file = save_file($_FILES['img']); if($file===false) exit('上存失败!'); echo "上存成功!"

色彩搭配原理分析及技巧

色彩搭配原理分析及技巧: 研究色彩是为了使用色彩,也就是说最大限度地发挥色彩的作用.色彩的意义与内容在艺术创造和表现方面是复杂多变的,但在欣赏和解释方面又有共通的国际特性,可见它在人们心目中不但是活的,也是一种很美的大众语言.所以,通过对色彩的各种心理分析,找出它们的各种特性,可以做到合理而有效地使用色彩. 一.红色系 红色系是从色相环上的红紫色到朱红色之间的色彩.它的刺激作用很大,具有很高的注目性和视认性.大红色是暖色系里温度最高的色彩,红色系原色彩对人的心理能产生很大的鼓舞作用. 1.纯色使

ToyBricks简介以及原理分析

ToyBricks背景 我始终认为,在高内聚,低耦合的原则下,进行组件化,模块化,插件化都是移动应用开发的趋势. 为什么这么说呢?下面我们举个栗子:大家都知道,以前Android应用开发中,可以使用HttpClient或者HttpUrlConnection来进行http访问.这里假设有一个耦合严重,但代码量巨大的项目,使用了基于HttpClient封装的loopj/android-async-http来进行http访问.但是,后来,Google明确支持使用HttpUrlConnection.此时