kafka-Java-SpringBoot-listener API开发

listener开发过程是独立的,你也可以不开发,使用@KafkaListener注解来监听kafka的消息,我的方式是实现一个唯一方法的接口,然后在该方法里面进行消费,无需关心kafka的具体实现,只需要添加一个topics到配置值文件即可.
项目git地址:

git@github.com:wudonghua/Java-Kafka-SpringBoot-API.git

接口:

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1414:52
 */
public interface IKafkaListener {
    void listener(ConsumerRecord<?, ?> record);
}

为实现IKafkaListener接口方注解.其中的TOPICS属性在ConsumerConfiguration中添加,即 private List topics;属性.

import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.NotFoundException;
import javassist.bytecode.AnnotationsAttribute;
import javassist.bytecode.ConstPool;
import javassist.bytecode.annotation.Annotation;
import javassist.bytecode.annotation.ArrayMemberValue;
import javassist.bytecode.annotation.MemberValue;
import javassist.bytecode.annotation.StringMemberValue;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.annotation.KafkaListener;
import riven.kafka.api.configuration.ConsumerConfiguration;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1415:18
 * 为实现了IKafkaListener接口方法listener追加注解
 */
public class KafkaListenerInitConfig implements BeanPostProcessor, ApplicationContextAware {

    private ApplicationContext applicationContext;
    private static final String TOPICS = "topics";
    private static final String LISTENER = "listener";
    @Autowired
    private ConsumerConfiguration consumerConfiguration;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        try {
            if (bean instanceof IKafkaListener)
                return injectBean(setAnnotation(bean));
            //设置参数注解:
            return bean;
        } catch (NotFoundException | CannotCompileException | IllegalAccessException | InstantiationException e) {
            e.printStackTrace();
        }
        return bean;
    }

    /**
     * 添加Autowired注入的内容
     *
     * @param iKafkaListener
     * @return
     * @throws IllegalAccessException
     */
    private IKafkaListener injectBean(IKafkaListener iKafkaListener) throws IllegalAccessException {
        Field[] fields = iKafkaListener.getClass().getFields();
        Field[] declaredFields = iKafkaListener.getClass().getDeclaredFields();
        Field[] allFiles = ArrayUtils.addAll(fields, declaredFields);
        //判断是否有@Autowired注解
        for (Field field : allFiles) {
            if (field.getAnnotation(Autowired.class) == null)
                continue;
            field.setAccessible(true);
            field.set(iKafkaListener, this.applicationContext.getBean(field.getName()));
        }
        return iKafkaListener;
    }

    /**
     * 添加注解及其属性
     *
     * @param bean
     * @return
     * @throws NotFoundException
     * @throws CannotCompileException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    private IKafkaListener setAnnotation(Object bean) throws NotFoundException, CannotCompileException, IllegalAccessException, InstantiationException {
        ClassPool classPool = ClassPool.getDefault();
        //获取当前Bean的常量池
        CtClass ctClass = classPool.getCtClass(bean.getClass().getName());
        ConstPool constPool = ctClass.getClassFile().getConstPool();
        //获取对应的注解内容
        List<StringMemberValue> list = new ArrayList<>(consumerConfiguration.getTopics().size());
        //创建注解属性
        consumerConfiguration.getTopics().forEach(topic -> list.add(new StringMemberValue(topic, constPool)));
        MemberValue[] memberValues = new MemberValue[list.size()];
        ArrayMemberValue arrayMemberValue = new ArrayMemberValue(constPool);
        arrayMemberValue.setValue(list.toArray(memberValues));
        //创建注解
        Annotation topics = new Annotation(KafkaListener.class.getName(), constPool);
        //为注解属性赋值
        topics.addMemberValue(TOPICS, arrayMemberValue);
        //创建注解容器
        AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
        annotationsAttribute.setAnnotation(topics);
        //把注解放到目标方法
        ctClass.getDeclaredMethod(LISTENER).getMethodInfo().addAttribute(annotationsAttribute);

        //生成一个全新的对象
        Class aClass = ctClass.toClass(new ClassLoader() {
            @Override
            public Class<?> loadClass(String name) throws ClassNotFoundException {
                return super.loadClass(name);
            }
        }, null);
        return (IKafkaListener) aClass.newInstance();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /**
     * 获取SpringIOC
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

把KafkaConsumerListener注册到SpringIOC之中:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Author dw07-Riven770[wudonghua@gznb.com]
* @Date 2017/12/1416:04
* 把KafkaConsumerListener注册到SpringIOC之中
*/
@Configuration
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers","Riven.kafka.consumer.groupId"})
public class CreateKafkaListener {
   private Logger logger = LoggerFactory.getLogger(this.getClass());

   @Bean
   public KafkaListenerInitConfig init() {
       return new KafkaListenerInitConfig();
   }
}

同样的,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize,riven.kafka.api.listener.CreateKafkaListener
时间: 2024-10-01 07:18:59

kafka-Java-SpringBoot-listener API开发的相关文章

Java使用新浪微博API开发微博应用的基本方法_java

新浪微博API现在运用比较广泛,做一个完整的开发流程Demo 1.第一步注册,就不多说了,注册帐号以及成为开发者帐号,这步操作不会的话请你马上砸掉电脑拔掉网线回家种田. 2.第二步创建应用,开发者帐号创建好了,打开新浪微博开发平台: http://open.weibo.com    上面菜单栏点击最后一个 管理中心 如果是web的应用的话选择创建网站接入的应用,然后根据新浪微博的要求balabalabala自己去搞定   应用创建完毕.点击应用跳转页面,点击查看应用参数,可以看到应用的相关参数,

