123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- <?php
- namespace Jobs\Model\Queue;
- use Jobs\Model\Queue\MBaseTopicQueue;
- class MRedisTopicQueue extends MBaseTopicQueue
- {
- private $logger =null;
- /**
- * RedisTopicQueue constructor.
- * 使用依赖注入的方式.
- *
- * @param \Redis $redis
- */
- public function __construct(\Redis $redis, $logger)
- {
- $this->queue = $redis;
- $this->logger = $logger;
- }
- public static function getConnection(array $config, $logger)
- {
- try {
- $redis = new \Redis();
- $redis->connect($config['host'], $config['port']);
- if (isset($config['password']) && !empty($config['password'])) {
- $redis->auth($config['password']);
- }
- if (isset($config['database']) && !empty($config['database'])) {
- $redis->select($config['database']);
- }
- } catch (\Throwable $e) {
- catchError($logger, $e);
- return false;
- } catch (\Exception $e) {
- catchError($logger, $e);
- return false;
- }
- $connection = new self($redis, $logger);
- return $connection;
- }
- /**
- * Push message to queue.
- *
- * @param [type] $topic
- * @param object $job
- *
- * @return string
- */
- public function push($topic, $job)
- {
- if (!$this->isConnected()) {
- return '';
- }
- $this->queue->lPush($topic, serialize($job));
- return $job->uuid ?: '';
- }
- public function pop($topic)
- {
- if (!$this->isConnected()) {
- return;
- }
- $result = $this->queue->lPop($topic);
- return !empty($result) ? unserialize($result) : null;
- }
- public function len($topic)
- {
- if (!$this->isConnected()) {
- return 0;
- }
- return (int) $this->queue->lLen($topic) ?: 0;
- }
- public function close()
- {
- if (!$this->isConnected()) {
- return;
- }
- $this->queue->close();
- }
- public function isConnected()
- {
- try {
- $this->queue->ping();
- } catch (\Throwable $e) {
- catchError($this->logger, $e);
- return false;
- } catch (\Exception $e) {
- catchError($this->logger, $e);
- return false;
- }
- return true;
- }
- }
|