千家信息网

PHP如何实现php-amqplib/php-amqplib实例RabbitMq

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这篇文章主要为大家展示了"PHP如何实现php-amqplib/php-amqplib实例RabbitMq",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"P
千家信息网最后更新 2025年01月24日PHP如何实现php-amqplib/php-amqplib实例RabbitMq

这篇文章主要为大家展示了"PHP如何实现php-amqplib/php-amqplib实例RabbitMq",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"PHP如何实现php-amqplib/php-amqplib实例RabbitMq"这篇文章吧。

项目代码

https://gitee.com/owenzhang24/tp5

其他笔记:

1: 列出队列(Listing queues)

如果你想查看Rabbitmq队列,并且想知道有多少消息存在其中,你(作为特权用户)可以使用rabbitmqctl 工具:

rabbitmqctl list_queues。

在Windows中,省略sudo:

rabbitmqctl.bat list_queues

2: 工作队列

我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。

一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。

为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在window系统运行,去掉sudo:

$ rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

3: rabbitmqctl能够列出服务器上所有的交换器:

$ sudo rabbitmqctl list_exchanges

这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。

4:列出所有现存的绑定 rabbitmqctl list_bindings

5: 如果你想把日志保存到文件中,只需要打开控制台输入: ( receive_logs.php 源代码)

$ php receive_logs.php > logs_from_rabbit.log

如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:

$ php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C

如果要触发一个error级别的日志,只需要输入:

$ php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

第一:安装RabbitMq环境

windows环境的rabbitmq安装与启动

https://my.oschina.net/owenzhang24/blog/5051652

