PHP操作kafka,实现生产消费与进程监听_wuliZs_的博客-CSDN博客


本站和网页 https://blog.csdn.net/qq_34284638/article/details/97643731 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

PHP操作kafka,实现生产消费与进程监听_wuliZs_的博客-CSDN博客
PHP操作kafka,实现生产消费与进程监听
wuliZs_
于 2019-07-29 15:35:18 发布
3267
收藏
分类专栏:
kafka
TCP
php
服务器
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_34284638/article/details/97643731
版权
kafka
同时被 3 个专栏收录
2 篇文章
0 订阅
订阅专栏
TCP
2 篇文章
0 订阅
订阅专栏
php
30 篇文章
0 订阅
订阅专栏
目录
一、PHP代码实现
kafka连接-设置初始参数生产者生产消息利用进程持续监听kafka,实时消费
二、逻辑思路
三、redis与kafka的区别
一、PHP代码实现
1. kafka连接-设置初始参数
<?php
/**
* Created by PhpStorm.
* User: wuliZs_
*/
/**
* 业务Kafka单件
*/
class App_Kafka
private static $producer = null;//生产者
/**
* 设置初始参数
* @return type
* @throws \Exception
*/
public static function getInstance()
$broker_list = ''; // kakfa连接
if (!isset(self::$producer)) {
try {
if (empty($broker_list)) {
throw new \Exception("broker_list is null", 1);
$rk = new \RdKafka\Producer();//创建生产者
if (!isset($rk)) {
throw new \Exception("create producer error", 1);
$rk->setLogLevel(LOG_DEBUG);//设置日志级别7
if (!$rk->addBrokers($broker_list)) {//设置kafka服务器
throw new \Exception("add producer error", 1);
self::$producer = $rk;
} catch (Exception $e) {
// 抛出异常,写入日志
App_Log::save('kafka_send_error', 'Message: ' . $e->getMessage());
return self::$producer;
} else {
return self::$producer;
2. 生产者生产消息
<?php
/**
* Created by PhpStorm.
* User: wuliZs_
*/
Class Lib_Kafka{
public static $partition = 0; //topic物理上的分区
/**
* 生产者生产消息
* @param array $message
* @return string
*/
public static function produce($topic, $message=''){
$producer = App_Kafka::getInstance();
$topic = $producer->newTopic($topic);//创建主题topic
if(strlen($message) > 1000000){//kafka限制了最大长度为1M
return false;
App_Log::save('kafka_send_error', '超过1m: ' . $message);
return $topic->produce(RD_KAFKA_PARTITION_UA, self::$partition, $message);//向指定的topic物理地址发消息
3. 利用进程持续监听kafka,实时消费【写入一个daemon,实时监听消费】
<?php
/**
* kafka监听消费
*/
require dirname(__DIR__) . '/script/shell.php';
class KafkaSendMsg{
public static function daemon(){
ini_set('default_socket_timeout', -1); //不超时
$kfk_conf = new Rdkafka\Conf();
$kfk_conf->set("group.id","");// 存放的分组名
$kfk_conf->set('metadata.broker.list','');// kafka连接
$kfk_conf->set('topic.metadata.refresh.interval.ms',60000);// Topic metadata 刷新间隔,毫秒。metadata 自动刷新错误和连接。设置为 -1 关闭刷新间隔。
$kfk_conf->set('socket.keepalive.enable',true);// Broker sockets 允许 TCP 保持活力
$kfk_conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
});
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.enable', true);
$kfk_conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($kfk_conf);
$consumer->subscribe(['kafka']);// kafka通道,进行监听
while (true) {
try {
$msg_kfk = $consumer->consume(10000); //获取队列并往下执行消息,设置timeout
if ($msg_kfk && !$msg_kfk->err) {
$message = json_decode($msg_kfk->payload, true);// 收到消息,进行处理
// 收到消息,调用处理方法
if(!empty($message)){
self::sendMsg($message);
}catch(Exception $e){
// 抛出异常,写入日志
/**
* 消息处理
* @param $message
*/
private static function sendMsg($message = null{
// 收到消息后处理逻辑的方法
KafkaSendMsg::daemon();
二、逻辑思路【用法跟redis的消费订阅差不多】:
连接kafka生产者生产消息,推送到指定的kafka通道开启一个进程监听kafka通道的动态消费者监听到通道有消息过来后,进行逻辑处理
三、redis与kafka的区别
注:以下内容转发至https://blog.csdn.net/qq_33594101/article/details/88044391
Kafka与Redis PUB/SUB之间较大的区别在于Kafka是一个完整的系统,而Redis PUB/SUB只是一个套件(utility)——没有冒犯Redis的意思,毕竟它的主要功能并不是PUB/SUB。 先说Redis吧,它首先是一个内存数据库,其提供的PUB/SUB功能把消息保存在内存中(基于channel),因此如果你的消息的持久性需求并不高且后端应用的消费能力超强的话,使用Redis PUB/SUB是比较合适的使用场景。比如官网说提供的一个网络聊天室的例子:模拟IRC,因为channel就是IRC中的服务器。用户发起连接,发布消息到channel,接收其他用户的消息。这些对于持久性的要求并不高,使用Redis PUB/SUB来做足矣。
而Kafka是一个完整的系统,它提供了一个高吞吐量、分布式的提交日志(由于提供了Kafka Connect和Kafka Streams,目前Kafka官网已经将自己修正为一个分布式的流式处理平台,这里也可以看出Kafka的野心:-)。除了p2p的消息队列,它当然提供PUB/SUB方式的消息模型。而且,Kafka默认提供了消息的持久化,确保消息的不丢失性(至少是大部分情况下)。另外,由于消费元数据是保存在consumer端的,所以对于消费而言consumer被赋予极大的自由度。consumer可以顺序地消费消息,也可以重新消费之前处理过的消息。这些都是Redis PUB/SUB无法做到的。
Redis PUB/SUB使用场景:
    消息持久性需求不高     吞吐量要求不高     可以忍受数据丢失     数据量不大
Kafka使用场景: 上面以外的其他场景:)
    高可靠性     高吞吐量     持久性高     多样化的消费处理模型  
希望可以帮助到有需要的同学!
wuliZs_
关注
关注
点赞
收藏
打赏
评论
PHP操作kafka,实现生产消费与进程监听
目录一、PHP代码实现kafka连接-设置初始参数生产者生产消息利用进程持续监听kafka,实时消费二、逻辑思路三、redis与kafka的区别一、PHP代码实现1. kafka连接-设置初始参数<?php/** * Created by PhpStorm. * User: wuliZs_ *//** * 业务Kafka单件 */...
复制链接
扫一扫
专栏目录
kafka_php_消费者_生产者--小记
dlh918的博客
06-14
266
<?php
$conf = new Rdkafka\Conf();
$conf->set('group.id', 0); // 设置组别id, 默认写0
$conf->set('metadata.broker.list', 'localhost:9092'); // 设置kafka地址及端口号 localhost可换成服>务器ip
$topicConf = new Rdkafka\topicConf();
$top.
php 消息队列_PHP处理kafka消息队列
weixin_39928686的博客
12-01
197
安装PHP—kafka扩展后,就可以开始编写 php 消费消息的脚本了,php-rdkafka 扩展提供了几种消息处理的方式低级方式(Low level)这种方式没有消费组的概念<?php
$rk = new RdKafkaConsumer();
$rk->setLogLevel(LOG_DEBUG);
// 指定 broker 地址,多个地址用"," 分割
$rk->addB...
评论 4
您还未登录,请先
登录
后发表或查看评论
PHP swoole 监听端口进程
最新发布
荷逸的博客
10-03
46
swoole定时器监听端口
Kafka消费者订阅指定主题(subscribe)或分区(assign)详解
Jeremy
08-02
2329
Kafka订阅主题(subscribe)与分区(assign)方法详解
在 php 中简单调用 kafka
潘广宇的博客
03-12
3362
一、确保 kafka 已被安装在 Linux 服务器中
若未安装,查看此博客快速安装:
https://blog.csdn.net/panguangyuu/article/details/88408320
二、启动 kakfa 服务
# 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &...
php rdkafka实例,php使用rdkafka进行消费
weixin_42400669的博客
03-09
1058
如仅作为消费者或生产者,直接使用下面消费者或生产者的代码,并安装扩展即可。PHP要安装rdkafka扩展,而rdkafka又依赖librdkafka,因此你需要安装rdkafka和librdkafka,之后就可以与kafka服务器交互了。消费者/*** 代码中的输出注释都可以打开供调试使用* 对 中台生产的 用户信息 进行消费* Date: 2019/7/31*/// 设置将要消费消息的主题$t...
php消费kafka,php发送数据到kafka实现代码
weixin_35172279的博客
03-11
285
kafka仅仅是个小小的纽带。经常用于数据的发送及转移。在kafka官方的例子中,其实并没有php的相关实现版本。现在网上流传的kafka的相关php库,都是些编程爱好者们自己写的类库,所以就肯定不会有太统一的接口标准了。下面以某个类库为例,展示相关的kafka的php扩展库使用。综合比较了几家kafka的php库,苏南大叔觉得下面的这个开源类库,nmred/kafka-php ,比较简洁方便一些...
php kafka
u013713010的专栏
04-01
2913
Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。
Kafka的特点:
以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息
PHP操作Kafka
右手诗的博客
03-30
3421
本文对php操作kafka的方法做一个记录,备忘。
一、搭建kafka集群
下载kafka并解压:
tar -xzf kafka_2.13-2.7.0.tgz
搭建单机集群:
cd kafka_2.13-2.7.0
#创建两个broker配置并修改端口
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
编辑拷贝.
php rdkafka扩展发送和接收消息
u013713010的专栏
04-24
9338
发送消息
try {
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test');
$cf = new RdKafka\TopicConf();
$cf->set('offset.store.method', 'broker');
$cf->set('auto.offset.reset', 'smal
librdkafka配置参数
热门推荐
02-23
1万+
librdkafka配置参数
以表格形式列出了librdkafka库中的控制参数信息
php 生产kafka 不生效问题
钚该钚想
04-21
733
记录一下php 调用 kafka 生产者代码但是消费者端口收不到数据的情况
PHP代码如下:
<?php
$objRdKafka = new RdKafka\Producer();
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("demo");
$oObjTopic-&g...
PHP处理kafka消息队列
weixin_33968104的博客
05-27
543
在安装php-kafka 扩展后,就可以开始编写 php 消费消息的脚本了,php-rdkafka 扩展提供了几种消息处理的方式
低级方式(Low level)
这种方式没有消费组的概念
<?php
$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
// 指定 broker 地址,多个地址用"," ...
kafka实战
weixin_30614109的博客
11-08
5089
1. kafka介绍
1.1. 主要功能
根据官网的介绍,ApacheKafka是一个分布式流媒体平台,它主要有3种功能:
  1:It lets you publish and subscribe to streams of records.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因
  2:It lets ...
php-rdkafka手动提交偏移量
moliyiran的专栏
02-24
1857
在项目中使用php-rdkafka的高级消费者时,发现设置了:
$topicConf->set('enable.auto.commit', 'false');
没有效果,还是会自动提交offset,查了各种资料,正确的应该是这样设置:
$conf->set('enable.auto.commit', 'false');
相关说明见文档:https://github.com/ed...
php多进程消费kafka消息高阶API封装
lc的大脑备份
03-06
1246
高阶API是指API侧提供了自动负载均衡,不需要业务去处理。
abstract class KafkaConsumerHighService extends KafkaConsumerBaseService
function consumer($partion_id, $worker)
$conf = new \RdKafka\Conf();
...
php kafka使用
lvfk
02-07
6049
一、安装以及使用shell命令终端操作kafka
环境配置
1、下载最新版本的kafka: kafka_2.11-1.0.0.tgz
http://mirrors.shu.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
2、配置,解压后进入config目录
2.1、配置zookeeper.properties
默认监听端口2181
rdkafka 使用案例
hintonic的专栏
07-30
2784
main.cpp
#include <iostream>
#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include
kafka消费者监听数据原理
卡夫卡
10-16
1万+
kafka确实是一个很牛逼的消息中间件。基本上是消息中间件中数据最快吞吐量最高的分布式消息中间件了。
由于公司对kafka全封装了,直接调用api就可以了。但是本人对kafka很感兴趣,就先看了下kafka监听topic里的新增的消息。
看了下源码其实很简单。public class Consumer{ private static final KafkaConsumer<String,
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:像素格子
设计师:CSDN官方博客
返回首页
wuliZs_
CSDN认证博客专家
CSDN认证企业博客
码龄7年
企业员工
63
原创
2万+
周排名
96万+
总排名
24万+
访问
等级
2949
积分
1314
粉丝
369
获赞
178
评论
138
收藏
私信
关注
热门文章
UnicodeDecodeError: ‘ascii‘ codec can‘t decode byte 0xe6 in position解决方法
37312
go: cannot find main module; see 'go help modules' 解决方案
18907
php极光推送详解过程
18173
关于phpstudy访问localhost不出现目录的问题
15501
本地git仓库推送到服务器自建的git仓库实现目录文件同步教程[自整理]
13446
分类专栏
IT工程师语录
1篇
php
30篇
IDE
Golang
6篇
java
1篇
python实战
4篇
json
1篇
数据库
1篇
mongo
1篇
jquery
2篇
mysql
8篇
js
2篇
加密算法
2篇
加密
2篇
服务器压力
2篇
服务器
12篇
linux
5篇
apache
1篇
证书
1篇
https
1篇
git
1篇
centos
1篇
ssh
php,centos
svn
1篇
推送
1篇
Jpush
1篇
tags
alias
redis
redis,算法
php,html,多文件上传
1篇
php,算法
2篇
javascript
1篇
curl
2篇
请求
2篇
python
4篇
搜索引擎
1篇
pycharm
python编辑器
windows server
域控服务器
1篇
GD
1篇
合并图片
1篇
kafka
2篇
TCP
2篇
最新评论
UnicodeDecodeError: ‘ascii‘ codec can‘t decode byte 0xe6 in position解决方法
ljl151610:
那在哪里设置啊?求大佬解决
本地git仓库推送到服务器自建的git仓库实现目录文件同步教程[自整理]
NEAUZSY:
[code=plain]
echo "太强了"
[/code]
关于phpstudy访问localhost不出现目录的问题
RunningBComeOn:
谢谢。可以用。
UnicodeDecodeError: ‘ascii‘ codec can‘t decode byte 0xe6 in position解决方法
I小码哥:
谢谢分享
全国省市区三级Json数据
        .:
好人一生平安
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
几年未写,打个卡 ~
【十大IDE】 解决你不懂英文的痛苦
Golang 入门-Gin框架深入了解使用
2022年1篇
2020年7篇
2019年19篇
2018年7篇
2017年18篇
2016年16篇
目录
目录
分类专栏
IT工程师语录
1篇
php
30篇
IDE
Golang
6篇
java
1篇
python实战
4篇
json
1篇
数据库
1篇
mongo
1篇
jquery
2篇
mysql
8篇
js
2篇
加密算法
2篇
加密
2篇
服务器压力
2篇
服务器
12篇
linux
5篇
apache
1篇
证书
1篇
https
1篇
git
1篇
centos
1篇
ssh
php,centos
svn
1篇
推送
1篇
Jpush
1篇
tags
alias
redis
redis,算法
php,html,多文件上传
1篇
php,算法
2篇
javascript
1篇
curl
2篇
请求
2篇
python
4篇
搜索引擎
1篇
pycharm
python编辑器
windows server
域控服务器
1篇
GD
1篇
合并图片
1篇
kafka
2篇
TCP
2篇
目录
评论 4
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
wuliZs_
你的鼓励将是我创作的最大动力
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值