Client.php 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. <?php
  2. namespace Channel;
  3. use Workerman\Lib\Timer;
  4. use Workerman\Connection\AsyncTcpConnection;
  5. /**
  6. * Channel/Client
  7. * @version 1.0.5
  8. */
  9. class Client
  10. {
  11. /**
  12. * onMessage.
  13. * @var callback
  14. */
  15. public static $onMessage = null;
  16. /**
  17. * onConnect
  18. * @var callback
  19. */
  20. public static $onConnect = null;
  21. /**
  22. * onClose
  23. * @var callback
  24. */
  25. public static $onClose = null;
  26. /**
  27. * Connction to channel server.
  28. * @var TcpConnection
  29. */
  30. protected static $_remoteConnection = null;
  31. /**
  32. * Channel server ip.
  33. * @var string
  34. */
  35. protected static $_remoteIp = null;
  36. /**
  37. * Channel server port.
  38. * @var int
  39. */
  40. protected static $_remotePort = null;
  41. /**
  42. * Reconnect timer.
  43. * @var Timer
  44. */
  45. protected static $_reconnectTimer = null;
  46. /**
  47. * Ping timer.
  48. * @var Timer
  49. */
  50. protected static $_pingTimer = null;
  51. /**
  52. * All event callback.
  53. * @var array
  54. */
  55. protected static $_events = array();
  56. /**
  57. * @var bool
  58. */
  59. protected static $_isWorkermanEnv = true;
  60. /**
  61. * Ping interval.
  62. * @var int
  63. */
  64. public static $pingInterval = 25;
  65. /**
  66. * Connect to channel server
  67. * @param string $ip
  68. * @param int $port
  69. * @return void
  70. */
  71. public static function connect($ip = '127.0.0.1', $port = 2206)
  72. {
  73. if(!self::$_remoteConnection)
  74. {
  75. self::$_remoteIp = $ip;
  76. self::$_remotePort = $port;
  77. if (PHP_SAPI !== 'cli' || !class_exists('Workerman\Worker', false)) {
  78. self::$_isWorkermanEnv = false;
  79. }
  80. // For workerman environment.
  81. if (self::$_isWorkermanEnv) {
  82. self::$_remoteConnection = new AsyncTcpConnection('frame://' . self::$_remoteIp . ':' . self::$_remotePort);
  83. self::$_remoteConnection->onClose = 'Channel\Client::onRemoteClose';
  84. self::$_remoteConnection->onConnect = 'Channel\Client::onRemoteConnect';
  85. self::$_remoteConnection->onMessage = 'Channel\Client::onRemoteMessage';
  86. self::$_remoteConnection->connect();
  87. if (empty(self::$_pingTimer)) {
  88. self::$_pingTimer = Timer::add(self::$pingInterval, 'Channel\Client::ping');
  89. }
  90. // Not workerman environment.
  91. } else {
  92. self::$_remoteConnection = stream_socket_client('tcp://'.self::$_remoteIp.':'.self::$_remotePort, $code, $message, 5);
  93. if (!self::$_remoteConnection) {
  94. throw new \Exception($message);
  95. }
  96. }
  97. }
  98. }
  99. /**
  100. * onRemoteMessage.
  101. * @param TcpConnection $connection
  102. * @param string $data
  103. * @throws \Exception
  104. */
  105. public static function onRemoteMessage($connection, $data)
  106. {
  107. $data = unserialize($data);
  108. $event = $data['channel'];
  109. $event_data = $data['data'];
  110. if(!empty(self::$_events[$event]))
  111. {
  112. call_user_func(self::$_events[$event], $event_data);
  113. }
  114. elseif(!empty(Client::$onMessage))
  115. {
  116. call_user_func(Client::$onMessage, $event, $event_data);
  117. }
  118. else
  119. {
  120. throw new \Exception("event:$event have not callback");
  121. }
  122. }
  123. /**
  124. * Ping.
  125. * @return void
  126. */
  127. public static function ping()
  128. {
  129. if(self::$_remoteConnection)
  130. {
  131. self::$_remoteConnection->send('');
  132. }
  133. }
  134. /**
  135. * onRemoteClose.
  136. * @return void
  137. */
  138. public static function onRemoteClose()
  139. {
  140. echo "Waring channel connection closed and try to reconnect\n";
  141. self::$_remoteConnection = null;
  142. self::clearTimer();
  143. self::$_reconnectTimer = Timer::add(1, 'Channel\Client::connect', array(self::$_remoteIp, self::$_remotePort));
  144. if (self::$onClose) {
  145. call_user_func(Client::$onClose);
  146. }
  147. }
  148. /**
  149. * onRemoteConnect.
  150. * @return void
  151. */
  152. public static function onRemoteConnect()
  153. {
  154. $all_event_names = array_keys(self::$_events);
  155. if($all_event_names)
  156. {
  157. self::subscribe($all_event_names);
  158. }
  159. self::clearTimer();
  160. if (self::$onConnect) {
  161. call_user_func(Client::$onConnect);
  162. }
  163. }
  164. /**
  165. * clearTimer.
  166. * @return void
  167. */
  168. public static function clearTimer()
  169. {
  170. if (!self::$_isWorkermanEnv) {
  171. throw new \Exception('Channel\\Client not support clearTimer method when it is not in the workerman environment.');
  172. }
  173. if(self::$_reconnectTimer)
  174. {
  175. Timer::del(self::$_reconnectTimer);
  176. self::$_reconnectTimer = null;
  177. }
  178. }
  179. /**
  180. * On.
  181. * @param string $event
  182. * @param callback $callback
  183. * @throws \Exception
  184. */
  185. public static function on($event, $callback)
  186. {
  187. if (!self::$_isWorkermanEnv) {
  188. throw new \Exception('Channel\\Client not support on method when it is not in the workerman environment.');
  189. }
  190. if(!is_callable($callback))
  191. {
  192. throw new \Exception('callback is not callable');
  193. }
  194. self::$_events[$event] = $callback;
  195. self::subscribe(array($event));
  196. }
  197. /**
  198. * Subscribe.
  199. * @param string $events
  200. * @return void
  201. */
  202. public static function subscribe($events)
  203. {
  204. if (!self::$_isWorkermanEnv) {
  205. throw new \Exception('Channel\\Client not support subscribe method when it is not in the workerman environment.');
  206. }
  207. self::connect(self::$_remoteIp, self::$_remotePort);
  208. $events = (array)$events;
  209. foreach($events as $event)
  210. {
  211. if(!isset(self::$_events[$event]))
  212. {
  213. self::$_events[$event] = null;
  214. }
  215. }
  216. self::$_remoteConnection->send(serialize(array('type' => 'subscribe', 'channels'=>(array)$events)));
  217. }
  218. /**
  219. * Unsubscribe.
  220. * @param string $events
  221. * @return void
  222. */
  223. public static function unsubscribe($events)
  224. {
  225. if (!self::$_isWorkermanEnv) {
  226. throw new \Exception('Channel\\Client not support unsubscribe method when it is not in the workerman environment.');
  227. }
  228. self::connect(self::$_remoteIp, self::$_remotePort);
  229. $events = (array)$events;
  230. foreach($events as $event)
  231. {
  232. unset(self::$_events[$event]);
  233. }
  234. self::$_remoteConnection->send(serialize(array('type' => 'unsubscribe', 'channels'=>$events)));
  235. }
  236. /**
  237. * Publish.
  238. * @param string $events
  239. * @param mixed $data
  240. */
  241. public static function publish($events, $data)
  242. {
  243. self::connect(self::$_remoteIp, self::$_remotePort);
  244. if (self::$_isWorkermanEnv) {
  245. self::$_remoteConnection->send(serialize(array('type' => 'publish', 'channels' => (array)$events, 'data' => $data)));
  246. } else {
  247. $body = serialize(array('type' => 'publish', 'channels'=>(array)$events, 'data' => $data));
  248. $buffer = pack('N', 4+strlen($body)) . $body;
  249. fwrite(self::$_remoteConnection, $buffer);
  250. }
  251. }
  252. }