java script-初学者,使用Java Script API开发一个网页需要多长时间?

问题描述 初学者,使用Java Script API开发一个网页需要多长时间? 网页开发初学者 基于百度地图JavaScript API开发一个类似新东方网站的网页,大概需要多长时间? 百度地图Java Scrip API : http://developer.baidu.com/map/index.php?title=首页 新东方网页:http://souke.xdf.cn/Campus.aspx 解决方案 熟悉百度javascript api 1-2天,只需要关注加载地图和标注这两个例子 新

JSR 310:一种新的Java日期/时间API

JSR 310 是一个用于执行与时间和日历有关的计算的 API,已经得到 Java SE 7 的推荐.该 API 的 目标是取代现有的两个构成 Java 的当前日期和时间 API 的类:java.util.Date 和 java.util.Calendar,同时仍然提供对这些旧有 API 的向后兼容访问.JSR 当前正在开发,并且该 API 有一个可用的试验性 Javadoc. 对 Java 6 日期/时间 API 的改进 JSR 310 日期/时间 API 试图通过提供更好的性能和易用性改进

使用Axis2的底层API开发Web Service

1.使用Axis2的底层API开发Web Service Server端 1.1创建一个WebService(取名为MyService) 在MyService中有两个operations,如下所示. public void ping(OMElement element){}//IN-ONLY模式.仅仅接收OMElement,并对 其处理. public OMElement echo(OMElement element){}//IN_OUT模式.接收OMElemen,并返回 OMElement.

Java桌面应用程序开发简介

Java对于服务器,个人电脑和移动设备来说是一项伟大的技术.由于需要java的跨平台的特性,因此java在服务器和移动设备方面的应用是非常成功的.但java在个人电脑应用方面的情况和在服务器及移动设备方面的应用有所不同,但是这很快就会有所改变,至少比你想象得要快.在这篇文章中,我会分析一下java在桌面环境中的应用将怎样得到提升,然后具体说一下java GUI(用户图形接口)的三个主要的工具:AWT, Swing, 和SWT..在下文中,我将会开发一个完整的java桌面应用程序. Java与桌面

你也可以玩转Skype -- 基于Skype API开发外壳程序入门

原文:你也可以玩转Skype -- 基于Skype API开发外壳程序入门 Skype是目前这个星球上最厉害的IM+VOIP软件,Skype现在已经改变了全球2.8亿人的生活方式.你,值得拥有! :) Skype中文官网:http://skype.tom.com/ Skype全球官网:http://www.skype.com/ Skype也是世界上最开放,最具创新意识的IM工具,他提供了Skype API, Skype4COM, Skype4Java几种形式的开发接口给Skype爱好者编写Sky

J-HI一款JAVA WEB应用软件快速开发开源平台

J-HI是一款JAVA WEB应用软件快速开发开源平台,主要服务于http://www.aliyun.com/zixun/aggregation/14750.html">软件企业和传统行业企事业单位信息中心的开发人员,为他们提供一套完整的一站式的JAVA WEB应用软件快速开发解决方案. 平台包括如下几个部分: 1.J-HI平台集成环境:J-HI团队开发了一个集成开发环境J-HI Studio,在此集成开发环境之上,开发人员能够快速搭建自己的开发环境,创建自己的模型,快速生成代码. 2.核

java学习-如何用Java进行高性能网站开发

1.生成对象时,合理分配空间和大小: Java中的很多类都有它的默认的空间分配大小,对于一些有大小的对象的初始化,应该预计对象的大小,然后使用进行初始化. 例如:我们在使用Vector,当声明Vector vect=new Vector()时,系统调用: public Vector() {// 缺省构造函数 this(10); // 容量是 10; } 缺省分配10个对象大小容量.当执行add方法时,可以看到具体实现为:.. public synchronized boolean add(Obj

《Java数字图像处理:编程技巧与应用实践》——第1章 Java Graphics及其API简介 1.1 什么是Java图形设备Graphics

第1章 Java Graphics及其API简介 在开始本书内容之前,笔者假设你已经有了面向对象语言编程的基本概念,了解Java语言的基本语法与特征,原因在于本书的所有源代码都是基于Java语言实现的,而且是基于Java开发环境运行与演示所有图像处理算法的.本书第1章到第3章是为了帮助读者了解与掌握Java 图形与GUI编程的基本知识与概念而写的.本章主要介绍Java GUI编程中基本的图形知识,针对GUI编程,Java语言提供了两套几乎并行的API,分别是Swing与AWT.早期的Java G

基于swagger的RESTful API开发实践

前言 RESTful架构,是目前最流行的一种互联网软件架构.它结构清晰.符合标准.易于理解.扩展方便,所以正得到越来越多网站的采用.后端通过提供一套标准的RESTful API,让网站,移动端和第三方系统都可以基于API进行数据交互和对接,极大的提高系统的开发效率,也使得前后端分离架构成为可能. 因此,不同的测试,开发团队(前端,移动端,第三方接入者等)都需要围绕API进行开发工作,API的规范和文档对于团队开发,测试变得越来越重要.除了一份标准的文档,我们还希望API能够在线测试使用,从而有更