* * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ class Client { const DEFAULT_PROTOCOL = 'http'; const DEFAULT_SERVER = '127.0.0.1:9200'; const DEFAULT_INDEX = null; const DEFAULT_TYPE = null; protected $_config = array(); protected static $_defaults = array( 'protocol' => Client::DEFAULT_PROTOCOL, 'servers' => Client::DEFAULT_SERVER, 'index' => Client::DEFAULT_INDEX, 'type' => Client::DEFAULT_TYPE, 'timeout' => null, ); protected static $_protocols = array( 'http' => 'Mall\\Framework\\SearchClient\\Transport\\HTTP' ); /** * @var Transport\Base */ private $transport; /** * @var Bulk */ private $bulk; /** * @var string */ private $index; /** * @var string */ private $type; /** * Construct search client * * @return \Mall\Framework\SearchClient\Client * @param \Mall\Framework\SearchClient\Transport\Base $transport * @param string $index * @param string $type */ public function __construct($transport, $index = null, $type = null) { $this->transport = $transport; $this->setIndex($index); $this->setType($type); } /** * Get a client instance * Defaults to opening a http transport connection to 127.0.0.1:9200 * * @param string|array $config Allow overriding only the configuration bits you desire * - _transport_ * - _host_ * - _port_ * - _index_ * - _type_ * @throws \Exception * @return \Mall\Framework\SearchClient\Client */ public static function connection($config = array()) { if (!$config && ($url = getenv('ELASTICSEARCH_URL'))) { $config = $url; } if (is_string($config)) { $config = self::parseDsn($config); } $config += self::$_defaults; $protocol = $config['protocol']; if (!isset(self::$_protocols[$protocol])) { throw new \Exception("Tried to use unknown protocol: $protocol"); } $class = self::$_protocols[$protocol]; if (null !== $config['timeout'] && !is_numeric($config['timeout'])) { throw new \Exception("HTTP timeout should have a numeric value when specified."); } $server = is_array($config['servers']) ? $config['servers'][0] : $config['servers']; list($host, $port) = explode(':', $server); $transport = new $class($host, $port, $config['timeout']); $client = new self($transport, $config['index'], $config['type']); $client->config($config); return $client; } /** * @param array|null $config * @return array|void */ public function config($config = null) { if (!$config) return $this->_config; if (is_array($config)) $this->_config = $config + $this->_config; } /** * Change what index to go against * @return \Mall\Framework\SearchClient\Client * @param mixed $index */ public function setIndex($index) { if (is_array($index)) { $index = implode(",", array_filter($index)); } $this->index = $index; $this->transport->setIndex($index); return $this; } /** * Get current index * * @return string */ public function getIndex() { return $this->index; } /** * Change what types to act against * @return \Mall\Framework\SearchClient\Client * @param mixed $type */ public function setType($type) { if (is_array($type)) $type = implode(",", array_filter($type)); $this->type = $type; $this->transport->setType($type); return $this; } /** * Get current type * * @return string */ public function getType() { return $this->type; } /** * Fetch a document by its id * * @return array * @param mixed $id Optional * @param bool $verbose */ public function get($id, $verbose = false) { return $this->request($id, "GET", false, $verbose); } /** * Puts a mapping on index * * @param array|object $mapping * @param array $config * @throws Exception * @return array */ public function map($mapping, array $config = array()) { if (is_array($mapping)) $mapping = new Mapping($mapping); $mapping->config($config); try { $type = $mapping->config('type'); } catch (\Exception $e) { } // No type is cool if (isset($type) && !$this->passesTypeConstraint($type)) { throw new Exception("Cant create mapping due to type constraint mismatch"); } return $this->request('_mapping', 'PUT', $mapping->export(), true); } protected function passesTypeConstraint($constraint) { if (is_string($constraint)) $constraint = array($constraint); $currentType = explode(',', $this->type); $includeTypes = array_intersect($constraint, $currentType); return ($constraint && count($includeTypes) === count($constraint)); } /** * 更新文档中的字段值 * @param array $data 要更新的字段内容 * @param int $id 要更新的文档id * @return array */ public function updateFieldVaule($data, $id) { $updateData['doc'] = $data; $searchUpdateData = json_encode($updateData); return $this->request($id.'/_update', 'POST', $searchUpdateData); } /** * 批量更新文档中的字段值 * @param array $data 要更新的字段内容 * @param int $id 要更新的文档id * @return array */ public function batchUpdateFieldVaule($data, $query) { $start = microtime(true); $result = $this->transport->batchUpdateFieldVaule($data, $query); $result['time'] = microtime(true) - $start; return $result; } /** * Perform a raw request * * Usage example * * $response = $client->request('_status', 'GET'); * * @return array * @param mixed $path Request path to use. * `type` is prepended to this path inside request * @param string $method HTTP verb to use * @param mixed $payload Array of data to be json-encoded * @param bool $verbose Controls response data, if `false` * only `_source` of response is returned */ public function request($path, $method = 'GET', $payload = false, $verbose = false, $buildPath = true) { $path = $buildPath ? $this->expandPath($path) : $path; $response = $this->transport->request($path, $method, $payload, $buildPath); return ($verbose || !isset($response['_source'])) ? $response : $response['_source']; } /** * Index a new document or update it if existing * * @return array * @param array $document * @param mixed $id Optional * @param array $options Allow sending query parameters to control indexing further * _refresh_ *bool* If set to true, immediately refresh the shard after indexing */ public function index($document, $id = false, array $options = array()) { if ($this->bulk) { return $this->bulk->index($document, $id, $this->index, $this->type, $options); } return $this->transport->index($document, $id, $options); } /** * Perform search, this is the sweet spot * * @return array * @param $query * @param array $options */ public function search($query, array $options = array()) { $start = microtime(true); $result = $this->transport->search($query, $options); $result['time'] = microtime(true) - $start; return $result; } /** * Perform scrollSearch * @param $query * @param string $scroll * @param int $size * @param array $options * @return mixed */ public function scrollSearch($query, $scroll = '10s', $size = 2000, array $options = array()) { $start = microtime(true); $result = $this->transport->scrollSearch($query, $scroll, $size, $options); $result['time'] = microtime(true) - $start; return $result; } /** * Flush this index/type combination * * @return array * @param mixed $id If id is supplied, delete that id for this index * if not wipe the entire index * @param array $options Parameters to pass to delete action */ public function delete($id = false, array $options = array()) { if ($this->bulk) { return $this->bulk->delete($id, $this->index, $this->type, $options); } return $this->transport->delete($id, $options); } /** * Flush this index/type combination * * @return array * @param mixed $query Text or array based query to delete everything that matches * @param array $options Parameters to pass to delete action */ public function deleteByQuery($query, array $options = array()) { return $this->transport->deleteByQuery($query, $options); } /** * Perform refresh of current indexes * * @return array */ public function refresh() { return $this->transport->request(array('_refresh'), 'GET'); } /** * Expand a given path (array or string) * If this is not an absolute path index + type will be prepended * If it is an absolute path it will be used as is * * @param mixed $path * @return array */ protected function expandPath($path) { $path = (array)$path; $isAbsolute = $path[0][0] === '/'; return $isAbsolute ? $path : array_merge((array)$this->type, $path); } /** * Parse a DSN string into an associative array * * @param string $dsn * @return array */ protected static function parseDsn($dsn) { $parts = parse_url($dsn); $protocol = $parts['scheme']; $servers = $parts['host'] . ':' . $parts['port']; if (isset($parts['path'])) { $path = explode('/', $parts['path']); list($index, $type) = array_values(array_filter($path)); } return compact('protocol', 'servers', 'index', 'type'); } /** * Create a bulk-transaction * * @return Bulk */ public function createBulk() { return new Bulk($this); } /** * 增加Index * * @param $index * @param $mappings * @return mixed */ public function createBase($index, $mappings) { return $this->transport->createBase($index, $mappings); } /** * 删除Index * * @param $index * @return mixed */ public function deleteBase($index) { return $this->transport->deleteBase($index); } /** * Begin a transparent bulk-transaction * if one is already running, return its handle * @return Bulk */ public function beginBulk() { if (!$this->bulk) { $this->bulk = $this->createBulk(); } return $this->bulk; } /** * @see beginBulk */ public function begin() { return $this->beginBulk(); } /** * commit a bulk-transaction * @return array */ public function commitBulk() { if ($this->bulk) { $result = $this->bulk->commit(); $this->bulk = null; return $result; } } /** * @see commitBulk */ public function commit() { return $this->commitBulk(); } }