[喵咪KafKa(3)]PHP拓展See-KafKa

[喵咪KafKa(3)]PHP拓展See-KafKa

前言

(Simple 简单 easy 容易 expand 的拓展)

KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通量(同时能传输的数据量)并且高可用一个消息平台,它是分布式消息队列,分布式日志,数据传输通道的不二之选,但是可惜的时PHP的拓展实在不是很好用(php-kafka拓展已经长期不维护存在非常多的问题,rdkafkaC底层编写不利于使用),希望可以更加方便的来使用KafKa这块肥肉于是基于rdKafKa封装的一个简单舒适KafKa拓展诞生了!

附上:

GitHub地址:https://github.com/wenzhenxi/See-KafKa

rdkafka PHP拓展地址:https://github.com/arnaud-lb/php-rdkafka

服务底层依赖:https://github.com/edenhill/librdkafka

作者博客:http://w-blog.cn

1. 安装

(See-KafKa支持0.9~0.10版本,对0.8版本以及以前的版本协议不支持)

首先需要安装配置好zookeeper+KafKa:可以参考作者博客下的KafKa模块下的介绍安装,作者博客介绍是对于0.8.2.2的安装方式,但是和0.9和0.10的安装并没有区别,只需要去下载0.9和0.10的包即可

在使用之前需要按照顺序先安装librdkafka,在安装php-rdkafka:

# 安装librdkafka
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
make install

# 安装php-rdkafka
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
make install
# 在php.ini加入如下信息
vim /usr/local/php/etc/php.ini
extension=rdkafka.so  

这个时候使用php -m 可以看到拓展列表内存在 rdkafka这项证明拓展已经安装成功

2. 使用

See-KafKa完美支持PhalApi,只需要把去拓展库中获取kafka拓展即可,当然不是PhalApi的也可以使用只需要include文件下的kafka.php即可使用

2.1 Producer

KafKa最基础的两个角色其中一个就是Producer(可以参考作者博客介绍)

向KafKa中的一个Topic写入一条消息,需要写入多条可以多次使用setMassage

<?php
/**
 * See-kafka Producer例子
 * 循环写入1w条数据15毫秒
 */

// 配置KafKa集群(默认端口9092)通过逗号分隔
$KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost");
// 设置一个Topic
$KafKa_Lite->setTopic("test");
// 单次写入效率ok  写入1w条15 毫秒
$Producer = $KafKa_Lite->newProducer();
// 参数分别是partition,消息内容,消息key(可选)
// partition:可以设置为KAFKA_PARTITION_UA会自动分配,比如有6个分区写入时会随机选择Partition
$Producer->setMessage(0, "hello");

2.2 Consumer

对于Consumer来说支持4种从offset的获取方式分别为:

  • KAFKA_OFFSET_STORED #通过group来获取消息的offset(必须设置group)
  • KAFKA_OFFSET_END #获取尾部的offset
  • KAFKA_OFFSET_BEGINNING #获取头部的offset
  • 手动指定offset开始值

2.2.1 例子1

此例子适合获取一段数据就结束的场景,每一次getMassage都会建立连接然后关闭连接,当循环使用getMassage会造成相对严重的效率问题

<?php
/**
 * See-kafka Consumer例子1
 */

// 配置KafKa集群(默认端口9092)通过逗号分隔
$KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost");
// 设置一个Topic
$KafKa_Lite->setTopic("test");
// 设置Consumer的Group分组(不使用自动offset的时候可以不设置)
$KafKa_Lite->setGroup("test");
// 获取Consumer实例
$consumer = $KafKa_Lite->newConsumer();

// 获取一组消息参数分别为:Partition,maxsize最大返回条数,offset(可选)默认KAFKA_OFFSET_STORED
$rs = $consumer->getMassage(0,100);
//返回结果是一个数组,数组元素类型为Kafka_Message

2.2.1 例子2

例子2适合脚本队列任务

<?php
/**
 * See-kafka Consumer例子1
 * 889 毫秒 获取1w条
 */

// 配置KafKa集群(默认端口9092)通过逗号分隔
$KafKa_Lite = new KafKa_Lite("127.0.0.1,localhost");
// 设置一个Topic
$KafKa_Lite->setTopic("test");
// 设置Consumer的Group分组(不使用自动offset的时候可以不设置)
$KafKa_Lite->setGroup("test");

