初探 RabbitMQ 和 PHP 7.2 整合

RabbitMQ是流行的开源消息队列系统,用Erlang语言开发,是AMQP(高级消息队列协议)的标准实,支持多种客户端,如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件 (MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一 部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特 性,例如更易于扩展,基于内容的路由。

在 CentOS 7 上安装的话(Ubuntu 上把 yum 替换为 apt 即可), 很简单:
# yum install rabbitmq-server librabbitmq-devel
以上命令会自动安装相关的依赖软件包,包括一大堆 erlang 的软件包。

然后 # systemctl enable rabbitmq-server; systemctl start rabbitmq-server
就可以把 RabbitMQ 服务启动起来了。

接下来我们把远程管理的插件启用:
# rabbitmq-plugins enable rabbitmq_management

我们要把 guest 账号远程登录开启:
# vim /etc/rabbitmq/rabbitmq.config
[{rabbit, [{loopback_users, []}]}].

接下来我们把 php 模块安装起来,
# apt install php-amqp

CentOS 7 上选择对应的软件包: php72-php-pecl-amqp

# apt install php7.2-amqp

我们用 # php -m 可以看到 amqp 已经在模块列表了。 (在 CentOS 上要重启 Apache 才能看到)

我们先写一段发送消息的代码:send.php

<?php
/*
PHP amqp(RabbitMQ) Demo – publisher
Author: Linvo
Date: 2012/7/30
*/
//配置信息
$conn_args = array(
‘host’ => ‘localhost’,
‘port’ => ‘5672’,
‘login’ => ‘guest’,
‘password’ => ‘guest’,
‘vhost’=>’/’
);
$e_name = ‘e_linvo’; //交换机名
//$q_name = ‘q_linvo’; //无需队列名
$k_route = ‘key_1’; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die(“Cannot connect to the broker!\n”);
}
$channel = new AMQPChannel($conn);
//消息内容
$message = “TEST MESSAGE! 测试消息!”;
//创建交换机对象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
//发送消息
//$channel->startTransaction(); //开始事务
for($i=0; $i<5; ++$i){ echo “Send Message:”.$ex->publish($message, $k_route).”\n”;
}
//$channel->commitTransaction(); //提交事务
$conn->disconnect();

然后我们再来写一段接收消息的代码:

[root@shanghai-yongjie demo]# cat receive.php

<?php
/*
PHP amqp(RabbitMQ) Demo – consumer
Author: Linvo
Date: 2012/7/30
*/
//配置信息
$conn_args = array(
‘host’ => ‘localhost’,
‘port’ => ‘5672’,
‘login’ => ‘guest’,
‘password’ => ‘guest’,
‘vhost’=>’/’
);
$e_name = ‘e_linvo’; //交换机名
$q_name = ‘q_linvo’; //队列名
$k_route = ‘key_1’; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die(“Cannot connect to the broker!\n”);
}
$channel = new AMQPChannel($conn);
//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo “Exchange Status:”.$ex->declare().”\n”;
//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo “Message Total:”.$q->declare().”\n”;
//绑定交换机与队列,并指定路由键
echo ‘Queue Bind: ‘.$q->bind($e_name, $k_route).”\n”;
//阻塞模式接收消息
echo “Message:\n”;
while(True){
$q->consume(‘processMessage’);
//$q->consume(‘processMessage’, AMQP_AUTOACK); //自动ACK应答
}
$conn->disconnect();
/**
消费回调函数
处理消息
*/
function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg.”\n”; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

我们按次序运行这两段代码,就可以看到如下的输出:

下期,我们搞一把 RabbitMQ 的集群配置

作者: 甬洁网络

--移动互联网&物联网技术提供商