Mongo.php 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | ThinkPHP [ WE CAN DO IT JUST THINK ]
  4. // +----------------------------------------------------------------------
  5. // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
  6. // +----------------------------------------------------------------------
  7. // | Author: liu21st <liu21st@gmail.com>
  8. // +----------------------------------------------------------------------
  9. declare (strict_types = 1);
  10. namespace think\db\connector;
  11. use Closure;
  12. use MongoDB\BSON\ObjectID;
  13. use MongoDB\Driver\BulkWrite;
  14. use MongoDB\Driver\Command;
  15. use MongoDB\Driver\Cursor;
  16. use MongoDB\Driver\Exception\AuthenticationException;
  17. use MongoDB\Driver\Exception\BulkWriteException;
  18. use MongoDB\Driver\Exception\ConnectionException;
  19. use MongoDB\Driver\Exception\InvalidArgumentException;
  20. use MongoDB\Driver\Exception\RuntimeException;
  21. use MongoDB\Driver\Manager;
  22. use MongoDB\Driver\Query as MongoQuery;
  23. use MongoDB\Driver\ReadPreference;
  24. use think\db\BaseQuery;
  25. use think\db\builder\Mongo as Builder;
  26. use think\db\Connection;
  27. use think\db\ConnectionInterface;
  28. use think\db\exception\DbException as Exception;
  29. use think\db\Mongo as Query;
  30. /**
  31. * Mongo数据库驱动
  32. */
  33. class Mongo extends Connection implements ConnectionInterface
  34. {
  35. // 查询数据类型
  36. protected $dbName = '';
  37. protected $typeMap = 'array';
  38. protected $mongo; // MongoDb Object
  39. protected $cursor; // MongoCursor Object
  40. // 数据库连接参数配置
  41. protected $config = [
  42. // 数据库类型
  43. 'type' => '',
  44. // 服务器地址
  45. 'hostname' => '',
  46. // 数据库名
  47. 'database' => '',
  48. // 是否是复制集
  49. 'is_replica_set' => false,
  50. // 用户名
  51. 'username' => '',
  52. // 密码
  53. 'password' => '',
  54. // 端口
  55. 'hostport' => '',
  56. // 连接dsn
  57. 'dsn' => '',
  58. // 数据库连接参数
  59. 'params' => [],
  60. // 数据库编码默认采用utf8
  61. 'charset' => 'utf8',
  62. // 主键名
  63. 'pk' => '_id',
  64. // 主键类型
  65. 'pk_type' => 'ObjectID',
  66. // 数据库表前缀
  67. 'prefix' => '',
  68. // 数据库部署方式:0 集中式(单一服务器),1 分布式(主从服务器)
  69. 'deploy' => 0,
  70. // 数据库读写是否分离 主从式有效
  71. 'rw_separate' => false,
  72. // 读写分离后 主服务器数量
  73. 'master_num' => 1,
  74. // 指定从服务器序号
  75. 'slave_no' => '',
  76. // 是否严格检查字段是否存在
  77. 'fields_strict' => true,
  78. // 开启字段缓存
  79. 'fields_cache' => false,
  80. // 监听SQL
  81. 'trigger_sql' => true,
  82. // 自动写入时间戳字段
  83. 'auto_timestamp' => false,
  84. // 时间字段取出后的默认时间格式
  85. 'datetime_format' => 'Y-m-d H:i:s',
  86. // 是否_id转换为id
  87. 'pk_convert_id' => false,
  88. // typeMap
  89. 'type_map' => ['root' => 'array', 'document' => 'array'],
  90. ];
  91. /**
  92. * 架构函数 读取数据库配置信息
  93. * @access public
  94. * @param array $config 数据库配置数组
  95. */
  96. public function __construct(array $config = [])
  97. {
  98. if (!empty($config)) {
  99. $this->config = array_merge($this->config, $config);
  100. }
  101. // 创建Builder对象
  102. $class = $this->getBuilderClass();
  103. $this->builder = new $class($this);
  104. }
  105. /**
  106. * 获取当前连接器类对应的Query类
  107. * @access public
  108. * @return string
  109. */
  110. public function getQueryClass(): string
  111. {
  112. return Query::class;
  113. }
  114. /**
  115. * 获取当前的builder实例对象
  116. * @access public
  117. * @return Builder
  118. */
  119. public function getBuilder(): Builder
  120. {
  121. return $this->builder;
  122. }
  123. /**
  124. * 获取当前连接器类对应的Builder类
  125. * @access public
  126. * @return string
  127. */
  128. public function getBuilderClass(): string
  129. {
  130. return Builder::class;
  131. }
  132. /**
  133. * 连接数据库方法
  134. * @access public
  135. * @param array $config 连接参数
  136. * @param integer $linkNum 连接序号
  137. * @return Manager
  138. * @throws InvalidArgumentException
  139. * @throws RuntimeException
  140. */
  141. public function connect(array $config = [], $linkNum = 0)
  142. {
  143. if (!isset($this->links[$linkNum])) {
  144. if (empty($config)) {
  145. $config = $this->config;
  146. } else {
  147. $config = array_merge($this->config, $config);
  148. }
  149. $this->dbName = $config['database'];
  150. $this->typeMap = $config['type_map'];
  151. if ($config['pk_convert_id'] && '_id' == $config['pk']) {
  152. $this->config['pk'] = 'id';
  153. }
  154. if (empty($config['dsn'])) {
  155. $config['dsn'] = 'mongodb://' . ($config['username'] ? "{$config['username']}" : '') . ($config['password'] ? ":{$config['password']}@" : '') . $config['hostname'] . ($config['hostport'] ? ":{$config['hostport']}" : '');
  156. }
  157. $startTime = microtime(true);
  158. $this->links[$linkNum] = new Manager($config['dsn'], $config['params']);
  159. if (!empty($config['trigger_sql'])) {
  160. // 记录数据库连接信息
  161. $this->trigger('CONNECT:[ UseTime:' . number_format(microtime(true) - $startTime, 6) . 's ] ' . $config['dsn']);
  162. }
  163. }
  164. return $this->links[$linkNum];
  165. }
  166. /**
  167. * 获取Mongo Manager对象
  168. * @access public
  169. * @return Manager|null
  170. */
  171. public function getMongo()
  172. {
  173. return $this->mongo ?: null;
  174. }
  175. /**
  176. * 设置/获取当前操作的database
  177. * @access public
  178. * @param string $db db
  179. * @throws Exception
  180. */
  181. public function db(string $db = null)
  182. {
  183. if (is_null($db)) {
  184. return $this->dbName;
  185. } else {
  186. $this->dbName = $db;
  187. }
  188. }
  189. /**
  190. * 执行查询但只返回Cursor对象
  191. * @access public
  192. * @param BaseQuery $query 查询对象
  193. * @return Cursor
  194. */
  195. public function cursor(BaseQuery $query)
  196. {
  197. // 分析查询表达式
  198. $options = $query->parseOptions();
  199. // 生成MongoQuery对象
  200. $mongoQuery = $this->builder->select($query);
  201. $master = $query->getOptions('master') ? true : false;
  202. // 执行查询操作
  203. return $this->getCursor($query, $mongoQuery, $master);
  204. }
  205. /**
  206. * 执行查询并返回Cursor对象
  207. * @access public
  208. * @param BaseQuery $query 查询对象
  209. * @param MongoQuery|Closure $mongoQuery Mongo查询对象
  210. * @param bool $master 是否主库操作
  211. * @return Cursor
  212. * @throws AuthenticationException
  213. * @throws InvalidArgumentException
  214. * @throws ConnectionException
  215. * @throws RuntimeException
  216. */
  217. public function getCursor(BaseQuery $query, $mongoQuery, bool $master = false): Cursor
  218. {
  219. $this->initConnect($master);
  220. $this->db->updateQueryTimes();
  221. $options = $query->getOptions();
  222. $namespace = $options['table'];
  223. if (false === strpos($namespace, '.')) {
  224. $namespace = $this->dbName . '.' . $namespace;
  225. }
  226. if (!empty($this->queryStr)) {
  227. // 记录执行指令
  228. $this->queryStr = 'db' . strstr($namespace, '.') . '.' . $this->queryStr;
  229. }
  230. if ($mongoQuery instanceof Closure) {
  231. $mongoQuery = $mongoQuery($query);
  232. }
  233. $readPreference = $options['readPreference'] ?? null;
  234. $this->queryStartTime = microtime(true);
  235. $this->cursor = $this->mongo->executeQuery($namespace, $mongoQuery, $readPreference);
  236. // SQL监控
  237. if (!empty($this->config['trigger_sql'])) {
  238. $this->trigger('', $master);
  239. }
  240. return $this->cursor;
  241. }
  242. /**
  243. * 执行查询
  244. * @access public
  245. * @param BaseQuery $query 查询对象
  246. * @param MongoQuery|Closure $mongoQuery Mongo查询对象
  247. * @return array
  248. * @throws AuthenticationException
  249. * @throws InvalidArgumentException
  250. * @throws ConnectionException
  251. * @throws RuntimeException
  252. */
  253. public function query(BaseQuery $query, $mongoQuery): array
  254. {
  255. $options = $query->parseOptions();
  256. if ($query->getOptions('cache')) {
  257. // 检查查询缓存
  258. $cacheItem = $this->parseCache($query, $query->getOptions('cache'));
  259. $key = $cacheItem->getKey();
  260. if ($this->cache->has($key)) {
  261. return $this->cache->get($key);
  262. }
  263. }
  264. if ($mongoQuery instanceof Closure) {
  265. $mongoQuery = $mongoQuery($query);
  266. }
  267. $master = $query->getOptions('master') ? true : false;
  268. $this->getCursor($query, $mongoQuery, $master);
  269. $resultSet = $this->getResult($options['typeMap']);
  270. if (isset($cacheItem) && $resultSet) {
  271. // 缓存数据集
  272. $cacheItem->set($resultSet);
  273. $this->cacheData($cacheItem);
  274. }
  275. return $resultSet;
  276. }
  277. /**
  278. * 执行写操作
  279. * @access public
  280. * @param BaseQuery $query
  281. * @param BulkWrite $bulk
  282. *
  283. * @return WriteResult
  284. * @throws AuthenticationException
  285. * @throws InvalidArgumentException
  286. * @throws ConnectionException
  287. * @throws RuntimeException
  288. * @throws BulkWriteException
  289. */
  290. public function execute(BaseQuery $query, BulkWrite $bulk)
  291. {
  292. $this->initConnect(true);
  293. $this->db->updateQueryTimes();
  294. $options = $query->getOptions();
  295. $namespace = $options['table'];
  296. if (false === strpos($namespace, '.')) {
  297. $namespace = $this->dbName . '.' . $namespace;
  298. }
  299. if (!empty($this->queryStr)) {
  300. // 记录执行指令
  301. $this->queryStr = 'db' . strstr($namespace, '.') . '.' . $this->queryStr;
  302. }
  303. $writeConcern = $options['writeConcern'] ?? null;
  304. $this->queryStartTime = microtime(true);
  305. $writeResult = $this->mongo->executeBulkWrite($namespace, $bulk, $writeConcern);
  306. // SQL监控
  307. if (!empty($this->config['trigger_sql'])) {
  308. $this->trigger();
  309. }
  310. $this->numRows = $writeResult->getMatchedCount();
  311. if ($query->getOptions('cache')) {
  312. // 清理缓存数据
  313. $cacheItem = $this->parseCache($query, $query->getOptions('cache'));
  314. $key = $cacheItem->getKey();
  315. $tag = $cacheItem->getTag();
  316. if (isset($key) && $this->cache->has($key)) {
  317. $this->cache->delete($key);
  318. } elseif (!empty($tag) && method_exists($this->cache, 'tag')) {
  319. $this->cache->tag($tag)->clear();
  320. }
  321. }
  322. return $writeResult;
  323. }
  324. /**
  325. * 执行指令
  326. * @access public
  327. * @param Command $command 指令
  328. * @param string $dbName 当前数据库名
  329. * @param ReadPreference $readPreference readPreference
  330. * @param string|array $typeMap 指定返回的typeMap
  331. * @param bool $master 是否主库操作
  332. * @return array
  333. * @throws AuthenticationException
  334. * @throws InvalidArgumentException
  335. * @throws ConnectionException
  336. * @throws RuntimeException
  337. */
  338. public function command(Command $command, string $dbName = '', ReadPreference $readPreference = null, $typeMap = null, bool $master = false): array
  339. {
  340. $this->initConnect($master);
  341. $this->db->updateQueryTimes();
  342. $this->queryStartTime = microtime(true);
  343. $dbName = $dbName ?: $this->dbName;
  344. if (!empty($this->queryStr)) {
  345. $this->queryStr = 'db.' . $this->queryStr;
  346. }
  347. $this->cursor = $this->mongo->executeCommand($dbName, $command, $readPreference);
  348. // SQL监控
  349. if (!empty($this->config['trigger_sql'])) {
  350. $this->trigger('', $master);
  351. }
  352. return $this->getResult($typeMap);
  353. }
  354. /**
  355. * 获得数据集
  356. * @access protected
  357. * @param string|array $typeMap 指定返回的typeMap
  358. * @return mixed
  359. */
  360. protected function getResult($typeMap = null): array
  361. {
  362. // 设置结果数据类型
  363. if (is_null($typeMap)) {
  364. $typeMap = $this->typeMap;
  365. }
  366. $typeMap = is_string($typeMap) ? ['root' => $typeMap] : $typeMap;
  367. $this->cursor->setTypeMap($typeMap);
  368. // 获取数据集
  369. $result = $this->cursor->toArray();
  370. if ($this->getConfig('pk_convert_id')) {
  371. // 转换ObjectID 字段
  372. foreach ($result as &$data) {
  373. $this->convertObjectID($data);
  374. }
  375. }
  376. $this->numRows = count($result);
  377. return $result;
  378. }
  379. /**
  380. * ObjectID处理
  381. * @access protected
  382. * @param array $data 数据
  383. * @return void
  384. */
  385. protected function convertObjectID(array &$data): void
  386. {
  387. if (isset($data['_id']) && is_object($data['_id'])) {
  388. $data['id'] = $data['_id']->__toString();
  389. unset($data['_id']);
  390. }
  391. }
  392. /**
  393. * 数据库日志记录(仅供参考)
  394. * @access public
  395. * @param string $type 类型
  396. * @param mixed $data 数据
  397. * @param array $options 参数
  398. * @return void
  399. */
  400. public function mongoLog(string $type, $data, array $options = [])
  401. {
  402. if (!$this->config['trigger_sql']) {
  403. return;
  404. }
  405. if (is_array($data)) {
  406. array_walk_recursive($data, function (&$value) {
  407. if ($value instanceof ObjectID) {
  408. $value = $value->__toString();
  409. }
  410. });
  411. }
  412. switch (strtolower($type)) {
  413. case 'aggregate':
  414. $this->queryStr = 'runCommand(' . ($data ? json_encode($data) : '') . ');';
  415. break;
  416. case 'find':
  417. $this->queryStr = $type . '(' . ($data ? json_encode($data) : '') . ')';
  418. if (isset($options['sort'])) {
  419. $this->queryStr .= '.sort(' . json_encode($options['sort']) . ')';
  420. }
  421. if (isset($options['skip'])) {
  422. $this->queryStr .= '.skip(' . $options['skip'] . ')';
  423. }
  424. if (isset($options['limit'])) {
  425. $this->queryStr .= '.limit(' . $options['limit'] . ')';
  426. }
  427. $this->queryStr .= ';';
  428. break;
  429. case 'insert':
  430. case 'remove':
  431. $this->queryStr = $type . '(' . ($data ? json_encode($data) : '') . ');';
  432. break;
  433. case 'update':
  434. $this->queryStr = $type . '(' . json_encode($options) . ',' . json_encode($data) . ');';
  435. break;
  436. case 'cmd':
  437. $this->queryStr = $data . '(' . json_encode($options) . ');';
  438. break;
  439. }
  440. $this->options = $options;
  441. }
  442. /**
  443. * 获取最近执行的指令
  444. * @access public
  445. * @return string
  446. */
  447. public function getLastSql(): string
  448. {
  449. return $this->queryStr;
  450. }
  451. /**
  452. * 关闭数据库
  453. * @access public
  454. */
  455. public function close()
  456. {
  457. $this->mongo = null;
  458. $this->cursor = null;
  459. $this->linkRead = null;
  460. $this->linkWrite = null;
  461. $this->links = [];
  462. }
  463. /**
  464. * 初始化数据库连接
  465. * @access protected
  466. * @param boolean $master 是否主服务器
  467. * @return void
  468. */
  469. protected function initConnect(bool $master = true): void
  470. {
  471. if (!empty($this->config['deploy'])) {
  472. // 采用分布式数据库
  473. if ($master) {
  474. if (!$this->linkWrite) {
  475. $this->linkWrite = $this->multiConnect(true);
  476. }
  477. $this->mongo = $this->linkWrite;
  478. } else {
  479. if (!$this->linkRead) {
  480. $this->linkRead = $this->multiConnect(false);
  481. }
  482. $this->mongo = $this->linkRead;
  483. }
  484. } elseif (!$this->mongo) {
  485. // 默认单数据库
  486. $this->mongo = $this->connect();
  487. }
  488. }
  489. /**
  490. * 连接分布式服务器
  491. * @access protected
  492. * @param boolean $master 主服务器
  493. * @return Manager
  494. */
  495. protected function multiConnect(bool $master = false): Manager
  496. {
  497. $config = [];
  498. // 分布式数据库配置解析
  499. foreach (['username', 'password', 'hostname', 'hostport', 'database', 'dsn'] as $name) {
  500. $config[$name] = is_string($this->config[$name]) ? explode(',', $this->config[$name]) : $this->config[$name];
  501. }
  502. // 主服务器序号
  503. $m = floor(mt_rand(0, $this->config['master_num'] - 1));
  504. if ($this->config['rw_separate']) {
  505. // 主从式采用读写分离
  506. if ($master) // 主服务器写入
  507. {
  508. if ($this->config['is_replica_set']) {
  509. return $this->replicaSetConnect();
  510. } else {
  511. $r = $m;
  512. }
  513. } elseif (is_numeric($this->config['slave_no'])) {
  514. // 指定服务器读
  515. $r = $this->config['slave_no'];
  516. } else {
  517. // 读操作连接从服务器 每次随机连接的数据库
  518. $r = floor(mt_rand($this->config['master_num'], count($config['hostname']) - 1));
  519. }
  520. } else {
  521. // 读写操作不区分服务器 每次随机连接的数据库
  522. $r = floor(mt_rand(0, count($config['hostname']) - 1));
  523. }
  524. $dbConfig = [];
  525. foreach (['username', 'password', 'hostname', 'hostport', 'database', 'dsn'] as $name) {
  526. $dbConfig[$name] = $config[$name][$r] ?? $config[$name][0];
  527. }
  528. return $this->connect($dbConfig, $r);
  529. }
  530. /**
  531. * 创建基于复制集的连接
  532. * @return Manager
  533. */
  534. public function replicaSetConnect(): Manager
  535. {
  536. $this->dbName = $this->config['database'];
  537. $this->typeMap = $this->config['type_map'];
  538. $startTime = microtime(true);
  539. $this->config['params']['replicaSet'] = $this->config['database'];
  540. $manager = new Manager($this->buildUrl(), $this->config['params']);
  541. // 记录数据库连接信息
  542. if (!empty($config['trigger_sql'])) {
  543. $this->trigger('CONNECT:ReplicaSet[ UseTime:' . number_format(microtime(true) - $startTime, 6) . 's ] ' . $this->config['dsn']);
  544. }
  545. return $manager;
  546. }
  547. /**
  548. * 根据配置信息 生成适用于连接复制集的 URL
  549. * @return string
  550. */
  551. private function buildUrl(): string
  552. {
  553. $url = 'mongodb://' . ($this->config['username'] ? "{$this->config['username']}" : '') . ($this->config['password'] ? ":{$this->config['password']}@" : '');
  554. $hostList = is_string($this->config['hostname']) ? explode(',', $this->config['hostname']) : $this->config['hostname'];
  555. $portList = is_string($this->config['hostport']) ? explode(',', $this->config['hostport']) : $this->config['hostport'];
  556. for ($i = 0; $i < count($hostList); $i++) {
  557. $url = $url . $hostList[$i] . ':' . $portList[0] . ',';
  558. }
  559. return rtrim($url, ",") . '/';
  560. }
  561. /**
  562. * 插入记录
  563. * @access public
  564. * @param BaseQuery $query 查询对象
  565. * @param boolean $getLastInsID 返回自增主键
  566. * @return mixed
  567. * @throws AuthenticationException
  568. * @throws InvalidArgumentException
  569. * @throws ConnectionException
  570. * @throws RuntimeException
  571. * @throws BulkWriteException
  572. */
  573. public function insert(BaseQuery $query, bool $getLastInsID = false)
  574. {
  575. // 分析查询表达式
  576. $options = $query->parseOptions();
  577. if (empty($options['data'])) {
  578. throw new Exception('miss data to insert');
  579. }
  580. // 生成bulk对象
  581. $bulk = $this->builder->insert($query);
  582. $writeResult = $this->execute($query, $bulk);
  583. $result = $writeResult->getInsertedCount();
  584. if ($result) {
  585. $data = $options['data'];
  586. $lastInsId = $this->getLastInsID($query);
  587. if ($lastInsId) {
  588. $pk = $query->getPk();
  589. $data[$pk] = $lastInsId;
  590. }
  591. $query->setOption('data', $data);
  592. $this->db->trigger('after_insert', $query);
  593. if ($getLastInsID) {
  594. return $lastInsId;
  595. }
  596. }
  597. return $result;
  598. }
  599. /**
  600. * 获取最近插入的ID
  601. * @access public
  602. * @param BaseQuery $query 查询对象
  603. * @return mixed
  604. */
  605. public function getLastInsID(BaseQuery $query)
  606. {
  607. $id = $this->builder->getLastInsID();
  608. if (is_array($id)) {
  609. array_walk($id, function (&$item, $key) {
  610. if ($item instanceof ObjectID) {
  611. $item = $item->__toString();
  612. }
  613. });
  614. } elseif ($id instanceof ObjectID) {
  615. $id = $id->__toString();
  616. }
  617. return $id;
  618. }
  619. /**
  620. * 批量插入记录
  621. * @access public
  622. * @param BaseQuery $query 查询对象
  623. * @param array $dataSet 数据集
  624. * @return integer
  625. * @throws AuthenticationException
  626. * @throws InvalidArgumentException
  627. * @throws ConnectionException
  628. * @throws RuntimeException
  629. * @throws BulkWriteException
  630. */
  631. public function insertAll(BaseQuery $query, array $dataSet = []): int
  632. {
  633. // 分析查询表达式
  634. $query->parseOptions();
  635. if (!is_array(reset($dataSet))) {
  636. return 0;
  637. }
  638. // 生成bulkWrite对象
  639. $bulk = $this->builder->insertAll($query, $dataSet);
  640. $writeResult = $this->execute($query, $bulk);
  641. return $writeResult->getInsertedCount();
  642. }
  643. /**
  644. * 更新记录
  645. * @access public
  646. * @param BaseQuery $query 查询对象
  647. * @return int
  648. * @throws Exception
  649. * @throws AuthenticationException
  650. * @throws InvalidArgumentException
  651. * @throws ConnectionException
  652. * @throws RuntimeException
  653. * @throws BulkWriteException
  654. */
  655. public function update(BaseQuery $query): int
  656. {
  657. $query->parseOptions();
  658. // 生成bulkWrite对象
  659. $bulk = $this->builder->update($query);
  660. $writeResult = $this->execute($query, $bulk);
  661. $result = $writeResult->getModifiedCount();
  662. if ($result) {
  663. $this->db->trigger('after_update', $query);
  664. }
  665. return $result;
  666. }
  667. /**
  668. * 删除记录
  669. * @access public
  670. * @param BaseQuery $query 查询对象
  671. * @return int
  672. * @throws Exception
  673. * @throws AuthenticationException
  674. * @throws InvalidArgumentException
  675. * @throws ConnectionException
  676. * @throws RuntimeException
  677. * @throws BulkWriteException
  678. */
  679. public function delete(BaseQuery $query): int
  680. {
  681. // 分析查询表达式
  682. $query->parseOptions();
  683. // 生成bulkWrite对象
  684. $bulk = $this->builder->delete($query);
  685. // 执行操作
  686. $writeResult = $this->execute($query, $bulk);
  687. $result = $writeResult->getDeletedCount();
  688. if ($result) {
  689. $this->db->trigger('after_delete', $query);
  690. }
  691. return $result;
  692. }
  693. /**
  694. * 查找记录
  695. * @access public
  696. * @param BaseQuery $query 查询对象
  697. * @return array
  698. * @throws ModelNotFoundException
  699. * @throws DataNotFoundException
  700. * @throws AuthenticationException
  701. * @throws InvalidArgumentException
  702. * @throws ConnectionException
  703. * @throws RuntimeException
  704. */
  705. public function select(BaseQuery $query): array
  706. {
  707. $resultSet = $this->db->trigger('before_select', $query);
  708. if (!$resultSet) {
  709. $resultSet = $this->query($query, function ($query) {
  710. return $this->builder->select($query);
  711. });
  712. }
  713. return $resultSet;
  714. }
  715. /**
  716. * 查找单条记录
  717. * @access public
  718. * @param BaseQuery $query 查询对象
  719. * @return array
  720. * @throws ModelNotFoundException
  721. * @throws DataNotFoundException
  722. * @throws AuthenticationException
  723. * @throws InvalidArgumentException
  724. * @throws ConnectionException
  725. * @throws RuntimeException
  726. */
  727. public function find(BaseQuery $query): array
  728. {
  729. // 事件回调
  730. $result = $this->db->trigger('before_find', $query);
  731. if (!$result) {
  732. // 执行查询
  733. $resultSet = $this->query($query, function ($query) {
  734. return $this->builder->select($query, true);
  735. });
  736. $result = $resultSet[0] ?? [];
  737. }
  738. return $result;
  739. }
  740. /**
  741. * 得到某个字段的值
  742. * @access public
  743. * @param string $field 字段名
  744. * @param mixed $default 默认值
  745. * @return mixed
  746. */
  747. public function value(BaseQuery $query, string $field, $default = null)
  748. {
  749. $options = $query->parseOptions();
  750. if (isset($options['projection'])) {
  751. $query->removeOption('projection');
  752. }
  753. $query->setOption('projection', (array) $field);
  754. if (!empty($options['cache'])) {
  755. $cacheItem = $this->parseCache($query, $options['cache']);
  756. $key = $cacheItem->getKey();
  757. if ($this->cache->has($key)) {
  758. return $this->cache->get($key);
  759. }
  760. }
  761. $mongoQuery = $this->builder->select($query, true);
  762. if (isset($options['projection'])) {
  763. $query->setOption('projection', $options['projection']);
  764. } else {
  765. $query->removeOption('projection');
  766. }
  767. // 执行查询操作
  768. $resultSet = $this->query($query, $mongoQuery);
  769. if (!empty($resultSet)) {
  770. $data = array_shift($resultSet);
  771. $result = $data[$field];
  772. } else {
  773. $result = false;
  774. }
  775. if (isset($cacheItem) && false !== $result) {
  776. // 缓存数据
  777. $cacheItem->set($result);
  778. $this->cacheData($cacheItem);
  779. }
  780. return false !== $result ? $result : $default;
  781. }
  782. /**
  783. * 得到某个列的数组
  784. * @access public
  785. * @param string $field 字段名 多个字段用逗号分隔
  786. * @param string $key 索引
  787. * @return array
  788. */
  789. public function column(BaseQuery $query, string $field, string $key = ''): array
  790. {
  791. $options = $query->parseOptions();
  792. if (isset($options['projection'])) {
  793. $query->removeOption('projection');
  794. }
  795. if ($key && '*' != $field) {
  796. $projection = $key . ',' . $field;
  797. } else {
  798. $projection = $field;
  799. }
  800. $query->field($projection);
  801. if (!empty($options['cache'])) {
  802. // 判断查询缓存
  803. $cacheItem = $this->parseCache($query, $options['cache']);
  804. $key = $cacheItem->getKey();
  805. if ($this->cache->has($key)) {
  806. return $this->cache->get($key);
  807. }
  808. }
  809. $mongoQuery = $this->builder->select($query);
  810. if (isset($options['projection'])) {
  811. $query->setOption('projection', $options['projection']);
  812. } else {
  813. $query->removeOption('projection');
  814. }
  815. // 执行查询操作
  816. $resultSet = $this->query($query, $mongoQuery);
  817. if (('*' == $field || strpos($field, ',')) && $key) {
  818. $result = array_column($resultSet, null, $key);
  819. } elseif (!empty($resultSet)) {
  820. $result = array_column($resultSet, $field, $key);
  821. } else {
  822. $result = [];
  823. }
  824. if (isset($cacheItem)) {
  825. // 缓存数据
  826. $cacheItem->set($result);
  827. $this->cacheData($cacheItem);
  828. }
  829. return $result;
  830. }
  831. /**
  832. * 执行command
  833. * @access public
  834. * @param BaseQuery $query 查询对象
  835. * @param string|array|object $command 指令
  836. * @param mixed $extra 额外参数
  837. * @param string $db 数据库名
  838. * @return array
  839. */
  840. public function cmd(BaseQuery $query, $command, $extra = null, string $db = ''): array
  841. {
  842. if (is_array($command) || is_object($command)) {
  843. $this->mongoLog('cmd', 'cmd', $command);
  844. // 直接创建Command对象
  845. $command = new Command($command);
  846. } else {
  847. // 调用Builder封装的Command对象
  848. $command = $this->builder->$command($query, $extra);
  849. }
  850. return $this->command($command, $db);
  851. }
  852. /**
  853. * 执行数据库事务
  854. * @access public
  855. * @param callable $callback 数据操作方法回调
  856. * @return mixed
  857. * @throws PDOException
  858. * @throws \Exception
  859. * @throws \Throwable
  860. */
  861. public function transaction(callable $callback)
  862. {}
  863. /**
  864. * 启动事务
  865. * @access public
  866. * @return void
  867. * @throws \PDOException
  868. * @throws \Exception
  869. */
  870. public function startTrans()
  871. {}
  872. /**
  873. * 用于非自动提交状态下面的查询提交
  874. * @access public
  875. * @return void
  876. * @throws PDOException
  877. */
  878. public function commit()
  879. {}
  880. /**
  881. * 事务回滚
  882. * @access public
  883. * @return void
  884. * @throws PDOException
  885. */
  886. public function rollback()
  887. {}
  888. }