Hi,为确保您的账号安全,请大家修改登陆密码为包含大小写字母、标点、数字的复合密码,不要使用过于简单的登陆密码。
您当前的位置:首页 :: PHP

Laravel queue 源理及流程

时间:2018-06-12 18:37:20  来源:  作者:

Laravel queue 源理及流程

在实际的项目开发中,我们经常会遇到需要轻量级队列的情形,例如发短信、发邮件等,这些任务不足以使用 kafka、RabbitMQ 等重量级的消息队列,但是又的确需要异步、重试、并发控制等功能。通常来说,我们经常会使用 Redis、Beanstalk、Amazon SQS 来实现相关功能,laravel 为此对不同的后台队列服务提供统一的 API,本文将会介绍应用最为广泛的 redis 队列。
queue在laravel的源码如下
laravel\framework\src\Illuminate\Queue

ls -hl
BeanstalkdQueue.php
composer.json  
ConsoleServiceProvider.php  
Failed/                     
Jobs/          
Queue.php                 
README.md             
SqsQueue.php
CallQueuedHandler.php  
Connectors/    
DatabaseQueue.php           
IlluminateQueueClosure.php  
Listener.php   
QueueManager.php          
RedisQueue.php        
SyncQueue.php
Capsule/               
Console/       
Events/                     
InteractsWithQueue.php      
NullQueue.php  
QueueServiceProvider.php  
SerializesModels.php  
Worker.php

这个目录里可以看到有好以XXXXQueue.php结尾的, 他们是  BeanstalkdQueue.php RedisQueue.php 等等
Connectors/ 这个目录下就是每一个服务的连接方式 比如:Beanstalkd redis sqs等

而我们主要讲使用redis作为队列服务

下面科普知识
在讲解 laravel 的队列服务之前,我们要先说说基于 redis 的队列服务。首先,redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列,
redis 队列的数据结构

    List 链表
redis 做消息队列的特性例如FIFO(先入先出)很容易实现,只需要一个 list 对象从头取数据,从尾部塞数据即可。
相关的命令:(1)左侧入右侧出:lpush/rpop;(2)右侧入左侧出:rpush/lpop。
这个简单的消息队列很容易实现。

    Zset 有序集合
有些任务场景,并不需要任务立刻执行,而是需要延迟执行;有些任务很重要,需要在任务失败的时候重新尝试。这些功能仅仅依靠 list 是无法完成的。这个时候,就需要 redis 的有序集合。
Redis 有序集合和 Redis 集合类似,是不包含相同字符串的合集。它们的差别是,每个有序集合的成员都关联着一个评分 score,这个评分用于把有序集合中的成员按最低分到最高分排列。
单看有序集合和延迟任务并无关系,但是可以将有序集合的评分 score 设置为延时任务开启的时间,之后轮询这个有序集合,将到期的任务拿出来进行处理,这样就实现了延迟任务的功能。
对于重要的需要重试的任务,在任务执行之前,会将该任务放入有序集合中,设置任务最长的执行时间。若任务顺利执行完毕,该任务会在有序集合中删除。如果任务没有在规定时间内完成,那么该有序集合的任务将会被重新放入队列中。

相关命令:
(1) ZADD 添加一个或多个成员到有序集合,或者如果它已经存在更新其分数。
(2) ZRANGEBYSCORE 按分数返回一个成员范围的有序集合。
(3) ZREMRANGEBYRANK 在给定的索引之内删除所有成员的有序集合。

laravel 的队列服务由两个进程控制,一个是生产者,一个是消费者。这两个进程操纵了 redis 三个队列,其中一个 List,负责即时任务,两个 Zset,负责延时任务与待处理任务。
生产者负责向 redis 推送任务,如果是即时任务,默认就会向 queue:default 推送;如果是延时任务,就会向 queue:default:delayed 推送。
消费者轮询两个队列,不断的从队列中取出任务,先把任务放入 queue:default:reserved 中,再执行相关任务。如果任务执行成功,就会删除 queue:default:reserved 中的任务,否则会被重新放入 queue:default:delayed 队列中。


而laravel的redis是使用rpush/lpop方式来实现队列服务的,具体可以看redisQueue.php

大家在使用的队列的使用是否记得是使用一个函数dispatch();把一个jobs里的类放到这里 例子:

dispatch((new PushEmail2Queue($XXXX, $XXX))->onQueue('high')->onConnection('email'));

