问题描述
- strom+kafka,strom会一直重复解析同条数据,有时解析不到数据
-
protected StormTopology buildTopology() {
// ConsumerConfig.
ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath);
// storm.kafka.KafkaSpout
TridentTopology tridentTopology = new TridentTopology(); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zkHosts, topic); spoutConf.forceFromStart = true;// 和startOffsetTime,一起用,默认情况下,为false,一旦startOffsetTime被设置,就要置为true spoutConf.startOffsetTime = -1L;// -2 从kafka头开始 -1 是从最新的开始 0 =无 从ZK开始 spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
// OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(spoutConf);
TransactionalTridentKafkaSpout opaqueTridentKafkaSpout = new TransactionalTridentKafkaSpout(spoutConf); Stream opTrdtSptStream = tridentTopology.newStream("opTrdtSptStream", opaqueTridentKafkaSpout); opTrdtSptStream.shuffle().each(new Fields("str"), new Utils.FilterLogInfo()).each(new Fields("str"), new LogFilterUtil(), new Fields("analyzeResult")).each(new Fields("analyzeResult"),new PropertyAnalyzeUtils (),new Fields("autoItem")).each(new Fields("autoItem"), new AnsjIllegalWordsUtil (), new Fields("ansjIllegalWordsResult")).each(new Fields("ansjIllegalWordsResult"), new AnsjKeyWordsUtil(), new Fields("ansjKeyWordsResult")).each(new Fields("ansjKeyWordsResult"), new AnsjPicturesUtil(), new Fields("ansjPicturesResult")); return tridentTopology.build(); }
解决方案
[url=http://www.51zxw.net/study.asp?vip=12576487]51自学网-专业培训老师录制的视频教程,让学习变得很轻松[/url]
时间: 2024-11-26 17:59:17