123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- <?php // vim:set ts=4 sw=4 et:
- namespace Mall\Framework\SearchClient\Transport;
- /**
- * This file is part of the ElasticSearch PHP client
- *
- * (c) Raymond Julin <raymond.julin@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
- if (!defined('CURLE_OPERATION_TIMEDOUT'))
- define('CURLE_OPERATION_TIMEDOUT', 28);
- class HTTP extends Base
- {
- /**
- * How long before timing out CURL call
- */
- private $timeout = 10;
- /**
- * curl handler which is needed for reusing existing http connection to the server
- * @var resource
- */
- protected $ch;
- public function __construct($host = 'localhost', $port = 9200, $timeout = null)
- {
- parent::__construct($host, $port);
- if (null !== $timeout) {
- $this->setTimeout($timeout);
- }
- $this->ch = curl_init();
- }
- /**
- * Index a new document or update it if existing
- *
- * @return array
- * @param array $document
- * @param mixed $id Optional
- * @param array $options
- */
- public function index($document, $id = false, array $options = array())
- {
- $url = $this->buildUrl(array($this->type, $id), $options);
- $method = ($id == false) ? "POST" : "PUT";
- return $this->call($url, $method, $document);
- }
- /**
- * Search
- *
- * @return array
- * @param array|string $query
- * @param array $options
- */
- public function search($query, array $options = array())
- {
- $result = false;
- if (is_array($query)) {
- /**
- * Array implies using the JSON query DSL
- */
- $arg = "_search";
- if (isset($options['routing'])) {
- $arg = "_search?routing=" . $options['routing'];
- }
- $url = $this->buildUrl(array(
- $this->type, $arg
- ));
- $result = $this->call($url, "GET", $query);
- } elseif (is_string($query)) {
- /**
- * String based search means http query string search
- */
- $url = $this->buildUrl(array(
- $this->type, "_search?q=" . $query
- ));
- $result = $this->call($url, "POST", $options);
- }
- return $result;
- }
- /**
- * ScrollSearch
- * @param $query
- * @param string $scroll
- * @param $size
- * @param array $options
- * @return array|bool
- * @throws HTTPException
- */
- public function scrollSearch($query, $scroll = '10s', $size = 2000, array $options = array())
- {
- $result = false;
- $arg = "_search?scroll=".$scroll.'&size='.$size;
- if (is_array($query)) {
- if (isset($options['routing'])) {
- $arg .= "&routing=" . $options['routing'];
- }
- $url = $this->buildUrl(array(
- $this->type, $arg
- ));
- $result = $this->call($url, "GET", $query);
- } elseif (is_string($query)) {
- /**
- * String based search means http query string search
- */
- $url = $this->buildUrl(array(
- $this->type, $arg."&q=" . $query
- ));
- $result = $this->call($url, "POST", $options);
- }
- return $result;
- }
- public function batchUpdateFieldVaule($data, $query)
- {
- $url = $this->buildUrl(array(
- $this->type, "_update_by_query"
- ));
- $field_str = "";
- $condition = "";
- foreach ($data as $field => $v) {
- if(!empty($field_str)) {
- $condition = ";";
- }
- $field_str .= $condition . "ctx._source.$field = params.$field";
- }
- $update = [
- 'script' => [
- 'lang' => "painless",
- 'inline' => $field_str,
- "params" => $data
- ]
- ];
- $result = $this->call($url, "POST", array_merge($query, $update));
- return $result;
- }
- /**
- * 增加站点索引Index
- *
- * @param $index
- * @param $mappings
- * @return array|bool
- * @throws HTTPException
- * @author feng
- */
- public function createBase($index, $mappings)
- {
- $result = false;
- if (is_string($index)) {
- $url = '/' . $index;
- $result = $this->call($url, 'PUT', $mappings);
- }
- return $result;
- }
- /**
- * 删除站点索引Index
- *
- * @param $index
- * @return array|bool
- * @throws HTTPException
- */
- public function deleteBase($index)
- {
- $result = false;
- if (is_string($index)) {
- $url = '/' . $index;
- $result = $this->call($url, 'DELETE');
- }
- return $result;
- }
- /**
- * Search
- *
- * @return array
- * @param mixed $query
- * @param array $options Parameters to pass to delete action
- */
- public function deleteByQuery($query, array $options = array())
- {
- $options += array(
- 'refresh' => true
- );
- if (is_array($query)) {
- /**
- * Array implies using the JSON query DSL
- */
- $url = $this->buildUrl(array($this->type, "_query"));
- $result = $this->call($url, "DELETE", $query);
- } elseif (is_string($query)) {
- /**
- * String based search means http query string search
- */
- $url = $this->buildUrl(array($this->type, "_query"), array('q' => $query));
- $result = $this->call($url, "DELETE");
- }
- if ($options['refresh']) {
- $this->request('_refresh', "POST");
- }
- return !isset($result['error']);
- }
- /**
- * Perform a request against the given path/method/payload combination
- * Example:
- * $es->request('/_status');
- *
- * @param string|array $path
- * @param string $method
- * @param array|bool $payload
- * @param bool $buildUrl
- * @return array
- */
- public function request($path, $method = "GET", $payload = false, $buildUrl = true)
- {
- $path = $buildUrl ? $this->buildUrl($path) : $path;
- return $this->call($path, $method, $payload);
- }
- /**
- * Flush this index/type combination
- *
- * @return array
- * @param mixed $id Id of document to delete
- * @param array $options Parameters to pass to delete action
- */
- public function delete($id = false, array $options = array())
- {
- if ($id)
- return $this->call($this->buildUrl(array($this->type, $id), $options), "DELETE");
- else
- return $this->request(false, "DELETE");
- }
- /**
- * Perform a http call against an url with an optional payload
- *
- * @return array
- * @param string $url
- * @param string $method (GET/POST/PUT/DELETE)
- * @param array|bool $payload The document/instructions to pass along
- * @throws HTTPException
- */
- protected function call($url, $method = "GET", $payload = null)
- {
- $conn = $this->ch;
- $protocol = "http";
- $requestURL = $protocol . "://" . $this->host . ':' . $this->port . $url;
- // 设置header需要发送的参数
- $header = ['Content-Type:application/json'];
- curl_setopt($conn, CURLOPT_HTTPHEADER , $header);
- curl_setopt($conn, CURLOPT_URL, $requestURL);
- curl_setopt($conn, CURLOPT_TIMEOUT, $this->timeout);
- curl_setopt($conn, CURLOPT_PORT, $this->port);
- curl_setopt($conn, CURLOPT_RETURNTRANSFER, 1);
- curl_setopt($conn, CURLOPT_CUSTOMREQUEST, strtoupper($method));
- curl_setopt($conn, CURLOPT_FORBID_REUSE, 0);
- if (is_array($payload) && count($payload) > 0)
- curl_setopt($conn, CURLOPT_POSTFIELDS, json_encode($payload));
- else
- curl_setopt($conn, CURLOPT_POSTFIELDS, $payload);
- $response = curl_exec($conn);
- if ($response !== false) {
- $data = json_decode($response, true);
- if (!$data) {
- $data = array('error' => $response, "code" => curl_getinfo($conn, CURLINFO_HTTP_CODE));
- }
- } else {
- /**
- * cUrl error code reference can be found here:
- * http://curl.haxx.se/libcurl/c/libcurl-errors.html
- */
- $errno = curl_errno($conn);
- switch ($errno) {
- case CURLE_UNSUPPORTED_PROTOCOL:
- $error = "Unsupported protocol [$protocol]";
- break;
- case CURLE_FAILED_INIT:
- $error = "Internal cUrl error?";
- break;
- case CURLE_URL_MALFORMAT:
- $error = "Malformed URL [$requestURL] -d " . json_encode($payload);
- break;
- case CURLE_COULDNT_RESOLVE_PROXY:
- $error = "Couldnt resolve proxy";
- break;
- case CURLE_COULDNT_RESOLVE_HOST:
- $error = "Couldnt resolve host";
- break;
- case CURLE_COULDNT_CONNECT:
- $error = "Couldnt connect to host [{$this->host}], ElasticSearch down?";
- break;
- case CURLE_OPERATION_TIMEDOUT:
- $error = "Operation timed out on [$requestURL]";
- break;
- default:
- $error = "Unknown error";
- if ($errno == 0)
- $error .= ". Non-cUrl error";
- break;
- }
- $exception = new HTTPException($error);
- $exception->payload = $payload;
- $exception->port = $this->port;
- $exception->protocol = $protocol;
- $exception->host = $this->host;
- $exception->method = $method;
- throw $exception;
- }
- return $data;
- }
- public function setTimeout($timeout)
- {
- $this->timeout = $timeout;
- }
- public function getTimeout()
- {
- return $this->timeout;
- }
- }
|