* * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ if (!defined('CURLE_OPERATION_TIMEDOUT')) define('CURLE_OPERATION_TIMEDOUT', 28); class HTTP extends Base { /** * How long before timing out CURL call */ private $timeout = 10; /** * curl handler which is needed for reusing existing http connection to the server * @var resource */ protected $ch; public function __construct($host = 'localhost', $port = 9200, $timeout = null) { parent::__construct($host, $port); if (null !== $timeout) { $this->setTimeout($timeout); } $this->ch = curl_init(); } /** * Index a new document or update it if existing * * @return array * @param array $document * @param mixed $id Optional * @param array $options */ public function index($document, $id = false, array $options = array()) { $url = $this->buildUrl(array($this->type, $id), $options); $method = ($id == false) ? "POST" : "PUT"; return $this->call($url, $method, $document); } /** * Search * * @return array * @param array|string $query * @param array $options */ public function search($query, array $options = array()) { $result = false; if (is_array($query)) { /** * Array implies using the JSON query DSL */ $arg = "_search"; if (isset($options['routing'])) { $arg = "_search?routing=" . $options['routing']; } $url = $this->buildUrl(array( $this->type, $arg )); $result = $this->call($url, "GET", $query); } elseif (is_string($query)) { /** * String based search means http query string search */ $url = $this->buildUrl(array( $this->type, "_search?q=" . $query )); $result = $this->call($url, "POST", $options); } return $result; } /** * ScrollSearch * @param $query * @param string $scroll * @param $size * @param array $options * @return array|bool * @throws HTTPException */ public function scrollSearch($query, $scroll = '10s', $size = 2000, array $options = array()) { $result = false; $arg = "_search?scroll=".$scroll.'&size='.$size; if (is_array($query)) { if (isset($options['routing'])) { $arg .= "&routing=" . $options['routing']; } $url = $this->buildUrl(array( $this->type, $arg )); $result = $this->call($url, "GET", $query); } elseif (is_string($query)) { /** * String based search means http query string search */ $url = $this->buildUrl(array( $this->type, $arg."&q=" . $query )); $result = $this->call($url, "POST", $options); } return $result; } public function batchUpdateFieldVaule($data, $query) { $url = $this->buildUrl(array( $this->type, "_update_by_query" )); $field_str = ""; $condition = ""; foreach ($data as $field => $v) { if(!empty($field_str)) { $condition = ";"; } $field_str .= $condition . "ctx._source.$field = params.$field"; } $update = [ 'script' => [ 'lang' => "painless", 'inline' => $field_str, "params" => $data ] ]; $result = $this->call($url, "POST", array_merge($query, $update)); return $result; } /** * 增加站点索引Index * * @param $index * @param $mappings * @return array|bool * @throws HTTPException * @author feng */ public function createBase($index, $mappings) { $result = false; if (is_string($index)) { $url = '/' . $index; $result = $this->call($url, 'PUT', $mappings); } return $result; } /** * 删除站点索引Index * * @param $index * @return array|bool * @throws HTTPException */ public function deleteBase($index) { $result = false; if (is_string($index)) { $url = '/' . $index; $result = $this->call($url, 'DELETE'); } return $result; } /** * Search * * @return array * @param mixed $query * @param array $options Parameters to pass to delete action */ public function deleteByQuery($query, array $options = array()) { $options += array( 'refresh' => true ); if (is_array($query)) { /** * Array implies using the JSON query DSL */ $url = $this->buildUrl(array($this->type, "_query")); $result = $this->call($url, "DELETE", $query); } elseif (is_string($query)) { /** * String based search means http query string search */ $url = $this->buildUrl(array($this->type, "_query"), array('q' => $query)); $result = $this->call($url, "DELETE"); } if ($options['refresh']) { $this->request('_refresh', "POST"); } return !isset($result['error']); } /** * Perform a request against the given path/method/payload combination * Example: * $es->request('/_status'); * * @param string|array $path * @param string $method * @param array|bool $payload * @param bool $buildUrl * @return array */ public function request($path, $method = "GET", $payload = false, $buildUrl = true) { $path = $buildUrl ? $this->buildUrl($path) : $path; return $this->call($path, $method, $payload); } /** * Flush this index/type combination * * @return array * @param mixed $id Id of document to delete * @param array $options Parameters to pass to delete action */ public function delete($id = false, array $options = array()) { if ($id) return $this->call($this->buildUrl(array($this->type, $id), $options), "DELETE"); else return $this->request(false, "DELETE"); } /** * Perform a http call against an url with an optional payload * * @return array * @param string $url * @param string $method (GET/POST/PUT/DELETE) * @param array|bool $payload The document/instructions to pass along * @throws HTTPException */ protected function call($url, $method = "GET", $payload = null) { $conn = $this->ch; $protocol = "http"; $requestURL = $protocol . "://" . $this->host . ':' . $this->port . $url; // 设置header需要发送的参数 $header = ['Content-Type:application/json']; curl_setopt($conn, CURLOPT_HTTPHEADER , $header); curl_setopt($conn, CURLOPT_URL, $requestURL); curl_setopt($conn, CURLOPT_TIMEOUT, $this->timeout); curl_setopt($conn, CURLOPT_PORT, $this->port); curl_setopt($conn, CURLOPT_RETURNTRANSFER, 1); curl_setopt($conn, CURLOPT_CUSTOMREQUEST, strtoupper($method)); curl_setopt($conn, CURLOPT_FORBID_REUSE, 0); if (is_array($payload) && count($payload) > 0) curl_setopt($conn, CURLOPT_POSTFIELDS, json_encode($payload)); else curl_setopt($conn, CURLOPT_POSTFIELDS, $payload); $response = curl_exec($conn); if ($response !== false) { $data = json_decode($response, true); if (!$data) { $data = array('error' => $response, "code" => curl_getinfo($conn, CURLINFO_HTTP_CODE)); } } else { /** * cUrl error code reference can be found here: * http://curl.haxx.se/libcurl/c/libcurl-errors.html */ $errno = curl_errno($conn); switch ($errno) { case CURLE_UNSUPPORTED_PROTOCOL: $error = "Unsupported protocol [$protocol]"; break; case CURLE_FAILED_INIT: $error = "Internal cUrl error?"; break; case CURLE_URL_MALFORMAT: $error = "Malformed URL [$requestURL] -d " . json_encode($payload); break; case CURLE_COULDNT_RESOLVE_PROXY: $error = "Couldnt resolve proxy"; break; case CURLE_COULDNT_RESOLVE_HOST: $error = "Couldnt resolve host"; break; case CURLE_COULDNT_CONNECT: $error = "Couldnt connect to host [{$this->host}], ElasticSearch down?"; break; case CURLE_OPERATION_TIMEDOUT: $error = "Operation timed out on [$requestURL]"; break; default: $error = "Unknown error"; if ($errno == 0) $error .= ". Non-cUrl error"; break; } $exception = new HTTPException($error); $exception->payload = $payload; $exception->port = $this->port; $exception->protocol = $protocol; $exception->host = $this->host; $exception->method = $method; throw $exception; } return $data; } public function setTimeout($timeout) { $this->timeout = $timeout; } public function getTimeout() { return $this->timeout; } }