RabbitMQ基础使用

简单地说,RabbitMQ是消息队列服务。对比Redis也提供List结构,也可以作为消息队列的实现。但是看过一些对比,貌似各位大佬还是更加支持使用RabbitMQ作为消息队列的实现。故我好奇,此处便来学习一下。

前言

简单地说,RabbitMQ是消息队列服务。对比Redis也提供List结构,也可以作为消息队列的实现。但是看过一些对比,貌似各位大佬还是更加支持使用RabbitMQ作为消息队列的实现。故我好奇,此处便来学习一下。

此demo为学习 RabbitMQ 官网入门教程RabbitMQ - RabbitMQ tutorial笔记。因官网文档内容为英文,又在segmentfault 网站中看到大佬翻译的该官网入门教程的翻译版本(虽然有点觉得也是机翻的xixixi),故两者结合观摩学习。

具体步骤在两份文档中已经十分详细,本文就作为学习RabbitMQ过程中,对每一部分的内容做些总结。又实际上该6篇教程中的使用代码结构差异不大,所以本文对比6篇入门教程,对RabbitMQ使用的不同之处,做一些简单的对比与总结。十分推荐阅读RabbitMQ 使用参考 - 邹业盛这位大佬的blog,和我这种目前半桶水都没有的相比hhhh,思路与概念讲述十分清晰。

环境准备

RabbitMQ安装

详细见于:RabbitMQ官网https://www.rabbitmq.com/download.html

PHP amqplib组件安装

通过composer,安装 PHP amqplib组件

composer require php-amqplib/php-amqplib

本demo中,通过 requrie_once 引入使用。

RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理(英语:Message broker)软件(亦称面向消息的中间件(英语:Message-oriented middleware))。——维基百科 - RabbitMQ词条

RabbitMQ默认服务监听在 5672 端口上。

通常队列服务都会有三个概念:

  • 工作者(发消息者)
  • 队列
  • 消费者(收消息者)

流程上,工作者把消息放上队列中,然后消费者从队列中取出消息。

RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在 发消息者队列 之间, 加入了 交换器(Exchange) . 这样 发消息者队列 就没有直接联系, 转而变成 发消息者 把消息给 交换器 , 交换器 根据调度策略再把消息再给 队列

当然, 多一层抽象会增加复杂度, 但是同时, 功能上也更灵活. 事实上, 很多时候面对具体场景时, 在这种”四段式”的结构下, 你可选择的方案不止一种的. 不过也不必过于担心, 在一些自我规定的”原则”之下, “正确”的方案也不会那么纠结.

总结一下 4 + 1 个概念, 或者说, 五种角色:

  • Producing , 生产者, 产生消息的角色.
  • Exchange , 交换器, 在得到生产者的消息后, 把消息扔到队列的角色.
  • Queue , 队列, 消息暂时呆的地方.
  • Consuming , 消费者, 把消息从队列中取出的角色.
  • 消息 Message , RabbitMQ 中的消息有自己的一系列属性, 某些属性对信息流有直接影响.

——摘抄自RabbitMQ 使用参考 - 邹业盛

RabbitMQ官方教程文档代码文件见于:https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php。本人也记录一份于本人Github:https://github.com/LucienVen/php_demo/tree/master/rabbitmq_demo

一些术语与单词翻译

名词

名词 解释 备注
生成者 发送信息的程序
消费者 等待消息然后处理的程序
队列
管道

单词翻译

单词 翻译 备注(记录出处)
declare 声明 queue_declare, 队列声明的意思
publish 发布 basic_publish,见于生产者程序
channel 渠道,管道,频道 常见于”频道“意思
consume 消耗,消费 basic_consume,理解为消费的意思,对应basic_publish,见于消费者程序

一般思路综述

tutorial part1至part5的代码的结构差不多,按照个人的理解:

生产者:

  1. 建立连接 new AMQPStreamConnection
  2. 从连接获取一个channel,$channel = $connection->channel()
  3. 设置队列queue / 交换机exchange,$channel->queue_declare() 或 exchange_declare()
  4. 定义message接口,$msg = new AMQPMessage
  5. 频道channel发布 basic_publish,$channel->queue_declare()
  6. 关闭channel和connection