第二:composer require php-amqplib/php-amqplib
第三:代码类
  1. rabbitMq实现的基础类:application/common/lib/classes/rabbitmq/RabbitMq.php

  2. 供外部调用的rabbitMq类:application/common/lib/classes/RabbitMqWork.php

  3. 测试发送消息到rabbitMq中的方法:application/index/controller/Index.php

  4. 添加php think命令实现接收rabbitMq中的消息:application/common/command/*.php

第四:使用说明
  1. 发送消息时直接在自己的方法中调用RabbitMqWork.php类中的几个送消息的方法即可。

  2. application/common/command/下的类都是实现添加php think命令的类,在configure方法中的setName()中设置命令名称,execute()方法是为了执行接收rabbitMq中的消息,同时在application/command.php中return添加设置的命令名称及对应的命令目录地址。

  3. 贡献文档

  4. RabbitMQ 中文文档-PHP版。https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/

  5. RabbitMQ官方文档。https://www.rabbitmq.com/getstarted.html

第五:源码

application/common/lib/classes/rabbitmq/RabbitMq.php

 'direct_exchange',        self::TOPIC => 'topic_exchange',        self::HEADERS => 'headers_exchange',        self::FANOUT => 'fanout_exchange',    ];    const SEVERITYS = [        'info',        'warning',        'error'    ];    static private $exchangeName = '';    /**     * RabbitMq constructor.     * @param $exchangeType     */    private function __construct($exchangeType)    {        self::$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');        self::$channel = self::$connection->channel();        if (!empty($exchangeType)) {            self::$exchangeName = self::$exchangeNames[$exchangeType];            self::$channel->exchange_declare(                self::$exchangeName, //交换机名称                $exchangeType, //路由类型                false, //don't check if a queue with the same name exists 是否检测同名队列                true, //the queue will not survive server restarts 是否开启队列持久化                false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列            );        }    }    /**     * 实例化     * @param string $exchangeType     * @return RabbitMq     */    public static function instance($exchangeType = '')    {        if (!self::$instance instanceof self) {            self::$instance = new self($exchangeType);        }        return self::$instance;    }    /**     * 防止被外部复制     */    private function __clone()    {    }    /**     * 简单的发送     */    public function send()    {        self::$channel->queue_declare('hello', false, false, false);        $msg = new AMQPMessage('Hello World!');        self::$channel->basic_publish($msg, '', 'hello');        echo "[X] Sent 'Hello World!'\n";    }    /**     * 简单的接收     * @param $queueName     * @param $callback     */    public function receive($callback)    {        self::$channel->queue_declare('hello', false, false, false, true);        echo "[*] Waiting for messages. To exit press CTRL+C\n";        self::$channel->basic_consume('hello', '', false, true, false, false, $callback);        while (count(self::$channel->callbacks)) {            self::$channel->wait();        }    }    /**     * 添加工作队列     * @param string $data     */    public function addTask($data = '')    {        self::$channel->queue_declare('task_queue', false, true, false, true);        if (empty($data)) $data = 'Hello World!';        $msg = new AMQPMessage(            $data,            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)        );        self::$channel->basic_publish($msg, '', 'task_queue');        echo "[x] Sent $data \n";    }    /**     * 执行工作队列     * @param $callback     */    public function workTask($callback)    {        self::$channel->queue_declare('task_queue', false, true, false, true);        echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";        self::$channel->basic_qos(null, 1, null);        self::$channel->basic_consume('task_queue', '', false, false, false, false, $callback);        while (count(self::$channel->callbacks)) {            self::$channel->wait();        }    }    /**     * 发布     * @param string $data     */    public function sendQueue($data = '')    {        if (empty($data)) $data = 'info:Hello World!';        $msg = new AMQPMessage($data);        self::$channel->basic_publish($msg, self::$exchangeName);        echo "[x] Sent $data \n";    }    /**     * 订阅     * @param $callback     */    public function subscribeQueue($callback)    {        list($queue_name, ,) = self::$channel->queue_declare(            "", //队列名称            false, //don't check if a queue with the same name exists 是否检测同名队列            true, //the queue will not survive server restarts 是否开启队列持久化            true, //the queue might be accessed by other channels 队列是否可以被其他队列访问            false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列        );        self::$channel->queue_bind($queue_name, self::$exchangeName);        echo "[*] Waiting for logs. To exit press CTRL+C \n";        self::$channel->basic_consume($queue_name, '', false, true, false, false, $callback);        while (count(self::$channel->callbacks)) {            self::$channel->wait();        }    }    /**     * 发送(直接交换机)     * @param $routingKey     * @param string $data     */    public function sendDirect($routingKey, $data = '')    {        if (empty($data)) $data = "Hello World!";        $msg = new AMQPMessage($data);        self::$channel->basic_publish($msg, self::$exchangeName, $routingKey);        echo "[x] Sent $routingKey:$data \n";    }    /**     * 接收(直接交换机)     * @param \Closure $callback     * @param array $bindingKeys     */    public function receiveDirect(\Closure $callback, array $bindingKeys)    {        list($queue_namme, ,) = self::$channel->queue_declare('', false, true, true, false);        foreach ($bindingKeys as $bindingKey) {            self::$channel->queue_bind($queue_namme, self::$exchangeName, $bindingKey);        }        echo "[x] Waiting for logs. To exit press CTRL+C \n";        self::$channel->basic_consume($queue_namme, '', false, true, false, false, $callback);        while (count(self::$channel->callbacks)) {            self::$channel->wait();        }    }    /**     * 发送(主题交换机)     * @param $routingKey     * @param string $data     */    public function sendTopic($routingKey, $data = '')    {        if (empty($data)) $data = "Hello World!";        $msg = new AMQPMessage($data);        self::$channel->basic_publish($msg, self::$exchangeName, $routingKey);        echo " [x] Sent ", $routingKey, ':', $data, " \n";    }    /**     * 接收(主题交换机)     * @param \Closure $callback     * @param array $bindingKeys     */    public function receiveTopic(\Closure $callback, array $bindingKeys)    {        list($queueName, ,) = self::$channel->queue_declare("", false, true, true, false);        foreach ($bindingKeys as $bindingKey) {            self::$channel->queue_bind($queueName, self::$exchangeName, $bindingKey);        }        echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";        self::$channel->basic_consume($queueName, '', false, true, false, false, $callback);        while (count(self::$channel->callbacks)) {            self::$channel->wait();        }    }    /**     * 销毁     */    public function __destruct()    {        // TODO: Implement __destruct() method.        self::$channel->close();        self::$connection->close();    }}

application/common/lib/classes/RabbitMqWork.php

RabbitMq = RabbitMq::instance($exchageType);    }    /**     * 发送(普通)     */    public function send()    {        $this->RabbitMq->send();    }    /**     * 接收(普通)     * @param $callback     */    public function receive($callback)    {        $this->RabbitMq->receive($callback);    }    /**     * 发送(工作队列)     * @param $data     */    public function addTask($data)    {        $this->RabbitMq->addTask($data);    }    /**     * 接收(工作队列)     * @param $callback     */    public function workTask($callback)    {        $this->RabbitMq->workTask($callback);    }    /**     * 发布(扇形交换机)     * @param $data     */    public function sendQueue($data)    {        $this->RabbitMq->sendQueue($data);    }    /**     * 订阅(扇形交换机)     * @param $callback     */    public function subscribeQueue($callback)    {        $this->RabbitMq->subscribeQueue($callback);    }    /**     * 发送(直接交换机)     * @param $bindingKey     * @param $data     */    public function sendDirect($routingKey, $data)    {        $this->RabbitMq->sendDirect($routingKey, $data);    }    /**     * 接收(直接交换机)     * @param \Closure $callback     * @param array $bindingKeys     */    public function receiveDirect(\Closure $callback, array $bindingKeys)    {        $this->RabbitMq->receiveDirect($callback, $bindingKeys);    }    /**     * 发送(主题交换机)     * @param $routingKey     * @param $data     */    public function sendTopic($routingKey, $data)    {        $this->RabbitMq->sendTopic($routingKey, $data);    }    /**     * 接收(主题交换机)     * @param \Closure $callback     * @param array $bindingKeys     */    public function receiveTopic(\Closure $callback, array $bindingKeys)    {        $this->RabbitMq->receiveTopic($callback, $bindingKeys);    }}

