<?php namespace AppUtils; use AppJobsFeiShuSendJob; use AppRepositoriesOrderOrdersRepo; use Exception; use IlluminateSupportFacadesLog; use PhpAmqpLibChannelAMQPChannel; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; use PhpAmqpLibExchangeAMQPExchangeType; use PhpAmqpLibWireAMQPTable; /* @method: queue_declare name: $queue // should be unique in fanout exchange. Let RabbitMQ create a queue name for us passive: false // don't check if a queue with the same name exists durable: false // the queue will not survive server restarts 是否持久化队列 exclusive: true // the queue can not be accessed by other channels 是否是独占队列,不允许并发消费 auto_delete: true // the queue will be deleted once the channel is closed. */ /** * fanout derict topic 延迟队列 */ class RabbitmqUtil { //配置数组 public array $configs = []; //交换机名称 public string $exchangeName = ''; //队列名称 public string $queueName = ''; //路由名称 public string $routeKey = ''; //最后一次心跳时间 public $lastHeartbeatAt = 0; //MQ链接 public $connection = null; //MQ CHANNEL public $channel = null; //mq参数数组 public $args = [ 'ask' => false ]; /** * 初始化配置文件 * * @param string $exchangeName * * @param string $queueName @param string $routeKey 支持普通路由和正侧路由键(user. user.#等) * @param array $configs * @throws Exception */ public function __construct(string $exchangeName,string $queueName = '', string $routeKey = '',array $configs=[]) { if(!$configs){ $configs = config('queue.connections.rabbitmq'); } $this->setConfigs($configs); $this->exchangeName = $exchangeName; $this->queueName = $queueName; $this->routeKey = $routeKey; } /** * 设置配置文件 * * @param array $configs * * @throws Exception */ private function setConfigs(array $configs) { if (!($configs['host'] && $configs['port'] && $configs['user'] && $configs['password'])) { throw new Exception('configs is empty'); } if (empty($configs['vhost'])) { $configs['vhost'] = '/'; } $this->configs = $configs; } /** * 初始化MQ * * @return AMQPStreamConnection */ public function initConnection(): AMQPStreamConnection { return new AMQPStreamConnection( $this->configs['host'], $this->configs['port'], $this->configs['user'], $this->configs['password'], $this->configs['vhost'], false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0, null, false, $this->configs['heartbeat']); } /** * 初始化队列 * * @param AMQPChannel $channel * * @return AMQPChannel */ private function initQueue(AMQPChannel &$channel) { // 初始化队列 if(!$this->queueName){ //如果队列不存在,则自动生成一个队列 list($this->queueName, ,) = $channel->queue_declare($this->queueName, false, true, false, false); }else{ $channel->queue_declare($this->queueName, false, true, false, false); } return $channel; } /** * 初始化频道信息 * * @param AMQPStreamConnection $connection * * @return mixed */ private function initTopicChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed { $channel = $connection->channel(); // 初始化交换机 $channel->exchange_declare($this->exchangeName, AMQPExchangeType::TOPIC, false, true, false); if($opType == 'consume'){ // 初始化队列 $this->initQueue($channel); // 将队列与某个交换机进行绑定,并使用路由关键字 $channel->queue_bind($this->queueName, $this->exchangeName); } return $channel; } /** * 发布Topic消息 * * @param $message * * @throws Exception */ public function pushTopicMessage($message) { $connection = $this->initConnection(); $channel = $this->initTopicChannel($connection); if (is_array($message)) { $message = json_encode($message); } $data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $channel->basic_publish($data, $this->exchangeName, $this->routeKey); $channel->close(); $connection->close(); } /** * 消费Topic消息 * * @param array|string $functionName * @param bool $ask * * @return false|void */ public function consumeTopicMessage(array|string $functionName) { if (!$functionName || !$this->queueName) { return false; } try { /** * @var AMQPChannel $channel */ $channel = $this->keeplive(AMQPExchangeType::DIRECT); $this->consume($channel,$functionName); } catch (Exception $e) { FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env')); Log::error('Rabbitmq', [$e->getMessage()]); } } /** * 初始化频道信息 * * @param $connection * * @return AMQPChannel */ public function initDirectChannel(AMQPStreamConnection $connection): mixed { $channel = $connection->channel(); // 初始化队列 $this->initQueue($channel); // 初始化交换机 $channel->exchange_declare($this->exchangeName, AMQPExchangeType::DIRECT, false, true, false); // 将队列与某个交换机进行绑定,并使用路由关键字 $channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey); return $channel; } /** * 发布直连消息 * * @param $message * * @throws Exception */ public function pushDirectMessage($message,$content_type='') { if (is_array($message)) { $message = json_encode($message); } $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]; if($content_type){ $properties['content_type']=$content_type; } $data = new AMQPMessage($message, $properties); $this->keeplive(); $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey); $this->channel->close(); $this->connection->close(); } public function pushDirectMessageArr(array $message,$content_type='') { if (is_array($message)) { $message = json_encode($message); } $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]; if($content_type){ $properties['content_type']=$content_type; } foreach ($message as $info){ $data = new AMQPMessage($info, $properties); $this->keeplive(); $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey); } $this->channel->close(); $this->connection->close(); } /** * 发布直连消息 不关闭连接 * * @param $message * * @throws Exception */ public function pushDirectMessageNoClose($message,$content_type='') { if (is_array($message)) { $message = json_encode($message); } $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]; if($content_type){ $properties['content_type']=$content_type; } $data = new AMQPMessage($message, $properties); $this->keeplive(); $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey); } /** * 消费直连消息 * * @param array|string $functionName * @param bool $ask * * @return false|void */ public function consumeDirectMessage(array|string $functionName) { if (!$functionName || !$this->queueName) { return false; } /** * @var AMQPChannel $channel */ $channel = $this->keeplive(AMQPExchangeType::DIRECT); //公平调度 $channel->basic_qos(null, 1, null); $this->consume($channel,$functionName); } /** * 初始化延迟channel信息 * * @param AMQPStreamConnection $connection * * @return PhpAmqpLibChannelAbstractChannel|AMQPChannel */ private function initDelayChannel(AMQPStreamConnection $connection) { $channel = $connection->channel(); // 初始化队列 $this->initQueue($channel); // 初始化交换机 new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT] 可以修改为其他模式 $channel->exchange_declare($this->exchangeName, 'x-delayed-message', false, true, false, false, false, new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT])); // 将队列与某个交换机进行绑定,并使用路由关键字 $channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey); return $channel; } /** * 发布延迟消息 * * @param array $messageData 要批量插入的消息数据 * @param string $dateKey 计算延时的key值 * @param int $delayTime 延迟时间(分钟) * * @throws Exception */ public function pushDelayMessage(array $messageData, int $delayTime = 30, string $dateKey = 'created_at') { $connection = $this->initConnection(); $channel = $this->initDelayChannel($connection); foreach ($messageData as $message) { if(isset($message[$dateKey])){ //TODO 单个代报、转班需要补款的订单,待支付订单统一为24小时支付时长,用户自主下单30分钟倒计时规则不变 if ((isset($message['orders_mode']) && in_array($message['orders_mode'], [OrdersRepo::ORDER_MODE_PROXY_SINGLE, OrdersRepo::ORDER_MODE_TRANSFER]))) { $delayTime = OrdersRepo::ORDER_PROXY_SINGLE_OR_TRANSFER_CYCLE_TIME; } $delay = strtotime($message[$dateKey]) + $delayTime * 60 - time(); $delay = max($delay, 0); }else{ $delay = 0; } if (is_array($message)) { $message = json_encode($message); } $data = new AMQPMessage($message, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['x-delay' => $delay * 1000]), ]); $channel->basic_publish($data, $this->exchangeName, $this->routeKey); } $channel->close(); $connection->close(); } /** * 消费延迟消息 * * @param array|string $functionName * @param bool $ask * * @return false|void */ public function consumeDelayMessage(array|string $functionName) { if (!$functionName || !$this->queueName) { return false; } try { /** * @var AMQPChannel $channel */ $channel = $this->keeplive('x-delayed-message'); $this->consume($channel,$functionName); } catch (Throwable $e) { FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env')); Log::error('Rabbitmq', [$e->getMessage()]); } } /** * 初始化频道信息 * * @param AMQPStreamConnection $connection * @param string $opType publish:发布,consume:消费 * * @return mixed */ private function initFanoutChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed { $channel = $connection->channel(); // 初始化交换机 $channel->exchange_declare($this->exchangeName, AMQPExchangeType::FANOUT, false, true, false); if($opType == 'consume'){ // 初始化队列 $this->initQueue($channel); // 将队列与某个交换机进行绑定,并使用路由关键字 $channel->queue_bind($this->queueName, $this->exchangeName); } return $channel; } /** * 发布广播消息 * * @param $message * * @throws Exception */ public function pushFanoutMessage($message) { $connection = $this->initConnection(); $channel = $this->initFanoutChannel($connection); if (is_array($message)) { $message = json_encode($message); } $data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $channel->basic_publish($data, $this->exchangeName); $channel->close(); $connection->close(); } /** * 消费广播消息消息 * * @param array|string $functionName * @param bool $ask * * @return false|void */ public function consumeFanoutMessage(array|string $functionName) { if (!$functionName || !$this->queueName) { return false; } try { /** * 开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连 * @var AMQPChannel $channel */ $channel = $this->keeplive(AMQPExchangeType::FANOUT); $this->consume($channel,$functionName); } catch (Throwable $e) { FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env')); Log::error('Rabbitmq', [$e->getMessage()]); } } /** * 消费 * * @param AMQPChannel $channel * @param mixed $functionName */ public function consume(AMQPChannel $channel,mixed $functionName) { $consumerTag = 'consumer' . getmypid(); //$channel->basic_qos(null,1,null); //无论有多少队列,每次只能消费1条数据 //no_ask = true 自动应答,false 手动应答 $channel->basic_consume($this->queueName, $consumerTag, true, $this->ask, false, false, $functionName); //消息是从服务器异步发送到客户端,当回调时我们的代码需要阻塞接收消息的channel,每当我们接收到一条消息,我们的回调函数将会被传递接收到消息 while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $this->connection->close(); } /** * channel 保活 * * @param string $mqType * * @return null */ public function keeplive($mqType = AMQPExchangeType::DIRECT) { //开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连 $needReconnect = isset($this->configs['heartbeat']) && $this->configs['heartbeat'] > 0 && time() - $this->lastHeartbeatAt > $this->configs['heartbeat'] - 1; if ($needReconnect || !$this->connection) { $this->connection = $this->initConnection(); switch ($mqType){ case AMQPExchangeType::DIRECT: $channelType = 'initDirectChannel'; break; case AMQPExchangeType::FANOUT: $channelType = 'initFanoutChannel'; break; case AMQPExchangeType::TOPIC: $channelType = 'initTopicChannel'; break; case 'x-delayed-message': $channelType = 'initDelayChannel'; break; } $this->channel = $this->$channelType($this->connection,'consume'); //记录心跳时间戳 $this->lastHeartbeatAt = time(); } return $this->channel; } /** * 设置MQ参数 * * @param $name * @param $value */ public function __set($name,$value) { $this->args[$name] = $value; } /** * 获取MQ参数 * * @param $name * * @return false|mixed */ public function __get($name) { return $this->args[$name] ?? false; } }
使用方式:
1、config 已配置mq信息
2、引入类库
3、$mq = new RabbitmqUtil(self::DELETE_EXCHANG,self::DELETE_QUEUE,'');