// 此项设置决定 在使用一个新的group时  是从 最小的一个开始 还是从最大的一个开始  默认是最大的(或尾部)
$KafKa_Lite->setTopicConf('auto.offset.reset', 'smallest');
// 此项配置决定在获取数据后回自动作为一家消费 成功 无需在 一定要 stop之后才会 提交 但是也是有限制的
// 时间越小提交的时间越快,时间越大提交的间隔也就越大 当获取一条数据之后就抛出异常时 更具获取之后的时间来计算是否算作处理完成
// 时间小于这个时间时抛出异常 则不会更新offset 如果大于这个时间则会直接更新offset 建议设置为 100~1000之间
$KafKa_Lite->setTopicConf('auto.commit.interval.ms', 1000);

// 获取Consumer实例
$consumer = $KafKa_Lite->newConsumer();

// 开启Consumer获取,参数分别为partition(默认:0),offset(默认:KAFKA_OFFSET_STORED)
$consumer->consumerStart(0);

for ($i = 0; $i < 100; $i++) {
    // 当获取不到数据时会阻塞默认10秒可以通过$consumer->setTimeout()进行设置
    // 阻塞后由数据能够获取会立即返回,超过10秒回返回null,正常返回格式为Kafka_Message
    $message = $consumer->consume();
}

// 关闭Consumer(不关闭程序不会停止)
$consumer->consumerStop();

3. 配置文件

See-kafka提供两种配置文件的配置,分别传入key和value,具体配置项已经作用参看如下地址:

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

配置文件说明:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

$KafKa_Lite->setTopicConf();
$KafKa_Lite->setKafkaConf();

在使用Consumer的Group(KAFKA_OFFSET_STORED)中需要注意以下配置项,否则你在使用一个新的group会从当前开始计算offset(根据场景):

// 此项设置决定 在使用一个新的group时  是从 最小的一个开始 还是从最大的一个开始  默认是最大的(或尾部)
$KafKa_Lite->setTopicConf('auto.offset.reset', 'smallest');

Consumer获取之后是需要提交告诉KafKa获取成功并且更新offset,但是如果中途报错没有提交offset则下次还是会从头获取,此项配置设置一个自动提交时间,当失败后之前处理的也会吧offset提交到KafKa:

// 此项配置决定在获取数据后回自动作为一家消费 成功 无需在 一定要 stop之后才会 提交 但是也是有限制的
// 时间越小提交的时间越快,时间越大提交的间隔也就越大 当获取一条数据之后就抛出异常时 更具获取之后的时间来计算是否算作处理完成
// 时间小于这个时间时抛出异常 则不会更新offset 如果大于这个时间则会直接更新offset 建议设置为 100~1000之间
$KafKa_Lite->setTopicConf('auto.commit.interval.ms', 1000);

4. 异常

在初始化KafKa_Lite会对集群端口进行验证,如果无任何一个可用的则会抛出一个No can use KafKa异常,也可以主动触发ping操作检查集群是否有有可用机器

当获取Consumer异常了会抛出一个KafKa_Exception_Base异常,异常有一个code号可参考,Exception/err.php文件,推荐使用try-catch进行处理

5. 总结

See-KafKa的宗旨是为了更加方便把KafKa和PHP相结合,并且能够方便的进行使用,如果大家感兴趣可以使用看看,有问题可以进行反馈,此拓展作者会长期维护下去!

官方交流群: 438882880

时间: 2024-08-22 14:37:15

[喵咪KafKa(3)]PHP拓展See-KafKa的相关文章

[喵咪KafKa(2)]单机模式运行KafKa

[喵咪KafKa(2)]单机模式运行KafKa 前言 在上节我们介绍完KafKa之后,今天我们来搭建KafKa三种模式(单机模式,伪集群,集群)中的一种单机模式的搭建,在正常的使用中我们一般吧单机模式作为开发环境的标配,今天就来和喵咪一同搭建一个KafKa的单机环境吧! 附上: 喵了个咪的博客:w-blog.cn KafKa官网地址:http://kafka.apache.org/ Git地址:https://github.com/apache/kafka 百度网盘 1. 安装zookeeper

[喵咪KafKa(1)]KafKa的介绍以及使用场景

[喵咪KafKa(1)]KafKa的介绍以及使用场景 前言 哈喽!大家好呀,真是一坑未平一坑又起,otter还在继续更新的同时,笔者也为大家带来了关于kafka相关的一系列博客,要说到kafka就离不开现在特别火热的大数据技术,了解的童鞋可能只要一些大数据的带名词比如Hadoop,spark,storm,包括最近很火的微服务,kafka也是其中一员,但是不同的是kafka并不负责处理数据,要给kafka一个定义的话应该是一个分布式发布订阅消息系统可以说是一个数据通道保证数据稳定传输,要是感兴趣就

