问题描述
@Overridepublicvoidrun(){//TODOAuto-generatedmethodstubtry{Thread.sleep(10000);}catch(InterruptedExceptione){//TODOAuto-generatedcatchblocke.printStackTrace();}Propertiesprops=newProperties();props.put("auto.offset.reset","smallest");//必须要加,如果要读旧数据props.put("zookeeper.connect","Master:2181");props.put("zk.connectiontimeout.ms","10000");props.put("group.id","test-consumer-group");//CreatetheconnectiontotheclusterConsumerConfigconsumerConfig=newConsumerConfig(props);ConsumerConnectorconsumerConnector=Consumer.createJavaConsumerConnector(consumerConfig);Map<String,Integer>topicCountMap=newHashMap<String,Integer>();topicCountMap.put("test",1);//一次从主题中获取一个数据Map<String,List<KafkaStream<byte[],byte[]>>>messageStreams=consumerConnector.createMessageStreams(topicCountMap);KafkaStream<byte[],byte[]>stream=messageStreams.get("test").get(0);//获取每次接收到的这个数据ConsumerIterator<byte[],byte[]>iterator=stream.iterator();System.out.println("consummer...");while(iterator.hasNext()){Stringmessage=newString(iterator.next().message());System.out.println("接收到:"+message);}}
producer正常运行,但是consumer没有取到数据,"group.id","test-consumer-group"是从conf中的consumer来的,消费者还需要其他配置?程序卡在while(iterator.hasNext()){这一行了,怎么解决能让它继续运行?
解决方案
解决方案二:
helphelp~