AdaSubscribe.php 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. <?php
  2. namespace AdaPay;
  3. use Workerman\Worker;
  4. use Workerman\Mqtt\Client;
  5. class AdaSubscribe extends AdaPay
  6. {
  7. public $worker;
  8. public $username;
  9. public $password;
  10. public $accessKey = "";
  11. public $instanceId = "";
  12. public $groupId = "";
  13. public $topic = "";
  14. public $clientId = "";
  15. public $client_address = "";
  16. public $token = "";
  17. public $callbackFunc = "";
  18. public $mq_token = NULL;
  19. public function __construct()
  20. {
  21. parent::__construct();
  22. $this->_init();
  23. $this->mq_token = new AdaMqttToken();
  24. }
  25. public function workerStart($workerMsg, $callback, $apiKey="", $client_id=""){
  26. $this->worker = new Worker();
  27. $this->_setting($apiKey, $client_id);
  28. $topic = $this->topic;
  29. $this->worker->onWorkerStart = function () use ($topic, $workerMsg, $callback){
  30. $options = array(
  31. 'keepalive'=> 5,
  32. 'username'=>$this->username,
  33. 'password'=>$this->_get_password(),
  34. 'client_id'=> $this->clientId,
  35. 'clean_session'=> false,
  36. 'debug'=>self::$isDebug
  37. );
  38. $client = new Client('mqtt://'.$this->client_address, $options);
  39. $client->onConnect = function($client) use ($topic) {
  40. $client->subscribe($topic, ['qos'=>1]);
  41. };
  42. $client->onError = function ($exception) use($options, $client){
  43. $this->worker->stopAll();
  44. print("execute before password:---------------------------");
  45. var_dump($options['password']);
  46. $options['password'] = $this->_get_password(); //重新获取token
  47. print("execute after password:---------------------------");
  48. var_dump($options['password']);
  49. $client->onConnectionClose(); //断开重新连接
  50. };
  51. $client->onMessage = function($topic, $content) use ($options, $workerMsg, $callback, $client) {
  52. if( $topic == '$SYS/tokenExpireNotice' ){
  53. print("execute before password:---------------------------");
  54. var_dump($options['password']);
  55. $options['password'] = $this->_get_password(); //重新获取token
  56. print("execute OnMessage password:---------------------------");
  57. var_dump($options['password']);
  58. $client->onConnectionClose(); //断开重新连接
  59. }else{
  60. call_user_func(array($workerMsg, 'mqttCallBack'), $content, $callback, $topic);
  61. }
  62. };
  63. $client->connect();
  64. };
  65. $this->worker->runAll();
  66. }
  67. public function mqttCallBack($content, $callback, $topic){
  68. $callback($content, $topic);
  69. }
  70. private function _init(){
  71. $this->accessKey = self::$mqttAccessKey;
  72. $this->instanceId = self::$mqttInstanceId;
  73. $this->groupId = self::$mqttGroupId;
  74. $this->client_address = self::$mqttAddress;
  75. }
  76. private function _get_password(){
  77. $token = $this->mq_token->getToken();
  78. return "R|".$token;
  79. }
  80. private function _setting($apiKey, $client_id){
  81. $_apiKey = empty($apiKey)? parent::$api_key: $apiKey;
  82. $client_id = empty($client_id)? $_apiKey: $_apiKey.$client_id;
  83. $this->username = 'Token|' . $this->accessKey . '|' . $this->instanceId;
  84. $this->clientId = $this->groupId . '@@@' . md5($client_id);
  85. $this->topic = "topic_crhs_sender/".$_apiKey;
  86. }
  87. }