Kafka 0.8 配置参数解析

Broker Configs

4个必填参数,

broker.id 
Each broker is uniquely identified by a non-negative integer id 
broker唯一标识,broker可以在不同的host或port,但必须保证id唯一

log.dirs (/tmp/kafka-logs) 
日志文件存放的目录 
可以用逗号隔开多个目录,当创建partitions时,会自动挑一个已创建partition最少的目录创建 
因为Kafka必须充分利用磁盘资源,所以要让partititons均匀分布在所有disks上,至少每个disk创建一个目录

port (6667) 
broker server所在端口

zookeeper.connect 
zk的地址,hostname1:port1,hostname2:port2

 

可选的重要参数,

advertised.host.name (null) 
设置暴露给其他Consumer,producer,broker的域名 
不设置会暴露默认域名,域名解析比较麻烦,这里可以设为ip

message.max.bytes (1000000) 
broker可以接收的message的最大size 
注意要和consumer的maximum fetch size相匹配

num.io.threads (8) 
处理I/O的线程数,即读写磁盘的线程数,所以至少等于磁盘数

queued.max.requests (500) 
可以queue的最大请求数,包括producer的写请求和consumer的读请求

log.segment.bytes (1024 * 1024 * 1024 
log.roll.hours 
上面2个配置用于设置,何时为partition产生新的segment文件 
为了防止partition文件过大,所以partition是由一组segment文件组成 
可以通过设置size或时间来决定何时roll

log.retention.{minutes,hours} (24 * 7) 
log.retention.bytes (-1) 
log.retention.check.interval.ms (5*60*1000, 5分钟) 
默认是7天数据 
当然你可以通过partition大小来设置删除threshold,默认是-1,即关掉的,注意这里设置的是partition大小,如果多个partition需要乘上partition数

log.cleanup.policy (delete or compact, default delete) 
log.cleaner.enable (false) 
log.cleaner.threads (1) 
当前retention支持两种类型,默认就是删除旧数据 
新加了compact模式,应用场景来源于Linkedin之前的Databus 
上面列出主要的相关配置,如果要使用compact 
log.cleanup.policy = compact 
log.cleaner.enable  = true 
http://kafka.apache.org/documentation.html#compaction 
这个场景,数据流中的key是有限重复的,只是在不断的更新value 
compact过程就是当触发retention条件时,删除相同key的旧值,只保留最新值

 
局限, 
a. 无法指定compact的范围,除了last segment(正在被写),其他所有的segments都可能被compact 
b. 不支持compress topics

 

log.index.size.max.bytes (10*1024*1024) 
log.index.interval.bytes (4096) 
这个由于0.8版本,做出妥协,用逻辑offset来替换原先的物理offset 
那就需要在逻辑offset和物理地址之间创建index 
log.index.size.max.bytes,一个segment最大的index的大小,超出会创建新的segment,无论segment本身的配置 
log.index.interval.bytes,做index的间隔,也是系统为了找到一个物理offset需要做linear scan的最大值

 

log.flush.interval.messages (None) 
log.flush.interval.ms (None) 
log.flush.scheduler.interval.ms (3000),多久check 
为了效率,broker会缓存部分log,然后再flush到磁盘 
这里可以通过message数或时间来控制flush策略

 

replica相关的参数,

default.replication.factor (1), The default replication factor for automatically created topics 
默认的副本数

replica.lag.time.max.ms (10000), follower的最大lag时间,即如果leader在这个时间内都没有收到follower的fetch请求,就认为它dead 
If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead. 
replica.lag.max.messages (4000), 最大lag消息数,超过这个消息数,leader会认为该follower dead 
If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead.

replica.socket.timeout.ms (30 * 1000), The socket timeout for network requests to the leader for replicating data. 
replica.socket.receive.buffer.bytes (64 * 1024), The socket receive buffer for network requests to the leader for replicating data.

replica.fetch.max.bytes (1024 * 1024), 一次fetch request最大可以fetch的数据size 
The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader. 
replica.fetch.wait.max.ms (500), fetch request的等待超时 
The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader. 
replica.fetch.min.bytes (1), Minimum bytes expected for each fetch response for the fetch requests from the replica to the leader. If not enough bytes, wait up to replica.fetch.wait.max.ms for this many bytes to arrive.

num.replica.fetchers (1), follower上用于同步leader数据的线程数 
Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.

replica.high.watermark.checkpoint.interval.ms (5000), checkpoint HW的interval 
The frequency with which each replica saves its high watermark to disk to handle recovery.

 

failover相关的参数,

controlled.shutdown.enable (false) 
用于Graceful shutdown,当设为true时,在用stop命令关闭broker, 
会做下面两点优化, 
a. sync all its logs to disk,这样可以加速重启,避免checksum校验等步骤 
b. migrate任意partitions的leader replica到其他的brokers,这样主动的迁移,比发现异常后被动迁移更快 
注意,设为true,只有当broker上所有的replica,不是该partition唯一replica时才能stop成功 
因为如果直接关掉,会导致该partition不work

auto.leader.rebalance.enable (false) 
leader.imbalance.per.broker.percentage (10) 
leader.imbalance.check.interval.seconds (300)
 
broker crash后,partition的leader replica会被迁到其他的broker 
但是当broker恢复后,上面的replica会成为follower,但不会自动切成leader 
这样会导致不均衡,所以当设为true时,会自动做leader的rebalance 
默认超过10%,认为不均衡,300秒check一次

 

Topic-level configuration

其中一些参数是可以按topic设置的,并且可以修改删除

> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
        --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --config max.message.bytes=128000
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --deleteConfig max.message.bytes

上面分别是创建,修改和删除的命令 
可以按topic设置的参数包括,参考文档 
http://kafka.apache.org/documentation.html#topic-config

需要注意的是,这里的属性名和配置文件中的属性名是不一样的。。。

 

Producer Configs

参考,Kafka Producer接口

 

Consumer Configs

首先只有在用high level consumer时才需要管这个配置 
如果用low level,自己做代码里面写,这个配置不起作用的

2个必填参数,

group.id 
A string that uniquely identifies the group of consumer processes to which this consumer belongs.

zookeeper.connect 
zk地址,hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 
可以看到zk地址其实是可以设置到,目录级别的(比如用于一个zk管理多个kafka集群),但是如果不用默认目录,需要自己创建和维护这个zk目录 
并且必须要和broker中的zk配置保持一致

 

可选配置,

fetch.message.max.bytes (1024 * 1024) 
The number of byes of messages to attempt to fetch for each topic-partition in each fetch request 
为了效率,consumer从kafka fetch数据,也是批量fetch的,虽然你处理的时候是onebyone逻辑,但后台是预读的 
这个值至少要等于broker里面设置的maximum message size,否则有可能连一条message都取不下来 
默认是1m,设太大会爆内存,而且会增大读重的可能性,因为当consumer发生变化时,会发生rebalance,这时被新分配到这个partition的consumer仍然会读到预读但没有commit的数据

auto.commit.enable (true), auto.commit.interval.ms (60 * 1000) 
默认是会自动commit offset到zk的,如果要自己commit,设为false 
默认除非自动commit的时间是1分钟 
用手工commit的简单的理由是,防止consumer crash丢失数据,可以在确认数据被正确处理后,再手工commit offset

queued.max.message.chunks (10) 
上面说的fetch.message.max.bytes是针对一个partition的,但是一个consumer是可以同时读多个partition,所以对每个partition都可以读一个这样的chunk 
所以这个配置是,同时可以读多少大小为fetch.message.max.bytes的chunk 
默认是10,个人认为至少要大于读的partition个数吧,但避免耗尽内存

fetch.wait.max.ms (100), fetch.min.bytes (1) 
当取不到fetch.message.max.bytes时,最大block时间,并且返回fetch.min.bytes大小数据

auto.offset.reset 
* smallest : automatically reset the offset to the smallest offset 
* largest : automatically reset the offset to the largest offset 
* anything else: throw exception to the consumer.

consumer.timeout.ms (-1) 
Throw a timeout exception to the consumer if no message is available for consumption after the specified interval 
默认是block的,当consumer取不到数据

本文章摘自博客园,原文发布日期:2014-09-04

时间: 2024-10-31 22:43:34

Kafka 0.8 配置参数解析的相关文章

对MySQL配置参数 my.ini/my.cnf的详细解析_Mysql

以下的文章主要描述的是对MySQL配置参数 my.ini/my.cnf的详细解析,我们主要是以实例演示的方式来对MySQL配置参数 my.ini/my.cnf的实际操作步骤进行说明,以下就是相关内容的具体描述. 1.获取当前配置参数 要优化MySQL配置参数,首先要了解当前的配置参数以及运行情况.使用下列命令可以获得目前服务器使用的配置参数: 复制代码 代码如下: mysqld –verbose –help mysqladmin variables extended-status –u root

thinkPHP5.0框架配置格式、加载解析与读取方法

本文实例讲述了thinkPHP5.0框架配置格式.加载解析与读取方法.分享给大家供大家参考,具体如下: ThinkPHP支持多种格式的配置格式,但最终都是解析为PHP数组的方式. PHP数组定义 返回PHP数组的方式是默认的配置定义格式,例如: //项目配置文件 return [ // 默认模块名 'default_module' => 'index', // 默认控制器名 'default_controller' => 'Index', // 默认操作名 'default_action' =

Hadoop-2.8.0集群搭建、hadoop源码编译和安装、host配置、ssh免密登录、hadoop配置文件中的参数配置参数总结、hadoop集群测试,安装过程中的常见错误

25.集群搭建 25.1 HADOOP集群搭建 25.1.1集群简介 HADOOP集群具体来说包含两个集群:HDFS集群和YARN集群,两者逻辑上分离,但物理上常在一起 HDFS集群: 负责海量数据的存储,集群中的角色主要有NameNode / DataNode YARN集群: 负责海量数据运算时的资源调度,集群中的角色主要有 ResourceManager /NodeManager 25.1.2服务器准备 本案例使用虚拟机服务器来搭建HADOOP集群,所用软件及版本: ü Vmware 11.

如何在 CentOS 7.0 上配置 Ceph 存储

如何在 CentOS 7.0 上配置 Ceph 存储 Ceph 是一个将数据存储在单一分布式计算机集群上的开源软件平台.当你计划构建一个云时,你首先需要决定如何实现你的存储.开源的 Ceph 是红帽原生技术之一,它基于称为 RADOS 的对象存储系统,用一组网关 API 表示块.文件.和对象模式中的数据.由于它自身开源的特性,这种便携存储平台能在公有云和私有云上安装和使用.Ceph 集群的拓扑结构是按照备份和信息分布设计的,这种内在设计能提供数据完整性.它的设计目标就是容错.通过正确配置能运行于

程序员的量化交易之路(11)--命令参数解析库JCommonder学习

转载须注明出处:http://blog.csdn.net/minimicall?viewmode=contents,http://cloudtrade.top 在学习量化交易平台的过程中,接触到一个参数解析的库,JCommander.今天把它记录一下. 它的官网为:http://www.jcommander.org/ 1. 概述 Jcommander是一个非常小的框架,用于解析命令行参数. 可以通过注解来描述你的参数选项: import com.beust.jcommander.Paramete

Linux下Kafka单机安装配置方法(图文)_Linux

介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: •Kafka将消息以topic为单位进行归纳. •将向Kafka topic发布消息的程序成为producers. •将预订topics并消费消息的程序成为consumer. •Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker. producers通过网络将消息发送到Kafka集群,集群

一步一步自定义SpringMVC参数解析器

随心所欲,自定义参数解析器绑定数据. 题图:from Zoommy 干货 SpringMVC解析器用于解析request请求参数并绑定数据到Controller的入参上. 自定义一个参数解析器需要实现HandlerMethodArgumentResolver接口,重写supportsParameter和resolveArgument方法,配置文件中加入resolver配置. 如果需要多个解析器同时生效需要在一个解析器中对其他解析器做兼容 缘起 为什么要自定义一个解析器呢? 源于需要对前端请求参数

Linux系统下Kafka单机安装配置详解

说明:   操作系统:CentOS 6.x 64位   Kafka版本:kafka_2.11-0.8.2.1   实现目的:   单机安装配置kafka   具体操作:   一.关闭SELINUX.开启防火墙9092端口   1.关闭SELINUX   vi /etc/selinux/config   #SELINUX=enforcing #注释掉   #SELINUXTYPE=targeted #注释掉   SELINUX=disabled #增加   :wq! #保存退出   setenfo

Spark配置参数

以下是整理的Spark中的一些配置参数,官方文档请参考Spark Configuration. Spark提供三个位置用来配置系统: Spark属性:控制大部分的应用程序参数,可以用SparkConf对象或者Java系统属性设置 环境变量:可以通过每个节点的 conf/spark-env.sh脚本设置.例如IP地址.端口等信息 日志配置:可以通过log4j.properties配置 Spark属性 Spark属性控制大部分的应用程序设置,并且为每个应用程序分别配置它.这些属性可以直接在Spark