cluster.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. const Base = require('sdk-base');
  2. const util = require('util');
  3. const ready = require('get-ready');
  4. const copy = require('copy-to');
  5. const currentIP = require('address').ip();
  6. const RR = 'roundRobin';
  7. const MS = 'masterSlave';
  8. module.exports = function (OssClient) {
  9. function Client(options) {
  10. if (!(this instanceof Client)) {
  11. return new Client(options);
  12. }
  13. if (!options || !Array.isArray(options.cluster)) {
  14. throw new Error('require options.cluster to be an array');
  15. }
  16. Base.call(this);
  17. this.clients = [];
  18. this.availables = {};
  19. for (let i = 0; i < options.cluster.length; i++) {
  20. const opt = options.cluster[i];
  21. copy(options).pick('timeout', 'agent', 'urllib').to(opt);
  22. this.clients.push(new OssClient(opt));
  23. this.availables[i] = true;
  24. }
  25. this.schedule = options.schedule || RR;
  26. // only read from master, default is false
  27. this.masterOnly = !!options.masterOnly;
  28. this.index = 0;
  29. const heartbeatInterval = options.heartbeatInterval || 10000;
  30. this._checkAvailableLock = false;
  31. this._timerId = this._deferInterval(this._checkAvailable.bind(this, true), heartbeatInterval);
  32. this._ignoreStatusFile = options.ignoreStatusFile || false;
  33. this._init();
  34. }
  35. util.inherits(Client, Base);
  36. const proto = Client.prototype;
  37. ready.mixin(proto);
  38. const GET_METHODS = [
  39. 'head',
  40. 'get',
  41. 'getStream',
  42. 'list',
  43. 'getACL'
  44. ];
  45. const PUT_METHODS = [
  46. 'put',
  47. 'putStream',
  48. 'delete',
  49. 'deleteMulti',
  50. 'copy',
  51. 'putMeta',
  52. 'putACL'
  53. ];
  54. GET_METHODS.forEach((method) => {
  55. proto[method] = async function (...args) {
  56. const client = this.chooseAvailable();
  57. let lastError;
  58. try {
  59. return await client[method](...args);
  60. } catch (err) {
  61. if (err.status && err.status >= 200 && err.status < 500) {
  62. // 200 ~ 499 belong to normal response, don't try again
  63. throw err;
  64. }
  65. // < 200 || >= 500 need to retry from other cluser node
  66. lastError = err;
  67. }
  68. for (let i = 0; i < this.clients.length; i++) {
  69. const c = this.clients[i];
  70. if (c !== client) {
  71. try {
  72. return await c[method].apply(client, args);
  73. } catch (err) {
  74. if (err.status && err.status >= 200 && err.status < 500) {
  75. // 200 ~ 499 belong to normal response, don't try again
  76. throw err;
  77. }
  78. // < 200 || >= 500 need to retry from other cluser node
  79. lastError = err;
  80. }
  81. }
  82. }
  83. lastError.message += ' (all clients are down)';
  84. throw lastError;
  85. };
  86. });
  87. // must cluster node write success
  88. PUT_METHODS.forEach((method) => {
  89. proto[method] = async function (...args) {
  90. const res = await Promise.all(this.clients.map(client => client[method](...args)));
  91. return res[0];
  92. };
  93. });
  94. proto.signatureUrl = function signatureUrl(/* name */...args) {
  95. const client = this.chooseAvailable();
  96. return client.signatureUrl(...args);
  97. };
  98. proto.getObjectUrl = function getObjectUrl(/* name, baseUrl */...args) {
  99. const client = this.chooseAvailable();
  100. return client.getObjectUrl(...args);
  101. };
  102. proto._init = function _init() {
  103. const that = this;
  104. (async () => {
  105. await that._checkAvailable(that._ignoreStatusFile);
  106. that.ready(true);
  107. })().catch((err) => {
  108. that.emit('error', err);
  109. });
  110. };
  111. proto._checkAvailable = async function _checkAvailable(ignoreStatusFile) {
  112. const name = `._ali-oss/check.status.${currentIP}.txt`;
  113. if (!ignoreStatusFile) {
  114. // only start will try to write the file
  115. await this.put(name, Buffer.from(`check available started at ${Date()}`));
  116. }
  117. if (this._checkAvailableLock) {
  118. return;
  119. }
  120. this._checkAvailableLock = true;
  121. const downStatusFiles = [];
  122. for (let i = 0; i < this.clients.length; i++) {
  123. const client = this.clients[i];
  124. // check 3 times
  125. let available = await this._checkStatus(client, name);
  126. if (!available) {
  127. // check again
  128. available = await this._checkStatus(client, name);
  129. }
  130. if (!available) {
  131. // check again
  132. /* eslint no-await-in-loop: [0] */
  133. available = await this._checkStatus(client, name);
  134. if (!available) {
  135. downStatusFiles.push(client._objectUrl(name));
  136. }
  137. }
  138. this.availables[i] = available;
  139. }
  140. this._checkAvailableLock = false;
  141. if (downStatusFiles.length > 0) {
  142. const err = new Error(`${downStatusFiles.length} data node down, please check status file: ${downStatusFiles.join(', ')}`);
  143. err.name = 'CheckAvailableError';
  144. this.emit('error', err);
  145. }
  146. };
  147. proto._checkStatus = async function _checkStatus(client, name) {
  148. let available = true;
  149. try {
  150. await client.head(name);
  151. } catch (err) {
  152. // 404 will be available too
  153. if (!err.status || err.status >= 500 || err.status < 200) {
  154. available = false;
  155. }
  156. }
  157. return available;
  158. };
  159. proto.chooseAvailable = function chooseAvailable() {
  160. if (this.schedule === MS) {
  161. // only read from master
  162. if (this.masterOnly) {
  163. return this.clients[0];
  164. }
  165. for (let i = 0; i < this.clients.length; i++) {
  166. if (this.availables[i]) {
  167. return this.clients[i];
  168. }
  169. }
  170. // all down, try to use this first one
  171. return this.clients[0];
  172. }
  173. // RR
  174. let n = this.clients.length;
  175. while (n > 0) {
  176. const i = this._nextRRIndex();
  177. if (this.availables[i]) {
  178. return this.clients[i];
  179. }
  180. n--;
  181. }
  182. // all down, try to use this first one
  183. return this.clients[0];
  184. };
  185. proto._nextRRIndex = function _nextRRIndex() {
  186. const index = this.index++;
  187. if (this.index >= this.clients.length) {
  188. this.index = 0;
  189. }
  190. return index;
  191. };
  192. proto._error = function error(err) {
  193. if (err) throw err;
  194. };
  195. proto._createCallback = function _createCallback(ctx, gen, cb) {
  196. return () => {
  197. cb = cb || this._error;
  198. gen.call(ctx).then(() => {
  199. cb();
  200. }, cb);
  201. };
  202. };
  203. proto._deferInterval = function _deferInterval(gen, timeout, cb) {
  204. return setInterval(this._createCallback(this, gen, cb), timeout);
  205. };
  206. proto.close = function close() {
  207. clearInterval(this._timerId);
  208. this._timerId = null;
  209. };
  210. return Client;
  211. };