分类 php 下的文章

 false
    ];


    /**
     * 初始化配置文件
     *
     * @param string $exchangeName
     *
     * @param string $queueName
     * @param string $routeKey 支持普通路由和正侧路由键(user.*  user.#等)
     * @param array $configs
     * @throws Exception
     */
    public function __construct(string $exchangeName,string $queueName = '', string $routeKey = '',array $configs=[])
    {
        if(!$configs){
            $configs = config('queue.connections.rabbitmq');
        }
        $this->setConfigs($configs);
        $this->exchangeName = $exchangeName;
        $this->queueName = $queueName;
        $this->routeKey = $routeKey;
    }


    /**
     * 设置配置文件
     *
     * @param  array  $configs
     *
     * @throws Exception
     */
    private function setConfigs(array $configs)
    {
        if (!($configs['host'] && $configs['port'] && $configs['user'] && $configs['password'])) {
            throw new Exception('configs is empty');
        }
        if (empty($configs['vhost'])) {
            $configs['vhost'] = '/';
        }
        $this->configs = $configs;
    }


    /**
     * 初始化MQ
     *
     * @return AMQPStreamConnection
     */
    public function initConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $this->configs['host'],
            $this->configs['port'],
            $this->configs['user'],
            $this->configs['password'],
            $this->configs['vhost'],
            false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0,
            null, false, $this->configs['heartbeat']);
    }

    /**
     * 初始化队列
     *
     * @param  AMQPChannel  $channel
     *
     * @return AMQPChannel
     */
    private function initQueue(AMQPChannel &$channel)
    {
        // 初始化队列
        if(!$this->queueName){
            //如果队列不存在,则自动生成一个队列
            list($this->queueName, ,) = $channel->queue_declare($this->queueName, false, true, false, false);
        }else{
            $channel->queue_declare($this->queueName, false, true, false, false);
        }

        return $channel;
    }
    /**
     * 初始化频道信息
     *
     * @param  AMQPStreamConnection  $connection
     *
     * @return mixed
     */
    private function initTopicChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed
    {
        $channel = $connection->channel();
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::TOPIC, false, true, false);
        if($opType == 'consume'){
            // 初始化队列
            $this->initQueue($channel);
            // 将队列与某个交换机进行绑定,并使用路由关键字
            $channel->queue_bind($this->queueName, $this->exchangeName);
        }

        return $channel;
    }

    /**
     * 发布Topic消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushTopicMessage($message)
    {
        $connection = $this->initConnection();
        $channel = $this->initTopicChannel($connection);
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        $channel->basic_publish($data, $this->exchangeName, $this->routeKey);

        $channel->close();
        $connection->close();
    }

    /**
     * 消费Topic消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeTopicMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {
            /**
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive(AMQPExchangeType::DIRECT);
            $this->consume($channel,$functionName);
        } catch (\Exception $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));

            \Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     *  初始化频道信息
     *
     * @param $connection
     *
     * @return AMQPChannel
     */
    public function initDirectChannel(AMQPStreamConnection $connection): mixed
    {
        $channel = $connection->channel();
        // 初始化队列
        $this->initQueue($channel);
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::DIRECT, false, true, false);
        // 将队列与某个交换机进行绑定,并使用路由关键字
        $channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);

        return $channel;
    }

    /**
     * 发布直连消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushDirectMessage($message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        $data = new AMQPMessage($message, $properties);
        $this->keeplive();
        $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        $this->channel->close();
        $this->connection->close();
    }

    public function pushDirectMessageArr(array $message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        foreach ($message as $info){
            $data = new AMQPMessage($info, $properties);
            $this->keeplive();
            $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        }

        $this->channel->close();
        $this->connection->close();
    }

    /**
     * 发布直连消息 不关闭连接
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushDirectMessageNoClose($message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        $data = new AMQPMessage($message, $properties);
        $this->keeplive();
        $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
    }

    /**
     * 消费直连消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public  function consumeDirectMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        /**
         * @var AMQPChannel $channel
         */
        $channel = $this->keeplive(AMQPExchangeType::DIRECT);
        //公平调度
        $channel->basic_qos(null, 1, null);
        $this->consume($channel,$functionName);
    }

    /**
     * 初始化延迟channel信息
     *
     * @param  AMQPStreamConnection  $connection
     *
     * @return \PhpAmqpLib\Channel\AbstractChannel|AMQPChannel
     */
    private function initDelayChannel(AMQPStreamConnection $connection)
    {
        $channel = $connection->channel();
        // 初始化队列
        $this->initQueue($channel);
        // 初始化交换机 new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT] 可以修改为其他模式
        $channel->exchange_declare($this->exchangeName, 'x-delayed-message', false, true, false, false, false, new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT]));
        // 将队列与某个交换机进行绑定,并使用路由关键字
        $channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);

        return $channel;
    }

    /**
     * 发布延迟消息
     *
     * @param  array  $messageData 要批量插入的消息数据
     * @param  string  $dateKey    计算延时的key值
     * @param  int  $delayTime     延迟时间(分钟)
     *
     * @throws Exception
     */
    public function pushDelayMessage(array $messageData, int $delayTime = 30, string $dateKey = 'created_at')
    {
        $connection = $this->initConnection();
        $channel = $this->initDelayChannel($connection);
        foreach ($messageData as $message) {
            if(isset($message[$dateKey])){
                //TODO 单个代报、转班需要补款的订单,待支付订单统一为24小时支付时长,用户自主下单30分钟倒计时规则不变
                if ((isset($message['orders_mode']) && in_array($message['orders_mode'], [OrdersRepo::ORDER_MODE_PROXY_SINGLE, OrdersRepo::ORDER_MODE_TRANSFER]))) {
                    $delayTime = OrdersRepo::ORDER_PROXY_SINGLE_OR_TRANSFER_CYCLE_TIME;
                }
                $delay = strtotime($message[$dateKey]) + $delayTime * 60 - time();
                $delay = max($delay, 0);
            }else{
                $delay = 0;
            }
            if (is_array($message)) {
                $message = json_encode($message);
            }
            $data = new AMQPMessage($message, [
                'delivery_mode'       => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'application_headers' => new AMQPTable(['x-delay' => $delay * 1000]),
            ]);
            $channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        }
        $channel->close();
        $connection->close();
    }

    /**
     * 消费延迟消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeDelayMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {
            /**
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive('x-delayed-message');
            $this->consume($channel,$functionName);
        } catch (\Throwable $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
            \Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     * 初始化频道信息
     *
     * @param  AMQPStreamConnection  $connection
     * @param  string                $opType  publish:发布,consume:消费
     *
     * @return mixed
     */
    private function initFanoutChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed
    {
        $channel = $connection->channel();
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::FANOUT, false, true, false);

        if($opType == 'consume'){
            // 初始化队列
            $this->initQueue($channel);

            // 将队列与某个交换机进行绑定,并使用路由关键字
            $channel->queue_bind($this->queueName, $this->exchangeName);
        }


        return $channel;
    }

    /**
     * 发布广播消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushFanoutMessage($message)
    {
        $connection = $this->initConnection();
        $channel = $this->initFanoutChannel($connection);
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        $channel->basic_publish($data, $this->exchangeName);

        $channel->close();
        $connection->close();
    }

    /**
     * 消费广播消息消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeFanoutMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {

            /**
             * 开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive(AMQPExchangeType::FANOUT);
            $this->consume($channel,$functionName);
        } catch (\Throwable $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
            \Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     * 消费
     *
     * @param  AMQPChannel  $channel
     * @param  mixed       $functionName
     */
    public function consume(AMQPChannel $channel,mixed $functionName)
    {
        $consumerTag = 'consumer' . getmypid();
        //$channel->basic_qos(null,1,null); //无论有多少队列,每次只能消费1条数据
        //no_ask = true 自动应答,false 手动应答
        $channel->basic_consume($this->queueName, $consumerTag, true, $this->ask, false, false, $functionName);
        //消息是从服务器异步发送到客户端,当回调时我们的代码需要阻塞接收消息的channel,每当我们接收到一条消息,我们的回调函数将会被传递接收到消息
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $this->connection->close();
    }

    /**
     * channel 保活
     *
     * @param  string  $mqType
     *
     * @return null
     */
    public function keeplive($mqType = AMQPExchangeType::DIRECT)
    {
        //开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连
        $needReconnect = isset($this->configs['heartbeat'])
            && $this->configs['heartbeat'] > 0
            && time() - $this->lastHeartbeatAt > $this->configs['heartbeat'] - 1;

        if ($needReconnect || !$this->connection) {
            $this->connection = $this->initConnection();
            switch ($mqType){
                case AMQPExchangeType::DIRECT:
                    $channelType = 'initDirectChannel';
                    break;
                case AMQPExchangeType::FANOUT:
                    $channelType = 'initFanoutChannel';
                    break;
                case AMQPExchangeType::TOPIC:
                    $channelType = 'initTopicChannel';
                    break;
                case 'x-delayed-message':
                    $channelType = 'initDelayChannel';
                    break;
            }
            $this->channel = $this->$channelType($this->connection,'consume');
            //记录心跳时间戳
            $this->lastHeartbeatAt = time();
        }

        return $this->channel;
    }

    /**
     * 设置MQ参数
     *
     * @param $name
     * @param $value
     */
    public function __set($name,$value)
    {
        $this->args[$name] = $value;
    }

    /**
     * 获取MQ参数
     *
     * @param $name
     *
     * @return false|mixed
     */
    public function __get($name)
    {
        return $this->args[$name] ?? false;
    }
}

