遥不可及|Swoft源码之Swoole和Swoft的分析( 二 )

任务执行思路很简单 , 将Worker进程发过来的数据解包还原成原来的调用参数 , 根据$name参数找到对应的TaskBean并调用其对应的task()方法 。 其中TaskBean使用类级别注解@Task(name="TaskName")或者@Task("TaskName")声明 。
值得一提的一点是 , @Task注解除了name属性 , 还有一个coroutine属性 , 上述代码会根据该参数选择使用协程的runCoTask()或者同步的runSyncTask()执行Task 。 但是由于而且由于Swoole的Task进程的执行是完全同步的 , 不支持协程 , 所以目前版本请该参数不要配置为true 。 同样的在TaskBean中编写的任务代码必须的同步阻塞的或者是要能根据环境自动将异步非阻塞和协程降级为同步阻塞的
从Process中投递任务前面我们提到:
Swoole的Task机制的本质是Worker进程将耗时任务投递给同步的Task进程(又名TaskWorker)处理 。
换句话说 , Swoole的$server->taskCo()或$server->task()都只能在Worker进程中使用 。
这个限制大大的限制了使用场景 。如何能够为了能够在Process中投递任务呢?Swoft为了绕过这个限制提供了Task::deliverByProcess()方法 。 其实现原理也很简单 , 通过Swoole的$server->sendMessage()方法将调用信息从Process中投递到Worker进程中 , 然后由Worker进程替其投递到Task进程当中 , 相关代码如下:
//Swoft\Task\Task.php/** * Deliver task by process * * @param string $taskName * @param string $methodName * @param array$params * @param string $type * @param int$timeout * @param int$workId * * @return bool */public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool{/* @var PipeMessageInterface $pipeMessage */$server= App::$server->getServer();$pipeMessage = App::getBean(PipeMessage::class);$data = http://kandian.youth.cn/index/['name'=> $taskName,'method'=> $methodName,'params'=> $params,'timeout' => $timeout,'type'=> $type,];$message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);return $server->sendMessage($message, $workId);}数据打包后使用$server->sendMessage()投递给Worker:
//Swoft\Bootstrap\Server\ServerTrait.php/** * onPipeMessage event callback * * @param \Swoole\Server $server * @param int$srcWorkerId * @param string$message * @return void * @throws \InvalidArgumentException */public function onPipeMessage(Server $server, int $srcWorkerId, string $message){/* @var PipeMessageInterface $pipeMessage */$pipeMessage = App::getBean(PipeMessage::class);list($type, $data) = $pipeMessage->unpack($message);App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);}$server->sendMessage后,Worker进程收到数据时会触发一个swoole.pipeMessage事件的回调 , Swoft会将其转换成自己的swoft.pipeMessage事件并触发.
//Swoft\Task\Event\Listeners\PipeMessageListener.php/** * The pipe message listener * * @Listener(event=AppEvent::PIPE_MESSAGE) */class PipeMessageListener implements EventHandlerInterface{/*** @param \Swoft\Event\EventInterface $event*/public function handle(EventInterface $event){$params = $event->getParams();if (count($params) < 3) {return;}list($type, $data, $srcWorkerId) = $params;if ($type != PipeMessage::MESSAGE_TYPE_TASK) {return;}$type= $data['type'];$taskName= $data['name'];$params= $data['params'];$timeout= $data['timeout'];$methodName = $data['method'];// delever taskTask::deliver($taskName, $methodName, $params, $type, $timeout);}}


推荐阅读