Manager.php 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | CRMEB [ CRMEB赋能开发者,助力企业发展 ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2016~2020 https://www.crmeb.com All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed CRMEB并不是自由软件,未经许可不能去掉CRMEB相关版权
  8. // +----------------------------------------------------------------------
  9. // | Author: CRMEB Team <admin@crmeb.com>
  10. // +----------------------------------------------------------------------
  11. namespace app\webscoket;
  12. use think\Config;
  13. use crmeb\services\CacheService;
  14. use Swoole\Websocket\Frame;
  15. use think\Event;
  16. use think\response\Json;
  17. use think\swoole\Websocket;
  18. use think\swoole\websocket\Room;
  19. use app\webscoket\Room as NowRoom;
  20. use think\swoole\websocket\socketio\Handler;
  21. /**
  22. * Class Manager
  23. * @package app\webscoket
  24. */
  25. class Manager extends Handler
  26. {
  27. /**
  28. * @var
  29. */
  30. protected $manager;
  31. /**
  32. * @var int
  33. */
  34. protected $cache_timeout;
  35. /**
  36. * @var Response
  37. */
  38. protected $response;
  39. /**
  40. * @var \Redis
  41. */
  42. protected $cache;
  43. /**
  44. * @var NowRoom
  45. */
  46. protected $nowRoom;
  47. const USER_TYPE = ['admin', 'user', 'kefu', 'store', 'supplier', 'cashier'];
  48. const KEFU_TYPE_NUM = 2;
  49. const USER_TYPE_NUM = 1;
  50. /**
  51. * Manager constructor.
  52. * @param Websocket $websocket
  53. * @param Config $config
  54. * @param Room $room
  55. * @param Event $event
  56. * @param Response $response
  57. * @param \app\webscoket\Room $nowRoom
  58. */
  59. public function __construct(Event $event, Config $config, Websocket $websocket, Response $response, NowRoom $nowRoom)
  60. {
  61. parent::__construct($event, $config, $websocket);
  62. $this->response = $response;
  63. $this->nowRoom = $nowRoom;
  64. $this->cache = CacheService::redisHandler();
  65. $this->nowRoom->setCache($this->cache);
  66. $this->cache_timeout = intval(app()->config->get('swoole.websocket.ping_timeout', 60000) / 1000) + 2;
  67. }
  68. /**
  69. * @param \think\Request $request
  70. * @return bool|void
  71. */
  72. public function onOpen(\think\Request $request)
  73. {
  74. $fd = $this->websocket->getSender();
  75. $type = $request->get('type');
  76. $token = $request->get('token');
  77. $touristUid = $request->get('tourist_uid', '');
  78. $tourist = !!$touristUid;
  79. if (!$token || !in_array($type, self::USER_TYPE)) {
  80. return $this->websocket->close();
  81. }
  82. // 只有用户模式下才能使用游客模式
  83. if ($type !== self::USER_TYPE[1] && $tourist) {
  84. return $this->websocket->close();
  85. }
  86. $types = self::USER_TYPE;
  87. $this->nowRoom->type(array_flip($types)[$type]);
  88. try {
  89. $data = $this->exec($type, 'login', [$fd, $request->get('form_type', null), ['token' => $token, 'tourist' => $tourist], $this->response])->getData();
  90. } catch (\Throwable $e) {
  91. return $this->websocket->close();
  92. }
  93. if ($tourist) {
  94. $data['status'] = 200;
  95. $data['data']['uid'] = $touristUid;
  96. }
  97. if ($data['status'] != 200 || !($data['data']['uid'] ?? null)) {
  98. return $this->websocket->close();
  99. }
  100. $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
  101. $uid = $data['data']['uid'];
  102. $type = array_search($type, self::USER_TYPE);
  103. $this->login($type, $uid, $fd);
  104. $this->nowRoom->add((string)$fd, $uid, 0, $tourist ? 1 : 0);
  105. $this->send($fd, $this->response->message('ping', ['now' => time()]));
  106. return $this->send($fd, $this->response->success());
  107. }
  108. public function login($type, $uid, $fd)
  109. {
  110. $key = '_ws_' . $type;
  111. $this->cache->sadd($key, $fd);
  112. $this->cache->sadd($key . $uid, $fd);
  113. $this->refresh($type, $uid);
  114. }
  115. public function refresh($type, $uid)
  116. {
  117. $key = '_ws_' . $type;
  118. $this->cache->expire($key, 1800);
  119. $this->cache->expire($key . $uid, 1800);
  120. }
  121. public function logout($type, $uid, $fd)
  122. {
  123. $key = '_ws_' . $type;
  124. $this->cache->srem($key, $fd);
  125. $this->cache->srem($key . $uid, $fd);
  126. }
  127. /**
  128. * 获取当前用户所有的fd
  129. * @param $type
  130. * @param string $uid
  131. * @return array
  132. */
  133. public static function userFd($type, $uid = '')
  134. {
  135. $key = '_ws_' . $type . $uid;
  136. return CacheService::redisHandler()->smembers($key) ?: [];
  137. }
  138. /**
  139. * 执行事件调度
  140. * @param $type
  141. * @param $method
  142. * @param $result
  143. * @return null|Json
  144. */
  145. protected function exec($type, $method, $result)
  146. {
  147. if (!in_array($type, self::USER_TYPE)) {
  148. return null;
  149. }
  150. if (!is_array($result)) {
  151. return null;
  152. }
  153. /** @var Json $response */
  154. return $this->event->until('swoole.websocket.' . $type, [$method, $result, $this, $this->nowRoom]);
  155. }
  156. /**
  157. * @param Frame $frame
  158. * @return bool
  159. */
  160. public function onMessage(Frame $frame)
  161. {
  162. $fd = $this->websocket->getSender();
  163. $info = $this->nowRoom->get($fd);
  164. $result = json_decode($frame->data, true) ?: [];
  165. if (!isset($result['type']) || !$result['type']) return true;
  166. $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
  167. $this->refresh($info['type'], $info['uid']);
  168. if ($result['type'] == 'ping') {
  169. return $this->send($fd, $this->response->message('ping', ['now' => time()]));
  170. }
  171. $data = $result['data'] ?? [];
  172. $frame->uid = $info['uid'];
  173. /** @var Response $res */
  174. $res = $this->exec(self::USER_TYPE[$info['type']], $result['type'], [$fd, $result['form_type'] ?? null, $data, $this->response]);
  175. if ($res) return $this->send($fd, $res);
  176. return true;
  177. }
  178. /**
  179. * @param int $type
  180. * @param int $userId
  181. * @param int $toUserId
  182. * @param string $field
  183. */
  184. public function updateTabelField(int $type, int $userId, int $toUserId, string $field = 'to_uid')
  185. {
  186. $fds = self::userFd($type, $userId);
  187. foreach ($fds as $fd) {
  188. $this->nowRoom->update($fd, $field, $toUserId);
  189. }
  190. }
  191. /**
  192. * 发送文本响应
  193. * @param $fd
  194. * @param Json $json
  195. * @return bool
  196. */
  197. public function send($fd, \think\response\Json $json)
  198. {
  199. return $this->pushing($fd, $json->getData());
  200. }
  201. /**
  202. * 发送
  203. * @param $data
  204. * @return bool
  205. */
  206. public function pushing($fds, $data, $exclude = null)
  207. {
  208. if ($data instanceof \think\response\Json) {
  209. $data = $data->getData();
  210. }
  211. $data = is_array($data) ? json_encode($data) : $data;
  212. $fds = is_array($fds) ? $fds : [$fds];
  213. foreach ($fds as $fd) {
  214. if (!$fd) {
  215. continue;
  216. }
  217. if ($exclude && is_array($exclude) && !in_array($fd, $exclude)) {
  218. continue;
  219. } elseif ($exclude && $exclude == $fd) {
  220. continue;
  221. }
  222. $this->websocket->to($fd)->push($data);
  223. }
  224. return true;
  225. }
  226. /**
  227. * 关闭连接
  228. */
  229. public function onClose()
  230. {
  231. $fd = $this->websocket->getSender();
  232. $tabfd = (string)$fd;
  233. if ($this->nowRoom->exist($fd)) {
  234. $data = $this->nowRoom->get($tabfd);
  235. $this->logout($data['type'], $data['uid'], $fd);
  236. $this->nowRoom->type($data['type'])->del($tabfd);
  237. $this->exec(self::USER_TYPE[$data['type']], 'close', [$fd, null, ['data' => $data], $this->response]);
  238. }
  239. parent::onClose();
  240. }
  241. }