WorkermanService.php 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | Admin 通知服务处理类
  4. // +----------------------------------------------------------------------
  5. namespace app\services\workerman;
  6. use Channel\Client;
  7. use Workerman\Connection\TcpConnection;
  8. use Workerman\Lib\Timer;
  9. use Workerman\Worker;
  10. class WorkermanService
  11. {
  12. protected $worker;
  13. protected $connections = [];
  14. protected $user = [];
  15. protected $handle;
  16. protected $response;
  17. protected $timer;
  18. public function __construct(Worker $worker)
  19. {
  20. $this->worker = $worker;
  21. $this->handle = new WorkermanHandle($this);
  22. $this->response = new Response();
  23. }
  24. public function setUser(TcpConnection $connection)
  25. {
  26. $this->user[$connection->adminInfo['id']] = $connection;
  27. }
  28. public function onConnect(TcpConnection $connection)
  29. {
  30. $this->connections[$connection->id] = $connection;
  31. $connection->lastMessageTime = time();
  32. }
  33. public function onMessage(TcpConnection $connection, $res)
  34. {
  35. $connection->lastMessageTime = time();
  36. $res = json_decode($res, true);
  37. if (!$res || !isset($res['type']) || !$res['type'] || $res['type'] == 'ping') {
  38. return $this->response->connection($connection)->success('ping', ['now' => time()]);
  39. }
  40. if (!method_exists($this->handle, $res['type'])) return;
  41. $this->handle->{$res['type']}($connection, $res + ['data' => []], $this->response->connection($connection));
  42. }
  43. public function onWorkerStart(Worker $worker)
  44. {
  45. ChannelService::connet();
  46. Client::on('muyinjie', function ($eventData) use ($worker) {
  47. if (!isset($eventData['type']) || !$eventData['type']) return;
  48. $ids = isset($eventData['ids']) && count($eventData['ids']) ? $eventData['ids'] : array_keys($this->user);
  49. foreach ($ids as $id) {
  50. if (isset($this->user[$id])) {
  51. $this->response->connection($this->user[$id])->success($eventData['type'], $eventData['data'] ?? null);
  52. }
  53. }
  54. });
  55. $this->timer = Timer::add(15, function () use (&$worker) {
  56. $time_now = time();
  57. foreach ($worker->connections as $connection) {
  58. if ($time_now - $connection->lastMessageTime > 12) {
  59. $this->response->connection($connection)->close('timeout');
  60. }
  61. }
  62. });
  63. }
  64. public function onClose(TcpConnection $connection)
  65. {
  66. unset($this->connections[$connection->id]);
  67. if (isset($connection->adminId)) {
  68. unset($this->user[$connection->adminId]);
  69. }
  70. }
  71. }