kafka练习

package com.ocean.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import javax.swing.plaf.multi.MultiButtonUI;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class MennuCommitConsumer {
    private Properties properties = new Properties();
    private KafkaConsumer<String, String> consumer;

    public MennuCommitConsumer() {

        properties.setProperty("bootstrap.servers", "master:9092");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("group.id", "java_group");
        // properties.setProperty("auto.offset.reset", "null");
        properties.setProperty("enable.auto.commit", "false");
        consumer = new KafkaConsumer<String, String>(properties);
    }

    public void subscirbleTopc() {
        List<String> topics = new ArrayList<String>();
        topics.add("b");
        topics.add("from-java");
        consumer.subscribe(topics);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition:" + record.partition() + "offset:" + record.offset() + "key:"
                        + record.key() + "value:" + record.value());
            }
            // consumer.commitSync();
            // 这句话是为了提交数据 如果不写 则会在下次启动时 还会出现

        }
    }

    public void getOffset() {
        OffsetAndMetadata offsets = consumer.committed(new TopicPartition("b", 0));
        System.out.println("offsets:" + offsets.offset());
    }
    // 制定分区消费 指定从offset的值出开始消费
    // 对消费着topic的消费指定有两种方式
    // 1.consumer.subscribe(topics);

    // 2.consumer.assign(topicPartitions);
    public void sonsumerAssigned() {
        // List<String>topics= new ArrayList<String>();
        // topics.add("b");
        // consumer.subscribe(topics);
        // 指定分区
        List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        topicPartitions.add(new TopicPartition("from-java", 0));
        consumer.assign(topicPartitions);
        // 指定分区的offset分区的位置
        consumer.seek(new TopicPartition("from-java", 0), 21);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(
                        "partition:" + record.partition() + "offset:" + record.offset() + "value:" + record.value());
            }
        }

    }

    public void setCommentOffset() {

        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("from_java", 0), new OffsetAndMetadata(0));
        List<String> topics = new ArrayList<String>();

        topics.add("from_java");
        consumer.subscribe(topics);
        // 指定位置提交某个分区的offsets的值 这会在下一次拉取数据前生效
        consumer.commitSync(offsets);

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {

                if (record.partition() == 0) {
                    System.out.println("partition:" + record.partition() + "offset:" + record.offset() + "value:"
                            + record.value());
                }
            }
        }

    }

    public void exactlyOnceConSumer(){
        //1.配置上参数
        properties.setProperty("enable.auto.commmit", "false");
        //2.订阅主题或者分区
        //consumer.subscribe(topics);
        //重设offset (offset)的值需要从mysql中获取
        //3.从mysql中获取
        //4.1 consumer.commitSync(offsets);
        //提交到kafka服务器中
        //或者使用
        //4.2 consumer.seek(new TopicPartition("from-java",0),0);
        //来指定要从kafka中高消费数据的初始值位置

        //订阅主题或分区
        //consumer.subscribe(topics);

        //5. poll数据
//      recordes =consumer.pool(1000)

        //6. 遍历参数值分析计算

        //7.计算结束之后使用consumer.committed(new TopicPartition("from-java",1))
        //获取当前消费的offset值

        //8.把计算结果和offset值 以原子操作(事物)的形式保存到mysql数据库

        //9.重新调到第五步循环执行 进行下一次pool和下一次计算
    }

    public static void main(String[] args) {
        MennuCommitConsumer mennuCommitConsumer = new MennuCommitConsumer();
        // mennuCommitConsumer.subscirbleTopc();
        // mennuCommitConsumer.getOffset();
        mennuCommitConsumer.sonsumerAssigned();
        mennuCommitConsumer.setCommentOffset();

    }

}

package com.ocean.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.pattern.PropertiesPatternConverter;

public class ProducerConsumer {

    private Properties properties = new Properties();
    private KafkaConsumer<String, String> consumer;

