遥不可及|Swoft源码之Swoole和Swoft的分析
这篇文章给大家分享的内容是关于Swoft 源码剖析之Swoole和Swoft的一些介绍(Task投递/定时任务篇) , 有一定的参考价值 , 有需要的朋友可以参考一下 。
喜欢我的文章就关注我吧 , 持续更新中!前言Swoft的任务功能基于Swoole的Task机制 , 或者说Swoft的Task机制本质就是对Swoole的Task机制的封装和加强 。
说到做到 , 150粉丝福利安排PHP进阶架构资料 , 自己在工作中准备的 , 需要火速来任务投递//Swoft\Task\Task.phpclass Task{/*** Deliver coroutine or async task** @param string $taskName* @param string $methodName* @param array$params* @param string $type* @param int$timeout** @return bool|array* @throws TaskException*/public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3){$data= http://kandian.youth.cn/index/TaskHelper::pack($taskName, $methodName, $params, $type);if(!App::isWorkerStatus() //见下文Command章节}if(!App::isWorkerStatus()}$server = App::$server->getServer();// Delier coroutine taskif ($type == self::TYPE_CO) {$tasks[0]= $data;$prifleKey = 'task' . '.' . $taskName . '.' . $methodName;App::profileStart($prifleKey);$result = $server->taskCo($tasks, $timeout);App::profileEnd($prifleKey);return $result;}// Deliver async taskreturn $server->task($data);}}
任务投递Task::deliver()将调用参数打包后根据$type参数通过Swoole的$server->taskCo()或$server->task()接口投递到Task进程 。 Task本身始终是同步执行的 , $type仅仅影响投递这一操作的行为 , Task::TYPE_ASYNC对应的$server->task()是异步投递 , Task::deliver()调用后马上返回;Task::TYPE_CO对应的$server->taskCo()是协程投递 , 投递后让出协程控制 , 任务完成或执行超时后Task::deliver()才从协程返回 。
任务执行//Swoft\Task\Bootstrap\Listeners\TaskEventListener /** * The listener of swoole task * @SwooleListener({ *SwooleEvent::ON_TASK, *SwooleEvent::ON_FINISH, * }) */class TaskEventListener implements TaskInterface, FinishInterface{/*** @param \Swoole\Server $server* @param int$taskId* @param int$workerId* @param mixed$data* @return mixed* @throws \InvalidArgumentException*/public function onTask(Server $server, int $taskId, int $workerId, $data){try {/* @var TaskExecutor $taskExecutor*/$taskExecutor = App::getBean(TaskExecutor::class);$result = $taskExecutor->run($data);} catch (\Throwable $throwable) {App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));$result = false;// Release system resourcesApp::trigger(AppEvent::RESOURCE_RELEASE);App::trigger(TaskEvent::AFTER_TASK);}return $result;}}
此处是swoole.onTask的事件回调 , 其职责仅仅是将将Worker进程投递来的打包后的数据转发给TaskExecutor 。
Swoole的Task机制的本质是Worker进程将耗时任务投递给同步的Task进程(又名TaskWorker)处理,所以swoole.onTask的事件回调是在Task进程中执行的 。 上文说过,Worker进程是你大部分HTTP服务代码执行的环境 , 但是从TaskEventListener.onTask()方法开始 , 代码的执行环境都是Task进程 , 也就是说 , TaskExecutor和具体的TaskBean都是执行在Task进程中的 。
//Swoft\Task\TaskExecutor/** * The task executor * * @Bean() */class TaskExecutor{/*** @param string $data* @return mixed*/public function run(string $data){$data = http://kandian.youth.cn/index/TaskHelper::unpack($data);$name= $data['name'];$type= $data['type'];$method = $data['method'];$params = $data['params'];$logid= $data['logid'] ?? uniqid('', true);$spanid = $data['spanid'] ?? 0;$collector = TaskCollector::getCollector();if (!isset($collector['task'][$name])) {return false;}list(, $coroutine) = $collector['task'][$name];$task = App::getBean($name);if ($coroutine) {$result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);} else {$result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);}return $result;}}
推荐阅读
- 遥不可及|DataForce组件技术及功能:数据批量和实时同步
- 遥不可及|100万美元!辽宁副省长卢柯拿下科学大奖 埋头多年攻坚新型纳米结构研究
- 巅峰战队|ConcurrentHashMap的部分源码分析
- 遥不可及|联发科取消为华为定制5nm芯片计划,希望9月15号后继续供货
- 猪奇游戏菌|王者荣耀:职业真的遥不可及?老帅和末将的成功,告诉你并不绝对!
- 遥不可及|两年前的华为神机!2020年的华为P20Pro一切都还好吗?
- 遥不可及|英伟达显卡发展简史(一)
- 塞巴斯蒂安·维特尔|每站都在创造“新历史”!36年来首次!前10对跃马来说已是遥不可及的梦
- 遥不可及|与他人进行远程控制电脑能看见啥
- 遥不可及|8月最流畅安卓机型排行,小米一跃成王,诺基亚也上榜