FileChannel是flume一个非常重要的channel组件,非常常用。这个channel非常复杂,涉及的文件更多涉及三个包:org.apache.flume.channel.file、org.apache.flume.channel.file.encryption(加密)、org.apache.flume.channel.file.proto共计40个源码文件。
一、configure(Context context)方法:
1、首先获取配置文件中的checkpointDir和dataDirs属性,这是存放检查点和数据的目录,默认使用$user.home/.flume/file-channel/checkpoint和$user.home/.flume/file-channel/data来;checkpointDir是一个目录,而dataDirs可以是多个以“,”分割;且这两个目录最好不要来回修改,因为里面存储着数据呢;
2、获取容量capacity,并做一些检查比如是否<0,是否是动态加载(有无变化?),默认1000000;这指的是checkpoint文件存放event信息的最大容量。
3、keepAlive超时时间,就是如果channel中没有数据最长等待时间,默认3s;
4、transactionCapacity事务的最大容量,默认1000;
5、checkpointInterval检查点写入间隔,默认30000ms;
6、maxFileSize,data文件大小的上限,用户设置的和1623195647 之间较小的那个;
7、最少需要多少空间minimumRequiredSpace,max((用户配置的,500M),1M);
8、useLogReplayV1,默认false;
9、useFastReplay,默认false;
10、encryptionActiveKey,加密密钥别名,默认为null;
11、encryptionContext加密配置信息;
12、encryptionCipherProvider加密密码提供者,缺省值为null;
13、encryptionKeyProviderName,加密密钥提供者,缺省值为null;
14、queueRemaining,队列是否有剩余空间信号量,初始化容量为capacity;
15、设置Log log的检查间隔checkpointInterval和maxFileSize最大文件大小。
16、是否新建一个计数器channelCounter。
二、start()方法。
1、通过Log.Builder()构建了一个builder对象,并设置了相应的参数,然后log = builder.build(),Log的构造方法会对checkpointDir及logDirs尝试获取锁操作,所以如果存在多个file channel则checkpointDir及logDirs最好配置在多个磁盘下或者多个目录下,否则只能一个获得初始化;Log用来将封装好的FlumeEvent写入磁盘并将指向这些event的指针存入一个内存队列queue。会创建一个线程工作内容就是每隔checkpointInterval毫秒,默认30s写一次检查点log.writeCheckpoint(),会将checpoint、inflightTakes、inflightPuts都刷新至磁盘,会先后将inflightPuts、inflightTakes、checkpoint.meta重建,更新checkpoint文件并刷新至磁盘,这些文件都在checkpointDir目录下;更新log-ID.meta文件;同时肩负起删除log文件及其对应的meta文件的责任。
2、log.replay(),一旦一个Log对象被创建,则需要调用replay()方法使用queue最新的检查点来调整磁盘上的write ahead log。会获取最大的fileID;然后读取log文件根据record的类型进行相应的操作,进行恢复;遍历所有的data目录,然后roll(index)创建LogFile.Writer(空的);然后将queue刷新到相关文件。
3、 open = true,表示channel打开;
4、depth = getDepth(),FlumeEventQueue的大小,然后需要判断一下queueRemaining是否有足够的剩余量queueRemaining.tryAcquire(depth);
5、如果open==true,计数器开始工作。
三、createTransaction()方法主要是构造一个FileBackedTransaction对象用来直接操作channel,并返回。
四、stop()停止channel,清理数据。