MProcess.Class.php 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. <?php
  2. namespace Jobs\Model;
  3. use Mall\Framework\Factory;
  4. use Jobs\Model\MJobs;
  5. class MProcess
  6. {
  7. const CHILD_PROCESS_CAN_RESTART ='staticWorker'; //子进程可以重启,进程个数固定
  8. const CHILD_PROCESS_CAN_NOT_RESTART ='dynamicWorker'; //子进程不可以重启,进程个数根据队列堵塞情况动态分配
  9. const STATUS_RUNNING ='runnning'; //主进程running状态
  10. const STATUS_WAIT ='wait'; //主进程wait状态
  11. const STATUS_STOP ='stop'; //主进程stop状态
  12. const APP_NAME ='jobs'; //app name
  13. const STATUS_HSET_KEY_HASH ='status'; //status hash名
  14. public $processName = ':swooleProcessTopicQueueJob'; // 进程重命名, 方便 shell 脚本管理
  15. public $workers = [];
  16. private $version = '2.5';
  17. private $excuteTime =3600; //子进程最长执行时间,单位:秒
  18. private $queueMaxNum =10; //队列达到一定长度,启动动态子进程个数发和送消息提醒
  19. private $queueTickTimer =1000 * 10; //一定时间间隔(毫秒)检查队列长度;默认10秒钟
  20. private $messageTickTimer =1000 * 180; //一定时间间隔(毫秒)发送消息提醒;默认3分钟
  21. private $message =[]; //提醒消息内容
  22. private $workerNum =0; //固定分配的子进程个数
  23. private $dynamicWorkerNum =[]; //动态(不能重启)子进程计数,最大数为每个topic配置workerMaxNum,它的个数是动态变化的
  24. private $workersInfo =[];
  25. private $ppid;
  26. private $config = [];
  27. private $pidFile = 'master.pid'; //pid存放文件
  28. private $pidInfoFile = 'master.info'; //pid 序列化信息
  29. private $pidStatusFile = 'status.info'; //pid status信息
  30. private $status = '';
  31. private $logger = null;
  32. private $queue = null;
  33. private $topics = null;
  34. private $beginTime = '';
  35. private $logSaveFileWorker = 'workers.log';
  36. public function __construct($config)
  37. {
  38. $this->config = $config;
  39. $this->logger = Factory::logs($this->config['logPath'] ?:'', $this->config['logSaveFileApp'] ?:'', $this->config['app_name'] ?:'');
  40. $this->topics = $this->config['job']['topics'] ?: [];
  41. $this->processName = $this->config['processName'] ?: $this->processName;
  42. $this->excuteTime = $this->config['excuteTime'] ?: $this->excuteTime;
  43. $this->queueMaxNum = $this->config['queueMaxNum'] ?: $this->queueMaxNum;
  44. $this->queueTickTimer = $this->config['queueTickTimer'] ?: $this->queueTickTimer;
  45. $this->messageTickTimer = $this->config['messageTickTimer'] ?: $this->messageTickTimer;
  46. $this->logSaveFileWorker = $this->config['logSaveFileWorker'] ?: $this->logSaveFileWorker;
  47. $this->beginTime=time();
  48. //该变量需要在多进程共享
  49. $this->status=self::STATUS_RUNNING;
  50. if (isset($this->config['pidPath']) && !empty($this->config['pidPath'])) {
  51. if (!is_dir($this->config['pidPath'])) {
  52. mkdir($this->config['pidPath'], '0777', true);
  53. }
  54. $this->pidFile =$this->config['pidPath'] . '/' . $this->pidFile;
  55. $this->pidInfoFile =$this->config['pidPath'] . '/' . $this->pidInfoFile;
  56. $this->pidStatusFile=$this->config['pidPath'] . '/' . $this->pidStatusFile;
  57. } else {
  58. die('config pidPath must be set!' . PHP_EOL);
  59. }
  60. /*
  61. * master.pid 文件记录 master 进程 pid, 方便之后进程管理
  62. * 请管理好此文件位置, 使用 systemd 管理进程时会用到此文件
  63. * 判断文件是否存在,并判断进程是否在运行
  64. */
  65. if (file_exists($this->pidFile)) {
  66. $pid=$this->getMasterData('pid');
  67. // $signo=0,可以检测进程是否存在,不会发送默认SIGTERM信号,终止进程
  68. if ($pid && @\Swoole\Process::kill($pid, 0)) {
  69. die('已有进程运行中,请先结束或重启' . PHP_EOL);
  70. }
  71. }
  72. // 使当前进程蜕变为一个守护进程。
  73. \Swoole\Process::daemon();
  74. $this->ppid = getmypid();
  75. $data['pid'] =$this->ppid;
  76. $data['status']=$this->status;
  77. $this->saveMasterData($data);
  78. $this->setProcessName(self::APP_NAME . ' master ' . $this->ppid . $this->processName);
  79. //报错解决:must be forked outside the coroutine
  80. swoole_async_set(['enable_coroutine' => false]);//关闭内置协程
  81. }
  82. /**
  83. * 设置进程名
  84. * @param mixed $name
  85. */
  86. private function setProcessName($name)
  87. {
  88. $updateName = 0;
  89. // 低版本Linux内核和Mac OSX不支持进程重命名
  90. if (function_exists('cli_set_process_title') && PHP_OS != 'Darwin'){
  91. if(cli_set_process_title($name)){ // 按照官网文档优先使用内置函数,php >= 5.5
  92. $updateName = 1;
  93. }
  94. }else if (function_exists('swoole_set_process_name') && PHP_OS != 'Darwin' && !$updateName) {
  95. swoole_set_process_name($name); // swoole >= 1.6.3
  96. }
  97. }
  98. /**
  99. * 保存守护进程数据
  100. * @param array $data
  101. */
  102. private function saveMasterData($data=[])
  103. {
  104. isset($data['pid']) && file_put_contents($this->pidFile, $data['pid']);
  105. file_put_contents($this->pidInfoFile, serialize($data));
  106. }
  107. /**
  108. * 读取守护进程数据
  109. * @param string $key
  110. * @return mixed|null
  111. */
  112. private function getMasterData($key='')
  113. {
  114. $data=unserialize(file_get_contents($this->pidInfoFile));
  115. if ($key) {
  116. return $data[$key] ?: null;
  117. }
  118. return $data;
  119. }
  120. /**
  121. * 按照配置开启每个topic进程
  122. */
  123. public function start()
  124. {
  125. $topics = $this->topics;
  126. $this->logger->log('topics: ' . json_encode($topics));
  127. if ($topics) {
  128. //遍历topic任务列表
  129. foreach ((array) $topics as $topic) {
  130. if (isset($topic['workerMinNum']) && isset($topic['name'])) {
  131. //每个topic开启最少个进程消费队列
  132. for ($i = 0; $i < $topic['workerMinNum']; ++$i) {
  133. $this->reserveQueue($i, $topic['name'], self::CHILD_PROCESS_CAN_RESTART);
  134. }
  135. }
  136. }
  137. }
  138. $this->registSignal();
  139. $this->registTimer();
  140. }
  141. //增加定时器,检查队列积压情况;
  142. public function registTimer()
  143. {
  144. \Swoole\Timer::tick($this->queueTickTimer, function ($timerId) {
  145. $topics = $this->topics;
  146. $this->status =$this->getMasterData('status');
  147. if (empty($this->workers) && self::STATUS_WAIT == $this->status) {
  148. $this->exitMaster();
  149. }
  150. $this->queue = MQueue::getQueue($this->config['job']['queue'], $this->logger);
  151. if (empty($this->queue)) {
  152. $this->logger->log('queue connection is lost', 'info', $this->logSaveFileWorker);
  153. return;
  154. }
  155. $this->queue->setTopics($topics);
  156. if ($topics && self::STATUS_RUNNING == $this->status) {
  157. //遍历topic任务列表
  158. foreach ((array) $topics as $topic) {
  159. if (empty($topic['name'])) {
  160. continue;
  161. }
  162. $this->dynamicWorkerNum[$topic['name']]=$this->dynamicWorkerNum[$topic['name']] ?? 0;
  163. $topic['workerMaxNum'] =$topic['workerMaxNum'] ?? 0;
  164. try {
  165. $len=$this->queue->len($topic['name']);
  166. $this->logger->log('topic: ' . $topic['name'] . ' ' . $this->status . ' len: ' . $len, 'info', $this->logSaveFileWorker);
  167. } catch (\Throwable $e) {
  168. $this->logger->log('queueError' . $e->getMessage(), 'error', $this->logSaveFileWorker);
  169. } catch (\Exception $e) {
  170. $this->logger->log('queueError: ' . $e->getMessage(), 'error', $this->logSaveFileWorker);
  171. }
  172. $this->status=$this->getMasterData('status');
  173. //消息提醒:消息体收集
  174. if ($len > $this->queueMaxNum && count($this->message) <= count($topics)) {
  175. $this->message[]= strtr('Time:{time} Pid:{pid} ProName:{pname} Topic:{topic} Message:{message}', [
  176. '{time}' => date('Y-m-d H:i:s'),
  177. '{pid}' => $this->ppid,
  178. '{pname}' => $this->processName,
  179. '{topic}' => $topic['name'],
  180. '{message}'=> '积压消息个数:' . $len . PHP_EOL,
  181. ]);
  182. }
  183. if ($topic['workerMaxNum'] > $topic['workerMinNum'] && self::STATUS_RUNNING == $this->status && $len > $this->queueMaxNum && $this->dynamicWorkerNum[$topic['name']] < $topic['workerMaxNum']) {
  184. $max=$topic['workerMaxNum'] - $this->dynamicWorkerNum[$topic['name']];
  185. for ($i=0; $i < $max; ++$i) {
  186. //队列堆积达到一定数据,拉起一次性子进程,这类进程不会自动重启[没必要]
  187. $this->reserveQueue($this->dynamicWorkerNum[$topic['name']], $topic['name'], self::CHILD_PROCESS_CAN_NOT_RESTART);
  188. ++$this->dynamicWorkerNum[$topic['name']];
  189. $this->logger->log('topic: ' . $topic['name'] . ' ' . $this->status . ' len: ' . $len . ' for: ' . $i . ' ' . $max, 'info', $this->logSaveFileWorker);
  190. }
  191. }
  192. }
  193. }
  194. $this->queue->close();
  195. });
  196. //积压队列提醒
  197. \Swoole\Timer::tick($this->messageTickTimer, function ($timerId) {
  198. !empty($this->message) && $this->logger->log('Warning Message: ' . implode('', $this->message), 'warning', $this->logSaveFileWorker);
  199. if ($this->message && isset($this->config['message'])) {
  200. //$message =Message::getMessage($this->config['message']);
  201. //$message->send(implode('', $this->message), $this->config['message']['token']);
  202. }
  203. //重置message,防止message不断变长
  204. $this->message=[];
  205. });
  206. }
  207. //退出主进程
  208. private function exitMaster()
  209. {
  210. @unlink($this->pidFile);
  211. @unlink($this->pidInfoFile);
  212. $this->logger->log('Time: ' . microtime(true) . '主进程' . $this->ppid . '退出', 'info', $this->logSaveFileWorker);
  213. sleep(1);
  214. exit();
  215. }
  216. /**
  217. * 注册信号
  218. */
  219. public function registSignal()
  220. {
  221. //终止进程信号
  222. \Swoole\Process::signal(SIGTERM, function ($signo) {
  223. $this->killWorkersAndExitMaster();
  224. });
  225. //
  226. \Swoole\Process::signal(SIGKILL, function ($signo) {
  227. $this->killWorkersAndExitMaster();
  228. });
  229. //平滑退出
  230. \Swoole\Process::signal(SIGUSR1, function ($signo) {
  231. $this->waitWorkers();
  232. });
  233. //记录进程状态
  234. \Swoole\Process::signal(SIGUSR2, function ($signo) {
  235. $this->logger->log('[master pid: ' . $this->ppid . '] has been received signal' . $signo);
  236. $result=$this->showStatus();
  237. $this->saveSwooleJobsStatus($result);
  238. //echo $result;
  239. });
  240. // 在一个进程终止或者停止时,将SIGCHLD信号发送给其父进程
  241. \Swoole\Process::signal(SIGCHLD, function ($signo) {
  242. while (true) {
  243. try {
  244. $ret = \Swoole\Process::wait(false);
  245. } catch (\Exception $e) {
  246. $this->logger->log('signoError: ' . $signo . $e->getMessage(), 'error', $this->logSaveFileWorker);
  247. }
  248. if ($ret) {
  249. $pid = $ret['pid'];
  250. $childProcess = $this->workers[$pid];
  251. $topic = $this->workersInfo[$pid]['topic'] ?: '';
  252. $this->status=$this->getMasterData('status');
  253. if(empty($this->dynamicWorkerNum) || empty($this->dynamicWorkerNum[$topic])){
  254. $topicCanNotRestartNum = null;
  255. }else{
  256. $topicCanNotRestartNum = $this->dynamicWorkerNum[$topic];
  257. }
  258. // $topicCanNotRestartNum = $this->dynamicWorkerNum[$topic] ?: 'null';
  259. $this->logger->log(self::CHILD_PROCESS_CAN_RESTART . '---' . $topic . '***' . $topicCanNotRestartNum . '***' . $this->status . '***' . $this->workersInfo[$pid]['type'] . '***' . $pid, 'info', $this->logSaveFileWorker);
  260. $this->logger->log($pid . ',' . $this->status . ',' . self::STATUS_RUNNING . ',' . $this->workersInfo[$pid]['type'] . ',' . self::CHILD_PROCESS_CAN_RESTART, 'info', $this->logSaveFileWorker);
  261. //主进程状态为running并且该子进程是可以重启的
  262. if (self::STATUS_RUNNING == $this->status && $this->workersInfo[$pid]['type'] == self::CHILD_PROCESS_CAN_RESTART) {
  263. try {
  264. //子进程重启可能失败,必须启动成功之后,再往下执行;最多尝试30次
  265. for ($i=0; $i < 30; ++$i) {
  266. var_dump("aaaa");
  267. $newPid = $childProcess->start();
  268. var_dump("xxxxxx".$newPid);
  269. if ($newPid > 0) {
  270. break;
  271. }
  272. sleep(1);
  273. }
  274. if (!$newPid) {
  275. $this->logger->log('静态子进程重启失败,问题有点严重,平滑退出子进程,主进程会跟着退出', 'error', $this->logSaveFileWorker);
  276. $this->waitWorkers();
  277. //$this->reserveQueue(0, $topic);
  278. continue;
  279. }
  280. $this->workers[$newPid] = $childProcess;
  281. $this->workersInfo[$newPid]['type'] = self::CHILD_PROCESS_CAN_RESTART;
  282. $this->workersInfo[$newPid]['topic'] = $topic;
  283. ++$this->workerNum;
  284. $this->logger->log("Worker Restart, kill_signal={$ret['signal']} PID=" . $newPid, 'info', $this->logSaveFileWorker);
  285. } catch (\Throwable $e) {
  286. $this->logger->log('restartErrorThrow' . $e->getMessage(), 'error', $this->logSaveFileWorker);
  287. } catch (\Exception $e) {
  288. $this->logger->log('restartError: ' . $e->getMessage(), 'error', $this->logSaveFileWorker);
  289. }
  290. }
  291. //某个topic动态变化的子进程,退出之后个数减少一个
  292. if ($this->workersInfo[$pid]['type'] == self::CHILD_PROCESS_CAN_NOT_RESTART) {
  293. --$this->dynamicWorkerNum[$topic];
  294. }
  295. $this->logger->log("Worker Exit, kill_signal={$ret['signal']} PID=" . $pid, 'info', $this->logSaveFileWorker);
  296. unset($this->workers[$pid], $this->workersInfo[$pid]);
  297. --$this->workerNum;
  298. $this->logger->log('Worker count: ' . count($this->workers) . '==' . $this->workerNum, 'info', $this->logSaveFileWorker);
  299. //如果$this->workers为空,且主进程状态为wait,说明所有子进程安全退出,这个时候主进程退出
  300. if (empty($this->workers) && self::STATUS_WAIT == $this->status) {
  301. $this->logger->log('主进程收到所有信号子进程的退出信号,子进程安全退出完成', 'info', $this->logSaveFileWorker);
  302. $this->exitMaster();
  303. }
  304. } else {
  305. break;
  306. }
  307. }
  308. });
  309. }
  310. private function saveSwooleJobsStatus($data)
  311. {
  312. file_put_contents($this->pidStatusFile, $data);
  313. }
  314. private function showStatus()
  315. {
  316. $statusStr ='-------------------------------------' . $this->processName . ' status--------------------------------------------' . PHP_EOL;
  317. $statusStr .= 'PHP version:' . PHP_VERSION . ' swoole-jobs version: ' . $this->version . PHP_EOL;
  318. $statusStr .= 'start time : ' . date('Y-m-d H:i:s', $this->beginTime) . ' run ' . floor((time() - $this->beginTime) / (24 * 60 * 60)) . ' days ' . floor(((time() - $this->beginTime) % (24 * 60 * 60)) / (60 * 60)) . ' hours ' . PHP_EOL;
  319. //$statusStr .= Utils::getSysLoadAvg() . ' memory use:' . Utils::getServerMemoryUsage() . PHP_EOL;
  320. $statusStr .= '|-- Master pid ' . $this->ppid . ' status: ' . $this->status . ' Worker num: ' . count($this->workers) . PHP_EOL;
  321. if ($this->workers) {
  322. foreach ($this->workers as $pid => $value) {
  323. $type =$this->workersInfo[$pid]['type'];
  324. $topic=$this->workersInfo[$pid]['topic'];
  325. $statusStr .= ' |---- Worker pid: ' . $pid . ' ' . $type . ' ' . $topic . PHP_EOL;
  326. }
  327. }
  328. return $statusStr;
  329. }
  330. //平滑等待子进程退出之后,再退出主进程
  331. private function waitWorkers()
  332. {
  333. $data['pid'] =$this->ppid;
  334. $data['status']=self::STATUS_WAIT;
  335. $this->saveMasterData($data);
  336. $this->status = self::STATUS_WAIT;
  337. $this->logger->log('master status: ' . $this->status, 'info', $this->logSaveFileWorker);
  338. }
  339. //强制杀死子进程并退出主进程
  340. private function killWorkersAndExitMaster()
  341. {
  342. //修改主进程状态为stop
  343. $this->status =self::STATUS_STOP;
  344. if ($this->workers) {
  345. foreach ($this->workers as $pid => $worker) {
  346. //强制杀workers子进程
  347. \Swoole\Process::kill($pid);
  348. unset($this->workers[$pid]);
  349. $this->logger->log('主进程收到退出信号,[' . $pid . ']子进程跟着退出', 'info', $this->logSaveFileWorker);
  350. $this->logger->log('Worker count: ' . count($this->workers), 'info', $this->logSaveFileWorker);
  351. }
  352. }
  353. $this->exitMaster();
  354. }
  355. /**
  356. * fork子进程消费队列.
  357. *
  358. * @param [type] $num 子进程编号
  359. * @param [type] $topic topic名称
  360. * @param string $type 是否会重启 canRestart|unRestart
  361. */
  362. public function reserveQueue($num, $topic, $type=self::CHILD_PROCESS_CAN_RESTART)
  363. {
  364. $reserveProcess = new \Swoole\Process(function ($worker) use ($num, $topic, $type) {
  365. $this->checkMpid($worker);
  366. $beginTime=microtime(true);
  367. try {
  368. //设置进程名字
  369. $this->setProcessName($type . ' ' . $topic . ' job ' . $num . ' ' . $this->processName);
  370. $jobs = new MJobs($this->pidInfoFile, $this->config);
  371. do {
  372. $jobs->run($topic);
  373. $this->status=$this->getMasterData('status');
  374. $where = (self::STATUS_RUNNING == $this->status) && (self::CHILD_PROCESS_CAN_RESTART == $type ? time() < ($beginTime + $this->excuteTime) : false);
  375. } while ($where);
  376. } catch (\Throwable $e) {
  377. catchError($this->logger, $e);
  378. } catch (\Exception $e) {
  379. catchError($this->logger, $e);
  380. }
  381. $endTime=microtime(true);
  382. $this->logger->log($topic . ' worker id: ' . $num . ' is done!!! Timing: ' . ($endTime - $beginTime), 'info', $this->logSaveFileWorker);
  383. unset($num, $topic, $type);
  384. });
  385. $pid = $reserveProcess->start();
  386. $this->workers[$pid] = $reserveProcess;
  387. $this->workersInfo[$pid]['type'] = $type;
  388. $this->workersInfo[$pid]['topic'] = $topic;
  389. $this->logger->log('topic: ' . $topic . ' ' . $type . ' worker id: ' . $num . ' pid: ' . $pid . ' is start...', 'info', $this->logSaveFileWorker);
  390. ++$this->workerNum;
  391. }
  392. /**
  393. * 主进程如果不存在了,子进程退出
  394. **/
  395. private function checkMpid(&$worker)
  396. {
  397. if (!@\Swoole\Process::kill($this->ppid, 0)) {
  398. $worker->exit();
  399. $this->logger->log("Master process exited, I [{$worker['pid']}] also quit", 'info', $this->logSaveFileWorker);
  400. }
  401. }
  402. }