分类 linux 下的文章

1、查看当前用户的 crontab 文件:

crontab -l

2、编辑当前用户的 crontab 文件:

crontab -e

3、删除当前用户的 crontab 文件:

crontab -r

4、列出系统上所有用户的 crontab 任务:

sudo crontab -l

5、重启 cron 服务:

sudo service cron restart  或者  sudo systemctl restart cron

5、查看 cron 服务状态:

sudo service cron status  或者  sudo systemctl status cron

6、环境变量:
默认情况下,cron 任务不会加载用户的环境变量。如果需要环境变量,可以将其添加到 crontab 文件中,或者使用 env 命令将它们传递给 cron。

在编辑 crontab 文件时,每一行都代表一个计划任务,格式如下

*     *     *   *    *        command to be executed
-     -     -   -    -
|     |     |   |    |
|     |     |   |    +----- day of the week (0 - 6) (Sunday=0)
|     |     |   +------- month (1 - 12)
|     |     +--------- day of the month (1 - 31)
|     +----------- hour (0 - 23)
+------------- min (0 - 59)

例如,下面的 crontab 条目表示每天凌晨 1 点执行 /path/to/script.sh 脚本:

0 1 * * * /path/to/script.sh

使用 crontab 时,请确保正确设置时间,并且脚本有执行权限。

首页,创建服务,生成API_KEY
其次,请求向量api,生成向量
示例:

import os
from openai import AzureOpenAI

client = AzureOpenAI(
  api_key = os.getenv("AZURE_OPENAI_API_KEY"),  //API_KEY
  api_version = "2024-02-01",  //版本号,使用默认值即可
  azure_endpoint =os.getenv("AZURE_OPENAI_ENDPOINT")  //申请的域名地址
)

response = client.embeddings.create(
    input = "Your text string goes here", //生成向量内容,这里会按照内容生成指定向量值,后期用于匹配
    model= "text-embedding-ada-002"
)

print(response.model_dump_json(indent=2))

注:向量生成完后需要存储到向量库,这里使用的是milvus,milvus:vector长度需要与向量长度一致,比如都为256
官方文档:
AzureOpenAI:https://learn.microsoft.com/zh-cn/azure/ai-services/openai/how-to/embeddings?tabs=python-new
milvus:https://milvus.io/docs

脚本相关:

sudo su //使用supervisor需在管理员权限下修改
cd /etc/supervisord.d/  //脚本存放路径

配置示例:
command=php /var/www/yunying/artisan pull:gufen_mock_user //启动该程序时将运行的命令
process_name=%(program_name)s_%(process_num)d  //一个Python字符串表达式,用于组成此进程的Supervisor进程名
autostart=true  //如果为true,则该程序将在supervisord启动时自动启动,默认值是:true
autorestart=true //指定程序在RUNNING状态下退出后是否自动重启
user=admin //指示supervisord 使用此用户帐户作为运行程序的帐户
numprocs=2 //Supervisor 将启动由 numprocs 指定的多个该程序的实例
numprocs_start=0 //一个整数偏移量,用于计算 process_num 开始的编号,默认值是:0,例如改为1,则process_name显示如:foo_01、foo_02,那么就是从1开始编号
redirect_stderr=true //如果为true,进程的标准错误重定向到标准输出,默认值是:false,相当于:2>&1。
stdout_logfile=/home/admin/yunying/laravel_gufen_mock_user%(process_num)08d.log //日志存放地址
loglevel=info //日志记录级别,分别有:trace, debug, info, warn, error, critical


服务相关

