ResumeUploader.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. <?php
  2. namespace Qiniu\Storage;
  3. use Qiniu\Config;
  4. use Qiniu\Http\Client;
  5. use Qiniu\Http\Error;
  6. use Qiniu\Enum\SplitUploadVersion;
  7. use Qiniu\Http\RequestOptions;
  8. /**
  9. * 断点续上传类, 该类主要实现了断点续上传中的分块上传,
  10. * 以及相应地创建块和创建文件过程.
  11. *
  12. * @link http://developer.qiniu.com/docs/v6/api/reference/up/mkblk.html
  13. * @link http://developer.qiniu.com/docs/v6/api/reference/up/mkfile.html
  14. */
  15. final class ResumeUploader
  16. {
  17. private $upToken;
  18. private $key;
  19. private $inputStream;
  20. private $size;
  21. private $params;
  22. private $mime;
  23. private $contexts;
  24. private $finishedEtags;
  25. private $host;
  26. private $bucket;
  27. private $currentUrl;
  28. private $config;
  29. private $resumeRecordFile;
  30. private $version;
  31. private $partSize;
  32. /**
  33. * @var RequestOptions
  34. */
  35. private $reqOpt;
  36. /**
  37. * 上传二进制流到七牛
  38. *
  39. * @param string $upToken 上传凭证
  40. * @param string $key 上传文件名
  41. * @param string $inputStream 上传二进制流
  42. * @param string $size 上传流的大小
  43. * @param string $params 自定义变量
  44. * @param string $mime 上传数据的mimeType
  45. * @param Config $config
  46. * @param string $resumeRecordFile 断点续传的已上传的部分信息记录文件
  47. * @param string $version 分片上传版本 目前支持v1/v2版本 默认v1
  48. * @param int $partSize 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
  49. * @param RequestOptions $reqOpt 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
  50. * @link http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
  51. *
  52. * @throws \Exception
  53. */
  54. public function __construct(
  55. $upToken,
  56. $key,
  57. $inputStream,
  58. $size,
  59. $params,
  60. $mime,
  61. $config,
  62. $resumeRecordFile = null,
  63. $version = 'v1',
  64. $partSize = config::BLOCK_SIZE,
  65. $reqOpt = null
  66. ) {
  67. $this->upToken = $upToken;
  68. $this->key = $key;
  69. $this->inputStream = $inputStream;
  70. $this->size = $size;
  71. $this->params = $params;
  72. $this->mime = $mime;
  73. $this->contexts = array();
  74. $this->finishedEtags = array("etags" => array(), "uploadId" => "", "expiredAt" => 0, "uploaded" => 0);
  75. $this->config = $config;
  76. $this->resumeRecordFile = $resumeRecordFile ? $resumeRecordFile : null;
  77. $this->partSize = $partSize ? $partSize : config::BLOCK_SIZE;
  78. if ($reqOpt === null) {
  79. $reqOpt = new RequestOptions();
  80. }
  81. $this->reqOpt = $reqOpt;
  82. try {
  83. $this->version = SplitUploadVersion::from($version ? $version : 'v1');
  84. } catch (\Exception $e) {
  85. throw new \Exception("only support v1/v2 now!", 0, $e);
  86. }
  87. list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($upToken);
  88. $this->bucket = $bucket;
  89. if ($err != null) {
  90. return array(null, $err);
  91. }
  92. list($upHost, $err) = $config->getUpHostV2($accessKey, $bucket);
  93. if ($err != null) {
  94. throw new \Exception($err->message(), 1);
  95. }
  96. $this->host = $upHost;
  97. }
  98. /**
  99. * 上传操作
  100. */
  101. public function upload($fname)
  102. {
  103. $uploaded = 0;
  104. if ($this->version == SplitUploadVersion::V2) {
  105. $partNumber = 1;
  106. $encodedObjectName = $this->key ? \Qiniu\base64_urlSafeEncode($this->key) : '~';
  107. };
  108. // get upload record from resumeRecordFile
  109. if ($this->resumeRecordFile != null) {
  110. $blkputRets = null;
  111. if (file_exists($this->resumeRecordFile)) {
  112. $stream = fopen($this->resumeRecordFile, 'r');
  113. if ($stream) {
  114. $streamLen = filesize($this->resumeRecordFile);
  115. if ($streamLen > 0) {
  116. $contents = fread($stream, $streamLen);
  117. fclose($stream);
  118. if ($contents) {
  119. $blkputRets = json_decode($contents, true);
  120. if ($blkputRets === null) {
  121. error_log("resumeFile contents decode error");
  122. }
  123. } else {
  124. error_log("read resumeFile failed");
  125. }
  126. } else {
  127. error_log("resumeFile is empty");
  128. }
  129. } else {
  130. error_log("resumeFile open failed");
  131. }
  132. } else {
  133. error_log("resumeFile not exists");
  134. }
  135. if ($blkputRets) {
  136. if ($this->version == SplitUploadVersion::V1) {
  137. if (isset($blkputRets['contexts']) && isset($blkputRets['uploaded']) &&
  138. is_array($blkputRets['contexts']) && is_int($blkputRets['uploaded'])) {
  139. $this->contexts = $blkputRets['contexts'];
  140. $uploaded = $blkputRets['uploaded'];
  141. }
  142. } elseif ($this->version == SplitUploadVersion::V2) {
  143. if (isset($blkputRets["etags"]) && isset($blkputRets["uploadId"]) &&
  144. isset($blkputRets["expiredAt"]) && $blkputRets["expiredAt"] > time()
  145. && $blkputRets["uploaded"] > 0 && is_array($blkputRets["etags"]) &&
  146. is_string($blkputRets["uploadId"]) && is_int($blkputRets["expiredAt"])) {
  147. $this->finishedEtags['etags'] = $blkputRets["etags"];
  148. $this->finishedEtags["uploadId"] = $blkputRets["uploadId"];
  149. $this->finishedEtags["expiredAt"] = $blkputRets["expiredAt"];
  150. $this->finishedEtags["uploaded"] = $blkputRets["uploaded"];
  151. $uploaded = $blkputRets["uploaded"];
  152. $partNumber = count($this->finishedEtags["etags"]) + 1;
  153. } else {
  154. $this->makeInitReq($encodedObjectName);
  155. }
  156. } else {
  157. throw new \Exception("only support v1/v2 now!");
  158. }
  159. } else {
  160. if ($this->version == SplitUploadVersion::V2) {
  161. $this->makeInitReq($encodedObjectName);
  162. }
  163. }
  164. } else {
  165. // init a Multipart Upload task if choose v2
  166. if ($this->version == SplitUploadVersion::V2) {
  167. $this->makeInitReq($encodedObjectName);
  168. }
  169. }
  170. while ($uploaded < $this->size) {
  171. $blockSize = $this->blockSize($uploaded);
  172. $data = fread($this->inputStream, $blockSize);
  173. if ($data === false) {
  174. throw new \Exception("file read failed", 1);
  175. }
  176. if ($this->version == SplitUploadVersion::V1) {
  177. $crc = \Qiniu\crc32_data($data);
  178. $response = $this->makeBlock($data, $blockSize);
  179. } elseif ($this->version == SplitUploadVersion::V2) {
  180. $md5 = md5($data);
  181. $response = $this->uploadPart(
  182. $data,
  183. $partNumber,
  184. $this->finishedEtags["uploadId"],
  185. $encodedObjectName,
  186. $md5
  187. );
  188. } else {
  189. throw new \Exception("only support v1/v2 now!");
  190. }
  191. $ret = null;
  192. if ($response->ok() && $response->json() != null) {
  193. $ret = $response->json();
  194. }
  195. if ($response->statusCode < 0) {
  196. list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($this->upToken);
  197. if ($err != null) {
  198. return array(null, $err);
  199. }
  200. list($upHostBackup, $err) = $this->config->getUpBackupHostV2($accessKey, $bucket);
  201. if ($err != null) {
  202. return array(null, $err);
  203. }
  204. $this->host = $upHostBackup;
  205. }
  206. if ($this->version == SplitUploadVersion::V1) {
  207. if ($response->needRetry() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
  208. $response = $this->makeBlock($data, $blockSize);
  209. $ret = $response->json();
  210. }
  211. if (!$response->ok() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
  212. return array(null, new Error($this->currentUrl, $response));
  213. }
  214. array_push($this->contexts, $ret['ctx']);
  215. } elseif ($this->version == SplitUploadVersion::V2) {
  216. if ($response->needRetry() || !isset($ret['md5']) || $md5 != $ret['md5']) {
  217. $response = $this->uploadPart(
  218. $data,
  219. $partNumber,
  220. $this->finishedEtags["uploadId"],
  221. $encodedObjectName,
  222. $md5
  223. );
  224. $ret = $response->json();
  225. }
  226. if (!$response->ok() || !isset($ret['md5']) || $md5 != $ret['md5']) {
  227. return array(null, new Error($this->currentUrl, $response));
  228. }
  229. $blockStatus = array('etag' => $ret['etag'], 'partNumber' => $partNumber);
  230. array_push($this->finishedEtags['etags'], $blockStatus);
  231. $partNumber += 1;
  232. } else {
  233. throw new \Exception("only support v1/v2 now!");
  234. }
  235. $uploaded += $blockSize;
  236. if ($this->version == SplitUploadVersion::V2) {
  237. $this->finishedEtags['uploaded'] = $uploaded;
  238. }
  239. if ($this->resumeRecordFile !== null) {
  240. if ($this->version == SplitUploadVersion::V1) {
  241. $recordData = array(
  242. 'contexts' => $this->contexts,
  243. 'uploaded' => $uploaded
  244. );
  245. $recordData = json_encode($recordData);
  246. } elseif ($this->version == SplitUploadVersion::V2) {
  247. $recordData = json_encode($this->finishedEtags);
  248. } else {
  249. throw new \Exception("only support v1/v2 now!");
  250. }
  251. if ($recordData) {
  252. $isWritten = file_put_contents($this->resumeRecordFile, $recordData);
  253. if ($isWritten === false) {
  254. error_log("write resumeRecordFile failed");
  255. }
  256. } else {
  257. error_log('resumeRecordData encode failed');
  258. }
  259. }
  260. }
  261. if ($this->version == SplitUploadVersion::V1) {
  262. return $this->makeFile($fname);
  263. } elseif ($this->version == SplitUploadVersion::V2) {
  264. return $this->completeParts($fname, $this->finishedEtags['uploadId'], $encodedObjectName);
  265. } else {
  266. throw new \Exception("only support v1/v2 now!");
  267. }
  268. }
  269. /**
  270. * 创建块
  271. */
  272. private function makeBlock($block, $blockSize)
  273. {
  274. $url = $this->host . '/mkblk/' . $blockSize;
  275. return $this->post($url, $block);
  276. }
  277. private function fileUrl($fname)
  278. {
  279. $url = $this->host . '/mkfile/' . $this->size;
  280. $url .= '/mimeType/' . \Qiniu\base64_urlSafeEncode($this->mime);
  281. if ($this->key != null) {
  282. $url .= '/key/' . \Qiniu\base64_urlSafeEncode($this->key);
  283. }
  284. $url .= '/fname/' . \Qiniu\base64_urlSafeEncode($fname);
  285. if (!empty($this->params)) {
  286. foreach ($this->params as $key => $value) {
  287. $val = \Qiniu\base64_urlSafeEncode($value);
  288. $url .= "/$key/$val";
  289. }
  290. }
  291. return $url;
  292. }
  293. /**
  294. * 创建文件
  295. */
  296. private function makeFile($fname)
  297. {
  298. $url = $this->fileUrl($fname);
  299. $body = implode(',', $this->contexts);
  300. $response = $this->post($url, $body);
  301. if ($response->needRetry()) {
  302. $response = $this->post($url, $body);
  303. }
  304. if (!$response->ok()) {
  305. return array(null, new Error($this->currentUrl, $response));
  306. }
  307. return array($response->json(), null);
  308. }
  309. private function post($url, $data)
  310. {
  311. $this->currentUrl = $url;
  312. $headers = array('Authorization' => 'UpToken ' . $this->upToken);
  313. return Client::post($url, $data, $headers, $this->reqOpt);
  314. }
  315. private function blockSize($uploaded)
  316. {
  317. if ($this->size < $uploaded + $this->partSize) {
  318. return $this->size - $uploaded;
  319. }
  320. return $this->partSize;
  321. }
  322. private function makeInitReq($encodedObjectName)
  323. {
  324. $res = $this->initReq($encodedObjectName);
  325. $this->finishedEtags["uploadId"] = $res['uploadId'];
  326. $this->finishedEtags["expiredAt"] = $res['expireAt'];
  327. }
  328. /**
  329. * 初始化上传任务
  330. */
  331. private function initReq($encodedObjectName)
  332. {
  333. $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName . '/uploads';
  334. $headers = array(
  335. 'Authorization' => 'UpToken ' . $this->upToken,
  336. 'Content-Type' => 'application/json'
  337. );
  338. $response = $this->postWithHeaders($url, null, $headers);
  339. return $response->json();
  340. }
  341. /**
  342. * 分块上传v2
  343. */
  344. private function uploadPart($block, $partNumber, $uploadId, $encodedObjectName, $md5)
  345. {
  346. $headers = array(
  347. 'Authorization' => 'UpToken ' . $this->upToken,
  348. 'Content-Type' => 'application/octet-stream',
  349. 'Content-MD5' => $md5
  350. );
  351. $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName .
  352. '/uploads/' . $uploadId . '/' . $partNumber;
  353. $response = $this->put($url, $block, $headers);
  354. return $response;
  355. }
  356. private function completeParts($fname, $uploadId, $encodedObjectName)
  357. {
  358. $headers = array(
  359. 'Authorization' => 'UpToken ' . $this->upToken,
  360. 'Content-Type' => 'application/json'
  361. );
  362. $etags = $this->finishedEtags['etags'];
  363. $sortedEtags = \Qiniu\arraySort($etags, 'partNumber');
  364. $metadata = array();
  365. $customVars = array();
  366. if ($this->params) {
  367. foreach ($this->params as $k => $v) {
  368. if (strpos($k, 'x:') === 0) {
  369. $customVars[$k] = $v;
  370. } elseif (strpos($k, 'x-qn-meta-') === 0) {
  371. $metadata[$k] = $v;
  372. }
  373. }
  374. }
  375. if (empty($metadata)) {
  376. $metadata = null;
  377. }
  378. if (empty($customVars)) {
  379. $customVars = null;
  380. }
  381. $body = array(
  382. 'fname' => $fname,
  383. 'mimeType' => $this->mime,
  384. 'metadata' => $metadata,
  385. 'customVars' => $customVars,
  386. 'parts' => $sortedEtags
  387. );
  388. $jsonBody = json_encode($body);
  389. $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName . '/uploads/' . $uploadId;
  390. $response = $this->postWithHeaders($url, $jsonBody, $headers);
  391. if ($response->needRetry()) {
  392. $response = $this->postWithHeaders($url, $jsonBody, $headers);
  393. }
  394. if (!$response->ok()) {
  395. return array(null, new Error($this->currentUrl, $response));
  396. }
  397. return array($response->json(), null);
  398. }
  399. private function put($url, $data, $headers)
  400. {
  401. $this->currentUrl = $url;
  402. return Client::put($url, $data, $headers, $this->reqOpt);
  403. }
  404. private function postWithHeaders($url, $data, $headers)
  405. {
  406. $this->currentUrl = $url;
  407. return Client::post($url, $data, $headers, $this->reqOpt);
  408. }
  409. }