三分快乐,七纷幸福 发布的文章

脚本相关:

sudo su //使用supervisor需在管理员权限下修改
cd /etc/supervisord.d/  //脚本存放路径

配置示例:
command=php /var/www/yunying/artisan pull:gufen_mock_user //启动该程序时将运行的命令
process_name=%(program_name)s_%(process_num)d  //一个Python字符串表达式,用于组成此进程的Supervisor进程名
autostart=true  //如果为true,则该程序将在supervisord启动时自动启动,默认值是:true
autorestart=true //指定程序在RUNNING状态下退出后是否自动重启
user=admin //指示supervisord 使用此用户帐户作为运行程序的帐户
numprocs=2 //Supervisor 将启动由 numprocs 指定的多个该程序的实例
numprocs_start=0 //一个整数偏移量,用于计算 process_num 开始的编号,默认值是:0,例如改为1,则process_name显示如:foo_01、foo_02,那么就是从1开始编号
redirect_stderr=true //如果为true,进程的标准错误重定向到标准输出,默认值是:false,相当于:2>&1。
stdout_logfile=/home/admin/yunying/laravel_gufen_mock_user%(process_num)08d.log //日志存放地址
loglevel=info //日志记录级别,分别有:trace, debug, info, warn, error, critical


服务相关

supervisorctl stop program_name 停止某个进程
supervisorctl start program_name 启动某个进程
supervisorctl restart program_name 重启某个进程
supervisorctl stop all 停止全部进程
supervisorctl reload 载入最新的配置文件,停止原有进程并按新的配置启动、管理所有进程
supervisorctl update 根据最新的配置文件,启动新配置或有改动的进程,配置没有改动的进程不会受影响而重启

 false
    ];


    /**
     * 初始化配置文件
     *
     * @param string $exchangeName
     *
     * @param string $queueName
     * @param string $routeKey 支持普通路由和正侧路由键(user.*  user.#等)
     * @param array $configs
     * @throws Exception
     */
    public function __construct(string $exchangeName,string $queueName = '', string $routeKey = '',array $configs=[])
    {
        if(!$configs){
            $configs = config('queue.connections.rabbitmq');
        }
        $this->setConfigs($configs);
        $this->exchangeName = $exchangeName;
        $this->queueName = $queueName;
        $this->routeKey = $routeKey;
    }


    /**
     * 设置配置文件
     *
     * @param  array  $configs
     *
     * @throws Exception
     */
    private function setConfigs(array $configs)
    {
        if (!($configs['host'] && $configs['port'] && $configs['user'] && $configs['password'])) {
            throw new Exception('configs is empty');
        }
        if (empty($configs['vhost'])) {
            $configs['vhost'] = '/';
        }
        $this->configs = $configs;
    }


    /**
     * 初始化MQ
     *
     * @return AMQPStreamConnection
     */
    public function initConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $this->configs['host'],
            $this->configs['port'],
            $this->configs['user'],
            $this->configs['password'],
            $this->configs['vhost'],
            false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0,
            null, false, $this->configs['heartbeat']);
    }

    /**
     * 初始化队列
     *
     * @param  AMQPChannel  $channel
     *
     * @return AMQPChannel
     */
    private function initQueue(AMQPChannel &$channel)
    {
        // 初始化队列
        if(!$this->queueName){
            //如果队列不存在,则自动生成一个队列
            list($this->queueName, ,) = $channel->queue_declare($this->queueName, false, true, false, false);
        }else{
            $channel->queue_declare($this->queueName, false, true, false, false);
        }

        return $channel;
    }
    /**
     * 初始化频道信息
     *
     * @param  AMQPStreamConnection  $connection
     *
     * @return mixed
     */
    private function initTopicChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed
    {
        $channel = $connection->channel();
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::TOPIC, false, true, false);
        if($opType == 'consume'){
            // 初始化队列
            $this->initQueue($channel);
            // 将队列与某个交换机进行绑定,并使用路由关键字
            $channel->queue_bind($this->queueName, $this->exchangeName);
        }

        return $channel;
    }

    /**
     * 发布Topic消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushTopicMessage($message)
    {
        $connection = $this->initConnection();
        $channel = $this->initTopicChannel($connection);
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        $channel->basic_publish($data, $this->exchangeName, $this->routeKey);

        $channel->close();
        $connection->close();
    }

    /**
     * 消费Topic消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeTopicMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {
            /**
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive(AMQPExchangeType::DIRECT);
            $this->consume($channel,$functionName);
        } catch (\Exception $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));

            \Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     *  初始化频道信息
     *
     * @param $connection
     *
     * @return AMQPChannel
     */
    public function initDirectChannel(AMQPStreamConnection $connection): mixed
    {
        $channel = $connection->channel();
        // 初始化队列
        $this->initQueue($channel);
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::DIRECT, false, true, false);
        // 将队列与某个交换机进行绑定,并使用路由关键字
        $channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);

        return $channel;
    }

    /**
     * 发布直连消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushDirectMessage($message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        $data = new AMQPMessage($message, $properties);
        $this->keeplive();
        $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        $this->channel->close();
        $this->connection->close();
    }

    public function pushDirectMessageArr(array $message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        foreach ($message as $info){
            $data = new AMQPMessage($info, $properties);
            $this->keeplive();
            $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        }

        $this->channel->close();
        $this->connection->close();
    }

    /**
     * 发布直连消息 不关闭连接
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushDirectMessageNoClose($message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        $data = new AMQPMessage($message, $properties);
        $this->keeplive();
        $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
    }

    /**
     * 消费直连消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public  function consumeDirectMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        /**
         * @var AMQPChannel $channel
         */
        $channel = $this->keeplive(AMQPExchangeType::DIRECT);
        //公平调度
        $channel->basic_qos(null, 1, null);
        $this->consume($channel,$functionName);
    }

    /**
     * 初始化延迟channel信息
     *
     * @param  AMQPStreamConnection  $connection
     *
     * @return \PhpAmqpLib\Channel\AbstractChannel|AMQPChannel
     */
    private function initDelayChannel(AMQPStreamConnection $connection)
    {
        $channel = $connection->channel();
        // 初始化队列
        $this->initQueue($channel);
        // 初始化交换机 new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT] 可以修改为其他模式
        $channel->exchange_declare($this->exchangeName, 'x-delayed-message', false, true, false, false, false, new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT]));
        // 将队列与某个交换机进行绑定,并使用路由关键字
        $channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);

        return $channel;
    }

    /**
     * 发布延迟消息
     *
     * @param  array  $messageData 要批量插入的消息数据
     * @param  string  $dateKey    计算延时的key值
     * @param  int  $delayTime     延迟时间(分钟)
     *
     * @throws Exception
     */
    public function pushDelayMessage(array $messageData, int $delayTime = 30, string $dateKey = 'created_at')
    {
        $connection = $this->initConnection();
        $channel = $this->initDelayChannel($connection);
        foreach ($messageData as $message) {
            if(isset($message[$dateKey])){
                //TODO 单个代报、转班需要补款的订单,待支付订单统一为24小时支付时长,用户自主下单30分钟倒计时规则不变
                if ((isset($message['orders_mode']) && in_array($message['orders_mode'], [OrdersRepo::ORDER_MODE_PROXY_SINGLE, OrdersRepo::ORDER_MODE_TRANSFER]))) {
                    $delayTime = OrdersRepo::ORDER_PROXY_SINGLE_OR_TRANSFER_CYCLE_TIME;
                }
                $delay = strtotime($message[$dateKey]) + $delayTime * 60 - time();
                $delay = max($delay, 0);
            }else{
                $delay = 0;
            }
            if (is_array($message)) {
                $message = json_encode($message);
            }
            $data = new AMQPMessage($message, [
                'delivery_mode'       => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'application_headers' => new AMQPTable(['x-delay' => $delay * 1000]),
            ]);
            $channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        }
        $channel->close();
        $connection->close();
    }

    /**
     * 消费延迟消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeDelayMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {
            /**
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive('x-delayed-message');
            $this->consume($channel,$functionName);
        } catch (\Throwable $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
            \Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     * 初始化频道信息
     *
     * @param  AMQPStreamConnection  $connection
     * @param  string                $opType  publish:发布,consume:消费
     *
     * @return mixed
     */
    private function initFanoutChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed
    {
        $channel = $connection->channel();
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::FANOUT, false, true, false);

        if($opType == 'consume'){
            // 初始化队列
            $this->initQueue($channel);

            // 将队列与某个交换机进行绑定,并使用路由关键字
            $channel->queue_bind($this->queueName, $this->exchangeName);
        }


        return $channel;
    }

    /**
     * 发布广播消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushFanoutMessage($message)
    {
        $connection = $this->initConnection();
        $channel = $this->initFanoutChannel($connection);
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        $channel->basic_publish($data, $this->exchangeName);

        $channel->close();
        $connection->close();
    }

    /**
     * 消费广播消息消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeFanoutMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {

            /**
             * 开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive(AMQPExchangeType::FANOUT);
            $this->consume($channel,$functionName);
        } catch (\Throwable $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
            \Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     * 消费
     *
     * @param  AMQPChannel  $channel
     * @param  mixed       $functionName
     */
    public function consume(AMQPChannel $channel,mixed $functionName)
    {
        $consumerTag = 'consumer' . getmypid();
        //$channel->basic_qos(null,1,null); //无论有多少队列,每次只能消费1条数据
        //no_ask = true 自动应答,false 手动应答
        $channel->basic_consume($this->queueName, $consumerTag, true, $this->ask, false, false, $functionName);
        //消息是从服务器异步发送到客户端,当回调时我们的代码需要阻塞接收消息的channel,每当我们接收到一条消息,我们的回调函数将会被传递接收到消息
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $this->connection->close();
    }

    /**
     * channel 保活
     *
     * @param  string  $mqType
     *
     * @return null
     */
    public function keeplive($mqType = AMQPExchangeType::DIRECT)
    {
        //开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连
        $needReconnect = isset($this->configs['heartbeat'])
            && $this->configs['heartbeat'] > 0
            && time() - $this->lastHeartbeatAt > $this->configs['heartbeat'] - 1;

        if ($needReconnect || !$this->connection) {
            $this->connection = $this->initConnection();
            switch ($mqType){
                case AMQPExchangeType::DIRECT:
                    $channelType = 'initDirectChannel';
                    break;
                case AMQPExchangeType::FANOUT:
                    $channelType = 'initFanoutChannel';
                    break;
                case AMQPExchangeType::TOPIC:
                    $channelType = 'initTopicChannel';
                    break;
                case 'x-delayed-message':
                    $channelType = 'initDelayChannel';
                    break;
            }
            $this->channel = $this->$channelType($this->connection,'consume');
            //记录心跳时间戳
            $this->lastHeartbeatAt = time();
        }

        return $this->channel;
    }

    /**
     * 设置MQ参数
     *
     * @param $name
     * @param $value
     */
    public function __set($name,$value)
    {
        $this->args[$name] = $value;
    }

    /**
     * 获取MQ参数
     *
     * @param $name
     *
     * @return false|mixed
     */
    public function __get($name)
    {
        return $this->args[$name] ?? false;
    }
}