使用方式:
1、config 已配置mq信息
2、引入类库
3、$mq = new RabbitmqUtil(self::DELETE_EXCHANG,self::DELETE_QUEUE,'');

问题说明:今天再做附件下载时,因为附件名称为中文,在苹果手机safari浏览器下载文件中文名称直接乱码,如图:
WX20231219-151526@2x.png
解决方法:
php代码:

public fuction downloadFile($fileurl, $filename, $charset = 'UTF-8'){
   if (preg_match("/MSIE/", $_SERVER["HTTP_USER_AGENT"])) {
      $filename = urlencode($filename);
      $filename = str_replace("+", "%20", $filename);// 替换空格
      $attachment = "attachment; filename=\"{$filename}\"; charset={$charset}"; 
   } else if (preg_match("/Firefox/", $_SERVER["HTTP_USER_AGENT"])) {
      $attachment = 'attachment; filename*=utf-8\'\'' . $filename; 
   } else if (preg_match("/Safari/", $_SERVER["HTTP_USER_AGENT"])) {
      $filename = rawurlencode($filename); // 注意:rawurlencode与urlencode的区别
      $attachment = 'attachment; filename*=utf-8\'\'' . $filename;
   } else {
      $filename = rawurlencode($filename);
      $attachment = "attachment; filename=\"{$filename}\"; charset={$charset}";
   }
   header('Pragma: public');
   header("Content-Transfer-Encoding: Binary");
   header('Content-Type: application/octet-stream');
   header("Content-disposition:".$attachment."");
   readfile($fileurl);  
}
      

