esnext.observable.js 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. 'use strict';
  2. // https://github.com/tc39/proposal-observable
  3. var $ = require('../internals/export');
  4. var global = require('../internals/global');
  5. var call = require('../internals/function-call');
  6. var DESCRIPTORS = require('../internals/descriptors');
  7. var setSpecies = require('../internals/set-species');
  8. var aCallable = require('../internals/a-callable');
  9. var isCallable = require('../internals/is-callable');
  10. var isConstructor = require('../internals/is-constructor');
  11. var anObject = require('../internals/an-object');
  12. var isObject = require('../internals/is-object');
  13. var anInstance = require('../internals/an-instance');
  14. var defineProperty = require('../internals/object-define-property').f;
  15. var redefine = require('../internals/redefine');
  16. var redefineAll = require('../internals/redefine-all');
  17. var getIterator = require('../internals/get-iterator');
  18. var getMethod = require('../internals/get-method');
  19. var iterate = require('../internals/iterate');
  20. var hostReportErrors = require('../internals/host-report-errors');
  21. var wellKnownSymbol = require('../internals/well-known-symbol');
  22. var InternalStateModule = require('../internals/internal-state');
  23. var $$OBSERVABLE = wellKnownSymbol('observable');
  24. var OBSERVABLE = 'Observable';
  25. var SUBSCRIPTION = 'Subscription';
  26. var SUBSCRIPTION_OBSERVER = 'SubscriptionObserver';
  27. var getterFor = InternalStateModule.getterFor;
  28. var setInternalState = InternalStateModule.set;
  29. var getObservableInternalState = getterFor(OBSERVABLE);
  30. var getSubscriptionInternalState = getterFor(SUBSCRIPTION);
  31. var getSubscriptionObserverInternalState = getterFor(SUBSCRIPTION_OBSERVER);
  32. var Array = global.Array;
  33. var SubscriptionState = function (observer) {
  34. this.observer = anObject(observer);
  35. this.cleanup = undefined;
  36. this.subscriptionObserver = undefined;
  37. };
  38. SubscriptionState.prototype = {
  39. type: SUBSCRIPTION,
  40. clean: function () {
  41. var cleanup = this.cleanup;
  42. if (cleanup) {
  43. this.cleanup = undefined;
  44. try {
  45. cleanup();
  46. } catch (error) {
  47. hostReportErrors(error);
  48. }
  49. }
  50. },
  51. close: function () {
  52. if (!DESCRIPTORS) {
  53. var subscription = this.facade;
  54. var subscriptionObserver = this.subscriptionObserver;
  55. subscription.closed = true;
  56. if (subscriptionObserver) subscriptionObserver.closed = true;
  57. } this.observer = undefined;
  58. },
  59. isClosed: function () {
  60. return this.observer === undefined;
  61. }
  62. };
  63. var Subscription = function (observer, subscriber) {
  64. var subscriptionState = setInternalState(this, new SubscriptionState(observer));
  65. var start;
  66. if (!DESCRIPTORS) this.closed = false;
  67. try {
  68. if (start = getMethod(observer, 'start')) call(start, observer, this);
  69. } catch (error) {
  70. hostReportErrors(error);
  71. }
  72. if (subscriptionState.isClosed()) return;
  73. var subscriptionObserver = subscriptionState.subscriptionObserver = new SubscriptionObserver(subscriptionState);
  74. try {
  75. var cleanup = subscriber(subscriptionObserver);
  76. var subscription = cleanup;
  77. if (cleanup != null) subscriptionState.cleanup = isCallable(cleanup.unsubscribe)
  78. ? function () { subscription.unsubscribe(); }
  79. : aCallable(cleanup);
  80. } catch (error) {
  81. subscriptionObserver.error(error);
  82. return;
  83. } if (subscriptionState.isClosed()) subscriptionState.clean();
  84. };
  85. Subscription.prototype = redefineAll({}, {
  86. unsubscribe: function unsubscribe() {
  87. var subscriptionState = getSubscriptionInternalState(this);
  88. if (!subscriptionState.isClosed()) {
  89. subscriptionState.close();
  90. subscriptionState.clean();
  91. }
  92. }
  93. });
  94. if (DESCRIPTORS) defineProperty(Subscription.prototype, 'closed', {
  95. configurable: true,
  96. get: function () {
  97. return getSubscriptionInternalState(this).isClosed();
  98. }
  99. });
  100. var SubscriptionObserver = function (subscriptionState) {
  101. setInternalState(this, {
  102. type: SUBSCRIPTION_OBSERVER,
  103. subscriptionState: subscriptionState
  104. });
  105. if (!DESCRIPTORS) this.closed = false;
  106. };
  107. SubscriptionObserver.prototype = redefineAll({}, {
  108. next: function next(value) {
  109. var subscriptionState = getSubscriptionObserverInternalState(this).subscriptionState;
  110. if (!subscriptionState.isClosed()) {
  111. var observer = subscriptionState.observer;
  112. try {
  113. var nextMethod = getMethod(observer, 'next');
  114. if (nextMethod) call(nextMethod, observer, value);
  115. } catch (error) {
  116. hostReportErrors(error);
  117. }
  118. }
  119. },
  120. error: function error(value) {
  121. var subscriptionState = getSubscriptionObserverInternalState(this).subscriptionState;
  122. if (!subscriptionState.isClosed()) {
  123. var observer = subscriptionState.observer;
  124. subscriptionState.close();
  125. try {
  126. var errorMethod = getMethod(observer, 'error');
  127. if (errorMethod) call(errorMethod, observer, value);
  128. else hostReportErrors(value);
  129. } catch (err) {
  130. hostReportErrors(err);
  131. } subscriptionState.clean();
  132. }
  133. },
  134. complete: function complete() {
  135. var subscriptionState = getSubscriptionObserverInternalState(this).subscriptionState;
  136. if (!subscriptionState.isClosed()) {
  137. var observer = subscriptionState.observer;
  138. subscriptionState.close();
  139. try {
  140. var completeMethod = getMethod(observer, 'complete');
  141. if (completeMethod) call(completeMethod, observer);
  142. } catch (error) {
  143. hostReportErrors(error);
  144. } subscriptionState.clean();
  145. }
  146. }
  147. });
  148. if (DESCRIPTORS) defineProperty(SubscriptionObserver.prototype, 'closed', {
  149. configurable: true,
  150. get: function () {
  151. return getSubscriptionObserverInternalState(this).subscriptionState.isClosed();
  152. }
  153. });
  154. var $Observable = function Observable(subscriber) {
  155. anInstance(this, ObservablePrototype);
  156. setInternalState(this, {
  157. type: OBSERVABLE,
  158. subscriber: aCallable(subscriber)
  159. });
  160. };
  161. var ObservablePrototype = $Observable.prototype;
  162. redefineAll(ObservablePrototype, {
  163. subscribe: function subscribe(observer) {
  164. var length = arguments.length;
  165. return new Subscription(isCallable(observer) ? {
  166. next: observer,
  167. error: length > 1 ? arguments[1] : undefined,
  168. complete: length > 2 ? arguments[2] : undefined
  169. } : isObject(observer) ? observer : {}, getObservableInternalState(this).subscriber);
  170. }
  171. });
  172. redefineAll($Observable, {
  173. from: function from(x) {
  174. var C = isConstructor(this) ? this : $Observable;
  175. var observableMethod = getMethod(anObject(x), $$OBSERVABLE);
  176. if (observableMethod) {
  177. var observable = anObject(call(observableMethod, x));
  178. return observable.constructor === C ? observable : new C(function (observer) {
  179. return observable.subscribe(observer);
  180. });
  181. }
  182. var iterator = getIterator(x);
  183. return new C(function (observer) {
  184. iterate(iterator, function (it, stop) {
  185. observer.next(it);
  186. if (observer.closed) return stop();
  187. }, { IS_ITERATOR: true, INTERRUPTED: true });
  188. observer.complete();
  189. });
  190. },
  191. of: function of() {
  192. var C = isConstructor(this) ? this : $Observable;
  193. var length = arguments.length;
  194. var items = Array(length);
  195. var index = 0;
  196. while (index < length) items[index] = arguments[index++];
  197. return new C(function (observer) {
  198. for (var i = 0; i < length; i++) {
  199. observer.next(items[i]);
  200. if (observer.closed) return;
  201. } observer.complete();
  202. });
  203. }
  204. });
  205. redefine(ObservablePrototype, $$OBSERVABLE, function () { return this; });
  206. $({ global: true }, {
  207. Observable: $Observable
  208. });
  209. setSpecies(OBSERVABLE);