<?php
namespace AppUtils;
use AppJobsFeiShuSendJob;
use AppRepositoriesOrderOrdersRepo;
use Exception;
use IlluminateSupportFacadesLog;
use PhpAmqpLibChannelAMQPChannel;
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibExchangeAMQPExchangeType;
use PhpAmqpLibWireAMQPTable;
/*
@method: queue_declare
name: $queue // should be unique in fanout exchange. Let RabbitMQ create a queue name for us
passive: false // don't check if a queue with the same name exists
durable: false // the queue will not survive server restarts 是否持久化队列
exclusive: true // the queue can not be accessed by other channels 是否是独占队列,不允许并发消费
auto_delete: true // the queue will be deleted once the channel is closed.
*/
/**
* fanout derict topic 延迟队列
*/
class RabbitmqUtil
{
//配置数组
public array $configs = [];
//交换机名称
public string $exchangeName = '';
//队列名称
public string $queueName = '';
//路由名称
public string $routeKey = '';
//最后一次心跳时间
public $lastHeartbeatAt = 0;
//MQ链接
public $connection = null;
//MQ CHANNEL
public $channel = null;
//mq参数数组
public $args = [
'ask' => false
];
/**
* 初始化配置文件
*
* @param string $exchangeName
*
* @param string $queueName
@param string $routeKey 支持普通路由和正侧路由键(user. user.#等)
* @param array $configs
* @throws Exception
*/
public function __construct(string $exchangeName,string $queueName = '', string $routeKey = '',array $configs=[])
{
if(!$configs){
$configs = config('queue.connections.rabbitmq');
}
$this->setConfigs($configs);
$this->exchangeName = $exchangeName;
$this->queueName = $queueName;
$this->routeKey = $routeKey;
}
/**
* 设置配置文件
*
* @param array $configs
*
* @throws Exception
*/
private function setConfigs(array $configs)
{
if (!($configs['host'] && $configs['port'] && $configs['user'] && $configs['password'])) {
throw new Exception('configs is empty');
}
if (empty($configs['vhost'])) {
$configs['vhost'] = '/';
}
$this->configs = $configs;
}
/**
* 初始化MQ
*
* @return AMQPStreamConnection
*/
public function initConnection(): AMQPStreamConnection
{
return new AMQPStreamConnection(
$this->configs['host'],
$this->configs['port'],
$this->configs['user'],
$this->configs['password'],
$this->configs['vhost'],
false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0,
null, false, $this->configs['heartbeat']);
}
/**
* 初始化队列
*
* @param AMQPChannel $channel
*
* @return AMQPChannel
*/
private function initQueue(AMQPChannel &$channel)
{
// 初始化队列
if(!$this->queueName){
//如果队列不存在,则自动生成一个队列
list($this->queueName, ,) = $channel->queue_declare($this->queueName, false, true, false, false);
}else{
$channel->queue_declare($this->queueName, false, true, false, false);
}
return $channel;
}
/**
* 初始化频道信息
*
* @param AMQPStreamConnection $connection
*
* @return mixed
*/
private function initTopicChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed
{
$channel = $connection->channel();
// 初始化交换机
$channel->exchange_declare($this->exchangeName, AMQPExchangeType::TOPIC, false, true, false);
if($opType == 'consume'){
// 初始化队列
$this->initQueue($channel);
// 将队列与某个交换机进行绑定,并使用路由关键字
$channel->queue_bind($this->queueName, $this->exchangeName);
}
return $channel;
}
/**
* 发布Topic消息
*
* @param $message
*
* @throws Exception
*/
public function pushTopicMessage($message)
{
$connection = $this->initConnection();
$channel = $this->initTopicChannel($connection);
if (is_array($message)) {
$message = json_encode($message);
}
$data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($data, $this->exchangeName, $this->routeKey);
$channel->close();
$connection->close();
}
/**
* 消费Topic消息
*
* @param array|string $functionName
* @param bool $ask
*
* @return false|void
*/
public function consumeTopicMessage(array|string $functionName)
{
if (!$functionName || !$this->queueName) {
return false;
}
try {
/**
* @var AMQPChannel $channel
*/
$channel = $this->keeplive(AMQPExchangeType::DIRECT);
$this->consume($channel,$functionName);
} catch (Exception $e) {
FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
Log::error('Rabbitmq', [$e->getMessage()]);
}
}
/**
* 初始化频道信息
*
* @param $connection
*
* @return AMQPChannel
*/
public function initDirectChannel(AMQPStreamConnection $connection): mixed
{
$channel = $connection->channel();
// 初始化队列
$this->initQueue($channel);
// 初始化交换机
$channel->exchange_declare($this->exchangeName, AMQPExchangeType::DIRECT, false, true, false);
// 将队列与某个交换机进行绑定,并使用路由关键字
$channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);
return $channel;
}
/**
* 发布直连消息
*
* @param $message
*
* @throws Exception
*/
public function pushDirectMessage($message,$content_type='')
{
if (is_array($message)) {
$message = json_encode($message);
}
$properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
if($content_type){
$properties['content_type']=$content_type;
}
$data = new AMQPMessage($message, $properties);
$this->keeplive();
$this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
$this->channel->close();
$this->connection->close();
}
public function pushDirectMessageArr(array $message,$content_type='')
{
if (is_array($message)) {
$message = json_encode($message);
}
$properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
if($content_type){
$properties['content_type']=$content_type;
}
foreach ($message as $info){
$data = new AMQPMessage($info, $properties);
$this->keeplive();
$this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
}
$this->channel->close();
$this->connection->close();
}
/**
* 发布直连消息 不关闭连接
*
* @param $message
*
* @throws Exception
*/
public function pushDirectMessageNoClose($message,$content_type='')
{
if (is_array($message)) {
$message = json_encode($message);
}
$properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
if($content_type){
$properties['content_type']=$content_type;
}
$data = new AMQPMessage($message, $properties);
$this->keeplive();
$this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
}
/**
* 消费直连消息
*
* @param array|string $functionName
* @param bool $ask
*
* @return false|void
*/
public function consumeDirectMessage(array|string $functionName)
{
if (!$functionName || !$this->queueName) {
return false;
}
/**
* @var AMQPChannel $channel
*/
$channel = $this->keeplive(AMQPExchangeType::DIRECT);
//公平调度
$channel->basic_qos(null, 1, null);
$this->consume($channel,$functionName);
}
/**
* 初始化延迟channel信息
*
* @param AMQPStreamConnection $connection
*
* @return PhpAmqpLibChannelAbstractChannel|AMQPChannel
*/
private function initDelayChannel(AMQPStreamConnection $connection)
{
$channel = $connection->channel();
// 初始化队列
$this->initQueue($channel);
// 初始化交换机 new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT] 可以修改为其他模式
$channel->exchange_declare($this->exchangeName, 'x-delayed-message', false, true, false, false, false, new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT]));
// 将队列与某个交换机进行绑定,并使用路由关键字
$channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);
return $channel;
}
/**
* 发布延迟消息
*
* @param array $messageData 要批量插入的消息数据
* @param string $dateKey 计算延时的key值
* @param int $delayTime 延迟时间(分钟)
*
* @throws Exception
*/
public function pushDelayMessage(array $messageData, int $delayTime = 30, string $dateKey = 'created_at')
{
$connection = $this->initConnection();
$channel = $this->initDelayChannel($connection);
foreach ($messageData as $message) {
if(isset($message[$dateKey])){
//TODO 单个代报、转班需要补款的订单,待支付订单统一为24小时支付时长,用户自主下单30分钟倒计时规则不变
if ((isset($message['orders_mode']) && in_array($message['orders_mode'], [OrdersRepo::ORDER_MODE_PROXY_SINGLE, OrdersRepo::ORDER_MODE_TRANSFER]))) {
$delayTime = OrdersRepo::ORDER_PROXY_SINGLE_OR_TRANSFER_CYCLE_TIME;
}
$delay = strtotime($message[$dateKey]) + $delayTime * 60 - time();
$delay = max($delay, 0);
}else{
$delay = 0;
}
if (is_array($message)) {
$message = json_encode($message);
}
$data = new AMQPMessage($message, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable(['x-delay' => $delay * 1000]),
]);
$channel->basic_publish($data, $this->exchangeName, $this->routeKey);
}
$channel->close();
$connection->close();
}
/**
* 消费延迟消息
*
* @param array|string $functionName
* @param bool $ask
*
* @return false|void
*/
public function consumeDelayMessage(array|string $functionName)
{
if (!$functionName || !$this->queueName) {
return false;
}
try {
/**
* @var AMQPChannel $channel
*/
$channel = $this->keeplive('x-delayed-message');
$this->consume($channel,$functionName);
} catch (Throwable $e) {
FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
Log::error('Rabbitmq', [$e->getMessage()]);
}
}
/**
* 初始化频道信息
*
* @param AMQPStreamConnection $connection
* @param string $opType publish:发布,consume:消费
*
* @return mixed
*/
private function initFanoutChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed
{
$channel = $connection->channel();
// 初始化交换机
$channel->exchange_declare($this->exchangeName, AMQPExchangeType::FANOUT, false, true, false);
if($opType == 'consume'){
// 初始化队列
$this->initQueue($channel);
// 将队列与某个交换机进行绑定,并使用路由关键字
$channel->queue_bind($this->queueName, $this->exchangeName);
}
return $channel;
}
/**
* 发布广播消息
*
* @param $message
*
* @throws Exception
*/
public function pushFanoutMessage($message)
{
$connection = $this->initConnection();
$channel = $this->initFanoutChannel($connection);
if (is_array($message)) {
$message = json_encode($message);
}
$data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($data, $this->exchangeName);
$channel->close();
$connection->close();
}
/**
* 消费广播消息消息
*
* @param array|string $functionName
* @param bool $ask
*
* @return false|void
*/
public function consumeFanoutMessage(array|string $functionName)
{
if (!$functionName || !$this->queueName) {
return false;
}
try {
/**
* 开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连
* @var AMQPChannel $channel
*/
$channel = $this->keeplive(AMQPExchangeType::FANOUT);
$this->consume($channel,$functionName);
} catch (Throwable $e) {
FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
Log::error('Rabbitmq', [$e->getMessage()]);
}
}
/**
* 消费
*
* @param AMQPChannel $channel
* @param mixed $functionName
*/
public function consume(AMQPChannel $channel,mixed $functionName)
{
$consumerTag = 'consumer' . getmypid();
//$channel->basic_qos(null,1,null); //无论有多少队列,每次只能消费1条数据
//no_ask = true 自动应答,false 手动应答
$channel->basic_consume($this->queueName, $consumerTag, true, $this->ask, false, false, $functionName);
//消息是从服务器异步发送到客户端,当回调时我们的代码需要阻塞接收消息的channel,每当我们接收到一条消息,我们的回调函数将会被传递接收到消息
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$this->connection->close();
}
/**
* channel 保活
*
* @param string $mqType
*
* @return null
*/
public function keeplive($mqType = AMQPExchangeType::DIRECT)
{
//开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连
$needReconnect = isset($this->configs['heartbeat'])
&& $this->configs['heartbeat'] > 0
&& time() - $this->lastHeartbeatAt > $this->configs['heartbeat'] - 1;
if ($needReconnect || !$this->connection) {
$this->connection = $this->initConnection();
switch ($mqType){
case AMQPExchangeType::DIRECT:
$channelType = 'initDirectChannel';
break;
case AMQPExchangeType::FANOUT:
$channelType = 'initFanoutChannel';
break;
case AMQPExchangeType::TOPIC:
$channelType = 'initTopicChannel';
break;
case 'x-delayed-message':
$channelType = 'initDelayChannel';
break;
}
$this->channel = $this->$channelType($this->connection,'consume');
//记录心跳时间戳
$this->lastHeartbeatAt = time();
}
return $this->channel;
}
/**
* 设置MQ参数
*
* @param $name
* @param $value
*/
public function __set($name,$value)
{
$this->args[$name] = $value;
}
/**
* 获取MQ参数
*
* @param $name
*
* @return false|mixed
*/
public function __get($name)
{
return $this->args[$name] ?? false;
}
}
使用方式:
1、config 已配置mq信息
2、引入类库
3、$mq = new RabbitmqUtil(self::DELETE_EXCHANG,self::DELETE_QUEUE,'');