123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- const Base = require('sdk-base');
- const util = require('util');
- const ready = require('get-ready');
- const copy = require('copy-to');
- const currentIP = require('address').ip();
- const RR = 'roundRobin';
- const MS = 'masterSlave';
- module.exports = function (OssClient) {
- function Client(options) {
- if (!(this instanceof Client)) {
- return new Client(options);
- }
- if (!options || !Array.isArray(options.cluster)) {
- throw new Error('require options.cluster to be an array');
- }
- Base.call(this);
- this.clients = [];
- this.availables = {};
- for (let i = 0; i < options.cluster.length; i++) {
- const opt = options.cluster[i];
- copy(options).pick('timeout', 'agent', 'urllib').to(opt);
- this.clients.push(new OssClient(opt));
- this.availables[i] = true;
- }
- this.schedule = options.schedule || RR;
- // only read from master, default is false
- this.masterOnly = !!options.masterOnly;
- this.index = 0;
- const heartbeatInterval = options.heartbeatInterval || 10000;
- this._checkAvailableLock = false;
- this._timerId = this._deferInterval(this._checkAvailable.bind(this, true), heartbeatInterval);
- this._ignoreStatusFile = options.ignoreStatusFile || false;
- this._init();
- }
- util.inherits(Client, Base);
- const proto = Client.prototype;
- ready.mixin(proto);
- const GET_METHODS = [
- 'head',
- 'get',
- 'getStream',
- 'list',
- 'getACL'
- ];
- const PUT_METHODS = [
- 'put',
- 'putStream',
- 'delete',
- 'deleteMulti',
- 'copy',
- 'putMeta',
- 'putACL'
- ];
- GET_METHODS.forEach((method) => {
- proto[method] = async function (...args) {
- const client = this.chooseAvailable();
- let lastError;
- try {
- return await client[method](...args);
- } catch (err) {
- if (err.status && err.status >= 200 && err.status < 500) {
- // 200 ~ 499 belong to normal response, don't try again
- throw err;
- }
- // < 200 || >= 500 need to retry from other cluser node
- lastError = err;
- }
- for (let i = 0; i < this.clients.length; i++) {
- const c = this.clients[i];
- if (c !== client) {
- try {
- return await c[method].apply(client, args);
- } catch (err) {
- if (err.status && err.status >= 200 && err.status < 500) {
- // 200 ~ 499 belong to normal response, don't try again
- throw err;
- }
- // < 200 || >= 500 need to retry from other cluser node
- lastError = err;
- }
- }
- }
- lastError.message += ' (all clients are down)';
- throw lastError;
- };
- });
- // must cluster node write success
- PUT_METHODS.forEach((method) => {
- proto[method] = async function (...args) {
- const res = await Promise.all(this.clients.map(client => client[method](...args)));
- return res[0];
- };
- });
- proto.signatureUrl = function signatureUrl(/* name */...args) {
- const client = this.chooseAvailable();
- return client.signatureUrl(...args);
- };
- proto.getObjectUrl = function getObjectUrl(/* name, baseUrl */...args) {
- const client = this.chooseAvailable();
- return client.getObjectUrl(...args);
- };
- proto._init = function _init() {
- const that = this;
- (async () => {
- await that._checkAvailable(that._ignoreStatusFile);
- that.ready(true);
- })().catch((err) => {
- that.emit('error', err);
- });
- };
- proto._checkAvailable = async function _checkAvailable(ignoreStatusFile) {
- const name = `._ali-oss/check.status.${currentIP}.txt`;
- if (!ignoreStatusFile) {
- // only start will try to write the file
- await this.put(name, Buffer.from(`check available started at ${Date()}`));
- }
- if (this._checkAvailableLock) {
- return;
- }
- this._checkAvailableLock = true;
- const downStatusFiles = [];
- for (let i = 0; i < this.clients.length; i++) {
- const client = this.clients[i];
- // check 3 times
- let available = await this._checkStatus(client, name);
- if (!available) {
- // check again
- available = await this._checkStatus(client, name);
- }
- if (!available) {
- // check again
- /* eslint no-await-in-loop: [0] */
- available = await this._checkStatus(client, name);
- if (!available) {
- downStatusFiles.push(client._objectUrl(name));
- }
- }
- this.availables[i] = available;
- }
- this._checkAvailableLock = false;
- if (downStatusFiles.length > 0) {
- const err = new Error(`${downStatusFiles.length} data node down, please check status file: ${downStatusFiles.join(', ')}`);
- err.name = 'CheckAvailableError';
- this.emit('error', err);
- }
- };
- proto._checkStatus = async function _checkStatus(client, name) {
- let available = true;
- try {
- await client.head(name);
- } catch (err) {
- // 404 will be available too
- if (!err.status || err.status >= 500 || err.status < 200) {
- available = false;
- }
- }
- return available;
- };
- proto.chooseAvailable = function chooseAvailable() {
- if (this.schedule === MS) {
- // only read from master
- if (this.masterOnly) {
- return this.clients[0];
- }
- for (let i = 0; i < this.clients.length; i++) {
- if (this.availables[i]) {
- return this.clients[i];
- }
- }
- // all down, try to use this first one
- return this.clients[0];
- }
- // RR
- let n = this.clients.length;
- while (n > 0) {
- const i = this._nextRRIndex();
- if (this.availables[i]) {
- return this.clients[i];
- }
- n--;
- }
- // all down, try to use this first one
- return this.clients[0];
- };
- proto._nextRRIndex = function _nextRRIndex() {
- const index = this.index++;
- if (this.index >= this.clients.length) {
- this.index = 0;
- }
- return index;
- };
- proto._error = function error(err) {
- if (err) throw err;
- };
- proto._createCallback = function _createCallback(ctx, gen, cb) {
- return () => {
- cb = cb || this._error;
- gen.call(ctx).then(() => {
- cb();
- }, cb);
- };
- };
- proto._deferInterval = function _deferInterval(gen, timeout, cb) {
- return setInterval(this._createCallback(this, gen, cb), timeout);
- };
- proto.close = function close() {
- clearInterval(this._timerId);
- this._timerId = null;
- };
- return Client;
- };
|