WorkermanService.php 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. <?php
  2. namespace crmeb\services\workerman;
  3. use Channel\Client;
  4. use Channel\Server;
  5. use Workerman\Connection\TcpConnection;
  6. use Workerman\Lib\Timer;
  7. use Workerman\Worker;
  8. class WorkermanService
  9. {
  10. /**
  11. * @var Worker
  12. */
  13. protected $worker;
  14. /**
  15. * @var TcpConnection[]
  16. */
  17. protected $connections = [];
  18. /**
  19. * @var TcpConnection[]
  20. */
  21. protected $user = [];
  22. /**
  23. * @var WorkermanHandle
  24. */
  25. protected $handle;
  26. /**
  27. * @var Response
  28. */
  29. protected $response;
  30. /**
  31. * @var int
  32. */
  33. protected $timer;
  34. public function __construct(Worker $worker)
  35. {
  36. $this->worker = $worker;
  37. $this->handle = new WorkermanHandle($this);
  38. $this->response = new Response();
  39. }
  40. public function setUser(TcpConnection $connection)
  41. {
  42. $this->user[$connection->adminInfo['id']] = $connection;
  43. }
  44. public function onConnect(TcpConnection $connection)
  45. {
  46. $this->connections[$connection->id] = $connection;
  47. $connection->lastMessageTime = time();
  48. }
  49. public function onMessage(TcpConnection $connection, $res)
  50. {
  51. $connection->lastMessageTime = time();
  52. $res = json_decode($res, true);
  53. if (!$res || !isset($res['type']) || !$res['type'] || $res['type'] == 'ping') return;
  54. var_dump('onMessage', $res);
  55. if (!method_exists($this->handle, $res['type'])) return;
  56. $this->handle->{$res['type']}($connection, $res + ['data' => []], $this->response->connection($connection));
  57. }
  58. public function onWorkerStart(Worker $worker)
  59. {
  60. var_dump('onWorkerStart');
  61. ChannelService::connet();
  62. Client::on('crmeb', function ($eventData) use ($worker) {
  63. if (!isset($eventData['type']) || !$eventData['type']) return;
  64. $ids = isset($eventData['ids']) && count($eventData['ids']) ? $eventData['ids'] : array_keys($this->user);
  65. foreach ($ids as $id) {
  66. if (isset($this->user[$id]))
  67. $this->response->connection($this->user[$id])->success($eventData['type'], $eventData['data'] ?? null);
  68. }
  69. });
  70. $this->timer = Timer::add(15, function () use (&$worker) {
  71. $time_now = time();
  72. foreach ($worker->connections as $connection) {
  73. if ($time_now - $connection->lastMessageTime > 12) {
  74. $this->response->connection($connection)->close('timeout');
  75. }
  76. }
  77. });
  78. }
  79. public function onClose(TcpConnection $connection)
  80. {
  81. var_dump('onClose');
  82. unset($this->connections[$connection->id]);
  83. }
  84. }