在sink和source中(不管是内置还是自定义的),基本都有如下代码:
... Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; Status result = Status.READY; transaction.begin(); ... event = channel.take();//getChannelProcessor().processEvent(event);,前者用于sink后者用于source ... transaction.commit(); transaction.rollback() transaction.close(); ...
那么有些人就要问了?从上述代码中似乎只需要获取channel就可以了,因为获取数据时只需要event = channel.take()或者
getChannelProcessor().processEvent(event)?这样对吗?你可以去掉transaction试试,结果显示是不行的,出错!
那么为什么呢?这确实有点让人疑惑,但实际上channel.take()操作是transaction.doTake()。也就是实际的put和take等操作都是在transaction中进行的,因此要用channel必须要先创建transcation才可以使用。而channel.getTransaction()方法就是获取(已经创建)或创建(还没有)transcation,BasicChannelSemantics的相对应代码如下:
@Override public Transaction getTransaction() { if (!initialized) { synchronized (this) { if (!initialized) { initialize(); initialized = true; } } } BasicTransactionSemantics transaction = currentTransaction.get();//获取transcation if (transaction == null || transaction.getState().equals(//如果transaction不存在或者已关闭就创建 BasicTransactionSemantics.State.CLOSED)) { transaction = createTransaction();//创建 currentTransaction.set(transaction);//赋值给currentTransaction } return transaction; }
本栏目更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/webkf/tools/
该方法在所有channel的父类BasicChannelSemantics中,然后在具体实现的channel类中需要实现protected abstract BasicTransactionSemantics createTransaction()这个抽象方法来获取相应的transaction对象。BasicChannelSemantics把transaction.take()和transaction.put(event)方法进一步封装成take()和put(event)方法,这俩方法就是暴露在sink或者source中的channel.take()和channel.put(event)方法。
@Override public void put(Event event) throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); transaction.put(event); } @Override public Event take() throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); return transaction.take(); }
作者:cnblogs 玖疯
以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索channel interleaving
, channel
, 方法
, event
, transactions
, transaction
, 大数据 flume ng
, transactional
, ''takes
, Transcation
Preconditions
flume ng filechannel、flume transaction、flume channel、flume filechannel、flume kafka channel,以便于您获取更多的相关知识。