使用方式:
1、config 已配置mq信息
2、引入类库
3、$mq = new RabbitmqUtil(self::DELETE_EXCHANG,self::DELETE_QUEUE,'');

查询重复数据:

#查询重复数据
SELECT mobile, activity_id, COUNT(*) as count FROM zwsearch_user
GROUP BY mobile, activity_id HAVING COUNT(*) > 1;

删除重复数据:

#删除重复数据
DELETE t1 FROM zwsearch_user t1
INNER JOIN (
SELECT MIN(id) AS min_id,activity_id,mobile FROM zwsearch_user
GROUP BY activity_id, mobile HAVING COUNT(*) > 1
) t2
ON t1.activity_id = t2.activity_id AND t1.mobile = t2.mobile
WHERE t1.id > t2.min_id;

一、安装扩展

composer require yandex/clickhouse-pdo

二、配置env

DB_CONNECTION=clickhouse
DB_HOST=your-clickhouse-host
DB_PORT=8123
DB_DATABASE=your-clickhouse-database
DB_USERNAME=your-clickhouse-username
DB_PASSWORD=your-clickhouse-password

三、修改config/database.php

'connections' => [
    // ... other connections ...

    'clickhouse' => [
        'driver' => 'clickhouse',
        'host' => env('DB_HOST', 'localhost'),
        'port' => env('DB_PORT', 8123),
        'database' => env('DB_DATABASE', 'forge'),
        'username' => env('DB_USERNAME', 'forge'),
        'password' => env('DB_PASSWORD', ''),
        'options' => [
            'timeout' => 10,
            'protocol' => 'https', // or 'http'
        ],
    ],
],

四、创建model

php artisan make:model ClickHouseModel

五、链接数据库

namespace App;

use Illuminate\Database\Eloquent\Model;

class ClickHouseModel extends Model
{
    protected $connection = 'clickhouse';
    protected $table = 'clickhouse_table';
}