主页
<?php

namespace AppUtils;

use AppJobsFeiShuSendJob;
use AppRepositoriesOrderOrdersRepo;
use Exception;
use IlluminateSupportFacadesLog;
use PhpAmqpLibChannelAMQPChannel;
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibExchangeAMQPExchangeType;
use PhpAmqpLibWireAMQPTable;

/*
    @method: queue_declare
        name: $queue    // should be unique in fanout exchange. Let RabbitMQ create a queue name for us
        passive: false  // don't check if a queue with the same name exists
        durable: false  // the queue will not survive server restarts       是否持久化队列
        exclusive: true // the queue can not be accessed by other channels  是否是独占队列,不允许并发消费
        auto_delete: true // the queue will be deleted once the channel is closed.
*/

/**
 * fanout derict topic 延迟队列
 */
class RabbitmqUtil
{
    //配置数组
    public array $configs = [];

    //交换机名称
    public string $exchangeName = '';

    //队列名称
    public string $queueName = '';

    //路由名称
    public string $routeKey = '';

    //最后一次心跳时间
    public $lastHeartbeatAt = 0;

    //MQ链接
    public $connection = null;

    //MQ CHANNEL
    public $channel = null;

    //mq参数数组
    public $args = [
        'ask' => false
    ];


    /**
     * 初始化配置文件
     *
     * @param string $exchangeName
     *
     * @param string $queueName
      @param string $routeKey 支持普通路由和正侧路由键(user.  user.#等)
     * @param array $configs
     * @throws Exception
     */
    public function __construct(string $exchangeName,string $queueName = '', string $routeKey = '',array $configs=[])
    {
        if(!$configs){
            $configs = config('queue.connections.rabbitmq');
        }
        $this->setConfigs($configs);
        $this->exchangeName = $exchangeName;
        $this->queueName = $queueName;
        $this->routeKey = $routeKey;
    }


    /**
     * 设置配置文件
     *
     * @param  array  $configs
     *
     * @throws Exception
     */
    private function setConfigs(array $configs)
    {
        if (!($configs['host'] && $configs['port'] && $configs['user'] && $configs['password'])) {
            throw new Exception('configs is empty');
        }
        if (empty($configs['vhost'])) {
            $configs['vhost'] = '/';
        }
        $this->configs = $configs;
    }


