kafka-Java-SpringBoot-consumer API开发

ConsumerAPI的开发逻辑和Product是一样的,只不过多了一项必填选项group_id.
属性:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.List;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1315:58
 */
@ConfigurationProperties(prefix = "Riven.kafka.consumer")
public class ConsumerConfiguration {
    //kafka服务器列表
    private String bootstrapServers;

    /**
     * 如果设置成true,偏移量由auto.commit.interval.ms控制自动提交的频率。
     * <p>
     * 如果设置成false,不需要定时的提交offset,可以自己控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。
     * 这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
     */
    private Boolean enableAutoCommit = false;

    /**
     * 提交延迟毫秒数
     */
    private int autoCommitIntervalMs = 100;

    /**
     * 执行超时时间
     */
    private int sessionTimeoutMs = 15000;

    /**
     * 每次最少拉取多少数据
     */
    private int fetchMinBytes = 1;

    /**
     * 在单次调用中的最大返回
     */
    private int maxPollRecords = 300;

    /**
     * 该Consumer属于的组
     */
    private String groupId ;

    /**
     * 在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
     * 有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
     * consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.
     */
    private String autoOffseReset = "latest";

    /**
     * 同一个组下 启动几个consumer来获取kafka的消息
     */
    private int consumerAmount = 3;

    /**
     * 设置启动的consumer多久超时
     */
    private int pollTimeout = 5000;

    private List<String> topics;

    private String keySerializer = StringDeserializer.class.getName();
    private String valueSerializer = StringDeserializer.class.getName();

}

配置类:

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import riven.kafka.api.configuration.ConsumerConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1411:16
 * 配置Consumer选项
 * 初始化consumer_S
 */
@Configuration
@EnableKafka
@EnableConfigurationProperties(ConsumerConfiguration.class)
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers", "Riven.kafka.consumer.groupId"}, matchIfMissing = false)
public class ConsumerInitialize {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 初始化参数
     *
     * @param config
     * @return
     */
    private Map<String, Object> assembleProducer(ConsumerConfiguration config) {
        Map<String, Object> propsMap = new HashMap<>();
        if (StringUtils.isBlank(config.getBootstrapServers()))
            throw new RuntimeException("缺失kafka集群列表,初始化失败");
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());

        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
        //提交延迟毫秒数
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.getAutoCommitIntervalMs());
        //执行超时时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getSessionTimeoutMs());
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeySerializer());
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueSerializer());
        propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, config.getFetchMinBytes());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
        //组ID
        if (StringUtils.isBlank(config.getGroupId()))
            throw new RuntimeException("缺失Consumer组信息,初始化失败");
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());

        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffseReset());
        return propsMap;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory
            (ConsumerConfiguration ver) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        try {
            factory.setConsumerFactory(consumerFactory(ver));
            factory.setConcurrency(ver.getConsumerAmount());//启动的consumer个数
            factory.getContainerProperties().setPollTimeout(ver.getPollTimeout());//consumer;连接超时时间ms
            logger.info("初始化Consumer_S完成,共启动 {} 个Consumer", ver.getConsumerAmount());
        } catch (Exception e) {
            logger.info("初始化Consumer_S失败!");
            e.printStackTrace();
        }
        return factory;
    }

    @org.jetbrains.annotations.NotNull
    private ConsumerFactory<String, String> consumerFactory(ConsumerConfiguration ver) {
        return new DefaultKafkaConsumerFactory<>(assembleProducer(ver));
    }

最后,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize
时间: 2024-07-30 09:00:43

kafka-Java-SpringBoot-consumer API开发的相关文章

Java使用新浪微博API开发微博应用的基本方法_java

新浪微博API现在运用比较广泛,做一个完整的开发流程Demo 1.第一步注册,就不多说了,注册帐号以及成为开发者帐号,这步操作不会的话请你马上砸掉电脑拔掉网线回家种田. 2.第二步创建应用,开发者帐号创建好了,打开新浪微博开发平台: http://open.weibo.com    上面菜单栏点击最后一个 管理中心 如果是web的应用的话选择创建网站接入的应用,然后根据新浪微博的要求balabalabala自己去搞定   应用创建完毕.点击应用跳转页面,点击查看应用参数,可以看到应用的相关参数,

