Flume-NG源码阅读:SourceRunner及选择器selector和拦截器interceptor的执行

在AbstractConfigurationProvider类中loadSources方法会将所有的source进行封装成SourceRunner放到了Map<String, SourceRunner> sourceRunnerMap之中。相关代码如下:

Map<String, String> selectorConfig = context.getSubProperties(
              BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);

          ChannelSelector selector = ChannelSelectorFactory.create(
              sourceChannels, selectorConfig);

          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
          Configurables.configure(channelProcessor, context);
          source.setChannelProcessor(channelProcessor);
          sourceRunnerMap.put(sourceName,
              SourceRunner.forSource(source));

每个source都有selector。上述代码会获取配置文件中关于source的selector配置信息;然后构造ChannelSelector对象selector;并封装selector对象成ChannelProcessor对象channelProcessor;执行channelProcessor.configure方法进行配置;设置soure的channelprocessor,最后封装为sourceRunner和source名称一起放入sourceRunnerMap中。 

一、ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig)会根据配置文件中指定的类型实例化一个ChannelSelector(共两种ReplicatingChannelSelector复制和MultiplexingChannelSelector复用)如果没有指定类型默认是ReplicatingChannelSelector,也就是配置文件中不用配置selector会将每个event复制发送到多个channel;selector.setChannels(channels);对此slector进行配置configure(context)。这两中selector都实现了三个方法getRequiredChannels(Event event)、getOptionalChannels(Event event) 以及configure(Context context)。其实Event要发送到的channel有两种组成:RequiredChannels和OptionalChannels,对应两个方法。

(1)ReplicatingChannelSelector的configure(context)方法会获得通过"optional"在配置文件中指定的可选发送的channels(可以多个,通过空格分割);获取requiredChannels是此source对应的channel中可以活动的channel列表;然后获取所有channel的名字及其与channel的映射channelNameMap;然后将可选的channel加入optionalChannels并从requiredChannels去掉有对应的channel,在这里并没有检查可选channel的合法性以及可以配置此source指定的channel之外的channel,requiredChannels和optionalChannels不能有交集。getOptionalChannels方法就是直接返回optionalChannels列表。getRequiredChannels方法返回requiredChannels列表,如果requiredChannels为null,则返回全部的可以活动的channel列表。

(2)MultiplexingChannelSelector的configure(context)先获取要匹配的event的header,headerName;获得默认发送到的channel列表defaultChannels;获得mapping的各个子值,及对应的channel名称mapConfig;用来存储header不同的值及其对应的要发送到的channel列表,可以发送到多个channel的channelMapping;optionalChannels是配置的可选的发送channel,channelMapping中已经出现的channel不允许再次在optionalChannels出现,optionalChannels存储的是对应header的各个值及其等于该值的event要发送到的可选择的channel列表。getOptionalChannels(Event event)方法返回的是optionalChannels中该event的指定header对应的可选择的channel列表。getRequiredChannels(Event event)方法返回的是channelMapping中该event的指定header对应的channel列表,如果为null(表示由于该event的headers没有匹配的channel就发送到默认的channel中)就返回默认发送列表defaultChannels。

二、 ChannelProcessor channelProcessor = new ChannelProcessor(selector)这个是封装选择器构造channelprocessor。其构造方法会赋值selector并构造一个InterceptorChain对象interceptorChain。ChannelProcessor类负责管理选择器selector和拦截器interceptor。

三、执行channelProcessor.configure(Context)进行必要的配置,该方法会调用channelProcessor.configureInterceptors(context)对拦截器们进行获取和配置,configureInterceptors方法会先从配置文件中获取interceptor的组件名字interceptorNames[](可以多个),然后获取所有的“interceptors.”的配置信息interceptorContexts,然后遍历所有interceptorNames从配置文件中获取属于这个interceptor的配置信息及类型(type),根据类型构建相应的interceptor并进行配置configure,加入interceptors列表(用来存放实例化的interceptor);最后将列表传递给interceptorChain。关于更多interceptor的信息可以看这篇Flume-NG源码阅读之Interceptor(原创) 。  

四、source.setChannelProcessor(channelProcessor)赋值。各个source通过getChannelProcessor()方法获取processor调用其processEventBatch(events)或者processEvent(event)来将event送到channel中。

五、sourceRunnerMap.put(sourceName,SourceRunner.forSource(source))将source封装成SourceRunner放入sourceRunnerMap。SourceRunner.forSource会根据这个source所实现的接口封装成不同的Runner,有两种接口PollableSource和EventDrivenSource,前者是有自己线程来驱动的需要实现process方法,后者是没有单独的线程来驱动的没有process方法。

public static SourceRunner forSource(Source source) {
    SourceRunner runner = null;

    if (source instanceof PollableSource) {
      runner = new PollableSourceRunner();
      ((PollableSourceRunner) runner).setSource((PollableSource) source);
    } else if (source instanceof EventDrivenSource) {
      runner = new EventDrivenSourceRunner();
      ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    } else {
      throw new IllegalArgumentException("No known runner type for source "
          + source);
    }

    return runner;
  }