supervisorctl stop program_name 停止某个进程
supervisorctl start program_name 启动某个进程
supervisorctl restart program_name 重启某个进程
supervisorctl stop all 停止全部进程
supervisorctl reload 载入最新的配置文件,停止原有进程并按新的配置启动、管理所有进程
supervisorctl update 根据最新的配置文件,启动新配置或有改动的进程,配置没有改动的进程不会受影响而重启

 false
    ];


    /**
     * 初始化配置文件
     *
     * @param string $exchangeName
     *
     * @param string $queueName
     * @param string $routeKey 支持普通路由和正侧路由键(user.*  user.#等)
     * @param array $configs
     * @throws Exception
     */
    public function __construct(string $exchangeName,string $queueName = '', string $routeKey = '',array $configs=[])
    {
        if(!$configs){
            $configs = config('queue.connections.rabbitmq');
        }
        $this->setConfigs($configs);
        $this->exchangeName = $exchangeName;
        $this->queueName = $queueName;
        $this->routeKey = $routeKey;
    }


    /**
     * 设置配置文件
     *
     * @param  array  $configs
     *
     * @throws Exception
     */
    private function setConfigs(array $configs)
    {
        if (!($configs['host'] && $configs['port'] && $configs['user'] && $configs['password'])) {
            throw new Exception('configs is empty');
        }
        if (empty($configs['vhost'])) {
            $configs['vhost'] = '/';
        }
        $this->configs = $configs;
    }


    /**
     * 初始化MQ
     *
     * @return AMQPStreamConnection
     */
    public function initConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $this->configs['host'],
            $this->configs['port'],
            $this->configs['user'],
            $this->configs['password'],
            $this->configs['vhost'],
            false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0,
            null, false, $this->configs['heartbeat']);
    }

    /**
     * 初始化队列
     *
     * @param  AMQPChannel  $channel
     *
     * @return AMQPChannel
     */
    private function initQueue(AMQPChannel &$channel)
    {
        // 初始化队列
        if(!$this->queueName){
            //如果队列不存在,则自动生成一个队列
            list($this->queueName, ,) = $channel->queue_declare($this->queueName, false, true, false, false);
        }else{
            $channel->queue_declare($this->queueName, false, true, false, false);
        }

        return $channel;
    }
    /**
     * 初始化频道信息
     *
     * @param  AMQPStreamConnection  $connection
     *
     * @return mixed
     */
    private function initTopicChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed
    {
        $channel = $connection->channel();
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::TOPIC, false, true, false);
        if($opType == 'consume'){
            // 初始化队列
            $this->initQueue($channel);
            // 将队列与某个交换机进行绑定,并使用路由关键字
            $channel->queue_bind($this->queueName, $this->exchangeName);
        }

        return $channel;
    }

    /**
     * 发布Topic消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushTopicMessage($message)
    {
        $connection = $this->initConnection();
        $channel = $this->initTopicChannel($connection);
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        $channel->basic_publish($data, $this->exchangeName, $this->routeKey);

        $channel->close();
        $connection->close();
    }

    /**
     * 消费Topic消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeTopicMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {
            /**
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive(AMQPExchangeType::DIRECT);
            $this->consume($channel,$functionName);
        } catch (\Exception $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));

            \Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     *  初始化频道信息
     *
     * @param $connection
     *
     * @return AMQPChannel
     */
    public function initDirectChannel(AMQPStreamConnection $connection): mixed
    {
        $channel = $connection->channel();
        // 初始化队列
        $this->initQueue($channel);
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::DIRECT, false, true, false);
        // 将队列与某个交换机进行绑定,并使用路由关键字
        $channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);

        return $channel;
    }

    /**
     * 发布直连消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushDirectMessage($message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        $data = new AMQPMessage($message, $properties);
        $this->keeplive();
        $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        $this->channel->close();
        $this->connection->close();
    }

    public function pushDirectMessageArr(array $message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        foreach ($message as $info){
            $data = new AMQPMessage($info, $properties);
            $this->keeplive();
            $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        }

        $this->channel->close();
        $this->connection->close();
    }

    /**
     * 发布直连消息 不关闭连接
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushDirectMessageNoClose($message,$content_type='')
    {
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        if($content_type){
            $properties['content_type']=$content_type;
        }
        $data = new AMQPMessage($message, $properties);
        $this->keeplive();
        $this->channel->basic_publish($data, $this->exchangeName, $this->routeKey);
    }

    /**
     * 消费直连消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public  function consumeDirectMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        /**
         * @var AMQPChannel $channel
         */
        $channel = $this->keeplive(AMQPExchangeType::DIRECT);
        //公平调度
        $channel->basic_qos(null, 1, null);
        $this->consume($channel,$functionName);
    }

    /**
     * 初始化延迟channel信息
     *
     * @param  AMQPStreamConnection  $connection
     *
     * @return \PhpAmqpLib\Channel\AbstractChannel|AMQPChannel
     */
    private function initDelayChannel(AMQPStreamConnection $connection)
    {
        $channel = $connection->channel();
        // 初始化队列
        $this->initQueue($channel);
        // 初始化交换机 new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT] 可以修改为其他模式
        $channel->exchange_declare($this->exchangeName, 'x-delayed-message', false, true, false, false, false, new AMQPTable(["x-delayed-type" => AMQPExchangeType::DIRECT]));
        // 将队列与某个交换机进行绑定,并使用路由关键字
        $channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);

        return $channel;
    }

    /**
     * 发布延迟消息
     *
     * @param  array  $messageData 要批量插入的消息数据
     * @param  string  $dateKey    计算延时的key值
     * @param  int  $delayTime     延迟时间(分钟)
     *
     * @throws Exception
     */
    public function pushDelayMessage(array $messageData, int $delayTime = 30, string $dateKey = 'created_at')
    {
        $connection = $this->initConnection();
        $channel = $this->initDelayChannel($connection);
        foreach ($messageData as $message) {
            if(isset($message[$dateKey])){
                //TODO 单个代报、转班需要补款的订单,待支付订单统一为24小时支付时长,用户自主下单30分钟倒计时规则不变
                if ((isset($message['orders_mode']) && in_array($message['orders_mode'], [OrdersRepo::ORDER_MODE_PROXY_SINGLE, OrdersRepo::ORDER_MODE_TRANSFER]))) {
                    $delayTime = OrdersRepo::ORDER_PROXY_SINGLE_OR_TRANSFER_CYCLE_TIME;
                }
                $delay = strtotime($message[$dateKey]) + $delayTime * 60 - time();
                $delay = max($delay, 0);
            }else{
                $delay = 0;
            }
            if (is_array($message)) {
                $message = json_encode($message);
            }
            $data = new AMQPMessage($message, [
                'delivery_mode'       => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'application_headers' => new AMQPTable(['x-delay' => $delay * 1000]),
            ]);
            $channel->basic_publish($data, $this->exchangeName, $this->routeKey);
        }
        $channel->close();
        $connection->close();
    }

    /**
     * 消费延迟消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeDelayMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {
            /**
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive('x-delayed-message');
            $this->consume($channel,$functionName);
        } catch (\Throwable $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
            \Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     * 初始化频道信息
     *
     * @param  AMQPStreamConnection  $connection
     * @param  string                $opType  publish:发布,consume:消费
     *
     * @return mixed
     */
    private function initFanoutChannel(AMQPStreamConnection $connection,string $opType='publish'): mixed
    {
        $channel = $connection->channel();
        // 初始化交换机
        $channel->exchange_declare($this->exchangeName, AMQPExchangeType::FANOUT, false, true, false);

        if($opType == 'consume'){
            // 初始化队列
            $this->initQueue($channel);

            // 将队列与某个交换机进行绑定,并使用路由关键字
            $channel->queue_bind($this->queueName, $this->exchangeName);
        }


        return $channel;
    }

    /**
     * 发布广播消息
     *
     * @param $message
     *
     * @throws Exception
     */
    public function pushFanoutMessage($message)
    {
        $connection = $this->initConnection();
        $channel = $this->initFanoutChannel($connection);
        if (is_array($message)) {
            $message = json_encode($message);
        }
        $data = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        $channel->basic_publish($data, $this->exchangeName);

        $channel->close();
        $connection->close();
    }

    /**
     * 消费广播消息消息
     *
     * @param  array|string  $functionName
     * @param  bool          $ask
     *
     * @return false|void
     */
    public function consumeFanoutMessage(array|string $functionName)
    {
        if (!$functionName || !$this->queueName) {
            return false;
        }
        try {

            /**
             * 开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连
             * @var AMQPChannel $channel
             */
            $channel = $this->keeplive(AMQPExchangeType::FANOUT);
            $this->consume($channel,$functionName);
        } catch (\Throwable $e) {
            FeiShuSendJob::dispatch('Rabbitmq', [$e->getMessage(), $e->getFile(), $e->getLine()])->onQueue('FeiShuSendJob_'.config('app.env'));
            \Log::error('Rabbitmq', [$e->getMessage()]);
        }
    }

    /**
     * 消费
     *
     * @param  AMQPChannel  $channel
     * @param  mixed       $functionName
     */
    public function consume(AMQPChannel $channel,mixed $functionName)
    {
        $consumerTag = 'consumer' . getmypid();
        //$channel->basic_qos(null,1,null); //无论有多少队列,每次只能消费1条数据
        //no_ask = true 自动应答,false 手动应答
        $channel->basic_consume($this->queueName, $consumerTag, true, $this->ask, false, false, $functionName);
        //消息是从服务器异步发送到客户端,当回调时我们的代码需要阻塞接收消息的channel,每当我们接收到一条消息,我们的回调函数将会被传递接收到消息
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $this->connection->close();
    }

    /**
     * channel 保活
     *
     * @param  string  $mqType
     *
     * @return null
     */
    public function keeplive($mqType = AMQPExchangeType::DIRECT)
    {
        //开启了心跳(heartbeat>0)且距离上一次心跳间隔时间超过预定心跳时间(-1稍微提前下),则需要重连
        $needReconnect = isset($this->configs['heartbeat'])
            && $this->configs['heartbeat'] > 0
            && time() - $this->lastHeartbeatAt > $this->configs['heartbeat'] - 1;

        if ($needReconnect || !$this->connection) {
            $this->connection = $this->initConnection();
            switch ($mqType){
                case AMQPExchangeType::DIRECT:
                    $channelType = 'initDirectChannel';
                    break;
                case AMQPExchangeType::FANOUT:
                    $channelType = 'initFanoutChannel';
                    break;
                case AMQPExchangeType::TOPIC:
                    $channelType = 'initTopicChannel';
                    break;
                case 'x-delayed-message':
                    $channelType = 'initDelayChannel';
                    break;
            }
            $this->channel = $this->$channelType($this->connection,'consume');
            //记录心跳时间戳
            $this->lastHeartbeatAt = time();
        }

        return $this->channel;
    }

    /**
     * 设置MQ参数
     *
     * @param $name
     * @param $value
     */
    public function __set($name,$value)
    {
        $this->args[$name] = $value;
    }

    /**
     * 获取MQ参数
     *
     * @param $name
     *
     * @return false|mixed
     */
    public function __get($name)
    {
        return $this->args[$name] ?? false;
    }
}

使用方式:
1、config 已配置mq信息
2、引入类库
3、$mq = new RabbitmqUtil(self::DELETE_EXCHANG,self::DELETE_QUEUE,'');