QueueServices.php 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | CRMEB [ CRMEB赋能开发者,助力企业发展 ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2016~2020 https://www.crmeb.com All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed CRMEB并不是自由软件,未经许可不能去掉CRMEB相关版权
  8. // +----------------------------------------------------------------------
  9. // | Author: CRMEB Team <admin@crmeb.com>
  10. // +----------------------------------------------------------------------
  11. namespace app\services\other\queue;
  12. use app\dao\other\queue\QueueDao;
  13. use app\jobs\BatchHandleJob;
  14. use app\services\BaseServices;
  15. use app\services\activity\coupon\StoreCouponIssueServices;
  16. use app\services\order\StoreCartServices;
  17. use app\services\order\StoreOrderDeliveryServices;
  18. use app\services\order\StoreOrderServices;
  19. use app\services\product\category\StoreProductCategoryServices;
  20. use app\services\product\product\StoreProductRelationServices;
  21. use app\services\product\product\StoreProductServices;
  22. use app\services\product\sku\StoreProductRuleServices;
  23. use app\services\user\group\UserGroupServices;
  24. use app\services\user\label\UserLabelRelationServices;
  25. use app\services\user\label\UserLabelServices;
  26. use app\services\user\UserServices;
  27. use crmeb\exceptions\AdminException;
  28. use crmeb\services\CacheService;
  29. use think\exception\ValidateException;
  30. use think\facade\Log;
  31. /**
  32. * 队列
  33. * Class QueueServices
  34. * @package app\services\other\queue
  35. * @mixin QueueDao
  36. */
  37. class QueueServices extends BaseServices
  38. {
  39. /**
  40. * 任务类型名称
  41. * @var string[]
  42. */
  43. public $queue_type_name = [
  44. 1 => "批量发放用户优惠券",
  45. 2 => "批量设置用户分组",
  46. 3 => "批量设置用户标签",
  47. 4 => "批量下架商品",
  48. 5 => "批量删除商品规格",
  49. 6 => "批量删除订单",
  50. 7 => "批量手动发货",
  51. 8 => "批量打印电子面单",
  52. 9 => "批量配送",
  53. 10 => "批量虚拟发货",
  54. ];
  55. /**
  56. * 任务redis缓存key
  57. * @var string[]
  58. */
  59. public $queue_redis_key = [
  60. 1 => "DrivingSendCoupon-ADMIN",
  61. 2 => "DrivingUserGroup-ADMIN",
  62. 3 => "DrivingUserLabel-ADMIN",
  63. 4 => "DrivingProductUnshow-ADMIN",
  64. 5 => "DrivingProductRule-ADMIN",
  65. 6 => "DrivingOrderDel-ADMIN",
  66. 7 => 3,
  67. 8 => 4,
  68. 9 => 5,
  69. 10 => 6,
  70. ];
  71. /**
  72. * 状态
  73. * @var string[]
  74. */
  75. protected $status_name = [
  76. 0 => '未处理',
  77. 1 => '正在处理',
  78. 2 => '完成',
  79. 3 => '失败'
  80. ];
  81. public function __construct(QueueDao $dao)
  82. {
  83. $this->dao = $dao;
  84. }
  85. /**
  86. * 获取任务列表
  87. * @param array $where
  88. */
  89. public function getList(array $where = [])
  90. {
  91. [$page, $limit] = $this->getPageValue();
  92. $list = $this->dao->getList($where, $page, $limit);
  93. if ($list) {
  94. foreach ($list as &$v) {
  95. $v['finish_time'] = $v['finish_time'] ? date('Y-m-d H:i:s', $v['finish_time']) : "";
  96. $v['first_time'] = $v['first_time'] ? date('Y-m-d H:i:s', $v['first_time']) : "";
  97. $v['again_time'] = $v['again_time'] ? date('Y-m-d H:i:s', $v['again_time']) : "";
  98. $v['status_cn'] = $this->status_name[$v['status']] ?? '';
  99. $v['is_show_log'] = false;
  100. if (in_array($v['type'], [7, 8, 9, 10])) {
  101. $v['is_show_log'] = true;
  102. $v['is_error_button'] = $v['status'] == 2;
  103. }
  104. $v['type_cn'] = $this->queue_type_name[$v['type']] ?? '';
  105. $v['cache_type'] = $this->queue_redis_key[$v['type']] ?? 0;
  106. $v['success_num'] = bcsub($v['total_num'], $v['surplus_num'], 0);
  107. //是否显示停止按钮
  108. $v['is_stop_button'] = $v['status'] == 1;
  109. $v['add_time'] = date('Y-m-d H:i:s', $v['add_time']);
  110. }
  111. }
  112. $count = $this->dao->count($where);
  113. return compact('list', 'count');
  114. }
  115. /**
  116. * 将要执行的任务数据存入表中
  117. * @param array $where
  118. * @param string $field
  119. * @param array $data
  120. * @param int $type
  121. * @return mixed
  122. */
  123. public function setQueueData(array $where = [], $field = "*", array $data = [], int $type = 1, $other = false)
  124. {
  125. if (!$type) throw new ValidateException('缺少执行任务类型');
  126. $queue_redis_keys = $this->queue_redis_key;
  127. $redisKey = $queue_redis_keys[$type] ?? '';
  128. $queue_type_name = $this->queue_type_name;
  129. $queueName = $queue_type_name[$type] ?? '';
  130. if (!$redisKey || !$queueName) {
  131. throw new ValidateException('缺少队列缓存KEY,或者不存在此类型队列');
  132. }
  133. //检查同类型其他任务
  134. $this->checkTypeQueue($redisKey);
  135. $source = 'admin';
  136. if (in_array($type, [1, 2, 3, 4, 5, 6])) {
  137. $queueDataNum = $this->setRedisData($redisKey, $type, $data, $where, $field);
  138. if (!$queueDataNum) {
  139. throw new ValidateException('需要执行的批量数据为空');
  140. }
  141. if (!$id = $this->dao->addQueueList($queueName, $queueDataNum, $type, $redisKey, $source)) {
  142. throw new ValidateException('添加队列失败');
  143. }
  144. } else {
  145. if ($type == 7) {
  146. $ids = array_column($data, 0);
  147. } else {
  148. $ids = $data;
  149. }
  150. /** @var StoreOrderServices $orderService */
  151. $orderService = app()->make(StoreOrderServices::class);
  152. $oids = $orderService->getOrderDumpData($where, $ids, $field);
  153. $order_ids = [];
  154. if ($oids) {
  155. //过滤拼团未完成订单
  156. foreach ($oids as $order) {
  157. if (isset($order['pinkStatus']) && $order['pinkStatus'] != 2) {
  158. continue;
  159. }
  160. $order_ids[] = $order['id'];
  161. }
  162. }
  163. if (!$order_ids) {
  164. throw new ValidateException('暂无需要发货订单');
  165. }
  166. if (!$id = $this->dao->addQueueList($queueName, count($ids), $type, $redisKey, $source)) {
  167. throw new ValidateException('添加队列失败');
  168. }
  169. /** @var QueueAuxiliaryServices $auxiliaryService */
  170. $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
  171. $auxiliaryService->saveQueueOrderData($id, $order_ids, $data, $type, $redisKey);
  172. }
  173. return $id;
  174. }
  175. /**
  176. * 队列数据放入redis集合
  177. * @param $redisKey
  178. * @param $type
  179. * @param array $dataIds
  180. * @param array $where
  181. * @param string $filed
  182. * @return int
  183. * @throws \think\db\exception\DataNotFoundException
  184. * @throws \think\db\exception\DbException
  185. * @throws \think\db\exception\ModelNotFoundException
  186. */
  187. public function setRedisData($redisKey, $type, array $dataIds, array $where, $filed = "*")
  188. {
  189. if (!$redisKey || !$type) return 0;
  190. /** @var CacheService $redis */
  191. $redis = app()->make(CacheService::class);
  192. if ($dataIds) {
  193. foreach ($dataIds as $v) {
  194. $redis->sAdd($redisKey, $v);
  195. }
  196. } else {
  197. if ($where) {
  198. foreach ($where as $k => $v) {
  199. if (!$v) unset($where[$k]);
  200. }
  201. }
  202. switch ($type) {
  203. case 1://批量发放优惠券
  204. case 2://批量设置用户分组
  205. case 3://批量设置用户标签
  206. /** @var UserServices $userService */
  207. $userService = app()->make(UserServices::class);
  208. $dataInfo = $userService->getUserInfoList($where, $filed);
  209. if ($dataInfo) {
  210. foreach ($dataInfo as $k => $v) {
  211. $redis->sAdd($redisKey, $v['uid']);
  212. }
  213. }
  214. break;
  215. case 4://批量上下架商品
  216. $cateIds = [];
  217. if (isset($where['cate_id']) && $where['cate_id']) {
  218. /** @var StoreProductCategoryServices $storeCategory */
  219. $storeCategory = app()->make(StoreProductCategoryServices::class);
  220. $cateIds = $storeCategory->getColumn(['pid' => $where['cate_id']], 'id');
  221. }
  222. if ($cateIds) {
  223. $cateIds[] = $where['cate_id'];
  224. $where['cate_id'] = $cateIds;
  225. }
  226. /** @var StoreProductServices $productService */
  227. $productService = app()->make(StoreProductServices::class);
  228. $dataInfo = $productService->getProductListByWhere($where, $filed);
  229. if ($dataInfo) {
  230. foreach ($dataInfo as $k => $v) {
  231. $redis->sAdd($redisKey, $v['id']);
  232. }
  233. }
  234. break;
  235. case 5://批量删除商品规格
  236. /** @var StoreProductRuleServices $productRuleService */
  237. $productRuleService = app()->make(StoreProductRuleServices::class);
  238. $dataInfo = $productRuleService->getProductRuleList($where, $filed);
  239. if ($dataInfo) {
  240. foreach ($dataInfo as $k => $v) {
  241. $redis->sAdd($redisKey, $v['id']);
  242. }
  243. }
  244. break;
  245. case 6://批量删除用户已删除订单
  246. /** @var StoreOrderServices $orderService */
  247. $orderService = app()->make(StoreOrderServices::class);
  248. $dataInfo = $orderService->getOrderListByWhere($where, $filed);
  249. if ($dataInfo) {
  250. foreach ($dataInfo as $k => $v) {
  251. $redis->sAdd($redisKey, $v['id']);
  252. }
  253. }
  254. break;
  255. default:
  256. return 0;
  257. break;
  258. }
  259. }
  260. return $redis->sCard($redisKey);
  261. }
  262. /**
  263. * 获取队列redis中存的数据集合
  264. * @param string $redisKey
  265. * @param array $queueInfo
  266. * @return array
  267. */
  268. public function getQueueRedisdata($queueInfo, string $redisKey = '')
  269. {
  270. if (!$queueInfo) return [$redisKey, []];
  271. if (!$redisKey) {
  272. $redisKey = $queueInfo['execute_key'] ?? '';
  273. }
  274. if (!$redisKey) {
  275. return [$redisKey, []];
  276. }
  277. /** @var CacheService $redis */
  278. $redis = app()->make(CacheService::class);
  279. return [$redisKey, $redis->sMembers($redisKey)];
  280. }
  281. /**
  282. * 批量发送优惠券
  283. * @param $coupon
  284. * @param $type
  285. * @return bool
  286. * @throws \think\db\exception\DataNotFoundException
  287. * @throws \think\db\exception\DbException
  288. * @throws \think\db\exception\ModelNotFoundException
  289. */
  290. public function sendCoupon($coupon, $type)
  291. {
  292. if (!$type || !$coupon) return false;
  293. $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
  294. if (!$queueInfo) {
  295. return false;
  296. }
  297. //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
  298. $this->dao->setQueueDoing($coupon, $queueInfo['id']);
  299. [$redisKey, $uids] = $this->getQueueRedisdata($queueInfo);
  300. if ($uids) {
  301. $chunkUids = array_chunk($uids, 100, true);
  302. /** @var StoreCouponIssueServices $issueService */
  303. $issueService = app()->make(StoreCouponIssueServices::class);
  304. foreach ($chunkUids as $v) {
  305. $issueService->setCoupon($coupon, $v, $redisKey, $queueInfo);
  306. }
  307. }
  308. //发完后将队列置为完成
  309. $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
  310. return true;
  311. }
  312. /**
  313. * 批量设置用户分组
  314. * @param $groupId
  315. * @param $type
  316. * @return bool
  317. * @throws \think\db\exception\DataNotFoundException
  318. * @throws \think\db\exception\DbException
  319. * @throws \think\db\exception\ModelNotFoundException
  320. */
  321. public function setUserGroup($groupId, $type)
  322. {
  323. if (!$groupId || !$type) return false;
  324. $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
  325. if (!$queueInfo) {
  326. return false;
  327. }
  328. /** @var UserGroupServices $userGroup */
  329. $userGroup = app()->make(UserGroupServices::class);
  330. if (!$userGroup->getGroup($groupId)) {
  331. return false;
  332. }
  333. //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
  334. $this->dao->setQueueDoing($groupId, $queueInfo['id']);
  335. [$redisKey, $uids] = $this->getQueueRedisdata($queueInfo);
  336. if ($uids) {
  337. $chunkUids = array_chunk($uids, 1000, true);
  338. /** @var UserServices $userServices */
  339. $userServices = app()->make(UserServices::class);
  340. foreach ($chunkUids as $v) {
  341. //执行分组
  342. if (!$userServices->setUserGroup($v, $groupId)) {
  343. $this->setQueueFail($queueInfo['id'], $redisKey);
  344. } else {
  345. $this->doSuccessSremRedis($v, $redisKey, $type);
  346. }
  347. }
  348. }
  349. //发完后将队列置为完成
  350. $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
  351. return true;
  352. }
  353. /**
  354. * 批量设置用户标签
  355. * @param $labelId
  356. * @param $type
  357. * @return bool
  358. * @throws \think\db\exception\DataNotFoundException
  359. * @throws \think\db\exception\DbException
  360. * @throws \think\db\exception\ModelNotFoundException
  361. */
  362. public function setUserLabel($labelId, $type, $other = [])
  363. {
  364. if (!$labelId || !$type) return false;
  365. $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
  366. if (!$queueInfo) {
  367. return false;
  368. }
  369. /** @var UserLabelServices $userLabelServices */
  370. $userLabelServices = app()->make(UserLabelServices::class);
  371. $count = $userLabelServices->getCount([['id', 'IN', $labelId]]);
  372. if ($count != count($labelId)) {
  373. return false;
  374. }
  375. //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
  376. $this->dao->setQueueDoing($labelId, $queueInfo['id']);
  377. [$redisKey, $uids] = $this->getQueueRedisdata($queueInfo);
  378. if ($uids) {
  379. $chunkUids = array_chunk($uids, 1000, true);
  380. /** @var UserLabelRelationServices $services */
  381. $services = app()->make(UserLabelRelationServices::class);
  382. $store_id = (int)$other['store_id'] ?? 0;
  383. foreach ($chunkUids as $v) {
  384. if (!$services->setUserLable($v, $labelId, $store_id ? 1 : 0, $store_id)) {
  385. $this->setQueueFail($queueInfo['id'], $redisKey);
  386. } else {
  387. $this->doSuccessSremRedis($v, $redisKey, $type);
  388. }
  389. }
  390. }
  391. //发完后将队列置为完成
  392. $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
  393. return true;
  394. }
  395. /**
  396. * 商品批量上下架
  397. * @param string $upORdown
  398. * @param $type
  399. * @return bool
  400. * @throws \think\db\exception\DataNotFoundException
  401. * @throws \think\db\exception\DbException
  402. * @throws \think\db\exception\ModelNotFoundException
  403. */
  404. public function setProductShow($upORdown = "up", $type = 1)
  405. {
  406. if (!$type) return false;
  407. $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
  408. if (!$queueInfo) {
  409. return false;
  410. }
  411. //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
  412. $this->dao->setQueueDoing($upORdown, $queueInfo['id']);
  413. [$redisKey, $pids] = $this->getQueueRedisdata($queueInfo);
  414. if ($pids) {
  415. $chunkPids = array_chunk($pids, 1000, true);
  416. $isShow = 0;
  417. if ($upORdown == 'up') $isShow = 1;
  418. /** @var StoreProductServices $storeproductServices */
  419. $storeproductServices = app()->make(StoreProductServices::class);
  420. /** @var StoreProductRelationServices $storeProductRelationServices */
  421. $storeProductRelationServices = app()->make(StoreProductRelationServices::class);
  422. /** @var StoreCartServices $cartService */
  423. $cartService = app()->make(StoreCartServices::class);
  424. $update = ['is_show' => $isShow];
  425. if ($isShow) {//手动上架 清空定时下架状态
  426. $update['auto_off_time'] = 0;
  427. }
  428. foreach ($chunkPids as $v) {
  429. //商品
  430. $res = $storeproductServices->batchUpdate($v, $update);
  431. //门店商品
  432. $storeproductServices->batchUpdateAppendWhere($v, $update, ['type' => 1], 'pid');
  433. if ($isShow == 0) {
  434. $storeProductRelationServices->setShow($v, (int)$isShow);
  435. //购物车
  436. $cartService->batchUpdate($v, ['status' => 1], 'product_id');
  437. }
  438. //下架检测是否有参与活动商品
  439. try {
  440. $is_activity = $storeproductServices->checkActivity($v);
  441. } catch (\Throwable $e) {
  442. $is_activity = false;
  443. }
  444. if ($isShow == 0 || $is_activity) {
  445. //改变购物车中状态
  446. $storeProductRelationServices->setShow($v, (int)$isShow);
  447. }
  448. if (!$res) {
  449. $this->setQueueFail($queueInfo['id'], $redisKey);
  450. } else {
  451. $this->doSuccessSremRedis($v, $redisKey, $type);
  452. }
  453. }
  454. }
  455. //发完后将队列置为完成
  456. $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
  457. return true;
  458. }
  459. /**
  460. * 批量队列删除商品规格
  461. * @param $type
  462. * @return bool
  463. * @throws \think\db\exception\DataNotFoundException
  464. * @throws \think\db\exception\DbException
  465. * @throws \think\db\exception\ModelNotFoundException
  466. */
  467. public function delProductRule($type)
  468. {
  469. if (!$type) return false;
  470. $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
  471. if (!$queueInfo) {
  472. return false;
  473. }
  474. //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
  475. $this->dao->setQueueDoing('', $queueInfo['id']);
  476. [$redisKey, $pids] = $this->getQueueRedisdata($queueInfo);
  477. if ($pids) {
  478. $chunkPids = array_chunk($pids, 1000, true);
  479. /** @var StoreProductRuleServices $storeProductRuleservices */
  480. $storeProductRuleservices = app()->make(StoreProductRuleServices::class);
  481. foreach ($chunkPids as $v) {
  482. $res = $storeProductRuleservices->del(implode(',', $v));
  483. if ($res) {
  484. $this->doSuccessSremRedis($v, $redisKey, $queueInfo['type']);
  485. } else {
  486. $this->addQueueFail($queueInfo['id'], $redisKey);
  487. }
  488. }
  489. }
  490. //发完后将队列置为完成
  491. $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
  492. return true;
  493. }
  494. /**
  495. * 批量队列删除订单
  496. * @param $type
  497. * @return bool
  498. * @throws \think\db\exception\DataNotFoundException
  499. * @throws \think\db\exception\DbException
  500. * @throws \think\db\exception\ModelNotFoundException
  501. */
  502. public function delOrder($type)
  503. {
  504. if (!$type) return false;
  505. $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
  506. if (!$queueInfo) {
  507. return false;
  508. }
  509. //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
  510. $this->dao->setQueueDoing('', $queueInfo['id']);
  511. [$redisKey, $pids] = $this->getQueueRedisdata($queueInfo);
  512. if ($pids) {
  513. $chunkPids = array_chunk($pids, 1000, true);
  514. /** @var StoreOrderServices $storeOrderServices */
  515. $storeOrderServices = app()->make(StoreOrderServices::class);
  516. foreach ($chunkPids as $v) {
  517. $res = $storeOrderServices->batchUpdateOrder($v, ['is_system_del' => 1]);
  518. if ($res) {
  519. $this->doSuccessSremRedis($v, $redisKey, $type);
  520. } else {
  521. $this->setQueueFail($queueInfo['id'], $redisKey);
  522. }
  523. }
  524. }
  525. //发完后将队列置为完成
  526. $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
  527. return true;
  528. }
  529. /**
  530. * 队列批量发货
  531. * @param $oid
  532. * @param array $deliveryData
  533. * @return bool
  534. * @throws \think\db\exception\DataNotFoundException
  535. * @throws \think\db\exception\DbException
  536. * @throws \think\db\exception\ModelNotFoundException
  537. */
  538. public function orderDelivery($oid, array $deliveryData)
  539. {
  540. if (!$oid) return false;
  541. if (!isset($deliveryData['queueType'])) {
  542. return false;
  543. }
  544. /** @var QueueAuxiliaryServices $auxiliaryService */
  545. $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
  546. //看是否能查到任务数据
  547. $auxiliaryInfo = $auxiliaryService->getOrderCacheOne(['binding_id' => $deliveryData['queueId'], 'relation_id' => $oid, 'type' => $deliveryData['cacheType']]);
  548. if (!$auxiliaryInfo || !$auxiliaryInfo['other']) {
  549. return false;
  550. }
  551. $deliveryInfo = json_decode($auxiliaryInfo['other'], true);
  552. if ($deliveryData['queueType'] == 7) {
  553. if (!$deliveryInfo['delivery_name'] || !$deliveryInfo['delivery_code'] || !$deliveryInfo['delivery_id']) {
  554. return false;
  555. }
  556. $deliveryData['express_record_type'] = 1;
  557. $deliveryData['delivery_name'] = $deliveryInfo['delivery_name'];
  558. $deliveryData['delivery_id'] = $deliveryInfo['delivery_id'];
  559. $deliveryData['delivery_code'] = $deliveryInfo['delivery_code'];
  560. }
  561. try {
  562. /** @var StoreOrderDeliveryServices $storeOrderDelivery */
  563. $storeOrderDelivery = app()->make(StoreOrderDeliveryServices::class);
  564. //发货
  565. $storeOrderDelivery->delivery($oid, $deliveryData);
  566. } catch (\Throwable $e) {
  567. Log::error('队列发货失败发货,order_id:' . $oid . ',原因:' . $e->getMessage());
  568. }
  569. //更改队列子集数据
  570. $this->doSuccessSremRedis(['order_id' => $oid], $deliveryData['queueId'], $deliveryData['queueType'], ['phone_message' => 1, 'status' => 1]);
  571. //队列置为完成
  572. return $this->setQueueSuccess($deliveryData['queueId'], $deliveryData['queueType']);
  573. }
  574. /**
  575. * 添加任务前校验同类型任务状态
  576. * @param $type
  577. * @param array $queueInfo
  578. * @param false $is_again
  579. * @return bool
  580. * @throws \think\db\exception\DataNotFoundException
  581. * @throws \think\db\exception\DbException
  582. * @throws \think\db\exception\ModelNotFoundException
  583. */
  584. public function checkTypeQueue($type, array $queueInfo = [], bool $is_again = false)
  585. {
  586. if (!$type) return false;
  587. if (!$queueInfo) {
  588. $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => [0, 1]]);
  589. }
  590. if (!$queueInfo) {
  591. return false;
  592. }
  593. $num = 0;
  594. if (in_array($type, [7, 8, 9, 10])) {
  595. /** @var QueueAuxiliaryServices $auxiliaryService */
  596. $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
  597. $num = $auxiliaryService->count(['binding_id' => $queueInfo['id'], 'status' => 0]);
  598. } else {
  599. if ($queueInfo['execute_key']) {
  600. /** @var CacheService $redis */
  601. $redis = app()->make(CacheService::class);
  602. $num = $redis->sCard($queueInfo['execute_key']);
  603. }
  604. }
  605. if ($num) {
  606. if (!$is_again) {
  607. if ($queueInfo['status'] == 0) {
  608. throw new AdminException('上次批量任务尚未执行,请前往任务列表手动执行');
  609. }
  610. if ($queueInfo['status'] == 1) {
  611. throw new AdminException('有正在执行中的任务,请耐心等待,若长时间无反应,前往任务列表修复异常数据,再手动执行');
  612. }
  613. }
  614. } else {
  615. $this->delWrongQueue(0, $type);
  616. return false;
  617. }
  618. return true;
  619. }
  620. /**
  621. * 修复异常任务
  622. * @param $queueInfo
  623. * @return bool|mixed
  624. */
  625. public function repairWrongQueue($queueInfo)
  626. {
  627. if (!$queueInfo) throw new AdminException('任务不存在');
  628. try {
  629. switch ($queueInfo['type']) {
  630. case 1://批量发放优惠券
  631. case 2://批量设置用户分组
  632. case 3://批量设置用户标签
  633. case 4://批量上下架商品
  634. case 5://批量删除商品规格
  635. case 6://批量删除用户已删除订单
  636. if (!$queueInfo['execute_key']) {
  637. throw new AdminException('缓存key缺失,请清除数据');
  638. }
  639. /** @var CacheService $redis */
  640. $redis = app()->make(CacheService::class);
  641. $cacheNum = $redis->sCard($queueInfo['execute_key']);
  642. if ($cacheNum != $queueInfo['surplus_num']) {
  643. return $this->dao->update(['id' => $queueInfo['id']], ['surplus_num' => $cacheNum]);
  644. }
  645. break;
  646. case 7://手动发货
  647. case 8://电子面单发货
  648. case 9://批量配送
  649. case 10://批量虚拟发货
  650. /** @var QueueAuxiliaryServices $auxiliaryService */
  651. $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
  652. $cacheType = $this->queue_redis_key[$queueInfo['type']] ?? '';
  653. $cacheFailAndNoNum = $auxiliaryService->getCountOrder(['binding_id' => $queueInfo['id'], 'type' => $cacheType, 'status' => [0, 2]]);
  654. $cacheTotalNum = $auxiliaryService->getCountOrder(['binding_id' => $queueInfo['id'], 'type' => $cacheType, 'status' => [0, 1, 2]]);
  655. //如果任务已经执行完毕,但是记录却存在未执行数据,要进行修复,让其重新执行
  656. if ($cacheFailAndNoNum && $queueInfo['status'] == 2) return $this->dao->update(['id' => $queueInfo['id']], ['status' => 3, 'surplus_num' => $cacheFailAndNoNum, 'total_num' => $cacheTotalNum]);
  657. //如果执行失败,记录全部执行成功,那么进行修复
  658. if (!$cacheFailAndNoNum && $queueInfo['status'] != 2) return $this->dao->update(['id' => $queueInfo['id']], ['status' => 2, 'surplus_num' => 0, 'total_num' => $cacheTotalNum]);
  659. }
  660. return true;
  661. } catch (\Exception $e) {
  662. throw new AdminException($e->getMessage());
  663. }
  664. }
  665. /**
  666. * 队列再次执行
  667. * @param $queueId
  668. * @param $type
  669. * @return bool
  670. * @throws \think\db\exception\DataNotFoundException
  671. * @throws \think\db\exception\DbException
  672. * @throws \think\db\exception\ModelNotFoundException
  673. */
  674. public function againDoQueue($queueId, $type)
  675. {
  676. $queueInfo = $this->getQueueOne(['id' => $queueId, 'type' => $type]);
  677. if (!$queueInfo) {
  678. throw new AdminException('队列任务不存在');
  679. }
  680. if (!$queueInfo['queue_in_value']) {
  681. throw new AdminException('队列关键数据缺失,请清除此任务及异常数据');
  682. }
  683. if ($queueInfo['status'] == 2) {
  684. throw new AdminException('队列已完成');
  685. }
  686. if ($queueInfo['status'] == 3) {
  687. throw new AdminException('队列异常,请清除队列重新加入');
  688. }
  689. if ($queueInfo['status'] == 4) {
  690. throw new AdminException('队列已删除');
  691. }
  692. //检测当前队列
  693. if (!$this->checkTypeQueue($type, $queueInfo, true)) {
  694. throw new AdminException('任务已清除,无需再次执行');
  695. }
  696. //先进行数据修复
  697. $this->repairWrongQueue($queueInfo);
  698. if (in_array($type, [7, 8, 9, 10])) {
  699. $queueInValue = json_decode($queueInfo['queue_in_value'], true);
  700. /** @var StoreOrderServices $storeOrderService */
  701. $storeOrderService = app()->make(StoreOrderServices::class);
  702. $storeOrderService->adminQueueOrderDo($queueInValue, true);
  703. } else {
  704. $queueInValue = $queueInfo['queue_in_value'];
  705. if ($type == 1) {
  706. $queueInValue = json_decode($queueInfo['queue_in_value'], true);
  707. }
  708. //加入队列
  709. BatchHandleJob::dispatch([$queueInValue, $type]);
  710. }
  711. return true;
  712. }
  713. /**
  714. * 任务执行失败,修改队列状态
  715. * @param $queueId
  716. * @param string $redisKey
  717. * @return mixed
  718. */
  719. public function setQueueFail($queueId, $redisKey = '')
  720. {
  721. if ($redisKey) {
  722. /** @var CacheService $cacheService */
  723. $cacheService = app()->make(CacheService::class);
  724. $surplusNum = $cacheService->sCard($redisKey);
  725. } else {
  726. /** @var QueueAuxiliaryServices $auxiliaryService */
  727. $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
  728. $surplusNum = $auxiliaryService->count(['binding_id' => $queueId, 'status' => 0]);
  729. }
  730. return $this->dao->update(['id' => $queueId], ['status' => 3, 'surplus_num' => $surplusNum]);
  731. }
  732. /**
  733. * 将执行成功数据移除redis集合
  734. * @param array $data
  735. * @param $redisKey
  736. * @return bool
  737. */
  738. public function doSuccessSremRedis(array $data, $redisKey, $type, array $otherData = [])
  739. {
  740. if (!$data || !$redisKey || !$type) return true;
  741. if (in_array($type, [7, 8, 9, 10])) {
  742. $where['relation_id'] = $data['order_id'];
  743. $where['binding_id'] = $redisKey;
  744. /** @var QueueAuxiliaryServices $auxiliaryService */
  745. $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
  746. $getOne = $auxiliaryService->getOrderCacheOne($where);
  747. if (!$getOne) return false;
  748. $other = json_decode($getOne['other'], true);
  749. if (isset($otherData['delivery_status'])) $other['delivery_status'] = $otherData['delivery_status'];
  750. if (isset($otherData['wx_message'])) $other['wx_message'] = $otherData['wx_message'];
  751. if (isset($otherData['phone_message'])) $other['phone_message'] = $otherData['phone_message'];
  752. if (isset($otherData['error_info'])) $other['error_info'] = $otherData['error_info'];
  753. $updateData['status'] = isset($otherData['status']) ? $otherData['status'] : 0;
  754. $updateData['other'] = json_encode($other);
  755. $updateData['update_time'] = time();
  756. $auxiliaryService->updateOrderStatus($where, $updateData);
  757. } else {//在redis缓存集合的从集合删除
  758. /** @var CacheService $redis */
  759. $redis = app()->make(CacheService::class);
  760. foreach ($data as $k => $v) {
  761. $redis->sRem($redisKey, $v);
  762. }
  763. }
  764. return true;
  765. }
  766. /**
  767. * 任务执行成功
  768. * @param $queueId
  769. * @param $type
  770. * @return bool|mixed
  771. * @throws \think\db\exception\DataNotFoundException
  772. * @throws \think\db\exception\DbException
  773. * @throws \think\db\exception\ModelNotFoundException
  774. */
  775. public function setQueueSuccess($queueId, $type)
  776. {
  777. if (!$queueId || !$type) return false;
  778. $queueInfo = $this->dao->get($queueId);
  779. if (!$queueInfo) return false;
  780. $res = true;
  781. if (in_array($type, [7, 8, 9, 10])) {
  782. $res = false;
  783. if ($queueInfo['surplus_num'] > 0) {
  784. $this->dao->bcDec($queueId, 'surplus_num', 1);
  785. }
  786. //看是否全部执行成功
  787. $queueInfo = $this->dao->get($queueId);
  788. if ($queueInfo['surplus_num'] == 0) {
  789. $res = true;
  790. }
  791. }
  792. if ($res) {
  793. $update = [
  794. 'status' => 2,
  795. 'finish_time' => time(),
  796. 'surplus_num' => 0
  797. ];
  798. return $this->dao->update(['id' => $queueId], $update);
  799. }
  800. return true;
  801. }
  802. /**
  803. * 清除异常队列
  804. * @param $queueId
  805. * @param $type
  806. * @return bool
  807. * @throws \think\db\exception\DataNotFoundException
  808. * @throws \think\db\exception\DbException
  809. * @throws \think\db\exception\ModelNotFoundException
  810. */
  811. public function delWrongQueue($queueId, $type, $is_del = true)
  812. {
  813. if (!$type) return false;
  814. if ($queueId) {
  815. $queueInfo = $this->dao->getQueueOne(['id' => $queueId, 'type' => $type]);
  816. } else {
  817. $queueInfo = $this->dao->getQueueOne(['type' => $type]);
  818. }
  819. if (!$queueInfo) {
  820. return true;
  821. }
  822. try {
  823. $data = ['status' => 3];
  824. if ($is_del) {
  825. if (in_array($type, [7, 8, 9, 10])) {
  826. /** @var QueueAuxiliaryServices $auxiliaryService */
  827. $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
  828. $auxiliaryService->batchUpdate(['binding_id' => $queueInfo['id']], ['status' => 3]);
  829. } else {
  830. if ($queueInfo['execute_key']) {
  831. /** @var CacheService $redis */
  832. $redis = app()->make(CacheService::class);
  833. $redis->del($queueInfo['execute_key']);
  834. }
  835. }
  836. $data = ['is_del' => 1, 'status' => 4];
  837. }
  838. $this->dao->update(['id' => $queueInfo['id']], $data);
  839. } catch (\Throwable $e) {
  840. Log::error('清除异常队列失败,原因' . $e->getMessage());
  841. }
  842. return true;
  843. }
  844. }