Skip to content

系统设计:阶梯降频轮询 - 状态同步的智能策略

1667字约6分钟

LaravelPHP系统设计

2024-10-07

例如有这样的场景,某电商系统的订单支付状态需要和上游的支付系统保持实时同步,需要每秒检查一次订单状态,如果订单状态为未支付,则继续检查,直到订单已完成。

然而针对每一笔订单,每秒检查一次会对服务器和上游API造成很大的负载。

如果我们能动态调整检查频率,比如前五次尝试中每1秒检查一次订单状态, 然后在接下来的五次尝试中每5秒检查一次,依此类推。这样既能减少服务器和外部API的负载,又能确保及时更新

我们给他起个名字叫做阶梯降频轮询

真实的例子比如对接支付宝时,支付成功的异步通知以逐渐减低请求频次的的方式通知业务方:

支付宝文档示例 在进行异步通知交互时,如果支付宝收到的应答不是 success ,支付宝会认为通知失败,会通过一定的策略定期重新发起通知。通知的间隔频率为:4m、10m、10m、1h、2h、6h、15h。

本教程将带您一步步实现秒级别的阶梯降频轮询出队列方案。

实际效果:

logo

logo

准备工作

创建一个空的 Laravel 11 项目,接下来我们来实现每间隔几秒钟请求上游API同步订单状态,同时确保控制好频率,不要发起太多请求,也不要在短时间内发起重复请求。

设置队列任务

创建基础任务类

首先,我们需要一个基础任务类来实现轮询逻辑。下面是一个 BasePollingJob 类的示例,它实现了基本的轮询功能:

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\MaxAttemptsExceededException;
use Illuminate\Support\Facades\Log;

abstract class BasePollingJob implements ShouldQueue, ShouldBeUnique
{
    use Queueable;

    protected $jobDesc = '任务轮询队列';
    protected $jobPayload;
    const TIMEOUT_SECOND = 60 * 3;

    public function retryUntil()
    {
        return now()->addSeconds(self::TIMEOUT_SECOND);
    }

在这个类中,我们定义了任务的描述、有效负载和重试的超时时间。retryUntil 方法用于指定任务的最大重试时间,如果超过该时间队列任务尚未完成,laravel-queue会抛出一个异常MaxAttemptsExceededException表示已达最大重试次数,业务上可以在fail()方法里标记处理超时。

调度下次运行

接下来,我们实现一个 scheduleNextRunning 方法,根据尝试次数调节下次执行的时间间隔:

    protected function scheduleNextRunning()
    {
        $attempts = $this->job->attempts();
        if ($attempts <= 5) {
            $this->release(1); // 前5次,每隔1秒执行一次
        } elseif ($attempts <= 10) {
            $this->release(5); // 接下来5次,每隔5秒执行一次
        } elseif ($attempts <= 20) {
            $this->release(10); // 接下来10次,每隔10秒执行一次
        } else {
            $this->release(30); // 超过20次后,每隔30秒执行一次
        }
    }

这个方法根据当前的尝试次数来决定下次任务的执行间隔,确保在初始阶段短时间间隔检查,然后逐渐减少频率。

处理任务失败

在任务失败时或者处理超时,我们需要记录错误并执行相应的逻辑:

    public function failed(\Throwable $exception)
    {
        if ($exception instanceof MaxAttemptsExceededException) {
            Log::info($this->jobDesc . '超时退出执行', [
                'payload' => $this->jobPayload,
                'error' => $exception->getMessage(),
            ]);
            $this->afterMaxAttemptsExceeded();
        } else {
            Log::error($this->jobDesc . '异常失败', [
                'payload' => $this->jobPayload,
                'error' => $exception->getMessage(),
                'file' => $exception->getFile(),
                'line' => $exception->getLine(),
                'trace' => $exception->getTrace()
            ]);
        }
    }

在这里,我们可以记录错误信息,并根据需要进行后续处理。

实现轮询任务

接下来,我们创建一个具体的轮询任务类 PollingOrderStatusJob,用于检查订单状态:

namespace App\Jobs;

use App\Enums\OrderStatus;
use App\Models\Order;
use Illuminate\Support\Facades\Log;

class PollingOrderStatusJob extends BasePollingJob
{
    protected $jobDesc = '订单同步队列';
    private Order $order;

    public function __construct(Order $order)
    {
        $this->order = $order;
        $this->jobPayload = $order->toArray();
    }

在这个类中,我们定义了任务的描述和订单对象。在构造函数中,我们将订单信息存储为有效负载,以便后续使用。

为了确保同一个订单号不会重复入队列,我们需设置订单号为唯一锁获取的依据。

	// 使用订单号来获取唯一锁
    public function uniqueId()
    {
        return $this->order->trade_no;
    }

处理任务逻辑

handle 方法中,我们实现具体的任务逻辑:

    public function handle()
    {
        try {
            Log::info($this->jobDesc . '开始执行', [$this->order->id, $this->order->trade_no]);

            // 前置检测:订单状态如果已是终态,无需操作,退出队列
            if (!$this->checkOrderBeforePolling($this->order)) {
                $this->delete();
                return;
            }

            // 查询订单状态:比如发起一个HTTP请求,获取最新状态
            if (!$this->orderQuery($this->order)) {
                $this->delete();
                return;
            }

            // 调度下次运行的时机
            $this->scheduleNextRunning();
        } catch (\Throwable $e) {
            $this->fail($e);
        }
    }

在这个方法中,我们首先记录任务开始的日志。接着,调用 checkOrderBeforePolling 方法检查订单是否需要继续轮询。如果不需要,则删除任务并返回。然后,我们调用 orderQuery 方法查询订单状态,最后调度下次运行。

前置检测

checkOrderBeforePolling 方法中,如果订单状态已是终态,比如已超时或已完成,就不需要同步了。防止因上层业务异常,这里可以做拦截。

    private function checkOrderBeforePolling(Order $order): bool
    {
        $orderStatus = OrderStatus::from($order->status);
        if (!in_array($orderStatus, [OrderStatus::DEFAULT, OrderStatus::PAYING])) {
            return false;
        }

        return true;
    }

查询订单状态

orderQuery 方法模拟查询订单状态的过程:

    private function orderQuery(Order $order)
    {
        // 模拟订单查询请求
        sleep(1);

        // 支付成功返回
//        $result = [
//            'amount' => 100,
//            'payment_no' => 'p123456',
//            'status' => 'SUCCESS'
//        ];

        // 正在支付中返回
        $result = [
            'amount' => null,
            'payment_no' => null,
            'status' => 'PENDING'
        ];

       return $result['status'] == 'PENDING';
    }

这里我们简单模拟了一个查询请求,实际应用中可能会调用外部 API。

处理超时

    public function afterMaxAttemptsExceeded()
    {
        try {
            $this->order->status = OrderStatus::TIMEOUT->value;
            $this->order->save();
            Log::info($this->jobDesc . '-更新订单状态为超时', [$this->order->trade_no]);
        } catch (\Throwable $e) {
            Log::error($this->jobDesc . '-超时后置逻辑执行异常', [$this->order, $e->getMessage()]);
        }
    }

最后

完整代码包括日志记录、数据库初始化、手把手教程,开箱即用。已开源在这里:Github Gitee

来自亿级别流水项目实战经验总结,开源不易,还请各位看官点个爱心/Star,您的支持是我前进的动力~