示例:当执行

$data = $res->toArray();
dd($data);
//数据库日期格式:2023-09-04 15:41:17
//toArray后返回:2023-09-04T15:41:17.000000Z

官方文档说明:
Laravel 7 将使用新的日期序列化格式。为了格式化日期以进行序列化,Laravel 将会使用 Carbon 的 toJSON 方法,该方法将生成与 ISO-8601 兼容的日期,包括时区信息及小数秒。此外,该更改提供了更好的支持,并与客户端日期解析库集成。

此前,日期将序列化为以下格式:2020-03-04 16:11:00 。使用新格式进行序列化的日期将显示为:2020-03-04T20:01:00.283041Z

解决方式:
1、在Model中添加

protected $casts = [
        'created_at' => 'datetime:Y-m-d H:i:s',
        'updated_at' => 'datetime:Y-m-d H:i:s',
    ];

2、重写模型的 serializeDate 方法

/**
 * 为数组 / JSON 序列化准备日期。
 *
 * @param  \DateTimeInterface  $date
 * @return string
 */
protected function serializeDate(DateTimeInterface $date)
{
    return $date->format($this->dateFormat ?: 'Y-m-d H:i:s');
}

问题描述:
因为业务需要使用rabbitmq进行数据通信,所以使用laravel搭建了服务,但是在部署到测试服务器后,服务器开始报错:

dev.ERROR: AMQP error while attempting pop: ACCESS_REFUSED - Login was refused using authentication mechanism AMQPLAIN. For details see the broker logfile.(0, 0)

看错误描述是rabbitmq拒绝普通用户登录,但是我账号是管理员权限,没有理由被拒绝,于是开始排查
排查过程:
1、因为推送消息通过守护进程方式执行的,首先去掉自己的服务脚本,然后执行supervisorctl reload,确认自己脚本未执行,此时查看日志,发现还有报错
2、因为是测试环境,然后停止supervisor,继续查看日志,发现日志没有报错,确定是某个脚本报错,然后逐一排查,最后确定是一个脚本也是用queue队列,于是查看代码,发现当前脚本并没有使用mq,但是为什么触发报错
3、继续查看自己的代码,突然想起在.env里面里面配置了rabbitmq相关配置,突然发现一段代码

QUEUE_DRIVER=rabbitmq

,查看queue.php,里面默认是redis,到此,基本确认问题,于是注释掉QUEUE_DRIVER,然后重新supervisor,日志再没有报错信息
总结:
因为代码是参考网上示例,直接复制的,没有注意到消息队列被改为rabbitmq,导致其他消息队列报错,在此记录一下。从网上查找的资源,需要在校验是否可用的同时,涉及到环境变量的内容一定要特别注意