123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888 |
- <?php
- // +----------------------------------------------------------------------
- // | CRMEB [ CRMEB赋能开发者,助力企业发展 ]
- // +----------------------------------------------------------------------
- // | Copyright (c) 2016~2020 https://www.crmeb.com All rights reserved.
- // +----------------------------------------------------------------------
- // | Licensed CRMEB并不是自由软件,未经许可不能去掉CRMEB相关版权
- // +----------------------------------------------------------------------
- // | Author: CRMEB Team <admin@crmeb.com>
- // +----------------------------------------------------------------------
- namespace app\services\other\queue;
- use app\dao\other\queue\QueueDao;
- use app\jobs\BatchHandleJob;
- use app\services\BaseServices;
- use app\services\activity\coupon\StoreCouponIssueServices;
- use app\services\order\StoreCartServices;
- use app\services\order\StoreOrderDeliveryServices;
- use app\services\order\StoreOrderServices;
- use app\services\product\category\StoreProductCategoryServices;
- use app\services\product\product\StoreProductRelationServices;
- use app\services\product\product\StoreProductServices;
- use app\services\product\sku\StoreProductRuleServices;
- use app\services\user\group\UserGroupServices;
- use app\services\user\label\UserLabelRelationServices;
- use app\services\user\label\UserLabelServices;
- use app\services\user\UserServices;
- use crmeb\exceptions\AdminException;
- use crmeb\services\CacheService;
- use think\exception\ValidateException;
- use think\facade\Log;
- /**
- * 队列
- * Class QueueServices
- * @package app\services\other\queue
- * @mixin QueueDao
- */
- class QueueServices extends BaseServices
- {
- /**
- * 任务类型名称
- * @var string[]
- */
- public $queue_type_name = [
- 1 => "批量发放用户优惠券",
- 2 => "批量设置用户分组",
- 3 => "批量设置用户标签",
- 4 => "批量下架商品",
- 5 => "批量删除商品规格",
- 6 => "批量删除订单",
- 7 => "批量手动发货",
- 8 => "批量打印电子面单",
- 9 => "批量配送",
- 10 => "批量虚拟发货",
- ];
- /**
- * 任务redis缓存key
- * @var string[]
- */
- public $queue_redis_key = [
- 1 => "DrivingSendCoupon-ADMIN",
- 2 => "DrivingUserGroup-ADMIN",
- 3 => "DrivingUserLabel-ADMIN",
- 4 => "DrivingProductUnshow-ADMIN",
- 5 => "DrivingProductRule-ADMIN",
- 6 => "DrivingOrderDel-ADMIN",
- 7 => 3,
- 8 => 4,
- 9 => 5,
- 10 => 6,
- ];
- /**
- * 状态
- * @var string[]
- */
- protected $status_name = [
- 0 => '未处理',
- 1 => '正在处理',
- 2 => '完成',
- 3 => '失败'
- ];
- public function __construct(QueueDao $dao)
- {
- $this->dao = $dao;
- }
- /**
- * 获取任务列表
- * @param array $where
- */
- public function getList(array $where = [])
- {
- [$page, $limit] = $this->getPageValue();
- $list = $this->dao->getList($where, $page, $limit);
- if ($list) {
- foreach ($list as &$v) {
- $v['finish_time'] = $v['finish_time'] ? date('Y-m-d H:i:s', $v['finish_time']) : "";
- $v['first_time'] = $v['first_time'] ? date('Y-m-d H:i:s', $v['first_time']) : "";
- $v['again_time'] = $v['again_time'] ? date('Y-m-d H:i:s', $v['again_time']) : "";
- $v['status_cn'] = $this->status_name[$v['status']] ?? '';
- $v['is_show_log'] = false;
- if (in_array($v['type'], [7, 8, 9, 10])) {
- $v['is_show_log'] = true;
- $v['is_error_button'] = $v['status'] == 2;
- }
- $v['type_cn'] = $this->queue_type_name[$v['type']] ?? '';
- $v['cache_type'] = $this->queue_redis_key[$v['type']] ?? 0;
- $v['success_num'] = bcsub($v['total_num'], $v['surplus_num'], 0);
- //是否显示停止按钮
- $v['is_stop_button'] = $v['status'] == 1;
- $v['add_time'] = date('Y-m-d H:i:s', $v['add_time']);
- }
- }
- $count = $this->dao->count($where);
- return compact('list', 'count');
- }
- /**
- * 将要执行的任务数据存入表中
- * @param array $where
- * @param string $field
- * @param array $data
- * @param int $type
- * @return mixed
- */
- public function setQueueData(array $where = [], $field = "*", array $data = [], int $type = 1, $other = false)
- {
- if (!$type) throw new ValidateException('缺少执行任务类型');
- $queue_redis_keys = $this->queue_redis_key;
- $redisKey = $queue_redis_keys[$type] ?? '';
- $queue_type_name = $this->queue_type_name;
- $queueName = $queue_type_name[$type] ?? '';
- if (!$redisKey || !$queueName) {
- throw new ValidateException('缺少队列缓存KEY,或者不存在此类型队列');
- }
- //检查同类型其他任务
- $this->checkTypeQueue($redisKey);
- $source = 'admin';
- if (in_array($type, [1, 2, 3, 4, 5, 6])) {
- $queueDataNum = $this->setRedisData($redisKey, $type, $data, $where, $field);
- if (!$queueDataNum) {
- throw new ValidateException('需要执行的批量数据为空');
- }
- if (!$id = $this->dao->addQueueList($queueName, $queueDataNum, $type, $redisKey, $source)) {
- throw new ValidateException('添加队列失败');
- }
- } else {
- if ($type == 7) {
- $ids = array_column($data, 0);
- } else {
- $ids = $data;
- }
- /** @var StoreOrderServices $orderService */
- $orderService = app()->make(StoreOrderServices::class);
- $oids = $orderService->getOrderDumpData($where, $ids, $field);
- $order_ids = [];
- if ($oids) {
- //过滤拼团未完成订单
- foreach ($oids as $order) {
- if (isset($order['pinkStatus']) && $order['pinkStatus'] != 2) {
- continue;
- }
- $order_ids[] = $order['id'];
- }
- }
- if (!$order_ids) {
- throw new ValidateException('暂无需要发货订单');
- }
- if (!$id = $this->dao->addQueueList($queueName, count($ids), $type, $redisKey, $source)) {
- throw new ValidateException('添加队列失败');
- }
- /** @var QueueAuxiliaryServices $auxiliaryService */
- $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
- $auxiliaryService->saveQueueOrderData($id, $order_ids, $data, $type, $redisKey);
- }
- return $id;
- }
- /**
- * 队列数据放入redis集合
- * @param $redisKey
- * @param $type
- * @param array $dataIds
- * @param array $where
- * @param string $filed
- * @return int
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function setRedisData($redisKey, $type, array $dataIds, array $where, $filed = "*")
- {
- if (!$redisKey || !$type) return 0;
- /** @var CacheService $redis */
- $redis = app()->make(CacheService::class);
- if ($dataIds) {
- foreach ($dataIds as $v) {
- $redis->sAdd($redisKey, $v);
- }
- } else {
- if ($where) {
- foreach ($where as $k => $v) {
- if (!$v) unset($where[$k]);
- }
- }
- switch ($type) {
- case 1://批量发放优惠券
- case 2://批量设置用户分组
- case 3://批量设置用户标签
- /** @var UserServices $userService */
- $userService = app()->make(UserServices::class);
- $dataInfo = $userService->getUserInfoList($where, $filed);
- if ($dataInfo) {
- foreach ($dataInfo as $k => $v) {
- $redis->sAdd($redisKey, $v['uid']);
- }
- }
- break;
- case 4://批量上下架商品
- $cateIds = [];
- if (isset($where['cate_id']) && $where['cate_id']) {
- /** @var StoreProductCategoryServices $storeCategory */
- $storeCategory = app()->make(StoreProductCategoryServices::class);
- $cateIds = $storeCategory->getColumn(['pid' => $where['cate_id']], 'id');
- }
- if ($cateIds) {
- $cateIds[] = $where['cate_id'];
- $where['cate_id'] = $cateIds;
- }
- /** @var StoreProductServices $productService */
- $productService = app()->make(StoreProductServices::class);
- $dataInfo = $productService->getProductListByWhere($where, $filed);
- if ($dataInfo) {
- foreach ($dataInfo as $k => $v) {
- $redis->sAdd($redisKey, $v['id']);
- }
- }
- break;
- case 5://批量删除商品规格
- /** @var StoreProductRuleServices $productRuleService */
- $productRuleService = app()->make(StoreProductRuleServices::class);
- $dataInfo = $productRuleService->getProductRuleList($where, $filed);
- if ($dataInfo) {
- foreach ($dataInfo as $k => $v) {
- $redis->sAdd($redisKey, $v['id']);
- }
- }
- break;
- case 6://批量删除用户已删除订单
- /** @var StoreOrderServices $orderService */
- $orderService = app()->make(StoreOrderServices::class);
- $dataInfo = $orderService->getOrderListByWhere($where, $filed);
- if ($dataInfo) {
- foreach ($dataInfo as $k => $v) {
- $redis->sAdd($redisKey, $v['id']);
- }
- }
- break;
- default:
- return 0;
- break;
- }
- }
- return $redis->sCard($redisKey);
- }
- /**
- * 获取队列redis中存的数据集合
- * @param string $redisKey
- * @param array $queueInfo
- * @return array
- */
- public function getQueueRedisdata($queueInfo, string $redisKey = '')
- {
- if (!$queueInfo) return [$redisKey, []];
- if (!$redisKey) {
- $redisKey = $queueInfo['execute_key'] ?? '';
- }
- if (!$redisKey) {
- return [$redisKey, []];
- }
- /** @var CacheService $redis */
- $redis = app()->make(CacheService::class);
- return [$redisKey, $redis->sMembers($redisKey)];
- }
- /**
- * 批量发送优惠券
- * @param $coupon
- * @param $type
- * @return bool
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function sendCoupon($coupon, $type)
- {
- if (!$type || !$coupon) return false;
- $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
- if (!$queueInfo) {
- return false;
- }
- //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
- $this->dao->setQueueDoing($coupon, $queueInfo['id']);
- [$redisKey, $uids] = $this->getQueueRedisdata($queueInfo);
- if ($uids) {
- $chunkUids = array_chunk($uids, 100, true);
- /** @var StoreCouponIssueServices $issueService */
- $issueService = app()->make(StoreCouponIssueServices::class);
- foreach ($chunkUids as $v) {
- $issueService->setCoupon($coupon, $v, $redisKey, $queueInfo);
- }
- }
- //发完后将队列置为完成
- $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
- return true;
- }
- /**
- * 批量设置用户分组
- * @param $groupId
- * @param $type
- * @return bool
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function setUserGroup($groupId, $type)
- {
- if (!$groupId || !$type) return false;
- $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
- if (!$queueInfo) {
- return false;
- }
- /** @var UserGroupServices $userGroup */
- $userGroup = app()->make(UserGroupServices::class);
- if (!$userGroup->getGroup($groupId)) {
- return false;
- }
- //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
- $this->dao->setQueueDoing($groupId, $queueInfo['id']);
- [$redisKey, $uids] = $this->getQueueRedisdata($queueInfo);
- if ($uids) {
- $chunkUids = array_chunk($uids, 1000, true);
- /** @var UserServices $userServices */
- $userServices = app()->make(UserServices::class);
- foreach ($chunkUids as $v) {
- //执行分组
- if (!$userServices->setUserGroup($v, $groupId)) {
- $this->setQueueFail($queueInfo['id'], $redisKey);
- } else {
- $this->doSuccessSremRedis($v, $redisKey, $type);
- }
- }
- }
- //发完后将队列置为完成
- $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
- return true;
- }
- /**
- * 批量设置用户标签
- * @param $labelId
- * @param $type
- * @return bool
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function setUserLabel($labelId, $type, $other = [])
- {
- if (!$labelId || !$type) return false;
- $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
- if (!$queueInfo) {
- return false;
- }
- /** @var UserLabelServices $userLabelServices */
- $userLabelServices = app()->make(UserLabelServices::class);
- $count = $userLabelServices->getCount([['id', 'IN', $labelId]]);
- if ($count != count($labelId)) {
- return false;
- }
- //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
- $this->dao->setQueueDoing($labelId, $queueInfo['id']);
- [$redisKey, $uids] = $this->getQueueRedisdata($queueInfo);
- if ($uids) {
- $chunkUids = array_chunk($uids, 1000, true);
- /** @var UserLabelRelationServices $services */
- $services = app()->make(UserLabelRelationServices::class);
- $store_id = (int)$other['store_id'] ?? 0;
- foreach ($chunkUids as $v) {
- if (!$services->setUserLable($v, $labelId, $store_id ? 1 : 0, $store_id)) {
- $this->setQueueFail($queueInfo['id'], $redisKey);
- } else {
- $this->doSuccessSremRedis($v, $redisKey, $type);
- }
- }
- }
- //发完后将队列置为完成
- $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
- return true;
- }
- /**
- * 商品批量上下架
- * @param string $upORdown
- * @param $type
- * @return bool
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function setProductShow($upORdown = "up", $type = 1)
- {
- if (!$type) return false;
- $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
- if (!$queueInfo) {
- return false;
- }
- //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
- $this->dao->setQueueDoing($upORdown, $queueInfo['id']);
- [$redisKey, $pids] = $this->getQueueRedisdata($queueInfo);
- if ($pids) {
- $chunkPids = array_chunk($pids, 1000, true);
- $isShow = 0;
- if ($upORdown == 'up') $isShow = 1;
- /** @var StoreProductServices $storeproductServices */
- $storeproductServices = app()->make(StoreProductServices::class);
- /** @var StoreProductRelationServices $storeProductRelationServices */
- $storeProductRelationServices = app()->make(StoreProductRelationServices::class);
- /** @var StoreCartServices $cartService */
- $cartService = app()->make(StoreCartServices::class);
- $update = ['is_show' => $isShow];
- if ($isShow) {//手动上架 清空定时下架状态
- $update['auto_off_time'] = 0;
- }
- foreach ($chunkPids as $v) {
- //商品
- $res = $storeproductServices->batchUpdate($v, $update);
- //门店商品
- $storeproductServices->batchUpdateAppendWhere($v, $update, ['type' => 1], 'pid');
- if ($isShow == 0) {
- $storeProductRelationServices->setShow($v, (int)$isShow);
- //购物车
- $cartService->batchUpdate($v, ['status' => 1], 'product_id');
- }
- //下架检测是否有参与活动商品
- try {
- $is_activity = $storeproductServices->checkActivity($v);
- } catch (\Throwable $e) {
- $is_activity = false;
- }
- if ($isShow == 0 || $is_activity) {
- //改变购物车中状态
- $storeProductRelationServices->setShow($v, (int)$isShow);
- }
- if (!$res) {
- $this->setQueueFail($queueInfo['id'], $redisKey);
- } else {
- $this->doSuccessSremRedis($v, $redisKey, $type);
- }
- }
- }
- //发完后将队列置为完成
- $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
- return true;
- }
- /**
- * 批量队列删除商品规格
- * @param $type
- * @return bool
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function delProductRule($type)
- {
- if (!$type) return false;
- $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
- if (!$queueInfo) {
- return false;
- }
- //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
- $this->dao->setQueueDoing('', $queueInfo['id']);
- [$redisKey, $pids] = $this->getQueueRedisdata($queueInfo);
- if ($pids) {
- $chunkPids = array_chunk($pids, 1000, true);
- /** @var StoreProductRuleServices $storeProductRuleservices */
- $storeProductRuleservices = app()->make(StoreProductRuleServices::class);
- foreach ($chunkPids as $v) {
- $res = $storeProductRuleservices->del(implode(',', $v));
- if ($res) {
- $this->doSuccessSremRedis($v, $redisKey, $queueInfo['type']);
- } else {
- $this->addQueueFail($queueInfo['id'], $redisKey);
- }
- }
- }
- //发完后将队列置为完成
- $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
- return true;
- }
- /**
- * 批量队列删除订单
- * @param $type
- * @return bool
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function delOrder($type)
- {
- if (!$type) return false;
- $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
- if (!$queueInfo) {
- return false;
- }
- //把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
- $this->dao->setQueueDoing('', $queueInfo['id']);
- [$redisKey, $pids] = $this->getQueueRedisdata($queueInfo);
- if ($pids) {
- $chunkPids = array_chunk($pids, 1000, true);
- /** @var StoreOrderServices $storeOrderServices */
- $storeOrderServices = app()->make(StoreOrderServices::class);
- foreach ($chunkPids as $v) {
- $res = $storeOrderServices->batchUpdateOrder($v, ['is_system_del' => 1]);
- if ($res) {
- $this->doSuccessSremRedis($v, $redisKey, $type);
- } else {
- $this->setQueueFail($queueInfo['id'], $redisKey);
- }
- }
- }
- //发完后将队列置为完成
- $this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
- return true;
- }
- /**
- * 队列批量发货
- * @param $oid
- * @param array $deliveryData
- * @return bool
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function orderDelivery($oid, array $deliveryData)
- {
- if (!$oid) return false;
- if (!isset($deliveryData['queueType'])) {
- return false;
- }
- /** @var QueueAuxiliaryServices $auxiliaryService */
- $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
- //看是否能查到任务数据
- $auxiliaryInfo = $auxiliaryService->getOrderCacheOne(['binding_id' => $deliveryData['queueId'], 'relation_id' => $oid, 'type' => $deliveryData['cacheType']]);
- if (!$auxiliaryInfo || !$auxiliaryInfo['other']) {
- return false;
- }
- $deliveryInfo = json_decode($auxiliaryInfo['other'], true);
- if ($deliveryData['queueType'] == 7) {
- if (!$deliveryInfo['delivery_name'] || !$deliveryInfo['delivery_code'] || !$deliveryInfo['delivery_id']) {
- return false;
- }
- $deliveryData['express_record_type'] = 1;
- $deliveryData['delivery_name'] = $deliveryInfo['delivery_name'];
- $deliveryData['delivery_id'] = $deliveryInfo['delivery_id'];
- $deliveryData['delivery_code'] = $deliveryInfo['delivery_code'];
- }
- try {
- /** @var StoreOrderDeliveryServices $storeOrderDelivery */
- $storeOrderDelivery = app()->make(StoreOrderDeliveryServices::class);
- //发货
- $storeOrderDelivery->delivery($oid, $deliveryData);
- } catch (\Throwable $e) {
- Log::error('队列发货失败发货,order_id:' . $oid . ',原因:' . $e->getMessage());
- }
- //更改队列子集数据
- $this->doSuccessSremRedis(['order_id' => $oid], $deliveryData['queueId'], $deliveryData['queueType'], ['phone_message' => 1, 'status' => 1]);
- //队列置为完成
- return $this->setQueueSuccess($deliveryData['queueId'], $deliveryData['queueType']);
- }
- /**
- * 添加任务前校验同类型任务状态
- * @param $type
- * @param array $queueInfo
- * @param false $is_again
- * @return bool
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function checkTypeQueue($type, array $queueInfo = [], bool $is_again = false)
- {
- if (!$type) return false;
- if (!$queueInfo) {
- $queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => [0, 1]]);
- }
- if (!$queueInfo) {
- return false;
- }
- $num = 0;
- if (in_array($type, [7, 8, 9, 10])) {
- /** @var QueueAuxiliaryServices $auxiliaryService */
- $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
- $num = $auxiliaryService->count(['binding_id' => $queueInfo['id'], 'status' => 0]);
- } else {
- if ($queueInfo['execute_key']) {
- /** @var CacheService $redis */
- $redis = app()->make(CacheService::class);
- $num = $redis->sCard($queueInfo['execute_key']);
- }
- }
- if ($num) {
- if (!$is_again) {
- if ($queueInfo['status'] == 0) {
- throw new AdminException('上次批量任务尚未执行,请前往任务列表手动执行');
- }
- if ($queueInfo['status'] == 1) {
- throw new AdminException('有正在执行中的任务,请耐心等待,若长时间无反应,前往任务列表修复异常数据,再手动执行');
- }
- }
- } else {
- $this->delWrongQueue(0, $type);
- return false;
- }
- return true;
- }
- /**
- * 修复异常任务
- * @param $queueInfo
- * @return bool|mixed
- */
- public function repairWrongQueue($queueInfo)
- {
- if (!$queueInfo) throw new AdminException('任务不存在');
- try {
- switch ($queueInfo['type']) {
- case 1://批量发放优惠券
- case 2://批量设置用户分组
- case 3://批量设置用户标签
- case 4://批量上下架商品
- case 5://批量删除商品规格
- case 6://批量删除用户已删除订单
- if (!$queueInfo['execute_key']) {
- throw new AdminException('缓存key缺失,请清除数据');
- }
- /** @var CacheService $redis */
- $redis = app()->make(CacheService::class);
- $cacheNum = $redis->sCard($queueInfo['execute_key']);
- if ($cacheNum != $queueInfo['surplus_num']) {
- return $this->dao->update(['id' => $queueInfo['id']], ['surplus_num' => $cacheNum]);
- }
- break;
- case 7://手动发货
- case 8://电子面单发货
- case 9://批量配送
- case 10://批量虚拟发货
- /** @var QueueAuxiliaryServices $auxiliaryService */
- $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
- $cacheType = $this->queue_redis_key[$queueInfo['type']] ?? '';
- $cacheFailAndNoNum = $auxiliaryService->getCountOrder(['binding_id' => $queueInfo['id'], 'type' => $cacheType, 'status' => [0, 2]]);
- $cacheTotalNum = $auxiliaryService->getCountOrder(['binding_id' => $queueInfo['id'], 'type' => $cacheType, 'status' => [0, 1, 2]]);
- //如果任务已经执行完毕,但是记录却存在未执行数据,要进行修复,让其重新执行
- if ($cacheFailAndNoNum && $queueInfo['status'] == 2) return $this->dao->update(['id' => $queueInfo['id']], ['status' => 3, 'surplus_num' => $cacheFailAndNoNum, 'total_num' => $cacheTotalNum]);
- //如果执行失败,记录全部执行成功,那么进行修复
- if (!$cacheFailAndNoNum && $queueInfo['status'] != 2) return $this->dao->update(['id' => $queueInfo['id']], ['status' => 2, 'surplus_num' => 0, 'total_num' => $cacheTotalNum]);
- }
- return true;
- } catch (\Exception $e) {
- throw new AdminException($e->getMessage());
- }
- }
- /**
- * 队列再次执行
- * @param $queueId
- * @param $type
- * @return bool
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function againDoQueue($queueId, $type)
- {
- $queueInfo = $this->getQueueOne(['id' => $queueId, 'type' => $type]);
- if (!$queueInfo) {
- throw new AdminException('队列任务不存在');
- }
- if (!$queueInfo['queue_in_value']) {
- throw new AdminException('队列关键数据缺失,请清除此任务及异常数据');
- }
- if ($queueInfo['status'] == 2) {
- throw new AdminException('队列已完成');
- }
- if ($queueInfo['status'] == 3) {
- throw new AdminException('队列异常,请清除队列重新加入');
- }
- if ($queueInfo['status'] == 4) {
- throw new AdminException('队列已删除');
- }
- //检测当前队列
- if (!$this->checkTypeQueue($type, $queueInfo, true)) {
- throw new AdminException('任务已清除,无需再次执行');
- }
- //先进行数据修复
- $this->repairWrongQueue($queueInfo);
- if (in_array($type, [7, 8, 9, 10])) {
- $queueInValue = json_decode($queueInfo['queue_in_value'], true);
- /** @var StoreOrderServices $storeOrderService */
- $storeOrderService = app()->make(StoreOrderServices::class);
- $storeOrderService->adminQueueOrderDo($queueInValue, true);
- } else {
- $queueInValue = $queueInfo['queue_in_value'];
- if ($type == 1) {
- $queueInValue = json_decode($queueInfo['queue_in_value'], true);
- }
- //加入队列
- BatchHandleJob::dispatch([$queueInValue, $type]);
- }
- return true;
- }
- /**
- * 任务执行失败,修改队列状态
- * @param $queueId
- * @param string $redisKey
- * @return mixed
- */
- public function setQueueFail($queueId, $redisKey = '')
- {
- if ($redisKey) {
- /** @var CacheService $cacheService */
- $cacheService = app()->make(CacheService::class);
- $surplusNum = $cacheService->sCard($redisKey);
- } else {
- /** @var QueueAuxiliaryServices $auxiliaryService */
- $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
- $surplusNum = $auxiliaryService->count(['binding_id' => $queueId, 'status' => 0]);
- }
- return $this->dao->update(['id' => $queueId], ['status' => 3, 'surplus_num' => $surplusNum]);
- }
- /**
- * 将执行成功数据移除redis集合
- * @param array $data
- * @param $redisKey
- * @return bool
- */
- public function doSuccessSremRedis(array $data, $redisKey, $type, array $otherData = [])
- {
- if (!$data || !$redisKey || !$type) return true;
- if (in_array($type, [7, 8, 9, 10])) {
- $where['relation_id'] = $data['order_id'];
- $where['binding_id'] = $redisKey;
- /** @var QueueAuxiliaryServices $auxiliaryService */
- $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
- $getOne = $auxiliaryService->getOrderCacheOne($where);
- if (!$getOne) return false;
- $other = json_decode($getOne['other'], true);
- if (isset($otherData['delivery_status'])) $other['delivery_status'] = $otherData['delivery_status'];
- if (isset($otherData['wx_message'])) $other['wx_message'] = $otherData['wx_message'];
- if (isset($otherData['phone_message'])) $other['phone_message'] = $otherData['phone_message'];
- if (isset($otherData['error_info'])) $other['error_info'] = $otherData['error_info'];
- $updateData['status'] = isset($otherData['status']) ? $otherData['status'] : 0;
- $updateData['other'] = json_encode($other);
- $updateData['update_time'] = time();
- $auxiliaryService->updateOrderStatus($where, $updateData);
- } else {//在redis缓存集合的从集合删除
- /** @var CacheService $redis */
- $redis = app()->make(CacheService::class);
- foreach ($data as $k => $v) {
- $redis->sRem($redisKey, $v);
- }
- }
- return true;
- }
- /**
- * 任务执行成功
- * @param $queueId
- * @param $type
- * @return bool|mixed
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function setQueueSuccess($queueId, $type)
- {
- if (!$queueId || !$type) return false;
- $queueInfo = $this->dao->get($queueId);
- if (!$queueInfo) return false;
- $res = true;
- if (in_array($type, [7, 8, 9, 10])) {
- $res = false;
- if ($queueInfo['surplus_num'] > 0) {
- $this->dao->bcDec($queueId, 'surplus_num', 1);
- }
- //看是否全部执行成功
- $queueInfo = $this->dao->get($queueId);
- if ($queueInfo['surplus_num'] == 0) {
- $res = true;
- }
- }
- if ($res) {
- $update = [
- 'status' => 2,
- 'finish_time' => time(),
- 'surplus_num' => 0
- ];
- return $this->dao->update(['id' => $queueId], $update);
- }
- return true;
- }
- /**
- * 清除异常队列
- * @param $queueId
- * @param $type
- * @return bool
- * @throws \think\db\exception\DataNotFoundException
- * @throws \think\db\exception\DbException
- * @throws \think\db\exception\ModelNotFoundException
- */
- public function delWrongQueue($queueId, $type, $is_del = true)
- {
- if (!$type) return false;
- if ($queueId) {
- $queueInfo = $this->dao->getQueueOne(['id' => $queueId, 'type' => $type]);
- } else {
- $queueInfo = $this->dao->getQueueOne(['type' => $type]);
- }
- if (!$queueInfo) {
- return true;
- }
- try {
- $data = ['status' => 3];
- if ($is_del) {
- if (in_array($type, [7, 8, 9, 10])) {
- /** @var QueueAuxiliaryServices $auxiliaryService */
- $auxiliaryService = app()->make(QueueAuxiliaryServices::class);
- $auxiliaryService->batchUpdate(['binding_id' => $queueInfo['id']], ['status' => 3]);
- } else {
- if ($queueInfo['execute_key']) {
- /** @var CacheService $redis */
- $redis = app()->make(CacheService::class);
- $redis->del($queueInfo['execute_key']);
- }
- }
- $data = ['is_del' => 1, 'status' => 4];
- }
- $this->dao->update(['id' => $queueInfo['id']], $data);
- } catch (\Throwable $e) {
- Log::error('清除异常队列失败,原因' . $e->getMessage());
- }
- return true;
- }
- }
|