Kafka——使用java api进行pub & sub

       之前用过老的api,但是最近在写消费的时候,发现之前老的api很多方法都out了,又去官网看了下最新的0.10.x的api.

1,producer

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.1.0</version>
		</dependency>
  public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.31:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("defaultTopic", Integer.toString(i), String.valueOf(i)));

        producer.close();
    }

2,Comsumer

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.1.0</version>
		</dependency>


 public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.31:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("defaultTopic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

    }

写完java api,可以try一下spring-intergeted,方便,一些池操作可以屏蔽掉,专注业务。

时间: 2024-09-18 02:55:48

Kafka——使用java api进行pub &amp; sub的相关文章

使用Java API处理WebSphere MQ大消息

WebSphere MQ 中处理大消息的方法 使用过 WebSphere MQ 的读者都知道,WebSphere MQ 对处理的单条消息的大小是有限制的,目前支持的最大消息是100M,而且,随着消息大小的增大,WebSphere MQ 处理的性能也会随之下降.从最佳实践来说,WebSphere MQ 传输大小为几K的消息其效率是最高的.那如何使 WebSphere MQ 能高效的处理大消息呢? WebSphere MQ 提供了处理大消息的两种方法:消息分片和消息分组.下面我们来看在使用 Java

DB2 NoSQL JSON 功能(三) 使用 Java API 编写应用程序

管理 JSON 文档 - 使用事务和不使用事务 概述 简介 DB2 JSON 是一个可用于 DB2 Linux, Unix and Windows 10.5 的 Technology Preview,提供了以下特性: 一个命令行 shell 用于管理和查询 JSON 数据 一个 Java API 用于应用程序开发 一个有线监听器用于接受和响应通过网络发送的请求. 图 1. DB2 JSON 组件 本文将介绍如何使用 Java 接口管理和查询 DB2 JSON 文档存储中的 JSON 文档.还将讨

用JAX-RPC构建RPC服务和客户机:使用Java API构建基于RPC的Web服务(一)

简介:远程过程调用(RPC)是基于 Simple Object Access Protocol(SOAP)或 Representational State Transfer(REST)的现代 Web 服务的前身.因为所有 Java 平台的 Web 服务 API 都构建 在从 RPC 引入的概念之上,所以要想用 Java 语言编写有效且高效的 Web 服务,理解 Java API for XML-Based RPC(JAX-RPC)几乎是必需的.本教程讲解如何获取.安装和配置 JAX-RPC 并构

使用IBM Rational ClearQuest Java API进行集成开发

通过 JNI 技术使用 ClearQuest Java API 实现与 Rational ClearQuest Test Manager(CQTM )系统的集成 简介:Rational ClearQuest 是一个缺陷和变更的管理系统,ClearQuest Test Manager (CQTM) 作为 一个模型运行在 ClearQuest V7 的顶层.其管理功能可覆盖测试的整个生命周期,包括测试计划.测试 编写.测试执行和测试报告.为了能够更好地使用CQTM提供的强大功能,自如地与其他应用系统

关于java问题-Java API 的了解与应用

问题描述 Java API 的了解与应用 Java 包里有好多类,也有好多接口,方法,但是这些方法作用和入口参数很多都不知道,怎么了解利用这些接口方法呢?求指教 解决方案 直接查找API帮助文档,因为所有的语法说明都有.你也可以上网搜索相应的文章 解决方案二: JAVA中API的各种应用 解决方案三: http://download.csdn.net/detail/u012505618/9175265 这是JAVA的中文API说明文档 解决方案四: 查API文档,就跟你学习汉字时查新华字典一样.

处理JSON的Java API :JSON的简介

原文链接  作者:Jitendra Kotamraju   译者:撒木 处理JSON的各种解析.生成.处理.转换和查询的JAVA API JSON (JavaScript Object Notation)是一种轻量级的.基于文本的.完全独立于语言的数据交换格式.它非常方便人们和机器的阅读和书写.JSON 有两种结构类型的表现方式:对象和数组.对象是名/值对的无序集合.数组是值(value)的有序集合.值的类型可以是字符串(在双引号中).数字(整数或浮点数).逻辑值(true或false).数组(

c语言-c通过jni调用 HBASE JAVA API

问题描述 c通过jni调用 HBASE JAVA API jvm.dll!6db71017() 未知[下面的框架可能不正确和/或缺失,没有为 jvm.dll 加载符号] jvm.dll!6db71086() 未知jvm.dll!6db710e0() 未知jvm.dll!6dde9d1d() 未知jvm.dll!6deaa65d() 未知jvm.dll!6ddfd291() 未知jvm.dll!6dd07d3d() 未知jvm.dll!6dd11511() 未知022003e4() 未知jvm.d

HBase Java API详解

[本文转自HBase Java API详解] HBase是Hadoop的数据库,能够对大数据提供随机.实时读写访问.他是开源的,分布式的,多版本的,面向列的,存储模型. 在讲解的时候我首先给大家讲解一下HBase的整体结构,如下图: HBase Master是服务器负责管理所有的HRegion服务器,HBase Master并不存储HBase服务器的任何数据,HBase逻辑上的表可能会划分为多个HRegion,然后存储在HRegion Server群中,HBase Master Server中存

sqoop client java api将mysql的数据导到hdfs

问题描述 sqoop client java api将mysql的数据导到hdfs package com.hadoop.recommend; import org.apache.sqoop.client.SqoopClient; import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MFromConfig; import org.apache.sqoop.model.MJob; import org