Manager.php 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | CRMEB [ CRMEB赋能开发者,助力企业发展 ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2016~2024 https://www.crmeb.com All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed CRMEB并不是自由软件,未经许可不能去掉CRMEB相关版权
  8. // +----------------------------------------------------------------------
  9. // | Author: CRMEB Team <admin@crmeb.com>
  10. // +----------------------------------------------------------------------
  11. namespace app\webscoket;
  12. use app\common\repositories\system\DataScreenRepository;
  13. use app\webscoket\handler\AdminHandler;
  14. use app\webscoket\handler\MerchantHandler;
  15. use app\webscoket\handler\ServiceHandler;
  16. use app\webscoket\handler\UserHandler;
  17. use Swoole\Server;
  18. use Swoole\Websocket\Frame;
  19. use think\Config;
  20. use think\Event;
  21. use think\facade\Cache;
  22. use think\Request;
  23. use think\response\Json;
  24. use think\swoole\Websocket;
  25. use think\swoole\websocket\Room;
  26. /**
  27. * Class Manager
  28. * @package app\webscoket
  29. * @author xaboy
  30. * @day 2020-04-29
  31. */
  32. class Manager extends Websocket
  33. {
  34. /**
  35. * @var \Swoole\WebSocket\Server
  36. */
  37. protected $server;
  38. /**
  39. * @var Ping
  40. */
  41. protected $pingService;
  42. /**
  43. * @var int
  44. */
  45. protected $cache_timeout;
  46. const USER_TYPE = ['admin', 'user', 'mer', 'ser'];
  47. /**
  48. * Manager constructor.
  49. * @param Server $server
  50. * @param Room $room
  51. * @param Event $event
  52. * @param Ping $ping
  53. * @param Config $config
  54. */
  55. public function __construct(Server $server, Room $room, Event $event, Ping $ping, Config $config)
  56. {
  57. parent::__construct($server, $room, $event);
  58. $this->pingService = $ping;
  59. $this->cache_timeout = (int)($config->get('swoole.websocket.ping_timeout', 60000) / 1000) + 2;
  60. app()->bind('websocket_handler_admin', AdminHandler::class);
  61. app()->bind('websocket_handler_user', UserHandler::class);
  62. app()->bind('websocket_handler_mer', MerchantHandler::class);
  63. app()->bind('websocket_handler_ser', ServiceHandler::class);
  64. }
  65. /**
  66. * @param int $fd
  67. * @param Request $request
  68. * @return mixed
  69. * @author xaboy
  70. * @day 2020-05-06
  71. */
  72. public function onOpen($fd, Request $request)
  73. {
  74. $type = $request->get('type');
  75. $token = $request->get('token');
  76. if (!$token || !in_array($type, self::USER_TYPE)) {
  77. return $this->server->close($fd);
  78. }
  79. try {
  80. $data = $this->exec($type, 'login', compact('fd', 'request', 'token'))->getData();
  81. } catch (\Exception $e) {
  82. return $this->server->close($fd);
  83. }
  84. if (!isset($data['status']) || $data['status'] != 200 || !($data['data']['uid'] ?? null)) {
  85. return $this->server->close($fd);
  86. }
  87. $type = array_search($type, self::USER_TYPE);
  88. $this->login($type, $fd, $data['data']);
  89. $this->pingService->createPing($fd, time(), $this->cache_timeout);
  90. return $this->send($fd, app('json')->message('ping', ['now' => time()]));
  91. }
  92. public function login($type, $fd, $data)
  93. {
  94. $key = '_ws_' . $type;
  95. Cache::set('_ws_f_' . $fd, [
  96. 'type' => $type,
  97. 'uid' => $data['uid'],
  98. 'fd' => $fd,
  99. 'payload' => $data['payload'] ?? null,
  100. 'mer_id' => $data['mer_id'] ?? null
  101. ], 3600);
  102. if (isset($data['mer_id'])) {
  103. $groupKey = $key . '_group' . $data['mer_id'];
  104. Cache::sadd($groupKey, $fd);
  105. Cache::expire($groupKey, 3600);
  106. } else {
  107. Cache::sadd($key, $fd);
  108. }
  109. $this->refresh($type, $fd, $data['uid']);
  110. }
  111. public function refresh($type, $fd, $uid)
  112. {
  113. $key = '_ws_' . $type;
  114. Cache::expire($key, 3600);
  115. Cache::expire($key . $uid, 3600);
  116. Cache::expire('_ws_f_' . $fd, 3600);
  117. }
  118. public function logout($type, $fd)
  119. {
  120. $data = Cache::get('_ws_f_' . $fd);
  121. $key = '_ws_' . $type;
  122. Cache::srem($key, $fd);
  123. if ($data) {
  124. Cache::delete('_ws_f_' . $fd);
  125. Cache::srem($key . $data['uid'], $fd);
  126. if (($data['mer_id'] ?? null) !== null) {
  127. $groupKey = $key . '_group' . $data['mer_id'];
  128. Cache::srem($groupKey, $fd);
  129. }
  130. }
  131. }
  132. public static function merFd($merId)
  133. {
  134. return Cache::smembers('_ws_2_group' . $merId) ?: [];
  135. }
  136. public static function userFd($type, $uid = '')
  137. {
  138. $key = '_ws_' . $type . $uid;
  139. return Cache::smembers($key) ?: [];
  140. }
  141. /**
  142. * @param $type
  143. * @param $method
  144. * @param $result
  145. * @return null|Json
  146. * @author xaboy
  147. * @day 2020-05-06
  148. */
  149. protected function exec($type, $method, $result)
  150. {
  151. $handler = app()->make('websocket_handler_' . $type);
  152. if (!method_exists($handler, $method)) return null;
  153. /** @var Json $response */
  154. return $handler->{$method}($result);
  155. }
  156. /**
  157. * @param Frame $frame
  158. * @return bool
  159. * @author xaboy
  160. * @day 2020-04-29
  161. */
  162. public function onMessage(Frame $frame)
  163. {
  164. $info = Cache::get('_ws_f_' . $frame->fd);
  165. $result = json_decode($frame->data, true) ?: [];
  166. if (!isset($result['type']) || !$result['type']) return true;
  167. if (!$info) return true;
  168. $this->refresh($info['type'], $frame->fd, $info['uid']);
  169. if (($info['mer_id'] ?? null) !== null) {
  170. $groupKey = '_ws_' . $info['type'] . '_group' . $info['mer_id'];
  171. Cache::expire($groupKey, 3600);
  172. }
  173. if ($result['type'] == 'ping') {
  174. return $this->send($frame->fd, app('json')->message('ping', ['now' => time() ]));
  175. }
  176. if ($result['type'] == 'data_status') {
  177. $k = env('APP_KEY','merchant').'_data_status';
  178. Cache::set($k, time(),60);
  179. $data = app()->make(DataScreenRepository::class)->data_screen();
  180. return $this->send($frame->fd, app('json')->message('data_screen', $data));
  181. }
  182. $data = $result['data'] ?? [];
  183. $frame->uid = $info['uid'];
  184. $frame->payload = $info['payload'];
  185. /** @var Json $response */
  186. $response = $this->exec(self::USER_TYPE[$info['type']], $result['type'], compact('data', 'frame', 'info'));
  187. if ($response) return $this->send($frame->fd, $response);
  188. return true;
  189. }
  190. protected function send($fd, Json $json)
  191. {
  192. $this->pingService->createPing($fd, time(), $this->cache_timeout);
  193. if ($this->server->isEstablished($fd) && $this->server->exist($fd)) {
  194. $this->server->push($fd, json_encode($json->getData()));
  195. }
  196. return true;
  197. }
  198. /**
  199. * @param int $fd
  200. * @param int $reactorId
  201. * @author xaboy
  202. * @day 2020-04-29
  203. */
  204. public function onClose($fd, $reactorId)
  205. {
  206. $data = Cache::get('_ws_f_' . $fd);
  207. if ($data) {
  208. $this->logout($data['type'], $fd);
  209. $this->exec(self::USER_TYPE[$data['type']], 'close', $data);
  210. }
  211. $this->pingService->removePing($fd);
  212. }
  213. }