五、消息订阅(Publish/Subscribe) | RabbitMQ 从入门到放弃


本站和网页 https://xueyuanjun.com/post/7420.html 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

五、消息订阅(Publish/Subscribe) | RabbitMQ 从入门到放弃
Laravel 学院
文档
Laravel 8.x 中文文档
Laravel 7.x 中文文档
Laravel 6.x 中文文档
Laravel 5.8 中文文档
Laravel 5.7 中文文档
Laravel 5.6 中文文档
Laravel 5.5 中文文档
Laravel 5.4 中文文档
Laravel 5.3 中文文档
Laravel 5.2 中文文档
Laravel 5.1 中文文档
Lumen 中文文档
全栈教程
PHP 全栈工程师指南
PHP 入门到实战
Laravel 入门到精通
Vue.js 入门到实战
玩转 PhpStorm 教程
Laravel 博客入门项目
Laravel 微信小程序项目
Laravel 前后端分离项目
Swoole 入门到实战
Eloquent 性能优化实战
Redis 高性能实战系列
Laravel 新版本特性
PHP 新特性与最佳实践
Golang
Go 入门教程
Go Web 编程
Gin 使用教程
微服务开发
内功修炼
数据结构与算法
网络协议
微服务从入门到实践
高性能 MySQL 实战
高性能 Redis 实战
Laravel 消息队列实战
Laravel 从学徒到工匠
PHP 设计模式系列
名企面试指南
资源库
Laravel 资源大全
Laravel 开源项目
Laravel 扩展包
Laravel 资源下载
更多
博客 & 新闻
问答 & 讨论
Leetcode 题解
学院君读书笔记系列
关于 Laravel 学院
Laravel 互助学习群
Golang 互助学习群
更多
Laravel 中文文档
Laravel 全栈教程
Laravel 学习路径
Go 入门教程
程序员内功修炼
博客
问答
搜索
注册
登录
Info
Content
章节导航
RabbitMQ 从入门到放弃
一、核心概念及术语
二、Mac 下搭建 PHP 开发环境
三、PHP 中实现消息发送和接收
四、消息分发机制
五、消息订阅(Publish/Subscribe)
六、消息路由
七、通过主题进行消息分发
八、远程调用(RPC)
图书
RabbitMQ 从入门到放弃
五、消息订阅(Publish/Subscribe)
五、消息订阅(Publish/Subscribe)
由 学院君 创建于5年前, 最后更新于 2年前
版本号 #2
12111 views
0 likes
0 collects
之前都是将消息发送到同一个 Consumer,而现在我们需将其发送到多个 Consumer。
我们将创建一个日志系统,它包含两个部分:第一个部分负责发出log(Producer),第二个部分负责接收并打印(Consumer)。我们将构建两个 Consumer,第一个将 log 写到物理磁盘上;第二个将 log 输出到屏幕。
"Fanout" not telling an exchange to distribute messages to different consumers, but to different queues. So, you need at least two queues binding to a "fanout" exchange. Then let your two consumers get message from those two queues, one consumer to one queue.
Exchange
Exchange 决定将 Message 发送到具体的 Queue,至于是发送给一个 Queue 还是多个 Queue,则需要通过 Exchange 的类型类决定。Exchange 分为三种类型:direct、topic 和 fanout。fanout 就是广播模式,会将 Message 都放到它所知道的所有 Queue 中:
$exchange->setType(AMQP_EX_TYPE_FANOUT);
现在我们可以直接通过 Exchange,而不需要 routing_key 来发送 Message 了:
$exchange->publish($message);
临时队列
截至现在,我们用的 Queue 都是有名字的。使用有名字的 Queue,使得在 Producer 和 Consumer 之前共享 Queue 成为可能。
在我们的日志系统中,不需要有名字的队列,要实现这个目标,需要在声明队列时不指定名称,而由系统随机分配:
$queue = new AMQPQueue($channel);
//$queue->setName($queueName);
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declareQueue();
//$queue->bind($exchangeName, $routeKey);
这时,通过 $queue->getName() 获取到的队列名称是随机生成的。
绑定
建立 Exchange 与 Queue 之间的绑定:
$queue->bind($exchangeName);
演示代码
emit_logs.php
<?php
/**
* 发送消息
*/
$exchangeName = 'logs';
$message = 'Hello World!';
// 建立TCP连接
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to the broker!\n");
try {
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->declareExchange();
echo 'Send Message: ' . $exchange->publish($message) . "\n";
echo "Message Is Sent: " . $message . "\n";
} catch (AMQPConnectionException $e) {
var_dump($e);
// 断开连接
$connection->disconnect();
receive_logs.php
<?php
/**
* 接收消息
*/
$exchangeName = 'logs';
// 建立TCP连接
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => '5672',
'vhost' => '/',
'login' => 'guest',
'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declareQueue();
$queue->bind($exchangeName);
var_dump("Waiting for message...");
// 消费队列消息
while(TRUE) {
$queue->consume('processMessage');
// 断开连接
$connection->disconnect();
function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
var_dump("Received: " . $msg);
$queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
演示流程
打开两个终端,一个消费者队列负责将日志写入文件:
php receive_logs.php > logs_from_rabbit.log
一个负责将日志输出到屏幕:
php receive_logs.php
然后再打开一个终端,将日志信息发送到所有队列:
php emit_logs.php
这样,会发现所有队列会同时接收到日志并进行相应的处理。
广播
订阅
PHP
RabbitMQ
Exchange
消息队列
Fanout
点赞
取消点赞
收藏
取消收藏
赞赏
分享到以下平台:
<< 上一篇:
四、消息分发机制
>> 下一篇:
六、消息路由
1 条评论
#1
kincae
评论于 1年前
正在删除评论...
win10 环境下如果文件写入不了日志,改成$ php.exe receive_logs.php > logs_from_rabbit.log
登录后即可添加评论
升级为学院君订阅用户(新年优惠🎁)
内容导航
Exchange
临时队列
绑定
演示代码
相关推荐
一、核心概念及术语
RabbitMQ 从入门到放弃
三、PHP 中实现消息发送和接收
RabbitMQ 从入门到放弃
六、消息路由
RabbitMQ 从入门到放弃
七、通过主题进行消息分发
RabbitMQ 从入门到放弃
八、远程调用(RPC)
RabbitMQ 从入门到放弃
回到顶部
2022 基于 Laravel 6 构建
关于学院
订阅服务
友情链接
站点地图
本站 CDN 加速服务由又拍云赞助