123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441 |
- <?php
- namespace Jobs\Model;
- use Mall\Framework\Factory;
- use Jobs\Model\MJobs;
- class MProcess
- {
- const CHILD_PROCESS_CAN_RESTART ='staticWorker';
- const CHILD_PROCESS_CAN_NOT_RESTART ='dynamicWorker';
- const STATUS_RUNNING ='runnning';
- const STATUS_WAIT ='wait';
- const STATUS_STOP ='stop';
- const APP_NAME ='jobs';
- const STATUS_HSET_KEY_HASH ='status';
- public $processName = ':swooleProcessTopicQueueJob';
- public $workers = [];
- private $version = '2.5';
- private $excuteTime =3600;
- private $queueMaxNum =10;
- private $queueTickTimer =1000 * 10;
- private $messageTickTimer =1000 * 180;
- private $message =[];
- private $workerNum =0;
- private $dynamicWorkerNum =[];
- private $workersInfo =[];
- private $ppid;
- private $config = [];
- private $pidFile = 'master.pid';
- private $pidInfoFile = 'master.info';
- private $pidStatusFile = 'status.info';
- private $status = '';
- private $logger = null;
- private $queue = null;
- private $topics = null;
- private $beginTime = '';
- private $logSaveFileWorker = 'workers.log';
- public function __construct($config)
- {
- $this->config = $config;
- $this->logger = Factory::logs($this->config['logPath'] ?:'', $this->config['logSaveFileApp'] ?:'', $this->config['app_name'] ?:'');
- $this->topics = $this->config['job']['topics'] ?: [];
- $this->processName = $this->config['processName'] ?: $this->processName;
- $this->excuteTime = $this->config['excuteTime'] ?: $this->excuteTime;
- $this->queueMaxNum = $this->config['queueMaxNum'] ?: $this->queueMaxNum;
- $this->queueTickTimer = $this->config['queueTickTimer'] ?: $this->queueTickTimer;
- $this->messageTickTimer = $this->config['messageTickTimer'] ?: $this->messageTickTimer;
- $this->logSaveFileWorker = $this->config['logSaveFileWorker'] ?: $this->logSaveFileWorker;
- $this->beginTime=time();
-
- $this->status=self::STATUS_RUNNING;
- if (isset($this->config['pidPath']) && !empty($this->config['pidPath'])) {
- if (!is_dir($this->config['pidPath'])) {
- mkdir($this->config['pidPath'], '0777', true);
- }
- $this->pidFile =$this->config['pidPath'] . '/' . $this->pidFile;
- $this->pidInfoFile =$this->config['pidPath'] . '/' . $this->pidInfoFile;
- $this->pidStatusFile=$this->config['pidPath'] . '/' . $this->pidStatusFile;
- } else {
- die('config pidPath must be set!' . PHP_EOL);
- }
-
- if (file_exists($this->pidFile)) {
- $pid=$this->getMasterData('pid');
-
- if ($pid && @\Swoole\Process::kill($pid, 0)) {
- die('已有进程运行中,请先结束或重启' . PHP_EOL);
- }
- }
-
- \Swoole\Process::daemon();
- $this->ppid = getmypid();
- $data['pid'] =$this->ppid;
- $data['status']=$this->status;
- $this->saveMasterData($data);
- $this->setProcessName(self::APP_NAME . ' master ' . $this->ppid . $this->processName);
-
- swoole_async_set(['enable_coroutine' => false]);
- }
-
- private function setProcessName($name)
- {
- $updateName = 0;
-
- if (function_exists('cli_set_process_title') && PHP_OS != 'Darwin'){
- if(cli_set_process_title($name)){
- $updateName = 1;
- }
- }else if (function_exists('swoole_set_process_name') && PHP_OS != 'Darwin' && !$updateName) {
- swoole_set_process_name($name);
- }
- }
-
- private function saveMasterData($data=[])
- {
- isset($data['pid']) && file_put_contents($this->pidFile, $data['pid']);
- file_put_contents($this->pidInfoFile, serialize($data));
- }
-
- private function getMasterData($key='')
- {
- $data=unserialize(file_get_contents($this->pidInfoFile));
- if ($key) {
- return $data[$key] ?: null;
- }
- return $data;
- }
-
- public function start()
- {
- $topics = $this->topics;
- $this->logger->log('topics: ' . json_encode($topics));
- if ($topics) {
-
- foreach ((array) $topics as $topic) {
- if (isset($topic['workerMinNum']) && isset($topic['name'])) {
-
- for ($i = 0; $i < $topic['workerMinNum']; ++$i) {
- $this->reserveQueue($i, $topic['name'], self::CHILD_PROCESS_CAN_RESTART);
- }
- }
- }
- }
- $this->registSignal();
- $this->registTimer();
- }
-
- public function registTimer()
- {
- \Swoole\Timer::tick($this->queueTickTimer, function ($timerId) {
- $topics = $this->topics;
- $this->status =$this->getMasterData('status');
- if (empty($this->workers) && self::STATUS_WAIT == $this->status) {
- $this->exitMaster();
- }
- $this->queue = MQueue::getQueue($this->config['job']['queue'], $this->logger);
- if (empty($this->queue)) {
- $this->logger->log('queue connection is lost', 'info', $this->logSaveFileWorker);
- return;
- }
- $this->queue->setTopics($topics);
- if ($topics && self::STATUS_RUNNING == $this->status) {
-
- foreach ((array) $topics as $topic) {
- if (empty($topic['name'])) {
- continue;
- }
- $this->dynamicWorkerNum[$topic['name']]=$this->dynamicWorkerNum[$topic['name']] ?? 0;
- $topic['workerMaxNum'] =$topic['workerMaxNum'] ?? 0;
- try {
- $len=$this->queue->len($topic['name']);
- $this->logger->log('topic: ' . $topic['name'] . ' ' . $this->status . ' len: ' . $len, 'info', $this->logSaveFileWorker);
- } catch (\Throwable $e) {
- $this->logger->log('queueError' . $e->getMessage(), 'error', $this->logSaveFileWorker);
- } catch (\Exception $e) {
- $this->logger->log('queueError: ' . $e->getMessage(), 'error', $this->logSaveFileWorker);
- }
- $this->status=$this->getMasterData('status');
-
- if ($len > $this->queueMaxNum && count($this->message) <= count($topics)) {
- $this->message[]= strtr('Time:{time} Pid:{pid} ProName:{pname} Topic:{topic} Message:{message}', [
- '{time}' => date('Y-m-d H:i:s'),
- '{pid}' => $this->ppid,
- '{pname}' => $this->processName,
- '{topic}' => $topic['name'],
- '{message}'=> '积压消息个数:' . $len . PHP_EOL,
- ]);
- }
- if ($topic['workerMaxNum'] > $topic['workerMinNum'] && self::STATUS_RUNNING == $this->status && $len > $this->queueMaxNum && $this->dynamicWorkerNum[$topic['name']] < $topic['workerMaxNum']) {
- $max=$topic['workerMaxNum'] - $this->dynamicWorkerNum[$topic['name']];
- for ($i=0; $i < $max; ++$i) {
-
- $this->reserveQueue($this->dynamicWorkerNum[$topic['name']], $topic['name'], self::CHILD_PROCESS_CAN_NOT_RESTART);
- ++$this->dynamicWorkerNum[$topic['name']];
- $this->logger->log('topic: ' . $topic['name'] . ' ' . $this->status . ' len: ' . $len . ' for: ' . $i . ' ' . $max, 'info', $this->logSaveFileWorker);
- }
- }
- }
- }
- $this->queue->close();
- });
-
- \Swoole\Timer::tick($this->messageTickTimer, function ($timerId) {
- !empty($this->message) && $this->logger->log('Warning Message: ' . implode('', $this->message), 'warning', $this->logSaveFileWorker);
- if ($this->message && isset($this->config['message'])) {
-
-
- }
-
- $this->message=[];
- });
- }
-
- private function exitMaster()
- {
- @unlink($this->pidFile);
- @unlink($this->pidInfoFile);
- $this->logger->log('Time: ' . microtime(true) . '主进程' . $this->ppid . '退出', 'info', $this->logSaveFileWorker);
- sleep(1);
- exit();
- }
-
- public function registSignal()
- {
-
- \Swoole\Process::signal(SIGTERM, function ($signo) {
- $this->killWorkersAndExitMaster();
- });
-
- \Swoole\Process::signal(SIGKILL, function ($signo) {
- $this->killWorkersAndExitMaster();
- });
-
- \Swoole\Process::signal(SIGUSR1, function ($signo) {
- $this->waitWorkers();
- });
-
- \Swoole\Process::signal(SIGUSR2, function ($signo) {
- $this->logger->log('[master pid: ' . $this->ppid . '] has been received signal' . $signo);
- $result=$this->showStatus();
- $this->saveSwooleJobsStatus($result);
-
- });
-
- \Swoole\Process::signal(SIGCHLD, function ($signo) {
- while (true) {
- try {
- $ret = \Swoole\Process::wait(false);
- } catch (\Exception $e) {
- $this->logger->log('signoError: ' . $signo . $e->getMessage(), 'error', $this->logSaveFileWorker);
- }
- if ($ret) {
- $pid = $ret['pid'];
- $childProcess = $this->workers[$pid];
- $topic = $this->workersInfo[$pid]['topic'] ?: '';
- $this->status=$this->getMasterData('status');
- if(empty($this->dynamicWorkerNum) || empty($this->dynamicWorkerNum[$topic])){
- $topicCanNotRestartNum = null;
- }else{
- $topicCanNotRestartNum = $this->dynamicWorkerNum[$topic];
- }
-
- $this->logger->log(self::CHILD_PROCESS_CAN_RESTART . '---' . $topic . '***' . $topicCanNotRestartNum . '***' . $this->status . '***' . $this->workersInfo[$pid]['type'] . '***' . $pid, 'info', $this->logSaveFileWorker);
- $this->logger->log($pid . ',' . $this->status . ',' . self::STATUS_RUNNING . ',' . $this->workersInfo[$pid]['type'] . ',' . self::CHILD_PROCESS_CAN_RESTART, 'info', $this->logSaveFileWorker);
-
- if (self::STATUS_RUNNING == $this->status && $this->workersInfo[$pid]['type'] == self::CHILD_PROCESS_CAN_RESTART) {
- try {
-
- for ($i=0; $i < 30; ++$i) {
- var_dump("aaaa");
- $newPid = $childProcess->start();
- var_dump("xxxxxx".$newPid);
- if ($newPid > 0) {
- break;
- }
- sleep(1);
- }
- if (!$newPid) {
- $this->logger->log('静态子进程重启失败,问题有点严重,平滑退出子进程,主进程会跟着退出', 'error', $this->logSaveFileWorker);
- $this->waitWorkers();
-
- continue;
- }
- $this->workers[$newPid] = $childProcess;
- $this->workersInfo[$newPid]['type'] = self::CHILD_PROCESS_CAN_RESTART;
- $this->workersInfo[$newPid]['topic'] = $topic;
- ++$this->workerNum;
- $this->logger->log("Worker Restart, kill_signal={$ret['signal']} PID=" . $newPid, 'info', $this->logSaveFileWorker);
- } catch (\Throwable $e) {
- $this->logger->log('restartErrorThrow' . $e->getMessage(), 'error', $this->logSaveFileWorker);
- } catch (\Exception $e) {
- $this->logger->log('restartError: ' . $e->getMessage(), 'error', $this->logSaveFileWorker);
- }
- }
-
- if ($this->workersInfo[$pid]['type'] == self::CHILD_PROCESS_CAN_NOT_RESTART) {
- --$this->dynamicWorkerNum[$topic];
- }
- $this->logger->log("Worker Exit, kill_signal={$ret['signal']} PID=" . $pid, 'info', $this->logSaveFileWorker);
- unset($this->workers[$pid], $this->workersInfo[$pid]);
- --$this->workerNum;
- $this->logger->log('Worker count: ' . count($this->workers) . '==' . $this->workerNum, 'info', $this->logSaveFileWorker);
-
- if (empty($this->workers) && self::STATUS_WAIT == $this->status) {
- $this->logger->log('主进程收到所有信号子进程的退出信号,子进程安全退出完成', 'info', $this->logSaveFileWorker);
- $this->exitMaster();
- }
- } else {
- break;
- }
- }
- });
- }
- private function saveSwooleJobsStatus($data)
- {
- file_put_contents($this->pidStatusFile, $data);
- }
- private function showStatus()
- {
- $statusStr ='-------------------------------------' . $this->processName . ' status--------------------------------------------' . PHP_EOL;
- $statusStr .= 'PHP version:' . PHP_VERSION . ' swoole-jobs version: ' . $this->version . PHP_EOL;
- $statusStr .= 'start time : ' . date('Y-m-d H:i:s', $this->beginTime) . ' run ' . floor((time() - $this->beginTime) / (24 * 60 * 60)) . ' days ' . floor(((time() - $this->beginTime) % (24 * 60 * 60)) / (60 * 60)) . ' hours ' . PHP_EOL;
-
- $statusStr .= '|-- Master pid ' . $this->ppid . ' status: ' . $this->status . ' Worker num: ' . count($this->workers) . PHP_EOL;
- if ($this->workers) {
- foreach ($this->workers as $pid => $value) {
- $type =$this->workersInfo[$pid]['type'];
- $topic=$this->workersInfo[$pid]['topic'];
- $statusStr .= ' |---- Worker pid: ' . $pid . ' ' . $type . ' ' . $topic . PHP_EOL;
- }
- }
- return $statusStr;
- }
-
- private function waitWorkers()
- {
- $data['pid'] =$this->ppid;
- $data['status']=self::STATUS_WAIT;
- $this->saveMasterData($data);
- $this->status = self::STATUS_WAIT;
- $this->logger->log('master status: ' . $this->status, 'info', $this->logSaveFileWorker);
- }
-
- private function killWorkersAndExitMaster()
- {
-
- $this->status =self::STATUS_STOP;
- if ($this->workers) {
- foreach ($this->workers as $pid => $worker) {
-
- \Swoole\Process::kill($pid);
- unset($this->workers[$pid]);
- $this->logger->log('主进程收到退出信号,[' . $pid . ']子进程跟着退出', 'info', $this->logSaveFileWorker);
- $this->logger->log('Worker count: ' . count($this->workers), 'info', $this->logSaveFileWorker);
- }
- }
- $this->exitMaster();
- }
-
- public function reserveQueue($num, $topic, $type=self::CHILD_PROCESS_CAN_RESTART)
- {
- $reserveProcess = new \Swoole\Process(function ($worker) use ($num, $topic, $type) {
- $this->checkMpid($worker);
- $beginTime=microtime(true);
- try {
-
- $this->setProcessName($type . ' ' . $topic . ' job ' . $num . ' ' . $this->processName);
- $jobs = new MJobs($this->pidInfoFile, $this->config);
- do {
- $jobs->run($topic);
- $this->status=$this->getMasterData('status');
- $where = (self::STATUS_RUNNING == $this->status) && (self::CHILD_PROCESS_CAN_RESTART == $type ? time() < ($beginTime + $this->excuteTime) : false);
- } while ($where);
- } catch (\Throwable $e) {
- catchError($this->logger, $e);
- } catch (\Exception $e) {
- catchError($this->logger, $e);
- }
- $endTime=microtime(true);
- $this->logger->log($topic . ' worker id: ' . $num . ' is done!!! Timing: ' . ($endTime - $beginTime), 'info', $this->logSaveFileWorker);
- unset($num, $topic, $type);
- });
- $pid = $reserveProcess->start();
- $this->workers[$pid] = $reserveProcess;
- $this->workersInfo[$pid]['type'] = $type;
- $this->workersInfo[$pid]['topic'] = $topic;
- $this->logger->log('topic: ' . $topic . ' ' . $type . ' worker id: ' . $num . ' pid: ' . $pid . ' is start...', 'info', $this->logSaveFileWorker);
- ++$this->workerNum;
- }
-
- private function checkMpid(&$worker)
- {
- if (!@\Swoole\Process::kill($this->ppid, 0)) {
- $worker->exit();
- $this->logger->log("Master process exited, I [{$worker['pid']}] also quit", 'info', $this->logSaveFileWorker);
- }
- }
- }
|