遥不可及|Swoft源码之Swoole和Swoft的分析( 四 )

此处为何要使用Swoole的内存Table?Swoft的的定时任务管理是分别由 任务计划进程 和 任务执行进程 进程负责的 。 两个进程的运行共同管理定时任务 , 如果使用进程间独立的array()等结构 , 两个进程必然需要频繁的进程间通信 。 而使用跨进程的Table(本文的Table,除非特别说明 , 都指Swoole的Swoole\Table结构)直接进行进程间数据共享 , 不仅性能高 , 操作简单 还解耦了两个进程 。
为了Table能够在两个进程间共同使用,Table必须在Swoole Server启动前创建并分配内存 。 具体代码在Swoft\Task\Bootstrap\Listeners->onBeforeStart()中 , 比较简单 , 有兴趣的可以自行阅读 。
背景介绍完了 , 我们来看看这两个定时任务进程的行为
//Swoft\Task\Bootstrap\Process\CronTimerProcess.php/** * Crontab timer process * * @Process(name="cronTimer", boot=true) */class CronTimerProcess implements ProcessInterface{/*** @param \Swoft\Process\Process $process*/public function run(SwoftProcess $process){//code..../* @var \Swoft\Task\Crontab\Crontab $cron*/$cron = App::getBean('crontab');// Swoole/HttpServer$server = App::$server->getServer();$time = (60 - date('s')) * 1000;$server->after($time, function () use ($server, $cron) {// Every minute check all tasks, and prepare the tasks that next execution point needs$cron->checkTask();$server->tick(60 * 1000, function () use ($cron) {$cron->checkTask();});});}}
//Swoft\Task\Crontab\Crontab.php/** * 初始化runTimeTable数据 * * @param array $task任务 * @param array $parseResult 解析crontab命令规则结果 , 即Task需要在当前分钟内的哪些秒执行 * @return bool */private function initRunTimeTableData(array $task, array $parseResult): bool{$runTimeTableTasks = $this->getRunTimeTable()->table;$min = date('YmdHi');$sec = strtotime(date('Y-m-d H:i'));foreach ($parseResult as $time) {$this->checkTaskQueue(false);$key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);$runTimeTableTasks->set($key, ['taskClass'=> $task['taskClass'],'taskMethod' => $task['taskMethod'],'minute'=> $min,'sec'=> $time + $sec,'runStatus'=> self::NORMAL]);}return true;}CronTimerProcess是Swoft的定时任务调度进程 , 其核心方法是Crontab->initRunTimeTableData() 。
该进程使用了Swoole的定时器功能 , 通过Swoole\Timer在每分钟首秒时执行的回调 , CronTimerProcess每次被唤醒后都会遍历任务表计算出当前这一分钟内的60秒分别需要执行的任务清单 , 写入执行表并标记为 未执行 。
//Swoft\Task\Bootstrap\Process/** * Crontab process * * @Process(name="cronExec", boot=true) */class CronExecProcess implements ProcessInterface{/*** @param \Swoft\Process\Process $process*/public function run(SwoftProcess $process){$pname = App::$server->getPname();$process->name(sprintf('%s cronexec process', $pname));/** @var \Swoft\Task\Crontab\Crontab $cron */$cron = App::getBean('crontab');// Swoole/HttpServer$server = App::$server->getServer();$server->tick(0.5 * 1000, function () use ($cron) {$tasks = $cron->getExecTasks();if (!empty($tasks)) {foreach ($tasks as $task) {// Diliver taskTask::deliverByProcess($task['taskClass'], $task['taskMethod']);$cron->finishTask($task['key']);}}});}}


推荐阅读