(1)PollableSourceRunner的start()方法会获取source的ChannelProcessor,然后执行其initialize()方法,该方法会调用interceptorChain.initialize()方法对拦截器们进行初始化(遍历所有拦截器然后执行拦截器的initialize()方法);然后执行source.start()启动source;再启动一个线程PollingRunner,它的run方法会始终执行source.process()并根据返回的状态值做一些统计工作。

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索channel interleaving
, channel
, interceptor
, 方法
, 拦截器
, sources
, 配置
, selector
source
,以便于您获取更多的相关知识。

时间: 2024-10-16 03:24:17

Flume-NG源码阅读:SourceRunner及选择器selector和拦截器interceptor的执行的相关文章

【spring源码学习】springMVC之映射,拦截器解析,请求数据注入解析,DispatcherServlet执行过程

[一]springMVC之url和bean映射原理和源码解析 映射基本过程 (1)springMVC配置映射,需要在xml配置文件中配置<mvc:annotation-driven >  </mvc:annotation-driven> (2)配置后,该配置将会交由org.springframework.web.servlet.config.MvcNamespaceHandler处理,该类会转交给org.springframework.web.servlet.config.Anno

SpringMVC源码总结(十一)mvc:interceptors拦截器介绍

本文章针对mvc:interceptors标签进行介绍,它的注册过程以及在访问时的拦截过程.  首先说下接口HandlerInterceptor,它有如下三个方法:  ? 1 2 3 4 5 6 7 8 9 10 boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)         throws Exception;   void postHandle(       

Flume-NG源码阅读:HBaseSink

关于HBase的sink的所有内容均在org.apache.flume.sink.hbase包下. 每个sink包括自己定制的,都extends AbstractSink implements Configurable. 一.首先是configure(Context context)方法.该方法是对HBaseSink的参数初始化.主要包括以下几个: tableName:要写入的HBase数据表名,不能为空: columnFamily:数据表对应的列簇名,这个sink目前只支持一个列簇,不能为空:

Flume-NG源码阅读:AvroSink

org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组成分层结构.它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其他的sink一样都得extends AbstractSink implements Configurable,所以重点也在confgure.start.

淘宝数据库OceanBase SQL编译器部分 源码阅读--Schema模式

淘宝数据库OceanBase SQL编译器部分 源码阅读--Schema模式 什么是Database,什么是Schema,什么是Table,什么是列,什么是行,什么是User?我们可以可以把Database看作是一个大仓库,仓库分了很多很多的房间,Schema就是其中的房间,一个Schema代表一个房间,Table可以看作是每个Schema中的柜子,行和列就是柜子中的格子.User就是房间的主人.简单来说,Schema是包括表,列,索引,视图等数据库对象的集合. OceanBase中的强Sche

淘宝数据库OceanBase SQL编译器部分 源码阅读--生成物理查询计划

SQL编译解析三部曲分为:构建语法树,制定逻辑计划,生成物理执行计划.前两个步骤请参见我的博客<<淘宝数据库OceanBase SQL编译器部分 源码阅读--解析SQL语法树>>和<<淘宝数据库OceanBase SQL编译器部分 源码阅读--生成逻辑计划>>.这篇博客主要研究第三步,生成物理查询计划. 一. 什么是物理查询计划 与之前的阅读方法一致,这篇博客的两个主要问题是what 和how.那么什么是物理查询计划?物理查询计划能够直接执行并返回数据结果数

CI框架源码阅读笔记2 一切的入口 index.php

上一节(CI框架源码阅读笔记1 - 环境准备.基本术语和框架流程)中,我们提到了CI框架的基本流程,这里再次贴出流程图,以备参考: 作为CI框架的入口文件,源码阅读,自然由此开始.在源码阅读的过程中,我们并不会逐行进行解释,而只解释核心的功能和实现. 1. 设置应用程序环境 define("ENVIRONMENT", "development"); 这里的development可以是任何你喜欢的环境名称(比如dev,再如test),相对应的,你要在下面的switch

java源码阅读方法以及经验

问题描述 java源码阅读方法以及经验 如何更好的阅读java源码,更注重阅读哪些包里面的源码,当然连好的阅读源码的工具也说明一下更好了 解决方案 我在这里假设你在问怎么阅读jdk的源码,java源码这个名字有点奇怪. 你可以build 一个fast debug版本,然后使用debugger去调试你的程序,这样对程序是怎么调用的有很直观的视图. 其次,可以看看jdk里面的regression tests,里面有很多例子. 其次,openjdk提供了netbean的jdk project,你可以很

淘宝数据库OceanBase SQL编译器部分 源码阅读--生成逻辑计划

淘宝数据库OceanBase SQL编译器部分 源码阅读--生成逻辑计划 SQL编译解析三部曲分为:构建语法树,生成逻辑计划,指定物理执行计划.第一步骤,在我的上一篇博客淘宝数据库OceanBase SQL编译器部分 源码阅读--解析SQL语法树里做了介绍,这篇博客主要研究第二步,生成逻辑计划. 一. 什么是逻辑计划? 我们已经知道,语法树就是一个树状的结构组织,每个节点代表一种类型的语法含义.如 update student set sex="M" where name ="