RabbitMQ基础使用
23/Jul 2019
简单地说,RabbitMQ是消息队列服务。对比Redis也提供List结构,也可以作为消息队列的实现。但是看过一些对比,貌似各位大佬还是更加支持使用RabbitMQ作为消息队列的实现。故我好奇,此处便来学习一下。
前言
简单地说,RabbitMQ是消息队列服务。对比Redis也提供List结构,也可以作为消息队列的实现。但是看过一些对比,貌似各位大佬还是更加支持使用RabbitMQ作为消息队列的实现。故我好奇,此处便来学习一下。
此demo为学习 RabbitMQ 官网入门教程RabbitMQ - RabbitMQ tutorial笔记。因官网文档内容为英文,又在segmentfault 网站中看到大佬翻译的该官网入门教程的翻译版本(虽然有点觉得也是机翻的xixixi),故两者结合观摩学习。
具体步骤在两份文档中已经十分详细,本文就作为学习RabbitMQ过程中,对每一部分的内容做些总结。又实际上该6篇教程中的使用代码结构差异不大,所以本文对比6篇入门教程,对RabbitMQ使用的不同之处,做一些简单的对比与总结。十分推荐阅读RabbitMQ 使用参考 - 邹业盛这位大佬的blog,和我这种目前半桶水都没有的相比hhhh,思路与概念讲述十分清晰。
环境准备
RabbitMQ安装
详细见于:RabbitMQ官网https://www.rabbitmq.com/download.html
PHP amqplib组件安装
通过composer,安装 PHP amqplib组件
composer require php-amqplib/php-amqplib
本demo中,通过 requrie_once 引入使用。
RabbitMQ
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理(英语:Message broker)软件(亦称面向消息的中间件(英语:Message-oriented middleware))。——维基百科 - RabbitMQ词条
RabbitMQ默认服务监听在 5672 端口上。
通常队列服务都会有三个概念:
- 工作者(发消息者)
- 队列
- 消费者(收消息者)
流程上,工作者把消息放上队列中,然后消费者从队列中取出消息。
RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在 发消息者 和 队列 之间, 加入了 交换器(Exchange) . 这样 发消息者 和 队列 就没有直接联系, 转而变成 发消息者 把消息给 交换器 , 交换器 根据调度策略再把消息再给 队列 。
当然, 多一层抽象会增加复杂度, 但是同时, 功能上也更灵活. 事实上, 很多时候面对具体场景时, 在这种”四段式”的结构下, 你可选择的方案不止一种的. 不过也不必过于担心, 在一些自我规定的”原则”之下, “正确”的方案也不会那么纠结.
总结一下
4 + 1个概念, 或者说, 五种角色:
- Producing , 生产者, 产生消息的角色.
- Exchange , 交换器, 在得到生产者的消息后, 把消息扔到队列的角色.
- Queue , 队列, 消息暂时呆的地方.
- Consuming , 消费者, 把消息从队列中取出的角色.
- 消息 Message , RabbitMQ 中的消息有自己的一系列属性, 某些属性对信息流有直接影响.
——摘抄自RabbitMQ 使用参考 - 邹业盛
RabbitMQ官方教程文档代码文件见于:https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php。本人也记录一份于本人Github:https://github.com/LucienVen/php_demo/tree/master/rabbitmq_demo
一些术语与单词翻译
名词
| 名词 | 解释 | 备注 |
|---|---|---|
| 生成者 | 发送信息的程序 | |
| 消费者 | 等待消息然后处理的程序 | |
| 队列 | ||
| 管道 | ||
单词翻译
| 单词 | 翻译 | 备注(记录出处) |
|---|---|---|
| declare | 声明 | queue_declare, 队列声明的意思 |
| publish | 发布 | basic_publish,见于生产者程序 |
| channel | 渠道,管道,频道 | 常见于”频道“意思 |
| consume | 消耗,消费 | basic_consume,理解为消费的意思,对应basic_publish,见于消费者程序 |
一般思路综述
tutorial part1至part5的代码的结构差不多,按照个人的理解:
生产者:
- 建立连接 new AMQPStreamConnection
- 从连接获取一个channel,
$channel = $connection->channel() - 设置队列queue / 交换机exchange,
$channel->queue_declare() 或 exchange_declare() - 定义message接口,
$msg = new AMQPMessage - 频道channel发布 basic_publish,
$channel->queue_declare() - 关闭channel和connection
消费者:
- 建立连接 new AMQPStreamConnection
- 从连接获取channel
- 定义回调函数
- 声明需要的queue,
$channel->basic_consume() - 开始接受信息
注:
- 声明exchange与声明queue并不冲突,前面几篇教程为了简洁才不同时定义两者。
part1: Base usage
RabbitMQ的基础使用,了解如何构建一个简单的工作队列。
运行php send.php生产者程序脚本,会向名为前面声明的队列(假设为queue_name)发送信息。多次运行,会发送多条信息,若此时没有运行消费者程序(receive.php),消息会存在于队列中,待到消费者程序脚本运行时,一并提取。
运行php receive.php消费者程序脚本,会从声明为queue_name的队列中获取信息。终端运行 rabbitmqctl list_queues 命令,可列出队列。
如若未运行消费者程序(receive.php),而运行生成者程序两次,此时列出的队列名以及消息条数分别为:name => queue_name,message => 2。此时在运行消费者程序,则可见终端打印出两条message,再次运行rabbitmqctl list_queues 命令,可见name => queue_name,message => 0
part2: Work Queues
工作队列(又名任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待它完成。相反,我们计划稍后完成任务。我们将任务封装为消息并将其发送到队列中。后台运行的一个工作进程将弹出任务并最终执行该任务。当你运行许多工人(消费者)时,任务将在他们之间分担。
part2 Demo允许任意的消息是从命令行发送。这一计划将任务分配给我们的工作队列。进行单worker简单运行,结果同part1。
循环调度
使用任务队列的一个优点是容易拥有并行工作的能力。如果我们积压了大量的工作,可以增加更多的worker。尝试同时运行两个worker.php脚本,然后继续使用php new_task.php 'new message'发送消息,可见每个worker程序依次打印出new_task发送的消息。默认情况下,RabbitMQ将会发送的每一条消息给下一个消费者,在序列。平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环轮询。试着用三个或更多的工人进行尝试。
消息确认
概念略。
通过设置basic_consume第四个参数为false,(basic_consume函数参见下文 *常见函数使用*),可知basic_consume第四个参数bool $no_ack = false,默认为false,默认消息确定。
part3:
待续
part4: routing
待续
part5: topic exchange
待续
part6: RPC
待续
常见函数使用
- rabbitmqctl list_queues 列出队列
Timeout: 60.0 seconds … Listing queues for vhost / …
| name 队列名 | message 现存消息条数 | | | ———– | ——————– | —- | | demo | 2 | | | | | | | | | |
- queue_declare
queue_declare(
string $queue = '', # 队列名称
bool $passive = false,
bool $durable = false, # 持久化
bool $exclusive = false,
bool $auto_delete = true,
bool $nowait = false,
array $arguments = array(),
int $ticket = null
)
exchange_declare
定义 AMQP 信息体
use PhpAmqpLib\Message\AMQPMessage;
$msg = new AMQPMessage('message body');
- basic_publish
// Publishes a message
$channel->basic_publish(
\PhpAmqpLib\Message\AMQPMessage $msg,
string $exchange = '',
string $routing_key = '',
bool $mandatory = false,
bool $immediate = false,
int $ticket = null
);
- base_consume
base_consume(
string $queue = '',
string $consumer_tag = '',
bool $no_local = false,
bool $no_ack = false, # 消息确认
bool $exclusive = false,
bool $nowait = false,
callable|null $callback = null,
int|null $ticket = null,
array $arguments = array()
)
- new AMQPMessage()
new AMQPMessage(string $body = '', array $properties = array())
常用命令
- rabbitmqctl list_bindings
输出:Listing bindings for vhost /…
| source_name | source_kind | destination_name | destination_kind | routing_key | arguments |
|---|---|---|---|---|---|
| 1 | exchange | demo | queue | demo | [] |
- rabbitmqctl list_queues 列出队列
Timeout: 60.0 seconds … Listing queues for vhost / …
| name 队列名 | message 现存消息条数 | | | :———: | :——————: | —- | | demo | 2 | | | | | | | | | |
参考
- RabbitMQ - RabbitMQ tutorial
- RabbitMQ - RabbitMQ tutorial - 翻译版本 - segmentfault
- RabbitMQ 使用参考 - 邹业盛
- RabbitMQ - RabbitMQ tutorial - 完整代码