    /**
     * 初始化MQ
     *
     * @return AMQPStreamConnection
     */
    public function initConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $this->configs['host'],
            $this->configs['port'],
            $this->configs['user'],
            $this->configs['password'],
            $this->configs['vhost'],
            false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0,
            null, false, $this->configs['heartbeat']);
    }

    /**
     * 初始化队列
     *
     * @param  AMQPChannel  $channel
     *
     * @return AMQPChannel
     */
    private function initQueue(AMQPChannel &$channel)
    {
        // 初始化队列
        if(!$this->queueName){
            //如果队列不存在,则自动生成一个队列
            list($this->queueName, ,) = $channel->queue_declare($this->queueName, false, true, false, false);
        }else{
            $channel->queue_declare($this->queueName, false, true, false, false);
        }

        return $channel;
    }
    /**
     * 初始化频道信息
     *
     * @param  AMQPStreamConnection  $connection
     *
     * @return mixed
     */
    private function initTopicChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed
    {
        $channel = $connection->channel();
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::TOPIC, false, true, false);
        if($opType == 'consume'){
            // 初始化队列
            $this->initQueue($channel);
            // 将队列与某个交换机进行绑定,并使用路由关键字
            $channel->queue_bind($this->queueName, $this->exchangeName);
        }

        return $channel;
    }

    /**
     * 发布Topic消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushTopicMessage($message)
    {
        $connection = $this->initConnection();
        $channel = $this->initTopicChannel($connection);
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        $channel->basic_publish($data, $this->exchangeName, $this->routeKey);

        $channel->close();
        $connection->close();
    }

    /**
     * 消费Topic消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeTopicMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {
            /**
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive(AMQPExchangeType::DIRECT);
            $this->consume($channel,$functionName);
        } catch (Exception $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));

            Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     *  初始化频道信息
     *
     * @param $connection
     *
     * @return AMQPChannel
     */
    public function initDirectChannel(AMQPStreamConnection $connection): mixed
    {
        $channel = $connection->channel();
        // 初始化队列
        $this->initQueue($channel);
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::DIRECT, false, true, false);
        // 将队列与某个交换机进行绑定,并使用路由关键字
        $channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);

        return $channel;
    }

    /**
     * 发布直连消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushDirectMessage($message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        $data = new AMQPMessage($message, $properties);
        $this->keeplive();
        $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        $this->channel->close();
        $this->connection->close();
    }

    public function pushDirectMessageArr(array $message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        foreach ($message as $info){
            $data = new AMQPMessage($info, $properties);
            $this->keeplive();
            $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        }

        $this->channel->close();
        $this->connection->close();
    }

    /**
     * 发布直连消息 不关闭连接
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushDirectMessageNoClose($message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        $data = new AMQPMessage($message, $properties);
        $this->keeplive();
        $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
    }

    /**
     * 消费直连消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public  function consumeDirectMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        /**
         * @var AMQPChannel $channel
         */
        $channel = $this->keeplive(AMQPExchangeType::DIRECT);
        //公平调度
        $channel->basic_qos(null, 1, null);
        $this->consume($channel,$functionName);
    }

    /**
     * 初始化延迟channel信息
     *
     * @param  AMQPStreamConnection  $connection
     *
     * @return PhpAmqpLibChannelAbstractChannel|AMQPChannel
     */
    private function initDelayChannel(AMQPStreamConnection $connection)
    {
        $channel = $connection->channel();
        // 初始化队列
        $this->initQueue($channel);
        // 初始化交换机 new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT] 可以修改为其他模式
        $channel->exchange_declare($this->exchangeName, 'x-delayed-message', false, true, false, false, false, new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT]));
        // 将队列与某个交换机进行绑定,并使用路由关键字
        $channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);

        return $channel;
    }

    /**
     * 发布延迟消息
     *
     * @param  array  $messageData 要批量插入的消息数据
     * @param  string  $dateKey    计算延时的key值
     * @param  int  $delayTime     延迟时间(分钟)
     *
     * @throws Exception
     */
    public function pushDelayMessage(array $messageData, int $delayTime = 30, string $dateKey = 'created_at')
    {
        $connection = $this->initConnection();
        $channel = $this->initDelayChannel($connection);
        foreach ($messageData as $message) {
            if(isset($message[$dateKey])){
                //TODO 单个代报、转班需要补款的订单,待支付订单统一为24小时支付时长,用户自主下单30分钟倒计时规则不变
                if ((isset($message['orders_mode']) && in_array($message['orders_mode'], [OrdersRepo::ORDER_MODE_PROXY_SINGLE, OrdersRepo::ORDER_MODE_TRANSFER]))) {
                    $delayTime = OrdersRepo::ORDER_PROXY_SINGLE_OR_TRANSFER_CYCLE_TIME;
                }
                $delay = strtotime($message[$dateKey]) + $delayTime * 60 - time();
                $delay = max($delay, 0);
            }else{
                $delay = 0;
            }
            if (is_array($message)) {
                $message = json_encode($message);
            }
            $data = new AMQPMessage($message, [
                'delivery_mode'       => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'application_headers' => new AMQPTable(['x-delay' => $delay * 1000]),
            ]);
            $channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        }
        $channel->close();
        $connection->close();
    }

    /**
     * 消费延迟消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeDelayMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {
            /**
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive('x-delayed-message');
            $this->consume($channel,$functionName);
        } catch (Throwable $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
            Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     * 初始化频道信息
     *
     * @param  AMQPStreamConnection  $connection
     * @param  string                $opType  publish:发布,consume:消费
     *
     * @return mixed
     */
    private function initFanoutChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed
    {
        $channel = $connection->channel();
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::FANOUT, false, true, false);

        if($opType == 'consume'){
            // 初始化队列
            $this->initQueue($channel);

            // 将队列与某个交换机进行绑定,并使用路由关键字
            $channel->queue_bind($this->queueName, $this->exchangeName);
        }


        return $channel;
    }

    /**
     * 发布广播消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushFanoutMessage($message)
    {
        $connection = $this->initConnection();
        $channel = $this->initFanoutChannel($connection);
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        $channel->basic_publish($data, $this->exchangeName);

        $channel->close();
        $connection->close();
    }

    /**
     * 消费广播消息消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeFanoutMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {

            /**
             * 开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive(AMQPExchangeType::FANOUT);
            $this->consume($channel,$functionName);
        } catch (Throwable $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
            Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     * 消费
     *
     * @param  AMQPChannel  $channel
     * @param  mixed       $functionName
     */
    public function consume(AMQPChannel $channel,mixed $functionName)
    {
        $consumerTag = 'consumer' . getmypid();
        //$channel->basic_qos(null,1,null); //无论有多少队列,每次只能消费1条数据
        //no_ask = true 自动应答,false 手动应答
        $channel->basic_consume($this->queueName, $consumerTag, true, $this->ask, false, false, $functionName);
        //消息是从服务器异步发送到客户端,当回调时我们的代码需要阻塞接收消息的channel,每当我们接收到一条消息,我们的回调函数将会被传递接收到消息
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $this->connection->close();
    }

    /**
     * channel 保活
     *
     * @param  string  $mqType
     *
     * @return null
     */
    public function keeplive($mqType = AMQPExchangeType::DIRECT)
    {
        //开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连
        $needReconnect = isset($this->configs['heartbeat'])
            && $this->configs['heartbeat'] > 0
            && time() - $this->lastHeartbeatAt > $this->configs['heartbeat'] - 1;

        if ($needReconnect || !$this->connection) {
            $this->connection = $this->initConnection();
            switch ($mqType){
                case AMQPExchangeType::DIRECT:
                    $channelType = 'initDirectChannel';
                    break;
                case AMQPExchangeType::FANOUT:
                    $channelType = 'initFanoutChannel';
                    break;
                case AMQPExchangeType::TOPIC:
                    $channelType = 'initTopicChannel';
                    break;
                case 'x-delayed-message':
                    $channelType = 'initDelayChannel';
                    break;
            }
            $this->channel = $this->$channelType($this->connection,'consume');
            //记录心跳时间戳
            $this->lastHeartbeatAt = time();
        }

        return $this->channel;
    }

    /**
     * 设置MQ参数
     *
     * @param $name
     * @param $value
     */
    public function __set($name,$value)
    {
        $this->args[$name] = $value;
    }

    /**
     * 获取MQ参数
     *
     * @param $name
     *
     * @return false|mixed
     */
    public function __get($name)
    {
        return $this->args[$name] ?? false;
    }
}

使用方式:
1、config 已配置mq信息
2、引入类库
3、$mq = new RabbitmqUtil(self::DELETE_EXCHANG,self::DELETE_QUEUE,'');

版权属于:三分快乐,七纷幸福
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
0
查看目录

目录

来自 《推荐一个php rabbitmq类库 》
评论

三分快乐,七纷幸福
99 文章数
7 评论量
11 分类数
102 页面数
已在风雨中度过 2年199天23小时46分