Kafka详解五、Kafka Consumer的底层API- SimpleConsumer

1.Kafka提供了两套API给Consumer

  1. The high-level Consumer API
  2. The SimpleConsumer API     

第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情

  • 一个消息读取多次
  • 在一个处理过程中只消费Partition其中的一部分消息
  • 添加事务管理机制以保证消息被处理且仅被处理一次

2.使用SimpleConsumer有哪些弊端呢?

  • 必须在程序中跟踪offset值
  • 必须找出指定Topic Partition中的lead broker
  • 必须处理broker的变动

3.使用SimpleConsumer的步骤

  1. 从所有活跃的broker中找出哪个是指定Topic Partition中的leader broker
  2. 找出指定Topic Partition中的所有备份broker
  3. 构造请求
  4. 发送请求查询数据
  5. 处理leader broker变更

4.代码实例

package bonree.consumer;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class SimpleExample {
	private List<String> m_replicaBrokers = new ArrayList<String>();

	public SimpleExample() {
		m_replicaBrokers = new ArrayList<String>();
	}

	public static void main(String args[]) {
		SimpleExample example = new SimpleExample();
		// 最大读取消息数量
		long maxReads = Long.parseLong("3");
		// 要订阅的topic
		String topic = "mytopic";
		// 要查找的分区
		int partition = Integer.parseInt("0");
		// broker节点的ip
		List<String> seeds = new ArrayList<String>();
		seeds.add("192.168.4.30");
		seeds.add("192.168.4.31");
		seeds.add("192.168.4.32");
		// 端口
		int port = Integer.parseInt("9092");
		try {
			example.run(maxReads, topic, partition, seeds, port);
		} catch (Exception e) {
			System.out.println("Oops:" + e);
			e.printStackTrace();
		}
	}

	public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
		// 获取指定Topic partition的元数据
		PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
		if (metadata == null) {
			System.out.println("Can't find metadata for Topic and Partition. Exiting");
			return;
		}
		if (metadata.leader() == null) {
			System.out.println("Can't find Leader for Topic and Partition. Exiting");
			return;
		}
		String leadBroker = metadata.leader().host();
		String clientName = "Client_" + a_topic + "_" + a_partition;

		SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
		long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
		int numErrors = 0;
		while (a_maxReads > 0) {
			if (consumer == null) {
				consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
			}
			FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
			FetchResponse fetchResponse = consumer.fetch(req);

			if (fetchResponse.hasError()) {
				numErrors++;
				// Something went wrong!
				short code = fetchResponse.errorCode(a_topic, a_partition);
				System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
				if (numErrors > 5)
					break;
				if (code == ErrorMapping.OffsetOutOfRangeCode()) {
					// We asked for an invalid offset. For simple case ask for
					// the last element to reset
					readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
					continue;
				}
				consumer.close();
				consumer = null;
				leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
				continue;
			}
			numErrors = 0;

			long numRead = 0;
			for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
				long currentOffset = messageAndOffset.offset();
				if (currentOffset < readOffset) {
					System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
					continue;
				}

				readOffset = messageAndOffset.nextOffset();
				ByteBuffer payload = messageAndOffset.message().payload();

				byte[] bytes = new byte[payload.limit()];
				payload.get(bytes);
				System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
				numRead++;
				a_maxReads--;
			}

			if (numRead == 0) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		if (consumer != null)
			consumer.close();
	}

	public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}

	/**
	 * @param a_oldLeader
	 * @param a_topic
	 * @param a_partition
	 * @param a_port
	 * @return String
	 * @throws Exception
	 *             找一个leader broker
	 */
	private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
		for (int i = 0; i < 3; i++) {
			boolean goToSleep = false;
			PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
			if (metadata == null) {
				goToSleep = true;
			} else if (metadata.leader() == null) {
				goToSleep = true;
			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
				// first time through if the leader hasn't changed give
				// ZooKeeper a second to recover
				// second time, assume the broker did recover before failover,
				// or it was a non-Broker issue
				//
				goToSleep = true;
			} else {
				return metadata.leader().host();
			}
			if (goToSleep) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		System.out.println("Unable to find new leader after Broker failure. Exiting");
		throw new Exception("Unable to find new leader after Broker failure. Exiting");
	}

	private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
		PartitionMetadata returnMetaData = null;
		loop: for (String seed : a_seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
				List<String> topics = Collections.singletonList(a_topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						if (part.partitionId() == a_partition) {
							returnMetaData = part;
							break loop;
						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}
		if (returnMetaData != null) {
			m_replicaBrokers.clear();
			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
				m_replicaBrokers.add(replica.host());
			}
		}
		return returnMetaData;
	}
}
时间: 2024-09-27 20:10:14

Kafka详解五、Kafka Consumer的底层API- SimpleConsumer的相关文章

Masonry自动布局详解五:比例(multipliedBy)

