Kafka Tools

参考,

https://cwiki.apache.org/confluence/display/KAFKA/System+Tools

https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools

http://kafka.apache.org/documentation.html#quickstart

http://kafka.apache.org/documentation.html#operations

 

为了便于使用,kafka提供了比较强大的Tools,把经常需要使用的整理一下

 

开关kafka Server

bin/kafka-server-start.sh config/server.properties
bin/kafka-server-stop.sh
JMX_PORT=9999 nohup bin/kafka-server-start.sh config/server.properties &

 

topic相关

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testbin/kafka-topics.sh --list --zookeeper localhost:2181

describe topic的详细情况

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

修改topic的partition,只能增加

 

bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic test

到0.8.2才正式支持删除topic,当前是beta版

/usr/local/rds/kafka/bin/kafka-topics.sh --delete --topic topic_name --zookeeper localhost:2181

注意在配置里面,delete.topic.enable=true

 

查看有问题的partition

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --unavailable-partitions --topic test

per-topic 修改参数
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
        --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --config max.message.bytes=128000
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --deleteConfig max.message.bytes

 

集群扩展 
集群扩展,对于broker还是比较简单的,但是现有的topic上的partition是不会做自动迁移的 
需要手工做迁移,但kafka提供了比较方便的工具,

--generate,生成参考的迁移计划 
given a list of topics and a list of brokers,工具会给出迁徙方案

把topic完全迁移到新的brokers

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
 "version":1
}

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

给出当前的assignment情况和,迁移方案

我们可以同时保存当前的assignment情况和迁移方案,当前的assignment情况可以用于rollback

--execute,开始执行迁移

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

--verify,check当前的迁移状态

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

选择topic的某个partition的某些replica进行迁徙

moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3

> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
               {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

brokers下线

当前版本不支持下线的规划,需要到0.8.2才支持,这需要把一个broker上的replica清空

增加replication factor

partition 0的replica数从1增长到3,当前replica存在broker5,在broker6,7上增加replica

> cat increase-replication-factor.json
{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

 

Producer console

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

后面可以任意的输入message,都会发到broker的topic中

 

Comsumer console

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

从头读这个topic,可以重复读到所有数据 
我在想为啥,每次都能replay,原来每次都是随机产生一个groupid 
consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))

 

Consumer Offset Checker

这个会显示出consumer group的offset情况, 必须参数为--group, 不指定--topic,默认为所有topic

Displays the:  Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker

required argument: [group] 
Option Description 
------ ----------- 
--broker-info Print broker info 
--group Consumer group. 
--help Print this message. 
--topic Comma-separated list of consumer 
   topics (all topics if absent). 
--zkconnect ZooKeeper connect string. (default: localhost:2181)

Example,

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv

Group           Topic                          Pid Offset          logSize         Lag             Owner 
pv              page_visits                    0   21              21              0               none 
pv              page_visits                    1   19              19              0               none 
pv              page_visits                    2   20              20              0               none

 

 

Export Zookeeper Offsets

将Zk中的offset信息以下面的形式打到file里面去

A utility that retrieves the offsets of broker partitions in ZK and prints to an output file in the following format:

/consumers/group1/offsets/topic1/1-0:286894308 
/consumers/group1/offsets/topic1/2-0:284803985

bin/kafka-run-class.sh kafka.tools.ExportZkOffsets

required argument: [zkconnect] 
Option Description 
------ ----------- 
--group Consumer group. 
--help Print this message. 
--output-file Output file 
--zkconnect ZooKeeper connect string. (default: localhost:2181)

 

Update Offsets In Zookeeper

这个挺有用,用于replay, kafka的文档有点坑爹,看了不知道咋用,还是看源码才看明白

A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

Example,

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties  page_visits

Group           Topic                          Pid Offset          logSize         Lag             Owner 
pv              page_visits                    0   0               21              21              none 
pv              page_visits                    1   0               19              19              none 
pv              page_visits                    2   0               20              20              none

可以看到offset已经被清0,Lag=logSize

 

更加直接的方式是,直接去Zookeeper里面看

通过zkCli.sh连上后,通过ls查看

Broker Node Registry

