MRedisTopicQueue.Class.php 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. <?php
  2. namespace Jobs\Model\Queue;
  3. use Jobs\Model\Queue\MBaseTopicQueue;
  4. class MRedisTopicQueue extends MBaseTopicQueue
  5. {
  6. private $logger =null;
  7. /**
  8. * RedisTopicQueue constructor.
  9. * 使用依赖注入的方式.
  10. *
  11. * @param \Redis $redis
  12. */
  13. public function __construct(\Redis $redis, $logger)
  14. {
  15. $this->queue = $redis;
  16. $this->logger = $logger;
  17. }
  18. public static function getConnection(array $config, $logger)
  19. {
  20. try {
  21. $redis = new \Redis();
  22. $redis->connect($config['host'], $config['port']);
  23. if (isset($config['password']) && !empty($config['password'])) {
  24. $redis->auth($config['password']);
  25. }
  26. if (isset($config['database']) && !empty($config['database'])) {
  27. $redis->select($config['database']);
  28. }
  29. } catch (\Throwable $e) {
  30. catchError($logger, $e);
  31. return false;
  32. } catch (\Exception $e) {
  33. catchError($logger, $e);
  34. return false;
  35. }
  36. $connection = new self($redis, $logger);
  37. return $connection;
  38. }
  39. /**
  40. * Push message to queue.
  41. *
  42. * @param [type] $topic
  43. * @param object $job
  44. *
  45. * @return string
  46. */
  47. public function push($topic, $job)
  48. {
  49. if (!$this->isConnected()) {
  50. return '';
  51. }
  52. $this->queue->lPush($topic, serialize($job));
  53. return $job->uuid ?: '';
  54. }
  55. public function pop($topic)
  56. {
  57. if (!$this->isConnected()) {
  58. return;
  59. }
  60. $result = $this->queue->lPop($topic);
  61. return !empty($result) ? unserialize($result) : null;
  62. }
  63. public function len($topic)
  64. {
  65. if (!$this->isConnected()) {
  66. return 0;
  67. }
  68. return (int) $this->queue->lLen($topic) ?: 0;
  69. }
  70. public function close()
  71. {
  72. if (!$this->isConnected()) {
  73. return;
  74. }
  75. $this->queue->close();
  76. }
  77. public function isConnected()
  78. {
  79. try {
  80. $this->queue->ping();
  81. } catch (\Throwable $e) {
  82. catchError($this->logger, $e);
  83. return false;
  84. } catch (\Exception $e) {
  85. catchError($this->logger, $e);
  86. return false;
  87. }
  88. return true;
  89. }
  90. }