Masonry自动布局详解五:比例(multipliedBy) 说到iOS自动布局,有很多的解决办法.有的人使用xib/storyboard自动布局,也有人使用frame来适配.对于前者,笔者并不喜欢,也不支持.对于后者,更是麻烦,到处计算高度.宽度等,千万大量代码的冗余,对维护和开发的效率都很低. 笔者在这里介绍纯代码自动布局的第三方库:Masonry.这个库使用率相当高,在全世界都有大量的开发者在使用,其star数量也是相当高的. 支持原创,请阅读原文 效果图 本节详解Masonry的以动画

Android开发之对话框案例详解(五种对话框)

下面通过实例代码给大家分享5种android对话框,具体内容详情如下所示: 1 弹出普通对话框 --- 系统更新 2 自定义对话框-- 用户登录 3 时间选择对话框 -- 时间对话框 4 进度条对话框 -- 信息加载.. 5 popuWindow对话框 1 弹出普通对话框 --- 系统更新 //弹出普通对话框 public void showNormalDialog(View v) { AlertDialog.Builder builder = new Builder(this); //设置Di

kafka详解四:Kafka的设计思想、理念

     本节主要从整体角度介绍Kafka的设计思想,其中的每个理念都可以深入研究,以后我可能会发专题文章做深入介绍,在这里只做较概括的描述以便大家更好的理解Kafka的独特之处.本节主要涉及到如下主要内容: Kafka设计基本思想 Kafka中的数据压缩 Kafka消息转运过程中的可靠性 Kafka集群镜像复制 Kafka 备份机制 一.kafka由来      由于对JMS日常管理的过度开支和传统JMS可扩展性方面的局限,LinkedIn(www.linkedin.com)开发了Kafka以

kafka详解三:开发Kafka应用

一.整体看一下Kafka 我们知道,Kafka系统有三大组件:Producer.Consumer.broker . producers 生产(produce)消息(message)并推(push)送给brokers,consumers从brokers把消息提取(pull)出来消费(consume). 二.开发一个Producer应用          Producers用来生产消息并把产生的消息推送到Kafka的Broker.Producers可以是各种应用,比如web应用,服务器端应用,代理应

kafka详解一、Kafka简介

背景:      当今社会各种应用系统诸如商业.社交.搜索.浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它        如何及时做到如上两点      以上几个挑战形成了一个业务需求模型,即生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通两者的桥梁-消息系统.      从一个微观层面来说,这种需求也可理解为不同的系统之间如何传递消息. Kafka诞生

Kafka详解二、如何配置Kafka集群

Kafka集群配置比较简单,为了更好的让大家理解,在这里要分别介绍下面三种配置 单节点:一个broker的集群 单节点:多个broker的集群 多节点:多broker集群 一.单节点单broker实例的配置 1. 首先启动zookeeper服务      Kafka本身提供了启动zookeeper的脚本(在kafka/bin/目录下)和zookeeper配置文件(在kafka/config/目录下),首先进入Kafka的主目录(可通过 whereis kafka命令查找到):      [roo

Flash MX 新特性详解(五)应用开发和发布与重放

详解 一.应用开发 1.预制了用户界面的组件 预制一系列可定制的界面组件,包括:滚动条.多种文本域.输入按钮.和检查框,还有列表和组合箱,可以加速应用软件的开发.这些组件保证了普通用户用Flash MX来创建丰富的动画效果. 2.定制组件 定制一个更强的可以反复使用的组件来满足你变化的设计方案的需要.你可以将组件在不同的Falsh作品中使用或者在开发小组里相互组织.开发者可以拖动预制的组件到界面上.组建的方法可以在已经定义的APIs里整合和操作. 3.定制组件在界面上的设计时间 通过定制属性建材

hadoop详解(五) Archives

简介 我们在hadoop深入研究:(一)--hdfs介绍里已讲过,hdfs并不擅长存储小文件,因为每个文件最 少一个block,每个block的元数据都会在namenode节点占用内存,如果存在这样大量的小文件,它们会吃掉 namenode节点的大量内存. hadoop Archives可以有效的处理以上问题,他可以把多个文件归档成为一个文 件,归档成一个文件后还可以透明的访问每一个文件,并且可以做为mapreduce任务的输入. 用法 hadoop Archives可以使用archive工具创

CSS盒子模式详解五

在本人上一篇教程<彻底弄懂CSS盒子模式四(绝对定位和相对定位)>中最后有演示一个综合导航实例,那时因为时间关系,同时本人也觉得有必要将这实例分出来单独讲一下,所以没有把实例讲解教程写到上一篇教程中.这个教程可以作为CSS定位学习的强化练习,当然教程我也不只是单一的讲解做的步骤,还会和大家一起来分析一下设计思路,同时分享本人在做的过程中发现的一些问题供大家防范参考.为了兼顾一下没有来得及看我上一篇教程的网友,我再次给出代码运行框,大家可以先运行看看效果,不过建议最好先看一下本人上一篇教程,除非