Kafka JAVA客户端代码示例

介绍

     http://kafka.apache.org 
    kafka是一种高吞吐量的分布式发布订阅消息系统 
    kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态) 

    当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。 

高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理

测试环境

    kafka_2.10-0.8.1.1 3个节点做的集群

    zookeeper-3.4.5 一个实例节点

代码示例

消息生产者代码示例

import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
 * @author Fung
 *
 */
public class ProducerDemo {
	public static void main(String[] args) {
		Random rnd = new Random();
		int events=100;

		// 设置配置属性
		Properties props = new Properties();
		props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		// key.serializer.class默认为serializer.class
		props.put("key.serializer.class", "kafka.serializer.StringEncoder");
		// 可选配置,如果不配置,则使用默认的partitioner
		props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");
		// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
		// 值为0,1,-1,可以参考
		// http://kafka.apache.org/08/configuration.html
		props.put("request.required.acks", "1");
		ProducerConfig config = new ProducerConfig(props);

		// 创建producer
		Producer<String, String> producer = new Producer<String, String>(config);
		// 产生并发送消息
		long start=System.currentTimeMillis();
		for (long i = 0; i < events; i++) {
			long runtime = new Date().getTime();
			String ip = "192.168.2." + i;//rnd.nextInt(255);
			String msg = runtime + ",www.example.com," + ip;
			//如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
			KeyedMessage<String, String> data = new KeyedMessage<String, String>(
					"page_visits", ip, msg);
			producer.send(data);
		}
		System.out.println("耗时:" + (System.currentTimeMillis() - start));
		// 关闭producer
		producer.close();
	}
}

消息消费者代码示例

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 *
 * @author Fung
 *
 */
public class ConsumerDemo {
	private final ConsumerConnector consumer;
	private final String topic;
	private ExecutorService executor;

	public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
		consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
		this.topic = a_topic;
	}

	public void shutdown() {
		if (consumer != null)
			consumer.shutdown();
		if (executor != null)
			executor.shutdown();
	}

	public void run(int numThreads) {
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(numThreads));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
				.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

		// now launch all the threads
		executor = Executors.newFixedThreadPool(numThreads);

		// now create an object to consume the messages
		//
		int threadNumber = 0;
		for (final KafkaStream stream : streams) {
			executor.submit(new ConsumerMsgTask(stream, threadNumber));
			threadNumber++;
		}
	}

	private static ConsumerConfig createConsumerConfig(String a_zookeeper,
			String a_groupId) {
		Properties props = new Properties();
		props.put("zookeeper.connect", a_zookeeper);
		props.put("group.id", a_groupId);
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");

		return new ConsumerConfig(props);
	}

	public static void main(String[] arg) {
		String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" };
		String zooKeeper = args[0];
		String groupId = args[1];
		String topic = args[2];
		int threads = Integer.parseInt(args[3]);

		ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
		demo.run(threads);

		try {
			Thread.sleep(10000);
		} catch (InterruptedException ie) {

		}
		demo.shutdown();
	}
}

消息处理类

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerMsgTask implements Runnable {
	private KafkaStream m_stream;
	private int m_threadNumber;

	public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
		m_threadNumber = threadNumber;
		m_stream = stream;
	}

	public void run() {
		ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
		while (it.hasNext())
			System.out.println("Thread " + m_threadNumber + ": "
					+ new String(it.next().message()));
		System.out.println("Shutting down Thread: " + m_threadNumber);
	}
}

Partitioner类示例

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class PartitionerDemo implements Partitioner {
	public PartitionerDemo(VerifiableProperties props) {

	}

	@Override
	public int partition(Object obj, int numPartitions) {
		int partition = 0;
		if (obj instanceof String) {
			String key=(String)obj;
			int offset = key.lastIndexOf('.');
			if (offset > 0) {
				partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;
			}
		}else{
			partition = obj.toString().length() % numPartitions;
		}

		return partition;
	}

}

pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.xxx</groupId>
	<artifactId>kafka-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>kafka-demo</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.8.1.1</version>
			<exclusions>
				<exclusion>
					<artifactId>jmxtools</artifactId>
					<groupId>com.sun.jdmk</groupId>
				</exclusion>
				<exclusion>
					<artifactId>jmxri</artifactId>
					<groupId>com.sun.jmx</groupId>
				</exclusion>
				<exclusion>
					<artifactId>jms</artifactId>
					<groupId>javax.jms</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.15</version>
			<exclusions>
				<exclusion>
					<artifactId>jmxtools</artifactId>
					<groupId>com.sun.jdmk</groupId>
				</exclusion>
				<exclusion>
					<artifactId>jmxri</artifactId>
					<groupId>com.sun.jmx</groupId>
				</exclusion>
				<exclusion>
					<artifactId>jms</artifactId>
					<groupId>javax.jms</groupId>
				</exclusion>
				<exclusion>
					<artifactId>mail</artifactId>
					<groupId>javax.mail</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.11</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
