Manager.php 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | CRMEB [ CRMEB赋能开发者,助力企业发展 ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2016~2020 https://www.crmeb.com All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed CRMEB并不是自由软件,未经许可不能去掉CRMEB相关版权
  8. // +----------------------------------------------------------------------
  9. // | Author: CRMEB Team <admin@crmeb.com>
  10. // +----------------------------------------------------------------------
  11. namespace app\webscoket;
  12. use think\Config;
  13. use qiniu\services\CacheService;
  14. use Swoole\Websocket\Frame;
  15. use think\Event;
  16. use think\Request;
  17. use think\response\Json;
  18. use think\swoole\Websocket;
  19. use think\swoole\websocket\Room;
  20. use app\webscoket\Room as NowRoom;
  21. use think\swoole\websocket\socketio\Handler;
  22. /**
  23. * Class Manager
  24. * @package app\webscoket
  25. */
  26. class Manager extends Handler
  27. {
  28. /**
  29. * @var
  30. */
  31. protected $manager;
  32. /**
  33. * @var int
  34. */
  35. protected $cache_timeout;
  36. /**
  37. * @var Response
  38. */
  39. protected $response;
  40. /**
  41. * @var \Redis
  42. */
  43. protected $cache;
  44. /**
  45. * @var NowRoom
  46. */
  47. protected $nowRoom;
  48. const USER_TYPE = ['user', 'system'];
  49. /**
  50. * Manager constructor.
  51. * @param Event $event
  52. * @param Config $config
  53. * @param Websocket $websocket
  54. * @param Response $response
  55. * @param \app\webscoket\Room $nowRoom
  56. */
  57. public function __construct(Event $event, Config $config, Websocket $websocket, Response $response, NowRoom $nowRoom)
  58. {
  59. parent::__construct($event, $config, $websocket);
  60. $this->response = $response;
  61. $this->nowRoom = $nowRoom;
  62. $this->cache = CacheService::redisHandler();
  63. $this->nowRoom->setCache($this->cache);
  64. $this->cache_timeout = intval(app()->config->get('swoole.websocket.ping_timeout', 60000) / 1000) + 2;
  65. }
  66. /**
  67. * @param Request $request
  68. * @return bool|void
  69. */
  70. public function onOpen(Request $request)
  71. {
  72. $fd = $this->websocket->getSender();
  73. $type = $request->get('type');
  74. $token = $request->get('token');
  75. if (!$token || !in_array($type, self::USER_TYPE)) {
  76. $this->websocket->close();
  77. return;
  78. }
  79. $types = self::USER_TYPE;
  80. $this->nowRoom->type(array_flip($types)[$type]);
  81. try {
  82. $data = $this->exec($type, 'login', [$fd, $request->get('form_type', null), ['token' => $token], $this->response])->getData();
  83. } catch (\Throwable $e) {
  84. $this->websocket->close();
  85. return;
  86. }
  87. if ($data['status'] != 200 || !($data['data']['uid'] ?? null)) {
  88. $this->websocket->close();
  89. return;
  90. }
  91. $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
  92. $uid = $data['data']['uid'];
  93. $type = array_search($type, self::USER_TYPE);
  94. $this->login($type, $uid, $fd);
  95. $this->nowRoom->add((string)$fd, $uid, 0);
  96. $this->send($fd, $this->response->message('ping', ['now' => time()]));
  97. return $this->send($fd, $this->response->success());
  98. }
  99. public function login($type, $uid, $fd)
  100. {
  101. $key = '_ws_' . $type;
  102. $this->cache->sadd($key, $fd);
  103. $this->cache->sadd($key . $uid, $fd);
  104. $this->refresh($type, $uid);
  105. }
  106. public function refresh($type, $uid)
  107. {
  108. $key = '_ws_' . $type;
  109. $this->cache->expire($key, 1800);
  110. $this->cache->expire($key . $uid, 1800);
  111. }
  112. public function logout($type, $uid, $fd)
  113. {
  114. $key = '_ws_' . $type;
  115. $this->cache->srem($key, $fd);
  116. $this->cache->srem($key . $uid, $fd);
  117. }
  118. /**
  119. * 获取当前用户所有的fd
  120. * @param $type
  121. * @param string $uid
  122. * @return array
  123. */
  124. public static function userFd($type, $uid = '')
  125. {
  126. $key = '_ws_' . $type . $uid;
  127. return CacheService::redisHandler()->smembers($key) ?: [];
  128. }
  129. /**
  130. * 执行事件调度
  131. * @param $type
  132. * @param $method
  133. * @param $result
  134. * @return null|Json
  135. */
  136. protected function exec($type, $method, $result)
  137. {
  138. if (!in_array($type, self::USER_TYPE)) {
  139. return null;
  140. }
  141. if (!is_array($result)) {
  142. return null;
  143. }
  144. /** @var Json $response */
  145. return $this->event->until('swoole.websocket.' . $type, [$method, $result, $this, $this->nowRoom]);
  146. }
  147. /**
  148. * @param Frame $frame
  149. * @return bool
  150. */
  151. public function onMessage(Frame $frame)
  152. {
  153. $fd = $this->websocket->getSender();
  154. $info = $this->nowRoom->get($fd);
  155. $result = json_decode($frame->data, true) ?: [];
  156. if (!isset($result['type']) || !$result['type']) return true;
  157. $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
  158. $this->refresh($info['type'], $info['uid']);
  159. if ($result['type'] == 'ping') {
  160. return $this->send($fd, $this->response->message('ping', ['now' => time()]));
  161. }
  162. $data = $result['data'] ?? [];
  163. $frame->uid = $info['uid'];
  164. /** @var Response $res */
  165. $res = $this->exec(self::USER_TYPE[$info['type']], $result['type'], [$fd, $result['form_type'] ?? null, $data, $this->response]);
  166. if ($res) return $this->send($fd, $res);
  167. return true;
  168. }
  169. /**
  170. * @param int $type
  171. * @param int $userId
  172. * @param int $toUserId
  173. * @param string $field
  174. */
  175. public function updateTableField(int $type, int $userId, int $toUserId, string $field = 'to_uid')
  176. {
  177. $fds = self::userFd($type, $userId);
  178. foreach ($fds as $fd) {
  179. $this->nowRoom->update($fd, $field, $toUserId);
  180. }
  181. }
  182. /**
  183. * 发送文本响应
  184. * @param $fd
  185. * @param Json $json
  186. * @return bool
  187. */
  188. public function send($fd, \think\response\Json $json)
  189. {
  190. return $this->pushing($fd, $json->getData());
  191. }
  192. /**
  193. * 发送
  194. * @param $fds
  195. * @param $data
  196. * @param null $exclude
  197. * @return bool
  198. */
  199. public function pushing($fds, $data, $exclude = null)
  200. {
  201. if ($data instanceof \think\response\Json) {
  202. $data = $data->getData();
  203. }
  204. $data = is_array($data) ? json_encode($data) : $data;
  205. $fds = is_array($fds) ? $fds : [$fds];
  206. foreach ($fds as $fd) {
  207. if (!$fd) {
  208. continue;
  209. }
  210. if ($exclude && is_array($exclude) && !in_array($fd, $exclude)) {
  211. continue;
  212. } elseif ($exclude && $exclude == $fd) {
  213. continue;
  214. }
  215. $this->websocket->to($fd)->push($data);
  216. }
  217. return true;
  218. }
  219. /**
  220. * 关闭连接
  221. */
  222. public function onClose()
  223. {
  224. $fd = $this->websocket->getSender();
  225. $tabfd = (string)$fd;
  226. if ($this->nowRoom->exist($fd)) {
  227. $data = $this->nowRoom->get($tabfd);
  228. $this->logout($data['type'], $data['uid'], $fd);
  229. $this->nowRoom->type($data['type'])->del($tabfd);
  230. $this->exec(self::USER_TYPE[$data['type']], 'close', [$fd, null, ['data' => $data], $this->response]);
  231. }
  232. parent::onClose();
  233. }
  234. public function checkGroupUser($uid, $group_id)
  235. {
  236. $on = [];
  237. $toUserFd = $this->userFd(0, $uid);
  238. foreach ($toUserFd as $value) {
  239. if ($frem = $this->nowRoom->get($value)) {
  240. //如果当收消息人在和当前发消息人对话中
  241. if (($frem['to_group'] ?? 0) == $group_id) {
  242. $on[] = $value;
  243. }
  244. }
  245. }
  246. return $on;
  247. }
  248. }