/brokers/ids/[0...N] --> host:port (ephemeral node)

Broker Topic Registry

/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

Consumer Id Registry

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

Consumer Offset Tracking

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

Partition Owner registry

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)

本文章摘自博客园,原文发布日期: 2014-06-18

时间: 2024-09-17 04:43:00

Kafka Tools的相关文章

《kafka中文手册》-快速开始(三)

Step 7: Use Kafka Connect to import/export data Writing data from the console and writing it back to the console is a convenient place to start, but you'll probably want to use data from other sources or export data from Kafka to other systems. For m

《KAFKA官方文档》第三章:快速入门(二)

第八步:使用Kafka流(Kafka Streams)处理数据 Kafka流是一个针对存储于Kafka brokers上的数据进行实时流处理和分析的客户端类库.快速入门中的示例将展示如何使用这个类库实现一个数据流处理应用.下面是其中的WordCountDemo数单词示例代码片段(转换成Java8的lambda表达式更便于阅读). "` // 字符串和长整型的序列化器与反序列化器(serde) final Serde stringSerde = Serdes.String(); final Ser

《KAFKA官方文档》入门指南(三)

第7步:使用Kafka连接导入/导出数据 从控制台写入数据和写回控制台是一个很方便入门的例子,但你可能想用Kafka使用其他来源的数据或导出Kafka的数据到其他系统.相对于许多系统需要编写定制集成的代码,您可以使用Kafka连接到系统去导入或导出数据. Kafka Connect是包括在Kafka中一个工具,用来导入导出数据到Kafka.它是connectors的一个可扩展工具,其执行定制逻辑,用于与外部系统交互.在这个快速入门,我们将看到如何使用Kafka Connect做一些简单的连接器从

Kafka深度解析

[本文转自于Kafka深度解析] 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输 同时支持离线数据处理和实时数据处理 为什么要用消息系统 解耦 在项目启动之初来预测将来项目

Kafka Consumer接口

对于kafka的consumer接口,提供两种版本,   high-level 一种high-level版本,比较简单不用关心offset, 会自动的读zookeeper中该Consumer group的last offset  参考,https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example 不过要注意一些注意事项,对于多个partition和多个consumer  1. 如果consumer比partiti

Apche Kafka 的生与死 – failover 机制详解

Kafka 作为 high throughput 的消息中间件,以其性能,简单和稳定性,成为当前实时流处理框架中的主流的基础组件. 当然在使用 Kafka 中也碰到不少问题,尤其是 failover 的问题,常常给大家带来不少困扰和麻烦.  所以在梳理完 kafka 源码的基础上,尽量用通俗易懂的方式,把 Kafka 发生 failover 时的机制解释清楚,让大家在使用和运维中,做到心中有数. 如果对 kafka 不了解的,可以先参考https://kafka.apache.org/08/de

《KAFKA官方文档》入门指南(五)

新的协议版本 ListOffsetRequest V1支持精确的基于时间戳的偏移搜索. MetadataResponse V2引入了一个新的参数: "CLUSTER_ID". FetchRequest v3支持限制请求返回的大小(除了现有的每个分区的限制),它能够返回比限制更大的消息和在请求中加入分区的顺序具有重要意义. JoinGroup V1引入了一个新的字段: "rebalance_timeout". 升级0.8.4或0.9.x版本到0.10.0.0 0.10

Kafka压缩

在某些情况下,整个应用的瓶颈不在于CPU或者磁盘,而是受网络带宽的影响.当然你可以选择在业务代码中对每一条消息做压缩处理,之后再发送到kafka中,之后业务消费端再进行解压处理,这种方式对应消息的压缩效率是非常低.而真正有效的压缩是对一批消息进行压缩而不是单独的为每条消息进行压缩. Kafka(本文是以0.8.2.x的版本做基准的)本身可以支持几种类型的压缩,比如gzip和snappy,更高的版本还支持lz4.默认是none,即不采用任何压缩.开启压缩的方式是在客户端调用的时候设置produce

Kafka 常用命令行详细介绍及整理_Linux

 Kafka 常用命令行详细介绍及整理 以下是kafka常用命令行总结: 1.查看topic的详细信息 ./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1 2.为topic增加副本 ./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execu