队列
介绍
Laravel 队列提供了一个统一的 API,支持多种不同的队列后端,如 Beanstalk、Amazon SQS、Redis,甚至是关系数据库。队列允许您将耗时的任务(如发送电子邮件)的处理推迟到稍后进行。推迟这些耗时任务可以显著加快应用程序的 Web 请求速度。
队列配置文件存储在 config/queue.php
中。在此文件中,您将找到框架中包含的每个队列驱动程序的连接配置,其中包括数据库、Beanstalkd、Amazon SQS、Redis 和一个同步驱动程序(用于本地使用),该驱动程序将立即执行作业。还包括一个 null
队列驱动程序,它只是简单地丢弃排队的作业。
连接与队列
在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别很重要。在您的 config/queue.php
配置文件中,有一个 connections
配置选项。此选项定义了与后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。然而,任何给定的队列连接可能有多个“队列”,可以被视为不同的作业堆栈或堆。
请注意,queue
配置文件中的每个连接配置示例都包含一个 queue
属性。这是将作业发送到给定连接时将被调度到的默认队列。换句话说,如果您在调度作业时没有明确定义应将其调度到哪个队列,则作业将被放置在连接配置的 queue
属性中定义的队列上:
// 这个作业被发送到默认队列...
dispatch(new Job);
// 这个作业被发送到 "emails" 队列...
dispatch((new Job)->onQueue('emails'));
某些应用程序可能不需要将作业推送到多个队列,而是更喜欢拥有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序特别有用,因为 Laravel 队列工作者允许您按优先级指定应处理哪些队列。例如,如果您将作业推送到 high
队列,您可以运行一个工作者,给予它们更高的处理优先级:
php artisan queue:work --queue=high,default
驱动程序的先决条件
数据库
要使用 database
队列驱动程序,您需要一个数据库表来保存作业。要生成创建此表的迁移,请运行 queue:table
Artisan 命令。创建迁移后,您可以使用 migrate
命令迁移数据库:
php artisan queue:table
php artisan migrate
Redis
要使用 redis
队列驱动程序,您应在 config/database.php
配置文件中配置 Redis 数据库连接。
如果您的 Redis 队列连接使用 Redis 集群,您的队列名称必须包含一个 key hash tag。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
其他驱动程序的先决条件
以下依赖项是列出的队列驱动程序所需的:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~3.0
- Redis:
predis/predis ~1.0
创建作业
生成作业类
默认情况下,应用程序的所有可排队作业都存储在 app/Jobs
目录中。如果 app/Jobs
目录不存在,当您运行 make:job
Artisan 命令时,它将被创建。您可以使用 Artisan CLI 生成一个新的排队作业:
php artisan make:job SendReminderEmail
生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue
接口,指示 Laravel 该作业应被推送到队列以异步运行。
类结构
作业类非常简单,通常只包含一个 handle
方法,该方法在作业被队列处理时调用。首先,让我们看一个示例作业类。在此示例中,我们假装管理一个播客发布服务,并需要在发布前处理上传的播客文件:
<?php
namespace App\Jobs;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的作业实例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行作业。
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的播客...
}
}
在此示例中,请注意我们能够将 Eloquent 模型 直接传递到排队作业的构造函数中。由于作业使用的 SerializesModels
trait,Eloquent 模型将在作业处理时被优雅地序列化和反序列化。如果您的排队作业在其构造函数中接受 Eloquent 模型,则只有模型的标识符会被序列化到队列上。当作业实际处理时,队列系统将自动从数据库中重新检索完整的模型实例。这对您的应用程序来说是完全透明的,并防止了序列化完整 Eloquent 模型实例时可能出现的问题。
handle
方法在作业被队列处理时调用。请注意,我们能够在作业的 handle
方法上进行类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。
二进制数据(如原始图像内容)应在传递给排队作业之前通过 base64_encode
函数传递。否则,作业在放置到队列时可能无法正确序列化为 JSON。
调度作业
编写作业类后,您可以使用 dispatch
辅助函数调度它。您只需将作业实例作为参数传递给 dispatch
辅助函数:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
dispatch(new ProcessPodcast($podcast));
}
}
dispatch
辅助函数提供了一个简短、全局可用的函数的便利性,同时也非常易于测试。查看 Laravel 测试文档 以了解更多信息。
延迟调度
如果您希望延迟执行排队作业,可以在作业实例上使用 delay
方法。delay
方法由 Illuminate\Bus\Queueable
trait 提供,该 trait 默认包含在所有生成的作业类中。例如,让我们指定一个作业在调度后 10 分钟内不可用进行处理:
<?php
namespace App\Http\Controllers;
use Carbon\Carbon;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
$job = (new ProcessPodcast($podcast))
->delay(Carbon::now()->addMinutes(10));
dispatch($job);
}
}
Amazon SQS 队列服务的最大延迟时间为 15 分钟。
自定义队列和连接
调度到特定队列
通过将作业推送到不同的队列,您可以“分类”排队的作业,甚至可以优先考虑分配给各种队列的工作者数量。请记住,这不会将作业推送到您的队列配置文件中定义的不同队列“连接”,而只是推送到单个连接中的特定队列。要指定队列,请在作业实例上使用 onQueue
方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
$job = (new ProcessPodcast($podcast))->onQueue('processing');
dispatch($job);
}
}
调度到特定连接
如果您正在处理多个队列连接,可以指定将作业推送到哪个连接。要指定连接,请在作业实例上使用 onConnection
方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
$job = (new ProcessPodcast($podcast))->onConnection('sqs');
dispatch($job);
}
}
当然,您可以链式调用 onConnection
和 onQueue
方法,以指定作业的连接和队列:
$job = (new ProcessPodcast($podcast))
->onConnection('sqs')
->onQueue('processing');
错误处理
如果在作业处理时抛出异常,作业将自动释放回队列,以便可以再次尝试。作业将继续释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由 queue:work
Artisan 命令中使用的 --tries
开关定义。有关运行队列工作者的更多信息,请参见下文。
运行队列工作者
Laravel 包含一个队列工作者,它将在作业推送到队列时处理新作业。您可以使用 queue:work
Artisan 命令运行工作者。请注意,一旦 queue:work
命令启动,它将继续运行,直到手动停止或关闭终端:
php artisan queue:work
要使 queue:work
进程永久在后台运行,您应使用进程监视器(如 Supervisor)以确保队列工作者不会停止运行。
请记住,队列工作者是长时间运行的进程,并将已启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请务必重新启动队列工作者。
指定连接和队列
您还可以指定工作者应使用哪个队列连接。传递给 work
命令的连接名称应对应于 config/queue.php
配置文件中定义的连接之一:
php artisan queue:work redis
您可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果您的所有电子邮件都在 redis
队列连接上的 emails
队列中处理,您可以发出以下命令以启动仅处理该队列的工作者:
php artisan queue:work redis --queue=emails
资源考虑
守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应在每个作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,完成后应使用 imagedestroy
释放内存。
队列优先级
有时您可能希望优先处理队列。例如,在 config/queue.php
中,您可以将 redis
连接的默认 queue
设置为 low
。然而,偶尔您可能希望将作业推送到 high
优先级队列,如下所示:
dispatch((new Job)->onQueue('high'));
要启动一个工作者,以确保所有 high
队列作业在继续处理 low
队列上的任何作业之前都已处理完毕,请将队列名称的逗号分隔列表传递给 work
命令:
php artisan queue:work --queue=high,low
队列工作者与部署
由于队列工作者是长时间运行的进程,它们在不重新启动的情况下不会拾取代码更改。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重新启动工作者。您可以通过发出 queue:restart
命令优雅地重新启动所有工作者:
php artisan queue:restart
此命令将指示所有队列工作者在完成当前作业后优雅地“死亡”,以确保没有现有作业丢失。由于在执行 queue:restart
命令时队列工作者将死亡,因此您应运行一个进程管理器(如 Supervisor)以自动重新启动队列工作者。
作业过期与超时
作业过期
在 config/queue.php
配置文件中,每个队列连接定义了一个 retry_after
选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after
的值设置为 90
,则如果作业已处理 90 秒而未被删除,它将被释放回队列。通常,您应将 retry_after
值设置为作业合理完成处理所需的最大秒数。
唯一不包含 retry_after
值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的默认可见性超时重试作业。
工作者超时
queue:work
Artisan 命令公开了一个 --timeout
选项。--timeout
选项指定 Laravel 队列主进程在终止处理作业的子队列工作者之前将等待的时间。有时,由于各种原因(如未响应的外部 HTTP 调用),子队列进程可能会“冻结”。--timeout
选项会删除超过指定时间限制的冻结进程:
php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
CLI 选项是不同的,但它们协同工作以确保作业不会丢失,并且作业仅成功处理一次。
--timeout
值应始终比 retry_after
配置值短几秒钟。这将确保在重试作业之前,处理给定作业的工作者始终被终止。如果 --timeout
选项长于 retry_after
配置值,您的作业可能会被处理两次。
工作者休眠时间
当队列上有作业可用时,工作者将继续处理作业而没有延迟。然而,sleep
选项决定了如果没有新作业可用,工作者将“休眠”的时间:
php artisan queue:work --sleep=3
Supervisor 配置
安装 Supervisor
Supervisor 是 Linux 操作系统的进程监视器,如果 queue:work
进程失败,它将自动重新启动。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:
sudo apt-get install supervisor
如果自己配置 Supervisor 听起来很复杂,可以考虑使用 Laravel Forge,它将自动为您的 Laravel 项目安装和配置 Supervisor。
配置 Supervisor
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d
目录中。在此目录中,您可以创建任意数量的配置文件,指示 Supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf
文件,启动并监视一个 queue:work
进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
在此示例中,numprocs
指令将指示 Supervisor 运行 8 个 queue:work
进程并监视所有进程,如果它们失败,将自动重新启动。当然,您应更改 command
指令中的 queue:work sqs
部分以反映您所需的队列连接。
启动 Supervisor
创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
有关 Supervisor 的更多信息,请查阅 Supervisor 文档。
处理失败的作业
有时您的排队作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定作业应尝试的最大次数。在作业超过此尝试次数后,它将被插入到 failed_jobs
数据库表中。要为 failed_jobs
表创建迁移,您可以使用 queue:failed-table
命令:
php artisan queue:failed-table
php artisan migrate
然后,在运行您的队列工作者时,您应使用 queue:work
命令上的 --tries
开关指定作业应尝试的最大次数。如果您未为 --tries
选项指定值,作业将无限期尝试:
php artisan queue:work redis --tries=3
清理失败的作业
您可以直接在作业类上定义一个 failed
方法,允许您在发生故障时执行作业特定的清理。这是向用户发送警报或撤销作业执行的任何操作的理想位置。导致作业失败的 Exception
将传递给 failed
方法:
<?php
namespace App\Jobs;
use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的作业实例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行作业。
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的播客...
}
/**
* 作业处理失败。
*
* @param Exception $exception
* @return void
*/
public function failed(Exception $exception)
{
// 发送用户失败通知等...
}
}
失败作业事件
如果您希望注册一个在作业失败时调用的事件,可以使用 Queue::failing
方法。此事件是通过电子邮件或 HipChat 通知您的团队的绝佳机会。例如,我们可以从 Laravel 附带的 AppServiceProvider
中附加一个回调到此事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
/**
* 启动任何应用程序服务。
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
/**
* 注册服务提供者。
*
* @return void
*/
public function register()
{
//
}
}
重试失败的作业
要查看已插入 failed_jobs
数据库表中的所有失败作业,您可以使用 queue:failed
Artisan 命令:
php artisan queue:failed
queue:failed
命令将列出作业 ID、连接、队列和失败时间。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 5
的失败作业,请发出以下命令:
php artisan queue:retry 5
要重试所有失败的作业,请执行 queue:retry
命令并传递 all
作为 ID:
php artisan queue:retry all
如果您希望删除失败的作业,可以使用 queue:forget
命令:
php artisan queue:forget 5
要删除所有失败的作业,可以使用 queue:flush
命令:
php artisan queue:flush
作业事件
使用 Queue
facade 上的 before
和 after
方法,您可以指定在排队作业处理之前或之后执行的回调。这些回调是执行额外日志记录或增加仪表板统计信息的绝佳机会。通常,您应从服务提供者中调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider
:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 启动任何应用程序服务。
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
/**
* 注册服务提供者。
*
* @return void
*/
public function register()
{
//
}
}
使用 Queue
facade 上的 looping
方法,您可以指定在工作者尝试从队列获取作业之前执行的回调。例如,您可以注册一个闭包以回滚任何由先前失败的作业留下的未完成事务:
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});