系统设计:阶梯降频轮询 - 状态同步的智能策略
例如有这样的场景,某电商系统的订单支付状态需要和上游的支付系统保持实时同步,需要每秒检查一次订单状态,如果订单状态为未支付,则继续检查,直到订单已完成。
然而针对每一笔订单,每秒检查一次会对服务器和上游API造成很大的负载。
如果我们能动态调整检查频率,比如前五次尝试中每1秒检查一次订单状态, 然后在接下来的五次尝试中每5秒检查一次,依此类推。这样既能减少服务器和外部API的负载,又能确保及时更新。
我们给他起个名字叫做阶梯降频轮询。
真实的例子比如对接支付宝时,支付成功的异步通知以逐渐减低请求频次的的方式通知业务方:
支付宝文档示例 在进行异步通知交互时,如果支付宝收到的应答不是 success ,支付宝会认为通知失败,会通过一定的策略定期重新发起通知。通知的间隔频率为:4m、10m、10m、1h、2h、6h、15h。
本教程将带您一步步实现秒级别的阶梯降频轮询出队列方案。
实际效果:
准备工作
创建一个空的 Laravel 11 项目,接下来我们来实现每间隔几秒钟请求上游API同步订单状态,同时确保控制好频率,不要发起太多请求,也不要在短时间内发起重复请求。
设置队列任务
创建基础任务类
首先,我们需要一个基础任务类来实现轮询逻辑。下面是一个 BasePollingJob
类的示例,它实现了基本的轮询功能:
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\MaxAttemptsExceededException;
use Illuminate\Support\Facades\Log;
abstract class BasePollingJob implements ShouldQueue, ShouldBeUnique
{
use Queueable;
protected $jobDesc = '任务轮询队列';
protected $jobPayload;
const TIMEOUT_SECOND = 60 * 3;
public function retryUntil()
{
return now()->addSeconds(self::TIMEOUT_SECOND);
}
在这个类中,我们定义了任务的描述、有效负载和重试的超时时间。retryUntil
方法用于指定任务的最大重试时间,如果超过该时间队列任务尚未完成,laravel-queue会抛出一个异常MaxAttemptsExceededException
表示已达最大重试次数,业务上可以在fail()
方法里标记处理超时。
调度下次运行
接下来,我们实现一个 scheduleNextRunning
方法,根据尝试次数调节下次执行的时间间隔:
protected function scheduleNextRunning()
{
$attempts = $this->job->attempts();
if ($attempts <= 5) {
$this->release(1); // 前5次,每隔1秒执行一次
} elseif ($attempts <= 10) {
$this->release(5); // 接下来5次,每隔5秒执行一次
} elseif ($attempts <= 20) {
$this->release(10); // 接下来10次,每隔10秒执行一次
} else {
$this->release(30); // 超过20次后,每隔30秒执行一次
}
}
这个方法根据当前的尝试次数来决定下次任务的执行间隔,确保在初始阶段短时间间隔检查,然后逐渐减少频率。
处理任务失败
在任务失败时或者处理超时,我们需要记录错误并执行相应的逻辑:
public function failed(\Throwable $exception)
{
if ($exception instanceof MaxAttemptsExceededException) {
Log::info($this->jobDesc . '超时退出执行', [
'payload' => $this->jobPayload,
'error' => $exception->getMessage(),
]);
$this->afterMaxAttemptsExceeded();
} else {
Log::error($this->jobDesc . '异常失败', [
'payload' => $this->jobPayload,
'error' => $exception->getMessage(),
'file' => $exception->getFile(),
'line' => $exception->getLine(),
'trace' => $exception->getTrace()
]);
}
}
在这里,我们可以记录错误信息,并根据需要进行后续处理。
实现轮询任务
接下来,我们创建一个具体的轮询任务类 PollingOrderStatusJob
,用于检查订单状态:
namespace App\Jobs;
use App\Enums\OrderStatus;
use App\Models\Order;
use Illuminate\Support\Facades\Log;
class PollingOrderStatusJob extends BasePollingJob
{
protected $jobDesc = '订单同步队列';
private Order $order;
public function __construct(Order $order)
{
$this->order = $order;
$this->jobPayload = $order->toArray();
}
在这个类中,我们定义了任务的描述和订单对象。在构造函数中,我们将订单信息存储为有效负载,以便后续使用。
为了确保同一个订单号不会重复入队列,我们需设置订单号为唯一锁获取的依据。
// 使用订单号来获取唯一锁
public function uniqueId()
{
return $this->order->trade_no;
}
处理任务逻辑
在 handle
方法中,我们实现具体的任务逻辑:
public function handle()
{
try {
Log::info($this->jobDesc . '开始执行', [$this->order->id, $this->order->trade_no]);
// 前置检测:订单状态如果已是终态,无需操作,退出队列
if (!$this->checkOrderBeforePolling($this->order)) {
$this->delete();
return;
}
// 查询订单状态:比如发起一个HTTP请求,获取最新状态
if (!$this->orderQuery($this->order)) {
$this->delete();
return;
}
// 调度下次运行的时机
$this->scheduleNextRunning();
} catch (\Throwable $e) {
$this->fail($e);
}
}
在这个方法中,我们首先记录任务开始的日志。接着,调用 checkOrderBeforePolling
方法检查订单是否需要继续轮询。如果不需要,则删除任务并返回。然后,我们调用 orderQuery
方法查询订单状态,最后调度下次运行。
前置检测
在 checkOrderBeforePolling
方法中,如果订单状态已是终态,比如已超时或已完成,就不需要同步了。防止因上层业务异常,这里可以做拦截。
private function checkOrderBeforePolling(Order $order): bool
{
$orderStatus = OrderStatus::from($order->status);
if (!in_array($orderStatus, [OrderStatus::DEFAULT, OrderStatus::PAYING])) {
return false;
}
return true;
}
查询订单状态
orderQuery
方法模拟查询订单状态的过程:
private function orderQuery(Order $order)
{
// 模拟订单查询请求
sleep(1);
// 支付成功返回
// $result = [
// 'amount' => 100,
// 'payment_no' => 'p123456',
// 'status' => 'SUCCESS'
// ];
// 正在支付中返回
$result = [
'amount' => null,
'payment_no' => null,
'status' => 'PENDING'
];
return $result['status'] == 'PENDING';
}
这里我们简单模拟了一个查询请求,实际应用中可能会调用外部 API。
处理超时
public function afterMaxAttemptsExceeded()
{
try {
$this->order->status = OrderStatus::TIMEOUT->value;
$this->order->save();
Log::info($this->jobDesc . '-更新订单状态为超时', [$this->order->trade_no]);
} catch (\Throwable $e) {
Log::error($this->jobDesc . '-超时后置逻辑执行异常', [$this->order, $e->getMessage()]);
}
}
最后
完整代码包括日志记录、数据库初始化、手把手教程,开箱即用。已开源在这里:Github Gitee
来自亿级别流水项目实战经验总结,开源不易,还请各位看官点个爱心/Star,您的支持是我前进的动力~