说明:这里使用的是:vladimir-yuldashev/laravel-queue-rabbitmq
1、下载laravel-queue-rabbitmq(需要根据当前laravel版本,选择适用的版本,否则下载失败)
composer require vladimir-yuldashev/laravel-queue-rabbitmq
2、打开config/queue.php connections添加配置信息
'rabbitmq' => [ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], ], 'options' => [ 'ssl_options' => [ 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), ], 'queue' => [ 'job' => \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class, ], 'exchange' => [ 'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT), ] ], /* * Set to "horizon" if you wish to use Laravel Horizon. */ 'worker' => env('RABBITMQ_WORKER', 'default'), ]
3、配置env,添加如下信息
# mq的ip地址 RABBITMQ_HOST='localhost' # mq的端口 RABBITMQ_PORT=5672 # mq的账号 RABBITMQ_USER='guest' # mq的密码 RABBITMQ_PASSWORD='guest' # 默认的虚拟主机 RABBITMQ_VHOST=/ # 默认队列名称 RABBITMQ_QUEUE=product # 交换机名称 RABBITMQ_EXCHANGE_TYPE=exc_product
4、修改config/app.php 增加
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,
5、适用artisan命令创建jobs
php artisan make:Jobs UpdateProduct
6、修改app/Jobs/UpdateProduct.php
/** * php artisan make:job UpdateProduct */ namespace App\Jobs; use App\Services\RabbitmqService; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; class UpdateProduct implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $productKey; /** * UpdateProduct constructor. * @param $data * @throws \Exception */ public function __construct($data) { $this->productKey = "product::info::{$data->id}"; //服务生产者 RabbitmqService::push(env('RABBITMQ_QUEUE'),env('RABBITMQ_EXCHANGE_TYPE'),'pus_product',$data); } /** * 服务消费者会走到这里,把消息消费掉 * @throws \Exception */ public function handle() { //我这里是通过任务调度来取数据的,没有直接输出 // RabbitmqService::pop('product',function ($message){ // print_r('消费者消费消息'.PHP_EOL); // print_r(PHP_EOL); // $key = $this->productKey . ':' . date('Y-m-d H:i:s'); // $input = serialize(json_decode($message,true)); // $product = app('redis')->set($key,$input); // if($product){ // print_r('消息消费成功'); // return true; // }else{ // print_r('消息消费失败'); // return false; // } // }); } /** * 异常扑获 * @param \Exception $exception */ public function failed(\Exception $exception){ print_r($exception->getMessage()); } }
7、创建操作rabbitmq的service
namespace App\Services; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitmqService { private static function getConnect(){ $config = [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), 'exchange' => [ 'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT), ] ]; return new AMQPStreamConnection($config['host'],$config['port'],$config['user'],$config['password'],$config['vhost']); } /** * 数据插入到mq队列中(生产者) * @param $queue .队列名称 * @param $messageBody .消息体 * @param string $exchange .交换机名称 * @param string $routing_key .设置路由 * @throws \Exception */ public static function push($queue,$exchange,$routing_key,$messageBody){ //获取连接 $connection = self::getConnect(); //构建通道(mq的数据存储与获取是通过通道进行数据传输的) $channel = $connection->channel(); //监听数据,成功 $channel->set_ack_handler(function (AMQPMessage $message){ dump("数据写入成功"); }); //监听数据,失败 $channel->set_nack_handler(function (AMQPMessage $message){ dump("数据写入失败"); }); //声明一个队列 $channel->queue_declare($queue,false,true,false,false); //指定交换机,若是路由的名称不匹配不会把数据放入队列中 $channel->exchange_declare($exchange,'direct',false,true,false); //队列和交换器绑定/绑定队列和类型 $channel->queue_bind($queue,$exchange,$routing_key); $config = [ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]; //实例化消息推送类 $message = new AMQPMessage($messageBody,$config); //消息推送到路由名称为$exchange的队列当中 $channel->basic_publish($message,$exchange,$routing_key); //监听写入 $channel->wait_for_pending_acks(); dump('生产者已操作'); //关闭消息推送资源 $channel->close(); //关闭mq资源 $connection->close(); } /** * 消费者:取出消息进行消费,并返回 * @param $queue * @param $callback * @return bool * @throws \Exception */ public static function pop($queue,$callback){ print_r('消费者中心'.PHP_EOL); $connection = self::getConnect(); //构建消息通道 $channel = $connection->channel(); //从队列中取出消息,并且消费 $message = $channel->basic_get($queue); if(!$message) return false; //消息主题返回给回调函数 $res = $callback($message->body); if($res){ print_r('ack验证'.PHP_EOL); //ack验证,如果消费失败了,从新获取一次数据再次消费 $channel->basic_ack($message->getDeliveryTag()); } print_r('ack消费完成'.PHP_EOL); $channel->close(); $connection->close(); return true; } }
8、创建controller,推送数据到mq
public function pushRabbitmq(Request $request){ try{ $info = $request->input(); $productJob = new UpdateProduct(json_encode($info)); //派遣 $this->dispatch($productJob); return $this->success('操作成功'); }catch (\Exception $e){ return $this->failed($e->getMessage().$e->getLine()); } }
9、此时可以通过mq管理工具,查询到queues='product'信息
10、新建任务调度,在handle()中添加如下信息
public function handle() { try{ RabbitmqService::pop('product',function ($message){ // print_r('消费者消费消息'.PHP_EOL); // print_r(PHP_EOL); $input = json_decode($message,true); dd($input); }); }catch (\Exception $exception){ dd($exception->getMessage()); } }
注:本文使用laravel 5.8;vladimir-yuldashev/laravel-queue-rabbitmq 8.3
文件参考:https://blog.csdn.net/weixin_41753567/article/details/126097031 文中根据需求代码有做调整