Mongo.php 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | ThinkPHP [ WE CAN DO IT JUST THINK ]
  4. // +----------------------------------------------------------------------
  5. // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
  6. // +----------------------------------------------------------------------
  7. // | Author: liu21st <liu21st@gmail.com>
  8. // +----------------------------------------------------------------------
  9. declare (strict_types = 1);
  10. namespace think\db;
  11. use MongoDB\Driver\BulkWrite;
  12. use MongoDB\Driver\Command;
  13. use MongoDB\Driver\Cursor;
  14. use MongoDB\Driver\Exception\AuthenticationException;
  15. use MongoDB\Driver\Exception\BulkWriteException;
  16. use MongoDB\Driver\Exception\ConnectionException;
  17. use MongoDB\Driver\Exception\InvalidArgumentException;
  18. use MongoDB\Driver\Exception\RuntimeException;
  19. use MongoDB\Driver\Query as MongoQuery;
  20. use MongoDB\Driver\ReadPreference;
  21. use MongoDB\Driver\WriteConcern;
  22. use think\Collection;
  23. use think\db\connector\Mongo as Connection;
  24. use think\db\exception\DbException as Exception;
  25. use think\Paginator;
  26. class Mongo extends BaseQuery
  27. {
  28. /**
  29. * 执行查询 返回数据集
  30. * @access public
  31. * @param MongoQuery $query 查询对象
  32. * @return mixed
  33. * @throws AuthenticationException
  34. * @throws InvalidArgumentException
  35. * @throws ConnectionException
  36. * @throws RuntimeException
  37. */
  38. public function query(MongoQuery $query)
  39. {
  40. return $this->connection->query($this, $query);
  41. }
  42. /**
  43. * 执行指令 返回数据集
  44. * @access public
  45. * @param Command $command 指令
  46. * @param string $dbName
  47. * @param ReadPreference $readPreference readPreference
  48. * @param string|array $typeMap 指定返回的typeMap
  49. * @return mixed
  50. * @throws AuthenticationException
  51. * @throws InvalidArgumentException
  52. * @throws ConnectionException
  53. * @throws RuntimeException
  54. */
  55. public function command(Command $command, string $dbName = '', ReadPreference $readPreference = null, $typeMap = null)
  56. {
  57. return $this->connection->command($command, $dbName, $readPreference, $typeMap);
  58. }
  59. /**
  60. * 执行语句
  61. * @access public
  62. * @param BulkWrite $bulk
  63. * @return int
  64. * @throws AuthenticationException
  65. * @throws InvalidArgumentException
  66. * @throws ConnectionException
  67. * @throws RuntimeException
  68. * @throws BulkWriteException
  69. */
  70. public function execute(BulkWrite $bulk)
  71. {
  72. return $this->connection->execute($this, $bulk);
  73. }
  74. /**
  75. * 执行command
  76. * @access public
  77. * @param string|array|object $command 指令
  78. * @param mixed $extra 额外参数
  79. * @param string $db 数据库名
  80. * @return array
  81. */
  82. public function cmd($command, $extra = null, string $db = ''): array
  83. {
  84. $this->parseOptions();
  85. return $this->connection->cmd($this, $command, $extra, $db);
  86. }
  87. /**
  88. * 指定distinct查询
  89. * @access public
  90. * @param string $field 字段名
  91. * @return array
  92. */
  93. public function getDistinct(string $field)
  94. {
  95. $result = $this->cmd('distinct', $field);
  96. return $result[0]['values'];
  97. }
  98. /**
  99. * 获取数据库的所有collection
  100. * @access public
  101. * @param string $db 数据库名称 留空为当前数据库
  102. * @throws Exception
  103. */
  104. public function listCollections(string $db = '')
  105. {
  106. $cursor = $this->cmd('listCollections', null, $db);
  107. $result = [];
  108. foreach ($cursor as $collection) {
  109. $result[] = $collection['name'];
  110. }
  111. return $result;
  112. }
  113. /**
  114. * COUNT查询
  115. * @access public
  116. * @param string $field 字段名
  117. * @return integer
  118. */
  119. public function count(string $field = null): int
  120. {
  121. $result = $this->cmd('count');
  122. return $result[0]['n'];
  123. }
  124. /**
  125. * 聚合查询
  126. * @access public
  127. * @param string $aggregate 聚合指令
  128. * @param string $field 字段名
  129. * @param bool $force 强制转为数字类型
  130. * @return mixed
  131. */
  132. public function aggregate(string $aggregate, $field, bool $force = false)
  133. {
  134. $result = $this->cmd('aggregate', [strtolower($aggregate), $field]);
  135. $value = $result[0]['aggregate'] ?? 0;
  136. if ($force) {
  137. $value += 0;
  138. }
  139. return $value;
  140. }
  141. /**
  142. * 多聚合操作
  143. *
  144. * @param array $aggregate 聚合指令, 可以聚合多个参数, 如 ['sum' => 'field1', 'avg' => 'field2']
  145. * @param array $groupBy 类似mysql里面的group字段, 可以传入多个字段, 如 ['field_a', 'field_b', 'field_c']
  146. * @return array 查询结果
  147. */
  148. public function multiAggregate(array $aggregate, array $groupBy): array
  149. {
  150. $result = $this->cmd('multiAggregate', [$aggregate, $groupBy]);
  151. foreach ($result as &$row) {
  152. if (isset($row['_id']) && !empty($row['_id'])) {
  153. foreach ($row['_id'] as $k => $v) {
  154. $row[$k] = $v;
  155. }
  156. unset($row['_id']);
  157. }
  158. }
  159. return $result;
  160. }
  161. /**
  162. * 字段值增长
  163. * @access public
  164. * @param string $field 字段名
  165. * @param float $step 增长值
  166. * @return $this
  167. */
  168. public function inc(string $field, float $step = 1)
  169. {
  170. $this->options['data'][$field] = ['$inc', $step];
  171. return $this;
  172. }
  173. /**
  174. * 字段值减少
  175. * @access public
  176. * @param string $field 字段名
  177. * @param float $step 减少值
  178. * @return $this
  179. */
  180. public function dec(string $field, float $step = 1)
  181. {
  182. return $this->inc($field, -1 * $step);
  183. }
  184. /**
  185. * 指定当前操作的Collection
  186. * @access public
  187. * @param string $table 表名
  188. * @return $this
  189. */
  190. public function table($table)
  191. {
  192. $this->options['table'] = $table;
  193. return $this;
  194. }
  195. /**
  196. * table方法的别名
  197. * @access public
  198. * @param string $collection
  199. * @return $this
  200. */
  201. public function collection(string $collection)
  202. {
  203. return $this->table($collection);
  204. }
  205. /**
  206. * 设置typeMap
  207. * @access public
  208. * @param string|array $typeMap
  209. * @return $this
  210. */
  211. public function typeMap($typeMap)
  212. {
  213. $this->options['typeMap'] = $typeMap;
  214. return $this;
  215. }
  216. /**
  217. * awaitData
  218. * @access public
  219. * @param bool $awaitData
  220. * @return $this
  221. */
  222. public function awaitData(bool $awaitData)
  223. {
  224. $this->options['awaitData'] = $awaitData;
  225. return $this;
  226. }
  227. /**
  228. * batchSize
  229. * @access public
  230. * @param integer $batchSize
  231. * @return $this
  232. */
  233. public function batchSize(int $batchSize)
  234. {
  235. $this->options['batchSize'] = $batchSize;
  236. return $this;
  237. }
  238. /**
  239. * exhaust
  240. * @access public
  241. * @param bool $exhaust
  242. * @return $this
  243. */
  244. public function exhaust(bool $exhaust)
  245. {
  246. $this->options['exhaust'] = $exhaust;
  247. return $this;
  248. }
  249. /**
  250. * 设置modifiers
  251. * @access public
  252. * @param array $modifiers
  253. * @return $this
  254. */
  255. public function modifiers(array $modifiers)
  256. {
  257. $this->options['modifiers'] = $modifiers;
  258. return $this;
  259. }
  260. /**
  261. * 设置noCursorTimeout
  262. * @access public
  263. * @param bool $noCursorTimeout
  264. * @return $this
  265. */
  266. public function noCursorTimeout(bool $noCursorTimeout)
  267. {
  268. $this->options['noCursorTimeout'] = $noCursorTimeout;
  269. return $this;
  270. }
  271. /**
  272. * 设置oplogReplay
  273. * @access public
  274. * @param bool $oplogReplay
  275. * @return $this
  276. */
  277. public function oplogReplay(bool $oplogReplay)
  278. {
  279. $this->options['oplogReplay'] = $oplogReplay;
  280. return $this;
  281. }
  282. /**
  283. * 设置partial
  284. * @access public
  285. * @param bool $partial
  286. * @return $this
  287. */
  288. public function partial(bool $partial)
  289. {
  290. $this->options['partial'] = $partial;
  291. return $this;
  292. }
  293. /**
  294. * maxTimeMS
  295. * @access public
  296. * @param string $maxTimeMS
  297. * @return $this
  298. */
  299. public function maxTimeMS(string $maxTimeMS)
  300. {
  301. $this->options['maxTimeMS'] = $maxTimeMS;
  302. return $this;
  303. }
  304. /**
  305. * collation
  306. * @access public
  307. * @param array $collation
  308. * @return $this
  309. */
  310. public function collation(array $collation)
  311. {
  312. $this->options['collation'] = $collation;
  313. return $this;
  314. }
  315. /**
  316. * 设置是否REPLACE
  317. * @access public
  318. * @param bool $replace 是否使用REPLACE写入数据
  319. * @return $this
  320. */
  321. public function replace(bool $replace = true)
  322. {
  323. return $this;
  324. }
  325. /**
  326. * 设置返回字段
  327. * @access public
  328. * @param mixed $field 字段信息
  329. * @return $this
  330. */
  331. public function field($field)
  332. {
  333. if (empty($field) || '*' == $field) {
  334. return $this;
  335. }
  336. if (is_string($field)) {
  337. $field = array_map('trim', explode(',', $field));
  338. }
  339. $projection = [];
  340. foreach ($field as $key => $val) {
  341. if (is_numeric($key)) {
  342. $projection[$val] = 1;
  343. } else {
  344. $projection[$key] = $val;
  345. }
  346. }
  347. $this->options['projection'] = $projection;
  348. return $this;
  349. }
  350. /**
  351. * 指定要排除的查询字段
  352. * @access public
  353. * @param array|string $field 要排除的字段
  354. * @return $this
  355. */
  356. public function withoutField($field)
  357. {
  358. if (empty($field) || '*' == $field) {
  359. return $this;
  360. }
  361. if (is_string($field)) {
  362. $field = array_map('trim', explode(',', $field));
  363. }
  364. $projection = [];
  365. foreach ($field as $key => $val) {
  366. if (is_numeric($key)) {
  367. $projection[$val] = 0;
  368. } else {
  369. $projection[$key] = $val;
  370. }
  371. }
  372. $this->options['projection'] = $projection;
  373. return $this;
  374. }
  375. /**
  376. * 设置skip
  377. * @access public
  378. * @param integer $skip
  379. * @return $this
  380. */
  381. public function skip(int $skip)
  382. {
  383. $this->options['skip'] = $skip;
  384. return $this;
  385. }
  386. /**
  387. * 设置slaveOk
  388. * @access public
  389. * @param bool $slaveOk
  390. * @return $this
  391. */
  392. public function slaveOk(bool $slaveOk)
  393. {
  394. $this->options['slaveOk'] = $slaveOk;
  395. return $this;
  396. }
  397. /**
  398. * 指定查询数量
  399. * @access public
  400. * @param int $offset 起始位置
  401. * @param int $length 查询数量
  402. * @return $this
  403. */
  404. public function limit(int $offset, int $length = null)
  405. {
  406. if (is_null($length)) {
  407. $length = $offset;
  408. $offset = 0;
  409. }
  410. $this->options['skip'] = $offset;
  411. $this->options['limit'] = $length;
  412. return $this;
  413. }
  414. /**
  415. * 设置sort
  416. * @access public
  417. * @param array|string $field
  418. * @param string $order
  419. * @return $this
  420. */
  421. public function order($field, string $order = '')
  422. {
  423. if (is_array($field)) {
  424. $this->options['sort'] = $field;
  425. } else {
  426. $this->options['sort'][$field] = 'asc' == strtolower($order) ? 1 : -1;
  427. }
  428. return $this;
  429. }
  430. /**
  431. * 设置tailable
  432. * @access public
  433. * @param bool $tailable
  434. * @return $this
  435. */
  436. public function tailable(bool $tailable)
  437. {
  438. $this->options['tailable'] = $tailable;
  439. return $this;
  440. }
  441. /**
  442. * 设置writeConcern对象
  443. * @access public
  444. * @param WriteConcern $writeConcern
  445. * @return $this
  446. */
  447. public function writeConcern(WriteConcern $writeConcern)
  448. {
  449. $this->options['writeConcern'] = $writeConcern;
  450. return $this;
  451. }
  452. /**
  453. * 获取当前数据表的主键
  454. * @access public
  455. * @return string|array
  456. */
  457. public function getPk()
  458. {
  459. return $this->pk ?: $this->connection->getConfig('pk');
  460. }
  461. /**
  462. * 执行查询但只返回Cursor对象
  463. * @access public
  464. * @return Cursor
  465. */
  466. public function getCursor(): Cursor
  467. {
  468. $this->parseOptions();
  469. return $this->connection->getCursor($this);
  470. }
  471. /**
  472. * 获取当前的查询标识
  473. * @access public
  474. * @param mixed $data 要序列化的数据
  475. * @return string
  476. */
  477. public function getQueryGuid($data = null): string
  478. {
  479. return md5($this->getConfig('database') . serialize(var_export($data ?: $this->options, true)));
  480. }
  481. /**
  482. * 分页查询
  483. * @access public
  484. * @param int|array $listRows 每页数量 数组表示配置参数
  485. * @param int|bool $simple 是否简洁模式或者总记录数
  486. * @return Paginator
  487. * @throws Exception
  488. */
  489. public function paginate($listRows = null, $simple = false): Paginator
  490. {
  491. if (is_int($simple)) {
  492. $total = $simple;
  493. $simple = false;
  494. }
  495. $defaultConfig = [
  496. 'query' => [], //url额外参数
  497. 'fragment' => '', //url锚点
  498. 'var_page' => 'page', //分页变量
  499. 'list_rows' => 15, //每页数量
  500. ];
  501. if (is_array($listRows)) {
  502. $config = array_merge($defaultConfig, $listRows);
  503. $listRows = intval($config['list_rows']);
  504. } else {
  505. $config = $defaultConfig;
  506. $listRows = intval($listRows ?: $config['list_rows']);
  507. }
  508. $page = isset($config['page']) ? (int) $config['page'] : Paginator::getCurrentPage($config['var_page']);
  509. $page = $page < 1 ? 1 : $page;
  510. $config['path'] = $config['path'] ?? Paginator::getCurrentPath();
  511. if (!isset($total) && !$simple) {
  512. $options = $this->getOptions();
  513. unset($this->options['order'], $this->options['limit'], $this->options['page'], $this->options['field']);
  514. $total = $this->count();
  515. $results = $this->options($options)->page($page, $listRows)->select();
  516. } elseif ($simple) {
  517. $results = $this->limit(($page - 1) * $listRows, $listRows + 1)->select();
  518. $total = null;
  519. } else {
  520. $results = $this->page($page, $listRows)->select();
  521. }
  522. $this->removeOption('limit');
  523. $this->removeOption('page');
  524. return Paginator::make($results, $listRows, $page, $total, $simple, $config);
  525. }
  526. /**
  527. * 分批数据返回处理
  528. * @access public
  529. * @param integer $count 每次处理的数据数量
  530. * @param callable $callback 处理回调方法
  531. * @param string|array $column 分批处理的字段名
  532. * @param string $order 字段排序
  533. * @return bool
  534. * @throws Exception
  535. */
  536. public function chunk(int $count, callable $callback, $column = null, string $order = 'asc'): bool
  537. {
  538. $options = $this->getOptions();
  539. $column = $column ?: $this->getPk();
  540. if (isset($options['order'])) {
  541. unset($options['order']);
  542. }
  543. if (is_array($column)) {
  544. $times = 1;
  545. $query = $this->options($options)->page($times, $count);
  546. } else {
  547. $query = $this->options($options)->limit($count);
  548. if (strpos($column, '.')) {
  549. list($alias, $key) = explode('.', $column);
  550. } else {
  551. $key = $column;
  552. }
  553. }
  554. $resultSet = $query->order($column, $order)->select();
  555. while (count($resultSet) > 0) {
  556. if (false === call_user_func($callback, $resultSet)) {
  557. return false;
  558. }
  559. if (isset($times)) {
  560. $times++;
  561. $query = $this->options($options)->page($times, $count);
  562. } else {
  563. $end = $resultSet->pop();
  564. $lastId = is_array($end) ? $end[$key] : $end->getData($key);
  565. $query = $this->options($options)
  566. ->limit($count)
  567. ->where($column, 'asc' == strtolower($order) ? '>' : '<', $lastId);
  568. }
  569. $resultSet = $query->order($column, $order)->select();
  570. }
  571. return true;
  572. }
  573. /**
  574. * 分析表达式(可用于查询或者写入操作)
  575. * @access public
  576. * @return array
  577. */
  578. public function parseOptions(): array
  579. {
  580. $options = $this->options;
  581. // 获取数据表
  582. if (empty($options['table'])) {
  583. $options['table'] = $this->getTable();
  584. }
  585. foreach (['where', 'data'] as $name) {
  586. if (!isset($options[$name])) {
  587. $options[$name] = [];
  588. }
  589. }
  590. $modifiers = empty($options['modifiers']) ? [] : $options['modifiers'];
  591. if (isset($options['comment'])) {
  592. $modifiers['$comment'] = $options['comment'];
  593. }
  594. if (isset($options['maxTimeMS'])) {
  595. $modifiers['$maxTimeMS'] = $options['maxTimeMS'];
  596. }
  597. if (!empty($modifiers)) {
  598. $options['modifiers'] = $modifiers;
  599. }
  600. if (!isset($options['projection'])) {
  601. $options['projection'] = [];
  602. }
  603. if (!isset($options['typeMap'])) {
  604. $options['typeMap'] = $this->getConfig('type_map');
  605. }
  606. if (!isset($options['limit'])) {
  607. $options['limit'] = 0;
  608. }
  609. foreach (['master', 'fetch_sql', 'fetch_cursor'] as $name) {
  610. if (!isset($options[$name])) {
  611. $options[$name] = false;
  612. }
  613. }
  614. if (isset($options['page'])) {
  615. // 根据页数计算limit
  616. list($page, $listRows) = $options['page'];
  617. $page = $page > 0 ? $page : 1;
  618. $listRows = $listRows > 0 ? $listRows : (is_numeric($options['limit']) ? $options['limit'] : 20);
  619. $offset = $listRows * ($page - 1);
  620. $options['skip'] = intval($offset);
  621. $options['limit'] = intval($listRows);
  622. }
  623. $this->options = $options;
  624. return $options;
  625. }
  626. }