MJobs.Class.php 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. <?php
  2. namespace Jobs\Model;
  3. use Mall\Framework\Factory;
  4. use Jobs\Model\MQueue;
  5. use Jobs\Model\MProcess;
  6. class MJobs
  7. {
  8. public $logger = null;
  9. public $queue = null;
  10. public $sleep = 2; //单个topic如果没有任务,该进程暂停秒数,不能低于1秒,数值太小无用进程会频繁拉起
  11. public $config = [];
  12. private $pidInfoFile = ''; // 主进程pid信息文件路径
  13. public function __construct($pidInfoFile, $config)
  14. {
  15. $this->config = $config;
  16. $this->pidInfoFile = $pidInfoFile;
  17. $this->sleep = $this->config['sleep'] ?: $this->sleep;
  18. $this->logger = Factory::logs($this->config['logPath'] ?:'', $this->config['logSaveFileApp'] ?:'', $this->config['app_name'] ?:'');
  19. }
  20. public function run($topic='')
  21. {
  22. if ($topic) {
  23. $this->queue = MQueue::getQueue($this->config['job']['queue'], $this->logger);
  24. if (empty($this->queue)) {
  25. sleep($this->sleep);
  26. return;
  27. }
  28. $this->queue->setTopics($this->config['job']['topics'] ?: []);
  29. $len = $this->queue->len($topic);
  30. $this->logger->log($topic . ' pop len: ' . $len, 'info');
  31. if ($len > 0) {
  32. //循环拿出队列消息
  33. while ($data = $this->queue->pop($topic)) {
  34. //主进程状态不是running状态,退出循环
  35. if (MProcess::STATUS_RUNNING != $this->getMasterData('status')) {
  36. break;
  37. }
  38. $this->logger->log('pop data: ' . print_r($data, true), 'info');
  39. if (!empty($data) && is_object($data)) {
  40. $beginTime=microtime(true);
  41. // 根据自己的业务需求改写此方法
  42. $jobObject = $this->loadObject($data);
  43. $baseAction = $this->loadFrameworkAction();
  44. $baseAction->start($jobObject);
  45. $endTime=microtime(true);
  46. $this->logger->log('job id ' . $jobObject->uuid . ' done, spend time: ' . ($endTime - $beginTime), 'info');
  47. unset($jobObject, $baseAction);
  48. } else {
  49. $this->logger->log('pop error data: ' . print_r($data, true), 'error');
  50. }
  51. // if ($this->queue->len($topic) <= 0) {
  52. // break;
  53. // }
  54. }
  55. } else {
  56. $this->logger->log($topic . ' no work to do!', 'info');
  57. sleep($this->sleep);
  58. //$this->logger->log('sleep ' . $this->sleep . ' second!', 'info');
  59. }
  60. $this->queue->close();
  61. } else {
  62. $this->logger->log('All topic no work to do!', 'info');
  63. }
  64. }
  65. /**
  66. * 获取主进程状态
  67. * @param string $key
  68. * @return mixed|null
  69. */
  70. private function getMasterData($key='')
  71. {
  72. $data=unserialize(file_get_contents($this->pidInfoFile));
  73. if ($key) {
  74. return $data[$key] ?: null;
  75. }
  76. return $data;
  77. }
  78. //实例化job对象
  79. private function loadObject($data)
  80. {
  81. if (is_object($data)) {
  82. return $data;
  83. }
  84. return fasle;
  85. }
  86. //根据配置装入不同的框架
  87. private function loadFrameworkAction()
  88. {
  89. $classFramework=$this->config['framework']['class'] ?: 'Jobs\Controller\CJobs';
  90. try {
  91. $action = new $classFramework($this->config);
  92. } catch (\Throwable $e) {
  93. Utils::catchError($this->logger, $e);
  94. } catch (\Exception $e) {
  95. Utils::catchError($this->logger, $e);
  96. }
  97. return $action;
  98. }
  99. }