Kafka Java API示例

        1.Producer端 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaProducer{ private final Producer<String, String> producer; public fi

java script-初学者,使用Java Script API开发一个网页需要多长时间?

问题描述 初学者,使用Java Script API开发一个网页需要多长时间? 网页开发初学者 基于百度地图JavaScript API开发一个类似新东方网站的网页,大概需要多长时间? 百度地图Java Scrip API : http://developer.baidu.com/map/index.php?title=首页 新东方网页:http://souke.xdf.cn/Campus.aspx 解决方案 熟悉百度javascript api 1-2天,只需要关注加载地图和标注这两个例子 新

kafka详解三:开发Kafka应用

一.整体看一下Kafka 我们知道,Kafka系统有三大组件:Producer.Consumer.broker . producers 生产(produce)消息(message)并推(push)送给brokers,consumers从brokers把消息提取(pull)出来消费(consume). 二.开发一个Producer应用          Producers用来生产消息并把产生的消息推送到Kafka的Broker.Producers可以是各种应用,比如web应用,服务器端应用,代理应

JSR 310:一种新的Java日期/时间API

JSR 310 是一个用于执行与时间和日历有关的计算的 API,已经得到 Java SE 7 的推荐.该 API 的 目标是取代现有的两个构成 Java 的当前日期和时间 API 的类:java.util.Date 和 java.util.Calendar,同时仍然提供对这些旧有 API 的向后兼容访问.JSR 当前正在开发,并且该 API 有一个可用的试验性 Javadoc. 对 Java 6 日期/时间 API 的改进 JSR 310 日期/时间 API 试图通过提供更好的性能和易用性改进

使用Axis2的底层API开发Web Service

1.使用Axis2的底层API开发Web Service Server端 1.1创建一个WebService(取名为MyService) 在MyService中有两个operations,如下所示. public void ping(OMElement element){}//IN-ONLY模式.仅仅接收OMElement,并对 其处理. public OMElement echo(OMElement element){}//IN_OUT模式.接收OMElemen,并返回 OMElement.

Java桌面应用程序开发简介

Java对于服务器,个人电脑和移动设备来说是一项伟大的技术.由于需要java的跨平台的特性,因此java在服务器和移动设备方面的应用是非常成功的.但java在个人电脑应用方面的情况和在服务器及移动设备方面的应用有所不同,但是这很快就会有所改变,至少比你想象得要快.在这篇文章中,我会分析一下java在桌面环境中的应用将怎样得到提升,然后具体说一下java GUI(用户图形接口)的三个主要的工具:AWT, Swing, 和SWT..在下文中,我将会开发一个完整的java桌面应用程序. Java与桌面

你也可以玩转Skype -- 基于Skype API开发外壳程序入门

原文:你也可以玩转Skype -- 基于Skype API开发外壳程序入门 Skype是目前这个星球上最厉害的IM+VOIP软件,Skype现在已经改变了全球2.8亿人的生活方式.你,值得拥有! :) Skype中文官网:http://skype.tom.com/ Skype全球官网:http://www.skype.com/ Skype也是世界上最开放,最具创新意识的IM工具,他提供了Skype API, Skype4COM, Skype4Java几种形式的开发接口给Skype爱好者编写Sky

J-HI一款JAVA WEB应用软件快速开发开源平台

J-HI是一款JAVA WEB应用软件快速开发开源平台,主要服务于http://www.aliyun.com/zixun/aggregation/14750.html">软件企业和传统行业企事业单位信息中心的开发人员,为他们提供一套完整的一站式的JAVA WEB应用软件快速开发解决方案. 平台包括如下几个部分: 1.J-HI平台集成环境:J-HI团队开发了一个集成开发环境J-HI Studio,在此集成开发环境之上,开发人员能够快速搭建自己的开发环境,创建自己的模型,快速生成代码. 2.核

java学习-如何用Java进行高性能网站开发

1.生成对象时,合理分配空间和大小: Java中的很多类都有它的默认的空间分配大小,对于一些有大小的对象的初始化,应该预计对象的大小,然后使用进行初始化. 例如:我们在使用Vector,当声明Vector vect=new Vector()时,系统调用: public Vector() {// 缺省构造函数 this(10); // 容量是 10; } 缺省分配10个对象大小容量.当执行add方法时,可以看到具体实现为:.. public synchronized boolean add(Obj