上面的意思就是把一个消息放到 queue使用叫email的redis的服务器的high队列里
这里插入一下话题就是laravel queue配置文件了 具体是怎么配置的这里不讲啊,有兴趣的百度咯
这个queue.php里有default和connections, default这定义默认使用的队列服务器是啥是redis还是beanstalkd等
那么那个connections表示有很多个不同的队列服务器,比如下:

'default' => env('QUEUE_DRIVER', 'redis'),
'connections' => [
	'redis' => [
        'driver' => 'redis',
        'connection' => 'default',
        'queue' => 'default',
        'expire' => 60,
    ],
    'email' => [
        'driver' => 'redis',
        'connection' => 'email', //onConnection()这里会用到
        'queue' => 'default', //默认的队列,如果不用onQueue指定默认就放到default里
        'expire' => 60,
    ],
    'message' => [
        'driver' => 'redis',
        'connection' => 'message',
        'queue' => 'hightest',
        'expire' => 60,
    ],

]

比如dispatch((new PushEmail2Queue($XXXX, $XXX))->onQueue('high')->onConnection('email'));
那么就与使用queue.php 配置信息里的connections里的email里的配置信息 然后把jobs里的内容放到redis服务器里的queue:high的队列里

比如dispatch((new PushEmail2Queue($XXXX, $XXX))->onConnection('email'));
那个redis里就会有一个queue:default的队列

dispatch((new PushEmail2Queue($XXXX, $XXX))->onQueue('email')->onConnection('email'));
那个redis里就会有一个queue:email的队列
c03dbcab6c7da6fbe4090395e07c2e25.png
明白了吧

这里讲一下dispatch的过程
//使用redis
dispatch()->Dispatcher ->dispatch2Queue->queueManager->connction(调用配置信息redis并连接)->redisQueue->push/later->createPayloadArray->rpush/zadd->redis主机里
//beanstalkd
dispatch()->Dispatcher ->dispatch2Queue->queueManager->connction(调用配置信息beanstalkd并连接)->beanstalkdQueue->push/later->createPayloadArray->put->beanstalkd主机里

上面讲到了 laravel 的队列服务由两个进程控制,一个是生产者,一个是消费者
那个生产者就是dispatch

下面继续说一下,数据已经到队列服务器里了,那么是消费的呢? 别急,倒杯水,听我慢慢说来

那么执行队列任务的命令:php artisan queue:work 那工程流程
workCommand->worker->getnextJob->redisQueue->pop->redis->delayed/reserved->lpop->redisJob->fire->callQueuedHander->callHander->delete/dispatch
d44a49d543a6e7e727c37ac46ab02abf.png
任务类的创建
    任务类的结构很简单,一般来说只会包含一个让队列用来调用此任务的 handle 方法。
    如果想要使得任务被推送到队列中,而不是同步执行,那么需要实现 Illuminate\Contracts\Queue\ShouldQueue 接口。
    如果想要让任务推送到特定的连接中,例如 redis 或者 sqs,那么需要设置 conneciton 变量。
    如果想要让任务推送到特定的队列中去,可以设置 queue 变量。
    如果想要让任务延迟推送,那么需要设置 delay 变量。
    如果想要设置任务至多重试的次数,可以使用 tries 变量;
    如果想要设置任务可以运行的最大秒数,那么可以使用 timeout 参数。
    如果想要手动访问队列,可以使用 trait : Illuminate\Queue\InteractsWithQueue。
    如果队列监听器任务执行次数超过在工作队列中定义的最大尝试次数,监听器的 failed 方法将会被自动调用。 failed 方法接受事件实例和失败的异常作为参数:

下面是执行任务的命令

/usr/bin/php /data/www/artisan queue:work message --tries=3 --daemon --sleep=1 --queue=highest,higher

 

可以使用--queue来指定要跑的队列,也就是上面onQueue()函数里的队列

上面的命令可以使用supervisor 来做守护 给出下我的守护的例子

[program:laravel-message-worker]
process_name=%(program_name)s_%(process_num)02d
command=/usr/bin/php /var/www/artisan queue:work message  --tries=3 --daemon --sleep=3  --queue=highest,higher,high,default,low
autostart=true
autorestart=true
;user=vagrant
numprocs=3
redirect_stderr=true
stdout_logfile=/var/log/supervisor/worker-message.log

numprocs:表示使用多少个进程0会不执行

好结束


下班闪人

举报
收藏0次 / 评论0
评论(0)
还可以输入 2000 个字符
还可以输入 2000 个字符
取消回复
举报×

还可以输入 264 字符

收藏(0)×