application/index/controller/Index.php

send();//        $this->addTask();//        $this->sendQueue();//        $this->sendDirect();        $this->sendTopic();        var_dump(11);        die();    }    public function searchBlog()    {//        $id=1;//        $res = SyncBlog::getInstance()->syncBlog($id);        $search='11';        $res = SearchBlog::getInstance()->searchBlog($search, 1, 100);        var_dump($res);        die();        var_dump(1111);        die();    }    /**     * 发送(普通)     */    public function send()    {        $RabbitMqWork = new RabbitMqWork();        $RabbitMqWork->send();    }    /**     * 发送(工作队列)     */    public function addTask()    {        $data = input('data', 'This is work task!');        $RabbitMqWork = new RabbitMqWork();        $RabbitMqWork->addTask($data);    }    /**     * 发送(扇形交换机)     */    public function sendQueue()    {        $data = input('data', 'This is send queue1');        $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);        $RabbitMqWork->sendQueue($data);    }    /**     * 发送(直接交换机)     */    public function sendDirect()    {        $data = input('data', 'Hello World!');        $routingKey = input('routingKey', 'info');        $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);        $RabbitMqWork->sendDirect($routingKey, $data);    }    /**     * 发送(主题交换机)     */    public function sendTopic()    {        $data = input('data', 'Hello World!');        $routingKey = input('routingKey', 'lazy.boy');        $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);        $RabbitMqWork->sendTopic($routingKey, $data);    }}

application/command.php

// +----------------------------------------------------------------------return [    'simpleMq' => 'application\command\SimpleWork',    'workQueue' => 'application\command\WorkQueue',    'sendQueue' => 'application\command\SendQueue',    'directQueue' => 'application\command\DirectQueue',    'topicQueue' => 'application\command\TopicQueue',];

application/common/command/*.php

application/command/DirectQueue.php

setName('directQueue');    }    protected function execute(Input $input, Output $output)    {        $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);        $callback = function ($msg){            echo "[x] ".$msg->delivery_info['routing_key'].":$msg->body \n";        };        $RabbitMqWork->receiveDirect($callback,RabbitMq::SEVERITYS);    }}

application/command/SendQueue.php

setName('sendQueue');    }    protected function execute(Input $input, Output $output)    {        $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);        $callback = function ($msg) {            echo 'Receive:';            echo "Msg:$msg->body \n";            \Log::error("Msg:$msg->body");        };        $RabbitMqWork->subscribeQueue($callback);    }}

application/command/SimpleWork.php

setName('simpleMq');    }    protected function execute(Input $input, Output $output)    {        $RabbitMqWork= new RabbitMqWork();        $callback = function ($msg){            echo 'Receive:';            $queueName = $msg->delivery_info['routing_key'];            $msgData = $msg->body;            $isAck = true;            echo 'Msg:'.$msgData."\n";            echo 'QueueName:'.$queueName."\n";            if($isAck) {                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);            }        };        $RabbitMqWork->receive($callback);    }}

application/command/TopicQueue.php

setName('topicQueue');    }    protected function execute(Input $input, Output $output)    {        $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);        $callback = function ($msg){            echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";        };        $bindingKeys = [            '*.orange.*',            '*.*.rabbit',            'lazy.#'        ];        $RabbitMqWork->receiveTopic($callback,$bindingKeys);    }}

application/command/WorkQueue.php

setName('workQueue');    }    protected function execute(Input $input, Output $output)    {        $RabbitMqWork = new RabbitMqWork();        $callback = function ($msg){            echo " [x] Received ", $msg->body, "\n";            sleep(substr_count($msg->body, '.'));            echo " [x] Done", "\n";            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);        };        $RabbitMqWork->workTask($callback);    }}

以上是"PHP如何实现php-amqplib/php-amqplib实例RabbitMq"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0