<?php

namespace Jobs\Model\Queue;

use Jobs\Model\Queue\MBaseTopicQueue;

class MRedisTopicQueue extends MBaseTopicQueue
{
    private $logger =null;

    /**
     * RedisTopicQueue constructor.
     * 使用依赖注入的方式.
     *
     * @param \Redis $redis
     */
    public function __construct(\Redis $redis, $logger)
    {
        $this->queue   = $redis;
        $this->logger  = $logger;
    }

    public static function getConnection(array $config, $logger)
    {
        try {
            $redis = new \Redis();
            $redis->connect($config['host'], $config['port']);

            if (isset($config['password']) && !empty($config['password'])) {
                $redis->auth($config['password']);
            }
            if (isset($config['database']) && !empty($config['database'])) {
                $redis->select($config['database']);
            }

        } catch (\Throwable $e) {
            catchError($logger, $e);

            return false;
        } catch (\Exception $e) {
            catchError($logger, $e);

            return false;
        }
        $connection = new self($redis, $logger);

        return $connection;
    }

    /**
     * Push message to queue.
     *
     * @param [type]    $topic
     * @param object $job
     *
     * @return string
     */
    public function push($topic, $job)
    {
        if (!$this->isConnected()) {
            return '';
        }

        $this->queue->lPush($topic, serialize($job));

        return $job->uuid ?: '';
    }

    public function pop($topic)
    {
        if (!$this->isConnected()) {
            return;
        }

        $result = $this->queue->lPop($topic);

        return !empty($result) ? unserialize($result) : null;
    }

    public function len($topic)
    {
        if (!$this->isConnected()) {
            return 0;
        }

        return (int) $this->queue->lLen($topic) ?: 0;
    }

    public function close()
    {
        if (!$this->isConnected()) {
            return;
        }

        $this->queue->close();
    }

    public function isConnected()
    {
        try {
            $this->queue->ping();
        } catch (\Throwable $e) {
            catchError($this->logger, $e);

            return false;
        } catch (\Exception $e) {
            catchError($this->logger, $e);

            return false;
        }

        return true;
    }
}