123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- // var debug = require('debug')('ali-oss:multipart');
- const util = require('util');
- const path = require('path');
- const mime = require('mime');
- const copy = require('copy-to');
- const { isBlob } = require('../common/utils/isBlob');
- const { isFile } = require('../common/utils/isFile');
- const { isArray } = require('../common/utils/isArray');
- const { isBuffer } = require('../common/utils/isBuffer');
- const proto = exports;
- /**
- * Multipart operations
- */
- /**
- * Upload a file to OSS using multipart uploads
- * @param {String} name
- * @param {String|File|Buffer} file
- * @param {Object} options
- * {Object} options.callback The callback parameter is composed of a JSON string encoded in Base64
- * {String} options.callback.url the OSS sends a callback request to this URL
- * {String} options.callback.host The host header value for initiating callback requests
- * {String} options.callback.body The value of the request body when a callback is initiated
- * {String} options.callback.contentType The Content-Type of the callback requests initiatiated
- * {Object} options.callback.customValue Custom parameters are a map of key-values, e.g:
- * customValue = {
- * key1: 'value1',
- * key2: 'value2'
- * }
- */
- proto.multipartUpload = async function multipartUpload(name, file, options = {}) {
- this.resetCancelFlag();
- if (options.checkpoint && options.checkpoint.uploadId) {
- if (file && isFile(file)) options.checkpoint.file = file;
- return await this._resumeMultipart(options.checkpoint, options);
- }
- const minPartSize = 100 * 1024;
- if (!options.mime) {
- if (isFile(file)) {
- options.mime = mime.getType(path.extname(file.name));
- } else if (isBlob(file)) {
- options.mime = file.type;
- } else if (isBuffer(file)) {
- options.mime = '';
- } else {
- options.mime = mime.getType(path.extname(file));
- }
- }
- options.headers = options.headers || {};
- this._convertMetaToHeaders(options.meta, options.headers);
- const fileSize = await this._getFileSize(file);
- if (fileSize < minPartSize) {
- options.contentLength = fileSize;
- const result = await this.put(name, file, options);
- if (options && options.progress) {
- await options.progress(1);
- }
- const ret = {
- res: result.res,
- bucket: this.options.bucket,
- name,
- etag: result.res.headers.etag
- };
- if ((options.headers && options.headers['x-oss-callback']) || options.callback) {
- ret.data = result.data;
- }
- return ret;
- }
- if (options.partSize && !(parseInt(options.partSize, 10) === options.partSize)) {
- throw new Error('partSize must be int number');
- }
- if (options.partSize && options.partSize < minPartSize) {
- throw new Error(`partSize must not be smaller than ${minPartSize}`);
- }
- const initResult = await this.initMultipartUpload(name, options);
- const { uploadId } = initResult;
- const partSize = this._getPartSize(fileSize, options.partSize);
- const checkpoint = {
- file,
- name,
- fileSize,
- partSize,
- uploadId,
- doneParts: []
- };
- if (options && options.progress) {
- await options.progress(0, checkpoint, initResult.res);
- }
- return await this._resumeMultipart(checkpoint, options);
- };
- /*
- * Resume multipart upload from checkpoint. The checkpoint will be
- * updated after each successful part upload.
- * @param {Object} checkpoint the checkpoint
- * @param {Object} options
- */
- proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) {
- const that = this;
- if (this.isCancel()) {
- throw this._makeCancelEvent();
- }
- const {
- file, fileSize, partSize, uploadId, doneParts, name
- } = checkpoint;
- const internalDoneParts = [];
- if (doneParts.length > 0) {
- copy(doneParts).to(internalDoneParts);
- }
- const partOffs = this._divideParts(fileSize, partSize);
- const numParts = partOffs.length;
- let multipartFinish = false;
- let uploadPartJob = function uploadPartJob(self, partNo) {
- // eslint-disable-next-line no-async-promise-executor
- return new Promise(async (resolve, reject) => {
- try {
- if (!self.isCancel()) {
- const pi = partOffs[partNo - 1];
- const stream = self._createStream(file, pi.start, pi.end);
- const data = {
- stream,
- size: pi.end - pi.start
- };
- if (isArray(self.multipartUploadStreams)) {
- self.multipartUploadStreams.push(stream);
- } else {
- self.multipartUploadStreams = [stream];
- }
- const removeStreamFromMultipartUploadStreams = function () {
- if (!stream.destroyed) {
- stream.destroy();
- }
- const index = self.multipartUploadStreams.indexOf(stream);
- if (index !== -1) {
- self.multipartUploadStreams.splice(index, 1);
- }
- };
- stream.on('close', removeStreamFromMultipartUploadStreams);
- stream.on('end', removeStreamFromMultipartUploadStreams);
- stream.on('error', removeStreamFromMultipartUploadStreams);
- let result;
- try {
- result = await self._uploadPart(name, uploadId, partNo, data);
- } catch (error) {
- removeStreamFromMultipartUploadStreams();
- if (error.status === 404) {
- throw self._makeAbortEvent();
- }
- throw error;
- }
- if (!self.isCancel() && !multipartFinish) {
- checkpoint.doneParts.push({
- number: partNo,
- etag: result.res.headers.etag
- });
- if (options.progress) {
- await options.progress(doneParts.length / numParts, checkpoint, result.res);
- }
- resolve({
- number: partNo,
- etag: result.res.headers.etag
- });
- } else {
- resolve();
- }
- } else {
- resolve();
- }
- } catch (err) {
- const tempErr = new Error();
- tempErr.name = err.name;
- tempErr.message = err.message;
- tempErr.stack = err.stack;
- tempErr.partNum = partNo;
- copy(err).to(tempErr);
- reject(tempErr);
- }
- });
- };
- const all = Array.from(new Array(numParts), (x, i) => i + 1);
- const done = internalDoneParts.map(p => p.number);
- const todo = all.filter(p => done.indexOf(p) < 0);
- const defaultParallel = 5;
- const parallel = options.parallel || defaultParallel;
- // upload in parallel
- const jobErr = await this._parallel(todo, parallel, value => new Promise((resolve, reject) => {
- uploadPartJob(that, value).then((result) => {
- if (result) {
- internalDoneParts.push(result);
- }
- resolve();
- }).catch((err) => {
- reject(err);
- });
- }));
- multipartFinish = true;
- const abortEvent = jobErr.find(err => err.name === 'abort');
- if (abortEvent) throw abortEvent;
- if (this.isCancel()) {
- uploadPartJob = null;
- throw this._makeCancelEvent();
- }
- if (jobErr && jobErr.length > 0) {
- jobErr[0].message = `Failed to upload some parts with error: ${jobErr[0].toString()} part_num: ${jobErr[0].partNum}`;
- throw jobErr[0];
- }
- return await this.completeMultipartUpload(name, uploadId, internalDoneParts, options);
- };
- /**
- * Get file size
- */
- proto._getFileSize = async function _getFileSize(file) {
- if (isBuffer(file)) {
- return file.length;
- } else if (isBlob(file) || isFile(file)) {
- return file.size;
- }
- throw new Error('_getFileSize requires Buffer/File/Blob.');
- };
- /*
- * Readable stream for Web File
- */
- const { Readable } = require('stream');
- function WebFileReadStream(file, options) {
- if (!(this instanceof WebFileReadStream)) {
- return new WebFileReadStream(file, options);
- }
- Readable.call(this, options);
- this.file = file;
- this.reader = new FileReader();
- this.start = 0;
- this.finish = false;
- this.fileBuffer = null;
- }
- util.inherits(WebFileReadStream, Readable);
- WebFileReadStream.prototype.readFileAndPush = function readFileAndPush(size) {
- if (this.fileBuffer) {
- let pushRet = true;
- while (pushRet && this.fileBuffer && this.start < this.fileBuffer.length) {
- const { start } = this;
- let end = start + size;
- end = end > this.fileBuffer.length ? this.fileBuffer.length : end;
- this.start = end;
- pushRet = this.push(this.fileBuffer.slice(start, end));
- }
- }
- };
- WebFileReadStream.prototype._read = function _read(size) {
- if ((this.file && this.start >= this.file.size) ||
- (this.fileBuffer && this.start >= this.fileBuffer.length) ||
- (this.finish) || (this.start === 0 && !this.file)) {
- if (!this.finish) {
- this.fileBuffer = null;
- this.finish = true;
- }
- this.push(null);
- return;
- }
- const defaultReadSize = 16 * 1024;
- size = size || defaultReadSize;
- const that = this;
- this.reader.onload = function onload(e) {
- that.fileBuffer = Buffer.from(new Uint8Array(e.target.result));
- that.file = null;
- that.readFileAndPush(size);
- };
- if (this.start === 0) {
- this.reader.readAsArrayBuffer(this.file);
- } else {
- this.readFileAndPush(size);
- }
- };
- proto._createStream = function _createStream(file, start, end) {
- if (isBlob(file) || isFile(file)) {
- return new WebFileReadStream(file.slice(start, end));
- } else if (isBuffer(file)) {
- // we can't use Readable.from() since it is only support in Node v10
- const iterable = file.subarray(start, end);
- return new Readable({
- read() {
- this.push(iterable);
- this.push(null);
- }
- });
- }
- throw new Error('_createStream requires Buffer/File/Blob.');
- };
- proto._getPartSize = function _getPartSize(fileSize, partSize) {
- const maxNumParts = 10 * 1000;
- const defaultPartSize = 1 * 1024 * 1024;
- if (!partSize) partSize = defaultPartSize;
- const safeSize = Math.ceil(fileSize / maxNumParts);
- if (partSize < safeSize) {
- partSize = safeSize;
- console.warn(`partSize has been set to ${partSize}, because the partSize you provided causes partNumber to be greater than 10,000`);
- }
- return partSize;
- };
- proto._divideParts = function _divideParts(fileSize, partSize) {
- const numParts = Math.ceil(fileSize / partSize);
- const partOffs = [];
- for (let i = 0; i < numParts; i++) {
- const start = partSize * i;
- const end = Math.min(start + partSize, fileSize);
- partOffs.push({
- start,
- end
- });
- }
- return partOffs;
- };
|