    public ProducerConsumer() {

        properties = new Properties();
        properties.put("bootstrap.servers", "master:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        properties.setProperty("group.id", "java-group");
        consumer = new KafkaConsumer<String, String>(properties);

    }

    public void subscribeTopic() {
        List<String> topics = new ArrayList<String>();
        topics.add("home-work_pic");
        consumer.subscribe(topics);
        // 循环从kafka中拉取数据
        while (true) {
            // 从kafka中拉取数据
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收信息:partition" + record.partition() + "offset:" + record.offset() + "key:"
                        + record.key() + "value:" + record.value());
            }
        }
    }

    public static void main(String[] args) {
        ProducerConsumer producerConsumer = new ProducerConsumer();
        producerConsumer.subscribeTopic();

    }
}


package com.ocean.kafka;

import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerKafka {

    private KafkaProducer<String, String> producer;
    private Properties properties;

    public ProducerKafka() {

        properties=new Properties();
        properties.put("bootstrap.servers", "master:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//      properties.put("acks",  "all");
//      properties.put("retries", 0);
//
        producer=new KafkaProducer<String, String>(properties);

    }
    public void assignPartitionSend(String key,String value){

        ProducerRecord<String, String>record =new ProducerRecord<String, String>("from-java", 0,key,value);
        producer.send(record);

    }
    public void sendRecorder(String key,String value){
        Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
        ProducerRecord<String, String> record =new ProducerRecord<String, String>("from-java", key,value);
        producer.send(record);
    }
    public void getTopicPartitions(String topic){

        Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
//      ProducerRecord<String, String> record =new ProducerRecord<String, String>("from-java", key,value);

        List<PartitionInfo> partitionInfos =producer.partitionsFor(topic);
        for (PartitionInfo partitionInfo : partitionInfos) {
            System.out.println(partitionInfo);
        }

    }

    public void getMetrics(){
        @SuppressWarnings("unchecked")
        Map<MetricName, Metric> metrics =(Map<MetricName, Metric>) producer.metrics();
        for (MetricName name : metrics.keySet()) {
            System.out.println(name.name()+":"+metrics.get(name).value());
        }

    }

    public void sendRecorderWithCallback(String key,String value){
        final Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
        ProducerRecord<String, String>record =new ProducerRecord<String, String>("from-java",key,value);
        Callback callback=new Callback() {
            //回掉方法
            public void onCompletion(RecordMetadata metadata, Exception exception) {

                if(exception==null){
                    logger.info("存储位置:partition:"+metadata.partition()+",offset:"+metadata.offset()+",ts:"+metadata.timestamp());
                }else{
                    logger.warn("服务端出现异常");
                    exception.printStackTrace();
                }

            }
        };
        producer.send(record,callback);
    }
    public void close(){
        producer.flush();
        producer.close();
    }

    public static void main(String[] args) {

        ProducerKafka client =new ProducerKafka();
        for(int i=0;i<100;i++){
            client.sendRecorderWithCallback("Ckey"+i, "Cvalue"+i);
        }
//      client.getMetrics();
        client.close();
    }
}

时间: 2024-09-17 04:05:11

kafka练习的相关文章

kafka详解一、Kafka简介

背景:      当今社会各种应用系统诸如商业.社交.搜索.浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它        如何及时做到如上两点      以上几个挑战形成了一个业务需求模型,即生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通两者的桥梁-消息系统.      从一个微观层面来说,这种需求也可理解为不同的系统之间如何传递消息. Kafka诞生

日志收集之kafka篇

日志收集     日志收集包括服务器日志收集和埋码日志收集两种.     服务器日志主要是nginx.tomcat等产生的访问和业务日志.     埋码收集主要是某些服务器无法收集,需要在前端进行收集的数据. 收集流程     日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题. 日志采集客户端,负责日志数据采集,定时写受写入Kafka队列: Kafka消息队列,负责日志数据的接收,存储和转发: 日志处理应用:订阅并消费kafka队列中的日志数据:       下

Kafka详解二、如何配置Kafka集群

Kafka集群配置比较简单,为了更好的让大家理解,在这里要分别介绍下面三种配置 单节点:一个broker的集群 单节点:多个broker的集群 多节点:多broker集群 一.单节点单broker实例的配置 1. 首先启动zookeeper服务      Kafka本身提供了启动zookeeper的脚本(在kafka/bin/目录下)和zookeeper配置文件(在kafka/config/目录下),首先进入Kafka的主目录(可通过 whereis kafka命令查找到):      [roo

Kafka实战-Flume到Kafka

1.概述 前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览 下面开始今天的分享内容. 2.数据来源 Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka(供实时计算处理)和HDFS(离线计算处理).关于Flume集群的Agent部署,这里就不多做赘述了,不清楚的同学可以参

Kafka JAVA客户端代码示例--高级应用

什么时间使用高级应用? 针对一个消息读取多次 在一个process中,仅仅处理一个topic中的一组partitions 使用事务,确保每个消息只被处理一次 使用高级应用(调用较底层函数)的缺点?     SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要) 在应用程序中跟踪上次消息处理的offset 确定一个topic partition的lead broker 手工处理broker leander的改变 使用底层函数(SimpleConsumer)开发

Kafka详解五、Kafka Consumer的底层API- SimpleConsumer

1.Kafka提供了两套API给Consumer The high-level Consumer API The SimpleConsumer API      第一种高度抽象的Consumer API,它使用起来简单.方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情 一个消息读取多次 在一个处理过程中只消费Partition其中的一部分消息 添加事务管理机制以保证消息被处理且仅被处理一次 2.使用SimpleConsumer有哪些弊

Kafka - 消费接口分析

1.概述 在 Kafka 中,官方对外提供了两种消费 API,一种是高等级消费 API,另一种是低等级的消费 API.在 <高级消费 API>一文中,介绍了其高级消费的 API 实现.今天给大家介绍另一种消费 API. 2.内容 在使用过 Kafka 的高级消费 API 后,我们知道它是一种高度抽象的消费 API,使用起来简单,方便,但是对于某些特殊的需求我们可能要用到第二种更加底层的 API.那么,我们首先需要知道低级消费 API 的作用.它能帮助我们去做那些事情: 一个消息进行多次读取 在

kafka web console安装

貌似很多小伙伴都不能成功打包,共享下之前打包的文件: http://pan.baidu.com/s/1sjkE37J ======== kafka自己居然没有还一个Web管理界面.. 这里有个第三方的:  https://github.com/claudemamo/kafka-web-console 坑爹的是居然没有详细的安装步骤,只有一些简单的说明,对于不熟悉scala play开发的人来说,很蛋疼.下面记录详细的安装过程. 先下载安装scala的构建工具sbt,最新版本可以到官网查看: ht

Apache Kafka的代码实例

前提: 已经配置好kafka.若未安装,可以参照[Apache Kafka]安装升级指南 已在eclipse里面安装scala插件.Eclipse Kepler中在Help->Eclipse Markectplace中搜索Scala,然后安装即可. 使用maven构建kafka测试project在eclipse中. 创建topic:在kafka的安装目录下执行bin/kafka-create-topic.sh --zookeeper 192.168.20.99:2181 --replica 1

Kafka实战-简单示例

1.概述 上一篇博客<Kafka实战-Kafka Cluster>中,为大家介绍了Kafka集群的安装部署,以及对Kafka集群Producer/Consumer.HA等做了相关测试,今天我们来开发一个Kafka示例,练习如何在Kafka中进行编程,下面是今天的分享的目录结构: 开发环境 ConfigureAPI Consumer Producer 截图预览 下面开始今天的内容分享. 2.开发环境 在开发Kafka相关应用之前,我们得将Kafka得开发环境搭建完成,这里我所使用得开发环境如下所