123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441 |
- <?php
- namespace Jobs\Model;
- use Mall\Framework\Factory;
- use Jobs\Model\MJobs;
- class MProcess
- {
- const CHILD_PROCESS_CAN_RESTART ='staticWorker'; //子进程可以重启,进程个数固定
- const CHILD_PROCESS_CAN_NOT_RESTART ='dynamicWorker'; //子进程不可以重启,进程个数根据队列堵塞情况动态分配
- const STATUS_RUNNING ='runnning'; //主进程running状态
- const STATUS_WAIT ='wait'; //主进程wait状态
- const STATUS_STOP ='stop'; //主进程stop状态
- const APP_NAME ='jobs'; //app name
- const STATUS_HSET_KEY_HASH ='status'; //status hash名
- public $processName = ':swooleProcessTopicQueueJob'; // 进程重命名, 方便 shell 脚本管理
- public $workers = [];
- private $version = '2.5';
- private $excuteTime =3600; //子进程最长执行时间,单位:秒
- private $queueMaxNum =10; //队列达到一定长度,启动动态子进程个数发和送消息提醒
- private $queueTickTimer =1000 * 10; //一定时间间隔(毫秒)检查队列长度;默认10秒钟
- private $messageTickTimer =1000 * 180; //一定时间间隔(毫秒)发送消息提醒;默认3分钟
- private $message =[]; //提醒消息内容
- private $workerNum =0; //固定分配的子进程个数
- private $dynamicWorkerNum =[]; //动态(不能重启)子进程计数,最大数为每个topic配置workerMaxNum,它的个数是动态变化的
- private $workersInfo =[];
- private $ppid;
- private $config = [];
- private $pidFile = 'master.pid'; //pid存放文件
- private $pidInfoFile = 'master.info'; //pid 序列化信息
- private $pidStatusFile = 'status.info'; //pid status信息
- private $status = '';
- private $logger = null;
- private $queue = null;
- private $topics = null;
- private $beginTime = '';
- private $logSaveFileWorker = 'workers.log';
- public function __construct($config)
- {
- $this->config = $config;
- $this->logger = Factory::logs($this->config['logPath'] ?:'', $this->config['logSaveFileApp'] ?:'', $this->config['app_name'] ?:'');
- $this->topics = $this->config['job']['topics'] ?: [];
- $this->processName = $this->config['processName'] ?: $this->processName;
- $this->excuteTime = $this->config['excuteTime'] ?: $this->excuteTime;
- $this->queueMaxNum = $this->config['queueMaxNum'] ?: $this->queueMaxNum;
- $this->queueTickTimer = $this->config['queueTickTimer'] ?: $this->queueTickTimer;
- $this->messageTickTimer = $this->config['messageTickTimer'] ?: $this->messageTickTimer;
- $this->logSaveFileWorker = $this->config['logSaveFileWorker'] ?: $this->logSaveFileWorker;
- $this->beginTime=time();
- //该变量需要在多进程共享
- $this->status=self::STATUS_RUNNING;
- if (isset($this->config['pidPath']) && !empty($this->config['pidPath'])) {
- if (!is_dir($this->config['pidPath'])) {
- mkdir($this->config['pidPath'], '0777', true);
- }
- $this->pidFile =$this->config['pidPath'] . '/' . $this->pidFile;
- $this->pidInfoFile =$this->config['pidPath'] . '/' . $this->pidInfoFile;
- $this->pidStatusFile=$this->config['pidPath'] . '/' . $this->pidStatusFile;
- } else {
- die('config pidPath must be set!' . PHP_EOL);
- }
- /*
- * master.pid 文件记录 master 进程 pid, 方便之后进程管理
- * 请管理好此文件位置, 使用 systemd 管理进程时会用到此文件
- * 判断文件是否存在,并判断进程是否在运行
- */
- if (file_exists($this->pidFile)) {
- $pid=$this->getMasterData('pid');
- // $signo=0,可以检测进程是否存在,不会发送默认SIGTERM信号,终止进程
- if ($pid && @\Swoole\Process::kill($pid, 0)) {
- die('已有进程运行中,请先结束或重启' . PHP_EOL);
- }
- }
- // 使当前进程蜕变为一个守护进程。
- \Swoole\Process::daemon();
- $this->ppid = getmypid();
- $data['pid'] =$this->ppid;
- $data['status']=$this->status;
- $this->saveMasterData($data);
- $this->setProcessName(self::APP_NAME . ' master ' . $this->ppid . $this->processName);
- //报错解决:must be forked outside the coroutine
- swoole_async_set(['enable_coroutine' => false]);//关闭内置协程
- }
- /**
- * 设置进程名
- * @param mixed $name
- */
- private function setProcessName($name)
- {
- $updateName = 0;
- // 低版本Linux内核和Mac OSX不支持进程重命名
- if (function_exists('cli_set_process_title') && PHP_OS != 'Darwin'){
- if(cli_set_process_title($name)){ // 按照官网文档优先使用内置函数,php >= 5.5
- $updateName = 1;
- }
- }else if (function_exists('swoole_set_process_name') && PHP_OS != 'Darwin' && !$updateName) {
- swoole_set_process_name($name); // swoole >= 1.6.3
- }
- }
- /**
- * 保存守护进程数据
- * @param array $data
- */
- private function saveMasterData($data=[])
- {
- isset($data['pid']) && file_put_contents($this->pidFile, $data['pid']);
- file_put_contents($this->pidInfoFile, serialize($data));
- }
- /**
- * 读取守护进程数据
- * @param string $key
- * @return mixed|null
- */
- private function getMasterData($key='')
- {
- $data=unserialize(file_get_contents($this->pidInfoFile));
- if ($key) {
- return $data[$key] ?: null;
- }
- return $data;
- }
- /**
- * 按照配置开启每个topic进程
- */
- public function start()
- {
- $topics = $this->topics;
- $this->logger->log('topics: ' . json_encode($topics));
- if ($topics) {
- //遍历topic任务列表
- foreach ((array) $topics as $topic) {
- if (isset($topic['workerMinNum']) && isset($topic['name'])) {
- //每个topic开启最少个进程消费队列
- for ($i = 0; $i < $topic['workerMinNum']; ++$i) {
- $this->reserveQueue($i, $topic['name'], self::CHILD_PROCESS_CAN_RESTART);
- }
- }
- }
- }
- $this->registSignal();
- $this->registTimer();
- }
- //增加定时器,检查队列积压情况;
- public function registTimer()
- {
- \Swoole\Timer::tick($this->queueTickTimer, function ($timerId) {
- $topics = $this->topics;
- $this->status =$this->getMasterData('status');
- if (empty($this->workers) && self::STATUS_WAIT == $this->status) {
- $this->exitMaster();
- }
- $this->queue = MQueue::getQueue($this->config['job']['queue'], $this->logger);
- if (empty($this->queue)) {
- $this->logger->log('queue connection is lost', 'info', $this->logSaveFileWorker);
- return;
- }
- $this->queue->setTopics($topics);
- if ($topics && self::STATUS_RUNNING == $this->status) {
- //遍历topic任务列表
- foreach ((array) $topics as $topic) {
- if (empty($topic['name'])) {
- continue;
- }
- $this->dynamicWorkerNum[$topic['name']]=$this->dynamicWorkerNum[$topic['name']] ?? 0;
- $topic['workerMaxNum'] =$topic['workerMaxNum'] ?? 0;
- try {
- $len=$this->queue->len($topic['name']);
- $this->logger->log('topic: ' . $topic['name'] . ' ' . $this->status . ' len: ' . $len, 'info', $this->logSaveFileWorker);
- } catch (\Throwable $e) {
- $this->logger->log('queueError' . $e->getMessage(), 'error', $this->logSaveFileWorker);
- } catch (\Exception $e) {
- $this->logger->log('queueError: ' . $e->getMessage(), 'error', $this->logSaveFileWorker);
- }
- $this->status=$this->getMasterData('status');
- //消息提醒:消息体收集
- if ($len > $this->queueMaxNum && count($this->message) <= count($topics)) {
- $this->message[]= strtr('Time:{time} Pid:{pid} ProName:{pname} Topic:{topic} Message:{message}', [
- '{time}' => date('Y-m-d H:i:s'),
- '{pid}' => $this->ppid,
- '{pname}' => $this->processName,
- '{topic}' => $topic['name'],
- '{message}'=> '积压消息个数:' . $len . PHP_EOL,
- ]);
- }
- if ($topic['workerMaxNum'] > $topic['workerMinNum'] && self::STATUS_RUNNING == $this->status && $len > $this->queueMaxNum && $this->dynamicWorkerNum[$topic['name']] < $topic['workerMaxNum']) {
- $max=$topic['workerMaxNum'] - $this->dynamicWorkerNum[$topic['name']];
- for ($i=0; $i < $max; ++$i) {
- //队列堆积达到一定数据,拉起一次性子进程,这类进程不会自动重启[没必要]
- $this->reserveQueue($this->dynamicWorkerNum[$topic['name']], $topic['name'], self::CHILD_PROCESS_CAN_NOT_RESTART);
- ++$this->dynamicWorkerNum[$topic['name']];
- $this->logger->log('topic: ' . $topic['name'] . ' ' . $this->status . ' len: ' . $len . ' for: ' . $i . ' ' . $max, 'info', $this->logSaveFileWorker);
- }
- }
- }
- }
- $this->queue->close();
- });
- //积压队列提醒
- \Swoole\Timer::tick($this->messageTickTimer, function ($timerId) {
- !empty($this->message) && $this->logger->log('Warning Message: ' . implode('', $this->message), 'warning', $this->logSaveFileWorker);
- if ($this->message && isset($this->config['message'])) {
- //$message =Message::getMessage($this->config['message']);
- //$message->send(implode('', $this->message), $this->config['message']['token']);
- }
- //重置message,防止message不断变长
- $this->message=[];
- });
- }
- //退出主进程
- private function exitMaster()
- {
- @unlink($this->pidFile);
- @unlink($this->pidInfoFile);
- $this->logger->log('Time: ' . microtime(true) . '主进程' . $this->ppid . '退出', 'info', $this->logSaveFileWorker);
- sleep(1);
- exit();
- }
- /**
- * 注册信号
- */
- public function registSignal()
- {
- //终止进程信号
- \Swoole\Process::signal(SIGTERM, function ($signo) {
- $this->killWorkersAndExitMaster();
- });
- //
- \Swoole\Process::signal(SIGKILL, function ($signo) {
- $this->killWorkersAndExitMaster();
- });
- //平滑退出
- \Swoole\Process::signal(SIGUSR1, function ($signo) {
- $this->waitWorkers();
- });
- //记录进程状态
- \Swoole\Process::signal(SIGUSR2, function ($signo) {
- $this->logger->log('[master pid: ' . $this->ppid . '] has been received signal' . $signo);
- $result=$this->showStatus();
- $this->saveSwooleJobsStatus($result);
- //echo $result;
- });
- // 在一个进程终止或者停止时,将SIGCHLD信号发送给其父进程
- \Swoole\Process::signal(SIGCHLD, function ($signo) {
- while (true) {
- try {
- $ret = \Swoole\Process::wait(false);
- } catch (\Exception $e) {
- $this->logger->log('signoError: ' . $signo . $e->getMessage(), 'error', $this->logSaveFileWorker);
- }
- if ($ret) {
- $pid = $ret['pid'];
- $childProcess = $this->workers[$pid];
- $topic = $this->workersInfo[$pid]['topic'] ?: '';
- $this->status=$this->getMasterData('status');
- if(empty($this->dynamicWorkerNum) || empty($this->dynamicWorkerNum[$topic])){
- $topicCanNotRestartNum = null;
- }else{
- $topicCanNotRestartNum = $this->dynamicWorkerNum[$topic];
- }
- // $topicCanNotRestartNum = $this->dynamicWorkerNum[$topic] ?: 'null';
- $this->logger->log(self::CHILD_PROCESS_CAN_RESTART . '---' . $topic . '***' . $topicCanNotRestartNum . '***' . $this->status . '***' . $this->workersInfo[$pid]['type'] . '***' . $pid, 'info', $this->logSaveFileWorker);
- $this->logger->log($pid . ',' . $this->status . ',' . self::STATUS_RUNNING . ',' . $this->workersInfo[$pid]['type'] . ',' . self::CHILD_PROCESS_CAN_RESTART, 'info', $this->logSaveFileWorker);
- //主进程状态为running并且该子进程是可以重启的
- if (self::STATUS_RUNNING == $this->status && $this->workersInfo[$pid]['type'] == self::CHILD_PROCESS_CAN_RESTART) {
- try {
- //子进程重启可能失败,必须启动成功之后,再往下执行;最多尝试30次
- for ($i=0; $i < 30; ++$i) {
- var_dump("aaaa");
- $newPid = $childProcess->start();
- var_dump("xxxxxx".$newPid);
- if ($newPid > 0) {
- break;
- }
- sleep(1);
- }
- if (!$newPid) {
- $this->logger->log('静态子进程重启失败,问题有点严重,平滑退出子进程,主进程会跟着退出', 'error', $this->logSaveFileWorker);
- $this->waitWorkers();
- //$this->reserveQueue(0, $topic);
- continue;
- }
- $this->workers[$newPid] = $childProcess;
- $this->workersInfo[$newPid]['type'] = self::CHILD_PROCESS_CAN_RESTART;
- $this->workersInfo[$newPid]['topic'] = $topic;
- ++$this->workerNum;
- $this->logger->log("Worker Restart, kill_signal={$ret['signal']} PID=" . $newPid, 'info', $this->logSaveFileWorker);
- } catch (\Throwable $e) {
- $this->logger->log('restartErrorThrow' . $e->getMessage(), 'error', $this->logSaveFileWorker);
- } catch (\Exception $e) {
- $this->logger->log('restartError: ' . $e->getMessage(), 'error', $this->logSaveFileWorker);
- }
- }
- //某个topic动态变化的子进程,退出之后个数减少一个
- if ($this->workersInfo[$pid]['type'] == self::CHILD_PROCESS_CAN_NOT_RESTART) {
- --$this->dynamicWorkerNum[$topic];
- }
- $this->logger->log("Worker Exit, kill_signal={$ret['signal']} PID=" . $pid, 'info', $this->logSaveFileWorker);
- unset($this->workers[$pid], $this->workersInfo[$pid]);
- --$this->workerNum;
- $this->logger->log('Worker count: ' . count($this->workers) . '==' . $this->workerNum, 'info', $this->logSaveFileWorker);
- //如果$this->workers为空,且主进程状态为wait,说明所有子进程安全退出,这个时候主进程退出
- if (empty($this->workers) && self::STATUS_WAIT == $this->status) {
- $this->logger->log('主进程收到所有信号子进程的退出信号,子进程安全退出完成', 'info', $this->logSaveFileWorker);
- $this->exitMaster();
- }
- } else {
- break;
- }
- }
- });
- }
- private function saveSwooleJobsStatus($data)
- {
- file_put_contents($this->pidStatusFile, $data);
- }
- private function showStatus()
- {
- $statusStr ='-------------------------------------' . $this->processName . ' status--------------------------------------------' . PHP_EOL;
- $statusStr .= 'PHP version:' . PHP_VERSION . ' swoole-jobs version: ' . $this->version . PHP_EOL;
- $statusStr .= 'start time : ' . date('Y-m-d H:i:s', $this->beginTime) . ' run ' . floor((time() - $this->beginTime) / (24 * 60 * 60)) . ' days ' . floor(((time() - $this->beginTime) % (24 * 60 * 60)) / (60 * 60)) . ' hours ' . PHP_EOL;
- //$statusStr .= Utils::getSysLoadAvg() . ' memory use:' . Utils::getServerMemoryUsage() . PHP_EOL;
- $statusStr .= '|-- Master pid ' . $this->ppid . ' status: ' . $this->status . ' Worker num: ' . count($this->workers) . PHP_EOL;
- if ($this->workers) {
- foreach ($this->workers as $pid => $value) {
- $type =$this->workersInfo[$pid]['type'];
- $topic=$this->workersInfo[$pid]['topic'];
- $statusStr .= ' |---- Worker pid: ' . $pid . ' ' . $type . ' ' . $topic . PHP_EOL;
- }
- }
- return $statusStr;
- }
- //平滑等待子进程退出之后,再退出主进程
- private function waitWorkers()
- {
- $data['pid'] =$this->ppid;
- $data['status']=self::STATUS_WAIT;
- $this->saveMasterData($data);
- $this->status = self::STATUS_WAIT;
- $this->logger->log('master status: ' . $this->status, 'info', $this->logSaveFileWorker);
- }
- //强制杀死子进程并退出主进程
- private function killWorkersAndExitMaster()
- {
- //修改主进程状态为stop
- $this->status =self::STATUS_STOP;
- if ($this->workers) {
- foreach ($this->workers as $pid => $worker) {
- //强制杀workers子进程
- \Swoole\Process::kill($pid);
- unset($this->workers[$pid]);
- $this->logger->log('主进程收到退出信号,[' . $pid . ']子进程跟着退出', 'info', $this->logSaveFileWorker);
- $this->logger->log('Worker count: ' . count($this->workers), 'info', $this->logSaveFileWorker);
- }
- }
- $this->exitMaster();
- }
- /**
- * fork子进程消费队列.
- *
- * @param [type] $num 子进程编号
- * @param [type] $topic topic名称
- * @param string $type 是否会重启 canRestart|unRestart
- */
- public function reserveQueue($num, $topic, $type=self::CHILD_PROCESS_CAN_RESTART)
- {
- $reserveProcess = new \Swoole\Process(function ($worker) use ($num, $topic, $type) {
- $this->checkMpid($worker);
- $beginTime=microtime(true);
- try {
- //设置进程名字
- $this->setProcessName($type . ' ' . $topic . ' job ' . $num . ' ' . $this->processName);
- $jobs = new MJobs($this->pidInfoFile, $this->config);
- do {
- $jobs->run($topic);
- $this->status=$this->getMasterData('status');
- $where = (self::STATUS_RUNNING == $this->status) && (self::CHILD_PROCESS_CAN_RESTART == $type ? time() < ($beginTime + $this->excuteTime) : false);
- } while ($where);
- } catch (\Throwable $e) {
- catchError($this->logger, $e);
- } catch (\Exception $e) {
- catchError($this->logger, $e);
- }
- $endTime=microtime(true);
- $this->logger->log($topic . ' worker id: ' . $num . ' is done!!! Timing: ' . ($endTime - $beginTime), 'info', $this->logSaveFileWorker);
- unset($num, $topic, $type);
- });
- $pid = $reserveProcess->start();
- $this->workers[$pid] = $reserveProcess;
- $this->workersInfo[$pid]['type'] = $type;
- $this->workersInfo[$pid]['topic'] = $topic;
- $this->logger->log('topic: ' . $topic . ' ' . $type . ' worker id: ' . $num . ' pid: ' . $pid . ' is start...', 'info', $this->logSaveFileWorker);
- ++$this->workerNum;
- }
- /**
- * 主进程如果不存在了,子进程退出
- **/
- private function checkMpid(&$worker)
- {
- if (!@\Swoole\Process::kill($this->ppid, 0)) {
- $worker->exit();
- $this->logger->log("Master process exited, I [{$worker['pid']}] also quit", 'info', $this->logSaveFileWorker);
- }
- }
- }
|