config = $config; $this->pidInfoFile = $pidInfoFile; $this->sleep = $this->config['sleep'] ?: $this->sleep; $this->logger = Factory::logs($this->config['logPath'] ?:'', $this->config['logSaveFileApp'] ?:'', $this->config['app_name'] ?:''); } public function run($topic='') { if ($topic) { $this->queue = MQueue::getQueue($this->config['job']['queue'], $this->logger); if (empty($this->queue)) { sleep($this->sleep); return; } $this->queue->setTopics($this->config['job']['topics'] ?: []); $len = $this->queue->len($topic); $this->logger->log($topic . ' pop len: ' . $len, 'info'); if ($len > 0) { //循环拿出队列消息 while ($data = $this->queue->pop($topic)) { //主进程状态不是running状态,退出循环 if (MProcess::STATUS_RUNNING != $this->getMasterData('status')) { break; } $this->logger->log('pop data: ' . print_r($data, true), 'info'); if (!empty($data) && is_object($data)) { $beginTime=microtime(true); // 根据自己的业务需求改写此方法 $jobObject = $this->loadObject($data); $baseAction = $this->loadFrameworkAction(); $baseAction->start($jobObject); $endTime=microtime(true); $this->logger->log('job id ' . $jobObject->uuid . ' done, spend time: ' . ($endTime - $beginTime), 'info'); unset($jobObject, $baseAction); } else { $this->logger->log('pop error data: ' . print_r($data, true), 'error'); } // if ($this->queue->len($topic) <= 0) { // break; // } } } else { $this->logger->log($topic . ' no work to do!', 'info'); sleep($this->sleep); //$this->logger->log('sleep ' . $this->sleep . ' second!', 'info'); } $this->queue->close(); } else { $this->logger->log('All topic no work to do!', 'info'); } } /** * 获取主进程状态 * @param string $key * @return mixed|null */ private function getMasterData($key='') { $data=unserialize(file_get_contents($this->pidInfoFile)); if ($key) { return $data[$key] ?: null; } return $data; } //实例化job对象 private function loadObject($data) { if (is_object($data)) { return $data; } return fasle; } //根据配置装入不同的框架 private function loadFrameworkAction() { $classFramework=$this->config['framework']['class'] ?: 'Jobs\Controller\CJobs'; try { $action = new $classFramework($this->config); } catch (\Throwable $e) { Utils::catchError($this->logger, $e); } catch (\Exception $e) { Utils::catchError($this->logger, $e); } return $action; } }