Bulk.Class.php 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. <?php // vim:set ts=4 sw=4 et:
  2. namespace Mall\Framework\SearchClient;
  3. class Bulk {
  4. private $client;
  5. private $operations = array();
  6. /**
  7. * Construct a bulk operation
  8. *
  9. * @param \ElasticSearch\Client
  10. */
  11. public function __construct(Client $client) {
  12. $this->client = $client;
  13. }
  14. /**
  15. * commit this operation
  16. */
  17. public function commit() {
  18. return $this->client->request('/_bulk', 'POST', $this->createPayload());
  19. }
  20. /**
  21. * reset this operation
  22. */
  23. public function reset() {
  24. $this->operations = array();
  25. }
  26. /**
  27. * Index a new document or update it if existing
  28. *
  29. * @param array $document
  30. * @param mixed $id Optional
  31. * @param string $index Index
  32. * @param string $type Type
  33. * @param array $options Allow sending query parameters to control indexing further
  34. * _refresh_ *bool* If set to true, immediately refresh the shard after indexing
  35. * @return \Mall\Framework\SearchClient
  36. */
  37. public function index($document, $id=false, $index, $type, array $options = array()) {
  38. $params = array( '_id' => $id,
  39. '_index' => $index,
  40. '_type' => $type);
  41. foreach ($options as $key => $value) {
  42. $params['_' . $key] = $value;
  43. }
  44. $operation = array(
  45. array('index' => $params),
  46. $document
  47. );
  48. $this->operations[] = $operation;
  49. return $this;
  50. }
  51. /**
  52. * delete a document
  53. *
  54. * @param mixed $id
  55. * @param string $index Index
  56. * @param string $type Type
  57. * @param array $options Parameters to pass to delete action
  58. * @return \Mall\Framework\SearchClient
  59. */
  60. public function delete($id=false, $index, $type, array $options = array()) {
  61. $params = array( '_id' => $id,
  62. '_index' => $index,
  63. '_type' => $type);
  64. foreach ($options as $key => $value) {
  65. $params['_' . $key] = $value;
  66. }
  67. $operation = array(
  68. array('delete' => $params)
  69. );
  70. $this->operations[] = $operation;
  71. return $this;
  72. }
  73. /**
  74. * get all pending operations
  75. * @return array
  76. */
  77. public function getOperations() {
  78. return $this->operations;
  79. }
  80. /**
  81. * count all pending operations
  82. * @return int
  83. */
  84. public function count() {
  85. return count($this->operations);
  86. }
  87. /**
  88. * create a request payload with all pending operations
  89. * @return string
  90. */
  91. public function createPayload()
  92. {
  93. $payloads = array();
  94. foreach ($this->operations as $operation) {
  95. foreach ($operation as $partial) {
  96. $payloads[] = json_encode($partial);
  97. }
  98. }
  99. return join("\n", $payloads)."\n";
  100. }
  101. }