Kafka Producer接口

参考,

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

http://kafka.apache.org/08/configuration.html , 0.8版本,关于producer,consumer,broker所有的配置

 

因为Producer相对于consumer比较简单,直接看代码,需要注意的点

1. 配置参数,详细参考上面链接 
    1.1 metadata.broker.list, 不同于0.7,不需要给出zk的地址,而是给出一些broker地址,不用全部,这里建议给两个防止一个不可用 
          Kafka会自己找到相应topic,partition的leader broker 
    1.2 serializer.class,需要给出message的序列化的encoder,这里使用的是简单的StringEncoder 
          并且对于key还可以单独的设定,"key.serializer.class"  
          注意,除非明确知道message编码,否则不要直接使用StringEncoder, 
          因为源码中的逻辑是如果没有在初始化时指定编码会默认按UTF8转码,会导致乱码 
          所以不明确的时候,不要指定serializer.class,默认的encoder逻辑是直接将byte[]放入broker,不会做转码 
    1.3 partitioner.class,可以不设置,默认就是random partition,当然这里可以自定义,如何根据key来选择partition 
    1.4 request.required.acks, 是否要求broker给出ack,如果不设置默认是'fire and forget', 会丢数据 
          默认为0,即和0.7一样,发完不会管是否成功,lowest latency but the weakest durability 
          1, 等待leader replica的ack,否则重发,折中的方案,当leader在同步数据前dead,会丢数据 
          -1,等待all in-sync replicas的ack,只要有一个replica活着,就不会丢数据 
    1.5 producer.type,  
         sync,单条发送 
         async,buffer一堆请求后,再一起发送 
         如果不是对丢数据非常敏感,请设为async,因为对throughput帮助很大,但是当client crash时,会丢数据 
    1.6 compression.codec 
         支持"none", "gzip" and "snappy" 
         可以通过,compressed.topics,来指定压缩的topic

    当producer.type选择async的时候,需要关注如下配置 
    queue.buffering.max.ms (5000), 最大buffer数据的时间,默认是5秒 
    batch.num.messages (200), batch发送的数目,默认是200,producer会等待buffer的messages数目达到200或时间超过5秒,才发送数据 
    queue.buffering.max.messages (10000), 最多可以buffer的message数目,超过要么producer block或把数据丢掉 
    queue.enqueue.timeout.ms (-1), 默认是-1,即达到buffer最大meessage数目时,producer会block 
                                                       设为0,达到buffer最大meessage数目时会丢掉数据

 

2. Producer发送的是kv数据 
无论Producer或KeyedMessage都是<String, String>的泛型,这里是指key和value的类型

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "host1:9092, host2:9092 "); //
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner"); //可以不设置
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++) {
               long runtime = new Date().getTime();
               String ip = “192.168.2.” + rnd.nextInt(255);
               String msg = runtime + “,www.example.com,” + ip;
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); //指定topic,key,value
               producer.send(data);
        }
        producer.close();
    }
}

 

对于自定义partitioner也很简单,

对于partition,两个参数,key和partitions的数目 
所要完成的逻辑就是,如果根据key在partitions中挑选一个合适的partition

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(String key, int a_numPartitions) {
        int partition = 0;
        int offset = key.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }

}

2014-06-23
时间: 2024-12-24 08:19:24

Kafka Producer接口的相关文章

Kafka Producer拦截器

Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器.本篇主要讲述的是Kafka Producer端的拦截器,它主要用来对消息进行拦截或者修改,也可以用于Producer的Callback回调之前进行相应的预处理. 使用Kafka Producer端的拦截器非常简单,主要是实现ProducerInterceptor接口,此接口包含4个方法: 1. ProducerRecord

Kafka Consumer接口

对于kafka的consumer接口,提供两种版本,   high-level 一种high-level版本,比较简单不用关心offset, 会自动的读zookeeper中该Consumer group的last offset  参考,https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example 不过要注意一些注意事项,对于多个partition和多个consumer  1. 如果consumer比partiti

Kafka 0.8 配置参数解析

Broker Configs 4个必填参数, broker.id Each broker is uniquely identified by a non-negative integer id  broker唯一标识,broker可以在不同的host或port,但必须保证id唯一 log.dirs (/tmp/kafka-logs) 日志文件存放的目录  可以用逗号隔开多个目录,当创建partitions时,会自动挑一个已创建partition最少的目录创建  因为Kafka必须充分利用磁盘资源

kafka各版本差异

kafka各版本差异. kafka-0.8.2 新特性 producer不再区分同步(sync)和异步方式(async),所有的请求以异步方式发送,这样提升了客户端效率.producer请求会返回一个应答对象,包括偏移量或者错误信.这种异步方地批量的发送消息到kafka broker节点,因而可以减少server端资源的开销.新的producer和所有的服务器网络通信都是异步地,在ack=-1模式下需要等待所有的replica副本完成复制时,可以大幅减少等待时间. 在0.8.2之前,kafka删

Apache Kafka是分布式发布-订阅消息系统

转自: http://www.infoq.com/cn/articles/apache-kafka?utm_source=infoq&utm_medium=popular_links_homepage 简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同: 它被设计为一个分布式系统,易

《kafka中文手册》- 构架设计(一)

4. DESIGN 设计 4.1 Motivation 目的 4.2 Persistence 存储 Don't fear the filesystem! 不要对文件系统感到恐惧 Constant Time Suffices 常量耗时需求 4.3 Efficiency 效率 End-to-end Batch Compression 端到端的数据压缩 4.4 The Producer 发布者 Load balancing 负载均衡 Asynchronous send 异步发送 4.5 The Con

kafka数据可靠性深度解读

1 概述 Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark等都支持与Kafka集成. Kafka凭借着自身的优势,越来越受到互联网企业的青睐,唯品会也采用Kafka作为其内部核心消息引擎之一.Kafka作为一个商业级消息中间件,消息可靠性的重要性可想而知.如何确保消息的精确传输?如何确保消息的准确存储?如何

Kafka 设计与原理详解

一.Kafka简介 本文综合了我之前写的kafka相关文章,可作为一个全面了解学习kafka的培训学习资料. 转载请注明出处 : 本文链接 1.1 背景历史 当今社会各种应用系统诸如商业.社交.搜索.浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它 如何及时做到如上两点 以上几个挑战形成了一个业务需求模型,即生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通

《KAFKA官方文档》设计与实现(一)

5.设计与实现(IMPLEMENTATION) 5.1 API 设计 生产者 APIS 生产者API包含2个producers-kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.示例代码如下: class Producer { /* Sends the data, partitioned by key to the topic using either the */ /* synchronous or the async