123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- <?php
- namespace Jobs\Model;
- use Mall\Framework\Factory;
- use Jobs\Model\MQueue;
- use Jobs\Model\MProcess;
- class MJobs
- {
- public $logger = null;
- public $queue = null;
- public $sleep = 2; //单个topic如果没有任务,该进程暂停秒数,不能低于1秒,数值太小无用进程会频繁拉起
- public $config = [];
- private $pidInfoFile = ''; // 主进程pid信息文件路径
- public function __construct($pidInfoFile, $config)
- {
- $this->config = $config;
- $this->pidInfoFile = $pidInfoFile;
- $this->sleep = $this->config['sleep'] ?: $this->sleep;
- $this->logger = Factory::logs($this->config['logPath'] ?:'', $this->config['logSaveFileApp'] ?:'', $this->config['app_name'] ?:'');
- }
- public function run($topic='')
- {
- if ($topic) {
- $this->queue = MQueue::getQueue($this->config['job']['queue'], $this->logger);
- if (empty($this->queue)) {
- sleep($this->sleep);
- return;
- }
- $this->queue->setTopics($this->config['job']['topics'] ?: []);
- $len = $this->queue->len($topic);
- $this->logger->log($topic . ' pop len: ' . $len, 'info');
- if ($len > 0) {
- //循环拿出队列消息
- while ($data = $this->queue->pop($topic)) {
- //主进程状态不是running状态,退出循环
- if (MProcess::STATUS_RUNNING != $this->getMasterData('status')) {
- break;
- }
- $this->logger->log('pop data: ' . print_r($data, true), 'info');
- if (!empty($data) && is_object($data)) {
- $beginTime=microtime(true);
- // 根据自己的业务需求改写此方法
- $jobObject = $this->loadObject($data);
- $baseAction = $this->loadFrameworkAction();
- $baseAction->start($jobObject);
- $endTime=microtime(true);
- $this->logger->log('job id ' . $jobObject->uuid . ' done, spend time: ' . ($endTime - $beginTime), 'info');
- unset($jobObject, $baseAction);
- } else {
- $this->logger->log('pop error data: ' . print_r($data, true), 'error');
- }
- // if ($this->queue->len($topic) <= 0) {
- // break;
- // }
- }
- } else {
- $this->logger->log($topic . ' no work to do!', 'info');
- sleep($this->sleep);
- //$this->logger->log('sleep ' . $this->sleep . ' second!', 'info');
- }
- $this->queue->close();
- } else {
- $this->logger->log('All topic no work to do!', 'info');
- }
- }
- /**
- * 获取主进程状态
- * @param string $key
- * @return mixed|null
- */
- private function getMasterData($key='')
- {
- $data=unserialize(file_get_contents($this->pidInfoFile));
- if ($key) {
- return $data[$key] ?: null;
- }
- return $data;
- }
- //实例化job对象
- private function loadObject($data)
- {
- if (is_object($data)) {
- return $data;
- }
- return fasle;
- }
- //根据配置装入不同的框架
- private function loadFrameworkAction()
- {
- $classFramework=$this->config['framework']['class'] ?: 'Jobs\Controller\CJobs';
- try {
- $action = new $classFramework($this->config);
- } catch (\Throwable $e) {
- Utils::catchError($this->logger, $e);
- } catch (\Exception $e) {
- Utils::catchError($this->logger, $e);
- }
- return $action;
- }
- }
|