[喵咪MQ(1)]RabbitMQ简单介绍准备工作

[喵咪MQ(1)]RabbitMQ简单介绍准备工作 前言 哈喽大家好呀! 看标题就知道我们这次要讲MQ,之前博客中有提到的KafKa理论上来说也是一个优秀的MQ队列软件,比较知名的MQ有:Go语言编写的 nsq , 阿里云的RocketMQ , 大名鼎鼎的KafKa 以及 redis(也可以做队列),不过我们这次的主角是RabbitMQ. 附上: 喵了个咪的博客:w-blog.cn RabbitMQ官网 :http://www.rabbitmq.com/ 1.队列做什么?RabbitMQ是什么?

[喵咪Liunx(4)Monit进程监控

[喵咪Liunx(4)Monit进程监控 前言 有一段时间没有更新博客了,最近因为公司项目全球化以及最近慢慢在偏向学习团队管理忙的有点不可开交了,不过这次要给大家带来两篇关于Liunx日常开发维护管理中非常好用的两款利器,其中一个就是Monit啦,Monit是什么呢?他是一个进程级别的一个监控软件,不卖关子我们就进入到今天的正文当中吧! 附上: 喵了个咪的博客:w-blog.cn Monit官网地址:https://www.mmonit.com/monit/documentation/monit

kafka详解四:Kafka的设计思想、理念

     本节主要从整体角度介绍Kafka的设计思想,其中的每个理念都可以深入研究,以后我可能会发专题文章做深入介绍,在这里只做较概括的描述以便大家更好的理解Kafka的独特之处.本节主要涉及到如下主要内容: Kafka设计基本思想 Kafka中的数据压缩 Kafka消息转运过程中的可靠性 Kafka集群镜像复制 Kafka 备份机制 一.kafka由来      由于对JMS日常管理的过度开支和传统JMS可扩展性方面的局限,LinkedIn(www.linkedin.com)开发了Kafka以

[喵咪的Liunx(1)]计划任务队列脚本后台进程Supervisor帮你搞定

喵咪的Liunx(1)]计划任务队列脚本后台进程Supervisor帮你搞定 前言 哈喽大家好啊,好久不见啊(都快一个月了),要问为什么没有更新博客呢只应为最近在录制PhalApi的视频教程时间比较少,作为弥补那么为大家带来一点干货Supervisor,话不多说那么就开始今天的分享把 附上: 喵了个咪的博客:w-blog.cn Supervisor官网地址:https://pypi.python.org/pypi/supervisor PhalApi官网地址:http://www.phalapi

[喵咪Golang(1)]Go语言开篇

[喵咪Golang(1)]Go语言开篇 前言 哈喽大家好啊!喵咪我今天又来开坑了,最近学习和使用了go语言了一段时间,也在组合一些好的组件编写phalgo开发框架.在想是不是可以来写一个套关于go语言的文章和一些好的组件的介绍使用,虽然自己也是半桶水但是我相信在编写的过程中能给自己很多收获,最好也能帮助大家了解和熟悉golang这门语言,话不多说那么就开始今天的go语言之旅吧~ 附上: 喵了个咪的博客:w-blog.cn phalgo地址:github.com/wenzhenxi/phalgo

Kafka详解五、Kafka Consumer的底层API- SimpleConsumer

1.Kafka提供了两套API给Consumer The high-level Consumer API The SimpleConsumer API      第一种高度抽象的Consumer API,它使用起来简单.方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情 一个消息读取多次 在一个处理过程中只消费Partition其中的一部分消息 添加事务管理机制以保证消息被处理且仅被处理一次 2.使用SimpleConsumer有哪些弊

[喵咪Linux(2)]环境变量的坑

[喵咪Linux(2)]环境变量的坑 前言 玩过linux的童鞋对环境变量都不陌生,我们在安装好一些软件,组件之后想要直接使用它都需要加入环境变量,并且比如java啊golang都必须依赖你设置的环境变量来运行,在最近笔者在日常工作中遇到了一些关于环境变量的坎,所以今天把这些"坑"分享出来和大家交流! 附上: 喵了个咪的博客:w-blog.cn 1. 两场景 笔者有个习惯就是喜欢追问场景,比如别人问我一个问题,token要怎么用sign要怎么玩,我都会先问你是什么场景,什么场景才使用什