PHP 处理kafka消息实例 | 大后端


本站和网页 http://www.dahouduan.com/2017/09/18/php-consume-kafka/ 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

PHP 处理kafka消息实例 | 大后端
大后端
分享与精进
首页
教程
课程
翻译
关于
登录
PHP 处理kafka消息实例
教程 shanhuhai
5年前 (2017-09-18)
28574℃
0评论
在安装php-kafka 扩展后,就可以开始编写 php 消费消息的脚本了,php-rdkafka 扩展提供了几种消息处理的方式
低级方式(Low level)
这种方式没有消费组的概念
<?php
$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
// 指定 broker 地址,多个地址用"," 分割
$rk->addBrokers("192.168.33.1:9092");
$topic = $rk->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
// 第一个参数是分区号
// 第二个参数是超时时间
$msg = $topic->consume(0, 1000);
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
高级方式 (High level)
这种方式可以指定消费组,一个消费组内,一个consumer 进程只能读取一个分区,
<?php
$conf = new RdKafka\Conf();
// Set a rebalance callback to log partition assignments (optional)
// 当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
});
// 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
$conf->set('group.id', 'myConsumerGroup1');
//添加 kafka集群服务器地址
$conf->set('metadata.broker.list', '192.168.33.1:9092');
$topicConf = new RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
//当没有初始偏移量时,从哪里开始读取
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
// 让消费者订阅log 主题
$consumer->subscribe(['log']);
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
?>
转载请注明:大后端 &raquo; PHP 处理kafka消息实例
付费咨询
微信扫一扫立即添加咨询 4.9 3.9元/次
喜欢 (21)or分享 (0)
consumerkafkaphp消息处理
Laravel5.5 api token 用法
Golang 协程正确的使用方法
做为一个PHP 程序员为什么要学Java或者Go?Nginx 一个域名下部署多个网站运行多个PHP 版本Laravel 项目加速指南composer autoload 自动加载性能优化指南PHP 连接 Rabbitmq 实例代码PHP 安装 AMQP 扩展Oauth2.0 协议简介及 php实例代码Phpstorm 配置 Xdebug 断点调试教程
发表我的评论
取消评论
提交评论
表情
有人回复时邮件通知我
Hi,您需要填写昵称和邮箱!
昵称昵称 (必填)
邮箱邮箱 (必填)
网址网址
AD
最新文章
Java 基础 equals() 与 “==” 的区别
数组-前缀和技巧
Mac 解决 Nsurlsessionid 跑流量的问题
一文了解 Servlet/Tomcat/ Spring 之间的关系
做为一个PHP 程序员为什么要学Java或者Go?
对多态的理解(Java版本)
git 回退版本到指定记录,但是保留中间的某几条记录
python 动态生成期货价格+模拟网格交易过程
jQuery each 中使用 sleep 延迟
Nignx网站屏蔽指定路径的百度爬虫
如何写好代码(4)——函数篇其二
如何写好代码(3)——函数篇其一
如何写好代码(2)——变量篇
如何写好代码(1)——引子
软件项目管理(二)工时评估
架构设计之——域名设计
Nginx 一个域名下部署多个网站运行多个PHP 版本
软件项目管理(一)成本、产值及利润计算
用 webpack+vue+vue-router 搭建单页应用
用 git rabase 修改提交历史
更多标签php (45)Linux (21)教程 (20)centos (8)安装 (8)nginx (8)设计模式 (7)代码质量 (6)lua (6)java (5)shell (5)laravel (5)git (4)docker (4)前端 (4)模板 (4)扩展 (4)rabbitmq (4)代码片段 (3)mysql (3)
备案号:
京ICP备12039225号-5 ·
站点地图 · 大后端 © 2014-2017 · 友链交换加QQ 441358019