Server.php 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. <?php
  2. namespace Channel;
  3. use Workerman\Worker;
  4. /**
  5. * Channel server.
  6. */
  7. class Server
  8. {
  9. /**
  10. * Worker instance.
  11. * @var Worker
  12. */
  13. protected $_worker = null;
  14. /**
  15. * Construct.
  16. * @param string $ip
  17. * @param int $port
  18. */
  19. public function __construct($ip = '0.0.0.0', $port = 2206)
  20. {
  21. $worker = new Worker("frame://$ip:$port");
  22. $worker->count = 1;
  23. $worker->name = 'ChannelServer';
  24. $worker->channels = array();
  25. $worker->onMessage = array($this, 'onMessage') ;
  26. $worker->onClose = array($this, 'onClose');
  27. $this->_worker = $worker;
  28. }
  29. /**
  30. * onClose
  31. * @return void
  32. */
  33. public function onClose($connection)
  34. {
  35. if(empty($connection->channels))
  36. {
  37. return;
  38. }
  39. foreach($connection->channels as $channel)
  40. {
  41. unset($this->_worker->channels[$channel][$connection->id]);
  42. if(empty($this->_worker->channels[$channel]))
  43. {
  44. unset($this->_worker->channels[$channel]);
  45. }
  46. }
  47. }
  48. /**
  49. * onMessage.
  50. * @param TcpConnection $connection
  51. * @param string $data
  52. */
  53. public function onMessage($connection, $data)
  54. {
  55. if(!$data)
  56. {
  57. return;
  58. }
  59. $worker = $this->_worker;
  60. $data = unserialize($data);
  61. $type = $data['type'];
  62. $channels = $data['channels'];
  63. switch($type)
  64. {
  65. case 'subscribe':
  66. foreach($channels as $channel)
  67. {
  68. $connection->channels[$channel] = $channel;
  69. $worker->channels[$channel][$connection->id] = $connection;
  70. }
  71. break;
  72. case 'unsubscribe':
  73. foreach($channels as $channel)
  74. {
  75. if(isset($connection->channels[$channel]))
  76. {
  77. unset($connection->channels[$channel]);
  78. }
  79. if(isset($worker->channels[$channel][$connection->id]))
  80. {
  81. unset($worker->channels[$channel][$connection->id]);
  82. if(empty($worker->channels[$channel]))
  83. {
  84. unset($worker->channels[$channel]);
  85. }
  86. }
  87. }
  88. break;
  89. case 'publish':
  90. foreach($channels as $channel)
  91. {
  92. if(empty($worker->channels[$channel]))
  93. {
  94. continue;
  95. }
  96. $buffer = serialize(array('channel'=>$channel, 'data' => $data['data']))."\n";
  97. foreach($worker->channels[$channel] as $connection)
  98. {
  99. $connection->send($buffer);
  100. }
  101. }
  102. break;
  103. }
  104. }
  105. }