消费者:

  1. 建立连接 new AMQPStreamConnection
  2. 从连接获取channel
  3. 定义回调函数
  4. 声明需要的queue,$channel->basic_consume()
  5. 开始接受信息

注:

  • 声明exchange与声明queue并不冲突,前面几篇教程为了简洁才不同时定义两者。

part1: Base usage

RabbitMQ的基础使用,了解如何构建一个简单的工作队列。

运行php send.php生产者程序脚本,会向名为前面声明的队列(假设为queue_name)发送信息。多次运行,会发送多条信息,若此时没有运行消费者程序(receive.php),消息会存在于队列中,待到消费者程序脚本运行时,一并提取。

运行php receive.php消费者程序脚本,会从声明为queue_name的队列中获取信息。终端运行 rabbitmqctl list_queues 命令,可列出队列。

如若未运行消费者程序(receive.php),而运行生成者程序两次,此时列出的队列名以及消息条数分别为:name => queue_name,message => 2。此时在运行消费者程序,则可见终端打印出两条message,再次运行rabbitmqctl list_queues 命令,可见name => queue_name,message => 0

part2: Work Queues

工作队列(又名任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待它完成。相反,我们计划稍后完成任务。我们将任务封装为消息并将其发送到队列中。后台运行的一个工作进程将弹出任务并最终执行该任务。当你运行许多工人(消费者)时,任务将在他们之间分担。

part2 Demo允许任意的消息是从命令行发送。这一计划将任务分配给我们的工作队列。进行单worker简单运行,结果同part1。

循环调度

使用任务队列的一个优点是容易拥有并行工作的能力。如果我们积压了大量的工作,可以增加更多的worker。尝试同时运行两个worker.php脚本,然后继续使用php new_task.php 'new message'发送消息,可见每个worker程序依次打印出new_task发送的消息。默认情况下,RabbitMQ将会发送的每一条消息给下一个消费者,在序列。平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环轮询。试着用三个或更多的工人进行尝试。

消息确认

概念略。

通过设置basic_consume第四个参数为false,(basic_consume函数参见下文 *常见函数使用*),可知basic_consume第四个参数bool $no_ack = false,默认为false,默认消息确定。

part3:

待续

part4: routing

待续

part5: topic exchange

待续

part6: RPC

待续

常见函数使用

  • rabbitmqctl list_queues 列出队列

Timeout: 60.0 seconds … Listing queues for vhost / …

| name 队列名 | message 现存消息条数 | | | ———– | ——————– | —- | | demo | 2 | | | | | | | | | |

  • queue_declare
   queue_declare(
       string $queue = '', 			# 队列名称
       bool $passive = false, 
       bool $durable = false, 			# 持久化
       bool $exclusive = false, 
       bool $auto_delete = true, 
       bool $nowait = false, 
       array $arguments = array(), 
       int $ticket = null
   )
  • exchange_declare

  • 定义 AMQP 信息体

   use PhpAmqpLib\Message\AMQPMessage;
   $msg = new AMQPMessage('message body');
  • basic_publish
   // Publishes a message
   $channel->basic_publish(
       \PhpAmqpLib\Message\AMQPMessage $msg, 
       string $exchange = '', 
       string $routing_key = '', 
       bool $mandatory = false, 
       bool $immediate = false, 
       int $ticket = null
   );
  • base_consume
   base_consume(
       string $queue = '', 
       string $consumer_tag = '', 
       bool $no_local = false, 
       bool $no_ack = false, 			# 消息确认
       bool $exclusive = false, 
       bool $nowait = false, 
       callable|null $callback = null, 
       int|null $ticket = null, 
       array $arguments = array()
   )
  • new AMQPMessage()
    new AMQPMessage(string $body = '', array $properties = array())
   

常用命令

  • rabbitmqctl list_bindings

输出:Listing bindings for vhost /…

source_name source_kind destination_name destination_kind routing_key arguments
1 exchange demo queue demo []
  • rabbitmqctl list_queues 列出队列

Timeout: 60.0 seconds … Listing queues for vhost / …

| name 队列名 | message 现存消息条数 | | | :———: | :——————: | —- | | demo | 2 | | | | | | | | | |

参考

Tags// , , ,
More Reading