linux grep 查询文件内容
使用grep在指定文件中查找内容:
grep -nr 'wuliao' ./ | grep .php
附grep参数说明
-i:忽略大小写
-v:只显示不匹配的行
-n:显示匹配行的行号
-c:统计匹配的行数
-r:递归搜索子目录
-E:使用扩展正则表达式
-F:禁用正则表达式,使用固定字符串匹配
-w:只匹配整个单词,而不是单词的一部分
-A:显示匹配行之后的若干行
-B:显示匹配行之前的若干行
-C:显示匹配行前后的若干行
使用grep在指定文件中查找内容:
grep -nr 'wuliao' ./ | grep .php
附grep参数说明
-i:忽略大小写
-v:只显示不匹配的行
-n:显示匹配行的行号
-c:统计匹配的行数
-r:递归搜索子目录
-E:使用扩展正则表达式
-F:禁用正则表达式,使用固定字符串匹配
-w:只匹配整个单词,而不是单词的一部分
-A:显示匹配行之后的若干行
-B:显示匹配行之前的若干行
-C:显示匹配行前后的若干行
今天在supervisor添加一个服务脚本,在服务器启动一段时间后,自动退出,并报错FATAL Exited too quickly (process log may have details),查看日志时,发现日志并没有错误信息,于是手动执行php artisan 命令,没有报错信息,但是发现脚本在执行后立即就退出了,考虑到supervisor在监测服务异常或者退出后,会自动尝试重启服务,应该是重启次数超过startretries重启次数,导致进程终止。
注:出现这个错误,可自行先检查一下自己的脚本是否为常驻脚本,如果每次只去一次数据,建议修改为定时任务来处理
附supervisor子进程相关配置信息
[program:sync_merge_mobile] #
command=php /var/www/yunying/artisan consumer:merge-mobile-kafka #执行命令
process_name=%(program_name)s_%(process_num)d
autostart=true #自启动
autorestart=true #自动重启
user=admin #启动时用户权限
numprocs=2 #开启进程数
numprocs_start=0 #起始数字,默认为0
redirect_stderr=true #把 stderr 重定向到 stdout,默认 false
stdout_logfile=/home/admin/yunying/kafka_sync_merge_mobile_%(process_num)08d.log #日志位置
loglevel=info #日志级别,默认info,其它: debug,warn,trace
问题描述:
因为业务需要使用rabbitmq进行数据通信,所以使用laravel搭建了服务,但是在部署到测试服务器后,服务器开始报错:
dev.ERROR: AMQP error while attempting pop: ACCESS_REFUSED - Login was refused using authentication mechanism AMQPLAIN. For details see the broker logfile.(0, 0)
看错误描述是rabbitmq拒绝普通用户登录,但是我账号是管理员权限,没有理由被拒绝,于是开始排查
排查过程:
1、因为推送消息通过守护进程方式执行的,首先去掉自己的服务脚本,然后执行supervisorctl reload,确认自己脚本未执行,此时查看日志,发现还有报错
2、因为是测试环境,然后停止supervisor,继续查看日志,发现日志没有报错,确定是某个脚本报错,然后逐一排查,最后确定是一个脚本也是用queue队列,于是查看代码,发现当前脚本并没有使用mq,但是为什么触发报错
3、继续查看自己的代码,突然想起在.env里面里面配置了rabbitmq相关配置,突然发现一段代码
QUEUE_DRIVER=rabbitmq
,查看queue.php,里面默认是redis,到此,基本确认问题,于是注释掉QUEUE_DRIVER,然后重新supervisor,日志再没有报错信息
总结:
因为代码是参考网上示例,直接复制的,没有注意到消息队列被改为rabbitmq,导致其他消息队列报错,在此记录一下。从网上查找的资源,需要在校验是否可用的同时,涉及到环境变量的内容一定要特别注意
解决方法
删除:bootstarp/cache 所有文件
rm -rf bootstrap/cache/*
说明:这里使用的是:vladimir-yuldashev/laravel-queue-rabbitmq
1、下载laravel-queue-rabbitmq(需要根据当前laravel版本,选择适用的版本,否则下载失败)
composer require vladimir-yuldashev/laravel-queue-rabbitmq
2、打开config/queue.php connections添加配置信息
'rabbitmq' => [
'driver' => 'rabbitmq',
'queue' => env('RABBITMQ_QUEUE', 'default'),
'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,
'hosts' => [
[
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
],
],
'options' => [
'ssl_options' => [
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
],
'queue' => [
'job' => \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
],
'exchange' => [
'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
]
],
/*
* Set to "horizon" if you wish to use Laravel Horizon.
*/
'worker' => env('RABBITMQ_WORKER', 'default'),
]
3、配置env,添加如下信息
# mq的ip地址 RABBITMQ_HOST='localhost' # mq的端口 RABBITMQ_PORT=5672 # mq的账号 RABBITMQ_USER='guest' # mq的密码 RABBITMQ_PASSWORD='guest' # 默认的虚拟主机 RABBITMQ_VHOST=/ # 默认队列名称 RABBITMQ_QUEUE=product # 交换机名称 RABBITMQ_EXCHANGE_TYPE=exc_product
4、修改config/app.php 增加
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,
5、适用artisan命令创建jobs
php artisan make:Jobs UpdateProduct
6、修改app/Jobs/UpdateProduct.php
/**
* php artisan make:job UpdateProduct
*/
namespace App\Jobs;
use App\Services\RabbitmqService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class UpdateProduct implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $productKey;
/**
* UpdateProduct constructor.
* @param $data
* @throws \Exception
*/
public function __construct($data)
{
$this->productKey = "product::info::{$data->id}";
//服务生产者
RabbitmqService::push(env('RABBITMQ_QUEUE'),env('RABBITMQ_EXCHANGE_TYPE'),'pus_product',$data);
}
/**
* 服务消费者会走到这里,把消息消费掉
* @throws \Exception
*/
public function handle()
{
//我这里是通过任务调度来取数据的,没有直接输出
// RabbitmqService::pop('product',function ($message){
// print_r('消费者消费消息'.PHP_EOL);
// print_r(PHP_EOL);
// $key = $this->productKey . ':' . date('Y-m-d H:i:s');
// $input = serialize(json_decode($message,true));
// $product = app('redis')->set($key,$input);
// if($product){
// print_r('消息消费成功');
// return true;
// }else{
// print_r('消息消费失败');
// return false;
// }
// });
}
/**
* 异常扑获
* @param \Exception $exception
*/
public function failed(\Exception $exception){
print_r($exception->getMessage());
}
}
7、创建操作rabbitmq的service
namespace App\Services;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqService
{
private static function getConnect(){
$config = [
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
'exchange' => [
'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
]
];
return new AMQPStreamConnection($config['host'],$config['port'],$config['user'],$config['password'],$config['vhost']);
}
/**
* 数据插入到mq队列中(生产者)
* @param $queue .队列名称
* @param $messageBody .消息体
* @param string $exchange .交换机名称
* @param string $routing_key .设置路由
* @throws \Exception
*/
public static function push($queue,$exchange,$routing_key,$messageBody){
//获取连接
$connection = self::getConnect();
//构建通道(mq的数据存储与获取是通过通道进行数据传输的)
$channel = $connection->channel();
//监听数据,成功
$channel->set_ack_handler(function (AMQPMessage $message){
dump("数据写入成功");
});
//监听数据,失败
$channel->set_nack_handler(function (AMQPMessage $message){
dump("数据写入失败");
});
//声明一个队列
$channel->queue_declare($queue,false,true,false,false);
//指定交换机,若是路由的名称不匹配不会把数据放入队列中
$channel->exchange_declare($exchange,'direct',false,true,false);
//队列和交换器绑定/绑定队列和类型
$channel->queue_bind($queue,$exchange,$routing_key);
$config = [
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
//实例化消息推送类
$message = new AMQPMessage($messageBody,$config);
//消息推送到路由名称为$exchange的队列当中
$channel->basic_publish($message,$exchange,$routing_key);
//监听写入
$channel->wait_for_pending_acks();
dump('生产者已操作');
//关闭消息推送资源
$channel->close();
//关闭mq资源
$connection->close();
}
/**
* 消费者:取出消息进行消费,并返回
* @param $queue
* @param $callback
* @return bool
* @throws \Exception
*/
public static function pop($queue,$callback){
print_r('消费者中心'.PHP_EOL);
$connection = self::getConnect();
//构建消息通道
$channel = $connection->channel();
//从队列中取出消息,并且消费
$message = $channel->basic_get($queue);
if(!$message) return false;
//消息主题返回给回调函数
$res = $callback($message->body);
if($res){
print_r('ack验证'.PHP_EOL);
//ack验证,如果消费失败了,从新获取一次数据再次消费
$channel->basic_ack($message->getDeliveryTag());
}
print_r('ack消费完成'.PHP_EOL);
$channel->close();
$connection->close();
return true;
}
}
8、创建controller,推送数据到mq
public function pushRabbitmq(Request $request){
try{
$info = $request->input();
$productJob = new UpdateProduct(json_encode($info));
//派遣
$this->dispatch($productJob);
return $this->success('操作成功');
}catch (\Exception $e){
return $this->failed($e->getMessage().$e->getLine());
}
}
9、此时可以通过mq管理工具,查询到queues='product'信息
10、新建任务调度,在handle()中添加如下信息
public function handle()
{
try{
RabbitmqService::pop('product',function ($message){
// print_r('消费者消费消息'.PHP_EOL);
// print_r(PHP_EOL);
$input = json_decode($message,true);
dd($input);
});
}catch (\Exception $exception){
dd($exception->getMessage());
}
}
注:本文使用laravel 5.8;vladimir-yuldashev/laravel-queue-rabbitmq 8.3
文件参考:https://blog.csdn.net/weixin_41753567/article/details/126097031 文中根据需求代码有做调整