</project>

参考

https://cwiki.apache.org/confluence/display/KAFKA/Index

https://kafka.apache.org/

时间: 2024-12-26 20:25:15

Kafka JAVA客户端代码示例的相关文章

Kafka JAVA客户端代码示例--高级应用

什么时间使用高级应用? 针对一个消息读取多次 在一个process中,仅仅处理一个topic中的一组partitions 使用事务,确保每个消息只被处理一次 使用高级应用(调用较底层函数)的缺点?     SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要) 在应用程序中跟踪上次消息处理的offset 确定一个topic partition的lead broker 手工处理broker leander的改变 使用底层函数(SimpleConsumer)开发

kafka增加SSL认证的Producer客户端代码示例

kafka增加SSL认证的Producer客户端代码示例 package com.kafka.safe.ssl; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.commo

Solr JAVA客户端SolrJ 4.9使用示例教程

简介         SolrJ是操作Solr的JAVA客户端,它提供了增加.修改.删除.查询Solr索引的JAVA接口.SolrJ针对Solr提供了Rest 的HTTP接口进行了封装,SolrJ底层是通过使用httpClient中的方法来完成Solr的操作. SolrJ/Solr 跨版本兼容性     SolrJ通常向后保持兼容性,可以使用新版本的SolrJ访问较旧的Solr,反之亦然.建议使用同Solr server同版本的SolrJ,      详见http://wiki.apache.o

推荐Java五大微服务器及其代码示例教程

微服务越来越多地用于开发领域,因为开发人员致力于创建更大,更复杂的应用程序,这些应用程序作为较小的服务组合而更好地开发和管理,可将工作在一起,实现更大的应用程序范围的功能.工具正在上升,以满足使用逐块方法思考和构建应用程序的需求,与同时考虑整个应用程序相比.下面为大家讲述一下Java五大微服务器,使用这些功能的好处以及相关的代码示例. 什么是微服务 微服务是一种面向服务的架构风格(Java开发人员最重要的技能之一),其中应用程序被构建为不同的小型服务而不是整个应用程序的集合.您可以使用多个独立的

kafka linux下java客户端的详细编译步骤

问题描述 kafka linux下java客户端的详细编译步骤 求关于kafka在Linux 命令行下的java客户端详细编译步骤,网上直看到些程序代码但是具体编译过程很少

【智能合约】客户端和web端对智能合约的事件Event进行调用的代码示例

客户端和web端对智能合约的事件Event进行调用的代码示例 web truffle 按官网的例子 http://truffleframework.com/boxes/pet-shop truffle作为一个运行测试框架,用的也是web3对智能合约进行调用. 文件所在的位置src/js/app.js initWeb3: function() { // web3入口 if (typeof web3 !== 'undefined') { App.web3Provider = web3.current

java生成csv文件 自己定义表头、用sql查询内容,把内容和表头放到表里 , 最好有代码示例

问题描述 java生成csv文件 自己定义表头.用sql查询内容,把内容和表头放到表里 , 最好有代码示例 30C java生成csv文件 ,自己定义表头.用sql查询内容,把内容和表头放到表里 , 最好有代码示例 解决方案 csv文件最简单了,就是文本格式,逗号分割字段,换行分割记录.你自己sql循环,然后写文件,要表头的话,先输出一行作为表头就可以了. 解决方案二: 我要测试导出csv文件,response.getOutputStream()报空指针异常 解决方案三: 哪位大神有关于导出cs

谁有JAVA核心技术的示例代码?

问题描述 谁有JAVA核心技术的示例代码? 解决方案 解决方案二:何谓核心?解决方案三:引用楼主ybingxin1234的回复: 谁有JAVA核心技术的示例代码? 你要的核心技术的下载包里肯定有example解决方案四:<JAVA核心技术>里的示例代码?解决方案五:JDK的安装包里....解决方案六:JDK安装包中的demo中有源码

think in java interview-高级开发人员面试宝典代码示例

下载资源地址为: http://download.csdn.net/detail/lifetragedy/6379755 这是think in java interview中的代码示例,包括JAVA基础的数据结构,IO, 核心基础以及设计模式等. 因此我把它称为wallet工程(钱包工程),可以直接导入eclipse工程中去.