在AbstractConfigurationProvider类中loadSources方法会将所有的source进行封装成SourceRunner放到了Map<String, SourceRunner> sourceRunnerMap之中。相关代码如下:
Map<String, String> selectorConfig = context.getSubProperties( BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX); ChannelSelector selector = ChannelSelectorFactory.create( sourceChannels, selectorConfig); ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, context); source.setChannelProcessor(channelProcessor); sourceRunnerMap.put(sourceName, SourceRunner.forSource(source));
每个source都有selector。上述代码会获取配置文件中关于source的selector配置信息;然后构造ChannelSelector对象selector;并封装selector对象成ChannelProcessor对象channelProcessor;执行channelProcessor.configure方法进行配置;设置soure的channelprocessor,最后封装为sourceRunner和source名称一起放入sourceRunnerMap中。
一、ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig)会根据配置文件中指定的类型实例化一个ChannelSelector(共两种ReplicatingChannelSelector复制和MultiplexingChannelSelector复用)如果没有指定类型默认是ReplicatingChannelSelector,也就是配置文件中不用配置selector会将每个event复制发送到多个channel;selector.setChannels(channels);对此slector进行配置configure(context)。这两中selector都实现了三个方法getRequiredChannels(Event event)、getOptionalChannels(Event event) 以及configure(Context context)。其实Event要发送到的channel有两种组成:RequiredChannels和OptionalChannels,对应两个方法。
(1)ReplicatingChannelSelector的configure(context)方法会获得通过"optional"在配置文件中指定的可选发送的channels(可以多个,通过空格分割);获取requiredChannels是此source对应的channel中可以活动的channel列表;然后获取所有channel的名字及其与channel的映射channelNameMap;然后将可选的channel加入optionalChannels并从requiredChannels去掉有对应的channel,在这里并没有检查可选channel的合法性以及可以配置此source指定的channel之外的channel,requiredChannels和optionalChannels不能有交集。getOptionalChannels方法就是直接返回optionalChannels列表。getRequiredChannels方法返回requiredChannels列表,如果requiredChannels为null,则返回全部的可以活动的channel列表。
(2)MultiplexingChannelSelector的configure(context)先获取要匹配的event的header,headerName;获得默认发送到的channel列表defaultChannels;获得mapping的各个子值,及对应的channel名称mapConfig;用来存储header不同的值及其对应的要发送到的channel列表,可以发送到多个channel的channelMapping;optionalChannels是配置的可选的发送channel,channelMapping中已经出现的channel不允许再次在optionalChannels出现,optionalChannels存储的是对应header的各个值及其等于该值的event要发送到的可选择的channel列表。getOptionalChannels(Event event)方法返回的是optionalChannels中该event的指定header对应的可选择的channel列表。getRequiredChannels(Event event)方法返回的是channelMapping中该event的指定header对应的channel列表,如果为null(表示由于该event的headers没有匹配的channel就发送到默认的channel中)就返回默认发送列表defaultChannels。
二、 ChannelProcessor channelProcessor = new ChannelProcessor(selector)这个是封装选择器构造channelprocessor。其构造方法会赋值selector并构造一个InterceptorChain对象interceptorChain。ChannelProcessor类负责管理选择器selector和拦截器interceptor。
三、执行channelProcessor.configure(Context)进行必要的配置,该方法会调用channelProcessor.configureInterceptors(context)对拦截器们进行获取和配置,configureInterceptors方法会先从配置文件中获取interceptor的组件名字interceptorNames[](可以多个),然后获取所有的“interceptors.”的配置信息interceptorContexts,然后遍历所有interceptorNames从配置文件中获取属于这个interceptor的配置信息及类型(type),根据类型构建相应的interceptor并进行配置configure,加入interceptors列表(用来存放实例化的interceptor);最后将列表传递给interceptorChain。关于更多interceptor的信息可以看这篇Flume-NG源码阅读之Interceptor(原创) 。
四、source.setChannelProcessor(channelProcessor)赋值。各个source通过getChannelProcessor()方法获取processor调用其processEventBatch(events)或者processEvent(event)来将event送到channel中。
五、sourceRunnerMap.put(sourceName,SourceRunner.forSource(source))将source封装成SourceRunner放入sourceRunnerMap。SourceRunner.forSource会根据这个source所实现的接口封装成不同的Runner,有两种接口PollableSource和EventDrivenSource,前者是有自己线程来驱动的需要实现process方法,后者是没有单独的线程来驱动的没有process方法。
public static SourceRunner forSource(Source source) { SourceRunner runner = null; if (source instanceof PollableSource) { runner = new PollableSourceRunner(); ((PollableSourceRunner) runner).setSource((PollableSource) source); } else if (source instanceof EventDrivenSource) { runner = new EventDrivenSourceRunner(); ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source); } else { throw new IllegalArgumentException("No known runner type for source " + source); } return runner; }
(1)PollableSourceRunner的start()方法会获取source的ChannelProcessor,然后执行其initialize()方法,该方法会调用interceptorChain.initialize()方法对拦截器们进行初始化(遍历所有拦截器然后执行拦截器的initialize()方法);然后执行source.start()启动source;再启动一个线程PollingRunner,它的run方法会始终执行source.process()并根据返回的状态值做一些统计工作。
以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索channel interleaving
, channel
, interceptor
, 方法
, 拦截器
, sources
, 配置
, selector
source
,以便于您获取更多的相关知识。