主页

说明:这里使用的是:vladimir-yuldashev/laravel-queue-rabbitmq
1、下载laravel-queue-rabbitmq(需要根据当前laravel版本,选择适用的版本,否则下载失败)

composer require vladimir-yuldashev/laravel-queue-rabbitmq

2、打开config/queue.php connections添加配置信息

'rabbitmq' => [
            'driver' => 'rabbitmq',
            'queue' => env('RABBITMQ_QUEUE', 'default'),
            'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,

            'hosts' => [
                [
                    'host' => env('RABBITMQ_HOST', '127.0.0.1'),
                    'port' => env('RABBITMQ_PORT', 5672),
                    'user' => env('RABBITMQ_USER', 'guest'),
                    'password' => env('RABBITMQ_PASSWORD', 'guest'),
                    'vhost' => env('RABBITMQ_VHOST', '/'),
                ],
            ],

            'options' => [
                'ssl_options' => [
                    'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                    'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                    'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                    'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                    'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
                ],
                'queue' => [
                    'job' => \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
                ],
                'exchange' => [
                    'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
                ]
            ],

            /*
             * Set to "horizon" if you wish to use Laravel Horizon.
             */
            'worker' => env('RABBITMQ_WORKER', 'default'),
        ]

3、配置env,添加如下信息

# mq的ip地址
RABBITMQ_HOST='localhost'
# mq的端口
RABBITMQ_PORT=5672
# mq的账号
RABBITMQ_USER='guest'
# mq的密码
RABBITMQ_PASSWORD='guest'
# 默认的虚拟主机
RABBITMQ_VHOST=/
# 默认队列名称
RABBITMQ_QUEUE=product
# 交换机名称
RABBITMQ_EXCHANGE_TYPE=exc_product

4、修改config/app.php 增加

     VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class, 

5、适用artisan命令创建jobs

php artisan make:Jobs UpdateProduct

6、修改app/Jobs/UpdateProduct.php

/**
 * php artisan make:job UpdateProduct
 */

namespace App\Jobs;

use App\Services\RabbitmqService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class UpdateProduct implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected  $productKey;

    /**
     * UpdateProduct constructor.
     * @param $data
     * @throws \Exception
     */
    public function __construct($data)
    {

        $this->productKey = "product::info::{$data->id}";

        //服务生产者
        RabbitmqService::push(env('RABBITMQ_QUEUE'),env('RABBITMQ_EXCHANGE_TYPE'),'pus_product',$data);
    }

    /**
     * 服务消费者会走到这里,把消息消费掉
     * @throws \Exception
     */
    public function handle()
    {
//我这里是通过任务调度来取数据的,没有直接输出
//        RabbitmqService::pop('product',function ($message){
//            print_r('消费者消费消息'.PHP_EOL);

//            print_r(PHP_EOL);

//            $key = $this->productKey . ':' . date('Y-m-d H:i:s');

//            $input = serialize(json_decode($message,true));

//            $product = app('redis')->set($key,$input);

//            if($product){
//                print_r('消息消费成功');
//                return true;
//            }else{
//                print_r('消息消费失败');
//                return false;
//            }
//        });
    }

    /**
     * 异常扑获
     * @param \Exception $exception
     */
    public function failed(\Exception $exception){
        print_r($exception->getMessage());
    }
}

7、创建操作rabbitmq的service

namespace App\Services;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitmqService
{
    private static function getConnect(){
        $config = [
            'host' => env('RABBITMQ_HOST', '127.0.0.1'),
            'port' => env('RABBITMQ_PORT', 5672),
            'user' => env('RABBITMQ_USER', 'guest'),
            'password' => env('RABBITMQ_PASSWORD', 'guest'),
            'vhost' => env('RABBITMQ_VHOST', '/'),
            'exchange' => [
                'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
            ]
        ];
        return new AMQPStreamConnection($config['host'],$config['port'],$config['user'],$config['password'],$config['vhost']);
    }

    /**
     * 数据插入到mq队列中(生产者)
     * @param $queue   .队列名称
     * @param $messageBody .消息体
     * @param string $exchange .交换机名称
     * @param string $routing_key .设置路由
     * @throws \Exception
     */
    public static function push($queue,$exchange,$routing_key,$messageBody){
        //获取连接
        $connection = self::getConnect();

        //构建通道(mq的数据存储与获取是通过通道进行数据传输的)
        $channel = $connection->channel();

        //监听数据,成功
        $channel->set_ack_handler(function (AMQPMessage $message){
            dump("数据写入成功");
        });

        //监听数据,失败
        $channel->set_nack_handler(function (AMQPMessage $message){
            dump("数据写入失败");
        });

        //声明一个队列
        $channel->queue_declare($queue,false,true,false,false);

        //指定交换机,若是路由的名称不匹配不会把数据放入队列中
        $channel->exchange_declare($exchange,'direct',false,true,false);

        //队列和交换器绑定/绑定队列和类型
        $channel->queue_bind($queue,$exchange,$routing_key);

        $config = [
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ];

        //实例化消息推送类
        $message = new AMQPMessage($messageBody,$config);

        //消息推送到路由名称为$exchange的队列当中
        $channel->basic_publish($message,$exchange,$routing_key);

        //监听写入
        $channel->wait_for_pending_acks();

        dump('生产者已操作');

        //关闭消息推送资源
        $channel->close();

        //关闭mq资源
        $connection->close();
    }

    /**
     * 消费者:取出消息进行消费,并返回
     * @param $queue
     * @param $callback
     * @return bool
     * @throws \Exception
     */
    public static function pop($queue,$callback){

        print_r('消费者中心'.PHP_EOL);

        $connection = self::getConnect();

        //构建消息通道
        $channel = $connection->channel();

        //从队列中取出消息,并且消费
        $message = $channel->basic_get($queue);

        if(!$message) return false;

        //消息主题返回给回调函数
        $res = $callback($message->body);

        if($res){
            print_r('ack验证'.PHP_EOL);
            //ack验证,如果消费失败了,从新获取一次数据再次消费
            $channel->basic_ack($message->getDeliveryTag());
        }

        print_r('ack消费完成'.PHP_EOL);

        $channel->close();
        $connection->close();

        return true;
    }
}

8、创建controller,推送数据到mq

    public function pushRabbitmq(Request $request){
        try{
            $info = $request->input();
            $productJob = new UpdateProduct(json_encode($info));

            //派遣
            $this->dispatch($productJob);

            return $this->success('操作成功');
        }catch (\Exception $e){
            return $this->failed($e->getMessage().$e->getLine());
        }
    }

9、此时可以通过mq管理工具,查询到queues='product'信息
10、新建任务调度,在handle()中添加如下信息

public function handle()
    {
        try{
            RabbitmqService::pop('product',function ($message){
                //            print_r('消费者消费消息'.PHP_EOL);

                //            print_r(PHP_EOL);


                $input = json_decode($message,true);
                dd($input);

            });
        }catch (\Exception $exception){
            dd($exception->getMessage());
        }


    }

注:本文使用laravel 5.8;vladimir-yuldashev/laravel-queue-rabbitmq 8.3
文件参考:https://blog.csdn.net/weixin_41753567/article/details/126097031 文中根据需求代码有做调整

版权属于:三分快乐,七纷幸福
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
0
查看目录

目录

来自 《laravel 配置rabbitmq 说明》
评论

三分快乐,七纷幸福
104 文章数
7 评论量
11 分类数
107 页面数
已在风雨中度过 2年246天18小时51分