queue = $redis; $this->logger = $logger; } public static function getConnection(array $config, $logger) { try { $redis = new \Redis(); $redis->connect($config['host'], $config['port']); if (isset($config['password']) && !empty($config['password'])) { $redis->auth($config['password']); } if (isset($config['database']) && !empty($config['database'])) { $redis->select($config['database']); } } catch (\Throwable $e) { catchError($logger, $e); return false; } catch (\Exception $e) { catchError($logger, $e); return false; } $connection = new self($redis, $logger); return $connection; } /** * Push message to queue. * * @param [type] $topic * @param object $job * * @return string */ public function push($topic, $job) { if (!$this->isConnected()) { return ''; } $this->queue->lPush($topic, serialize($job)); return $job->uuid ?: ''; } public function pop($topic) { if (!$this->isConnected()) { return; } $result = $this->queue->lPop($topic); return !empty($result) ? unserialize($result) : null; } public function len($topic) { if (!$this->isConnected()) { return 0; } return (int) $this->queue->lLen($topic) ?: 0; } public function close() { if (!$this->isConnected()) { return; } $this->queue->close(); } public function isConnected() { try { $this->queue->ping(); } catch (\Throwable $e) { catchError($this->logger, $e); return false; } catch (\Exception $e) { catchError($this->logger, $e); return false; } return true; } }