GenerateObservable.ts 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. import { IScheduler } from '../Scheduler';
  2. import { Action } from '../scheduler/Action';
  3. import { Observable } from '../Observable' ;
  4. import { Subscriber } from '../Subscriber';
  5. import { Subscription } from '../Subscription';
  6. import { isScheduler } from '../util/isScheduler';
  7. const selfSelector = <T>(value: T) => value;
  8. export type ConditionFunc<S> = (state: S) => boolean;
  9. export type IterateFunc<S> = (state: S) => S;
  10. export type ResultFunc<S, T> = (state: S) => T;
  11. interface SchedulerState<T, S> {
  12. needIterate?: boolean;
  13. state: S;
  14. subscriber: Subscriber<T>;
  15. condition?: ConditionFunc<S>;
  16. iterate: IterateFunc<S>;
  17. resultSelector: ResultFunc<S, T>;
  18. }
  19. export interface GenerateBaseOptions<S> {
  20. /**
  21. * Initial state.
  22. */
  23. initialState: S;
  24. /**
  25. * Condition function that accepts state and returns boolean.
  26. * When it returns false, the generator stops.
  27. * If not specified, a generator never stops.
  28. */
  29. condition?: ConditionFunc<S>;
  30. /**
  31. * Iterate function that accepts state and returns new state.
  32. */
  33. iterate: IterateFunc<S>;
  34. /**
  35. * IScheduler to use for generation process.
  36. * By default, a generator starts immediately.
  37. */
  38. scheduler?: IScheduler;
  39. }
  40. export interface GenerateOptions<T, S> extends GenerateBaseOptions<S> {
  41. /**
  42. * Result selection function that accepts state and returns a value to emit.
  43. */
  44. resultSelector: ResultFunc<S, T>;
  45. }
  46. /**
  47. * We need this JSDoc comment for affecting ESDoc.
  48. * @extends {Ignored}
  49. * @hide true
  50. */
  51. export class GenerateObservable<T, S> extends Observable<T> {
  52. constructor(private initialState: S,
  53. private condition: ConditionFunc<S>,
  54. private iterate: IterateFunc<S>,
  55. private resultSelector: ResultFunc<S, T>,
  56. private scheduler?: IScheduler) {
  57. super();
  58. }
  59. /**
  60. * Generates an observable sequence by running a state-driven loop
  61. * producing the sequence's elements, using the specified scheduler
  62. * to send out observer messages.
  63. *
  64. * <img src="./img/generate.png" width="100%">
  65. *
  66. * @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
  67. * var res = Rx.Observable.generate(0, x => x < 10, x => x + 1, x => x);
  68. *
  69. * @example <caption>Using asap scheduler, produces sequence of 2, 3, 5, then completes.</caption>
  70. * var res = Rx.Observable.generate(1, x => x < 5, x => x * 2, x => x + 1, Rx.Scheduler.asap);
  71. *
  72. * @see {@link from}
  73. * @see {@link create}
  74. *
  75. * @param {S} initialState Initial state.
  76. * @param {function (state: S): boolean} condition Condition to terminate generation (upon returning false).
  77. * @param {function (state: S): S} iterate Iteration step function.
  78. * @param {function (state: S): T} resultSelector Selector function for results produced in the sequence.
  79. * @param {Scheduler} [scheduler] A {@link IScheduler} on which to run the generator loop. If not provided, defaults to emit immediately.
  80. * @returns {Observable<T>} The generated sequence.
  81. */
  82. static create<T, S>(initialState: S,
  83. condition: ConditionFunc<S>,
  84. iterate: IterateFunc<S>,
  85. resultSelector: ResultFunc<S, T>,
  86. scheduler?: IScheduler): Observable<T>
  87. /**
  88. * Generates an observable sequence by running a state-driven loop
  89. * producing the sequence's elements, using the specified scheduler
  90. * to send out observer messages.
  91. * The overload uses state as an emitted value.
  92. *
  93. * <img src="./img/generate.png" width="100%">
  94. *
  95. * @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
  96. * var res = Rx.Observable.generate(0, x => x < 10, x => x + 1);
  97. *
  98. * @example <caption>Using asap scheduler, produces sequence of 1, 2, 4, then completes.</caption>
  99. * var res = Rx.Observable.generate(1, x => x < 5, x => x * 2, Rx.Scheduler.asap);
  100. *
  101. * @see {@link from}
  102. * @see {@link create}
  103. *
  104. * @param {S} initialState Initial state.
  105. * @param {function (state: S): boolean} condition Condition to terminate generation (upon returning false).
  106. * @param {function (state: S): S} iterate Iteration step function.
  107. * @param {Scheduler} [scheduler] A {@link IScheduler} on which to run the generator loop. If not provided, defaults to emit immediately.
  108. * @returns {Observable<S>} The generated sequence.
  109. */
  110. static create<S>(initialState: S,
  111. condition: ConditionFunc<S>,
  112. iterate: IterateFunc<S>,
  113. scheduler?: IScheduler): Observable<S>
  114. /**
  115. * Generates an observable sequence by running a state-driven loop
  116. * producing the sequence's elements, using the specified scheduler
  117. * to send out observer messages.
  118. * The overload accepts options object that might contain initial state, iterate,
  119. * condition and scheduler.
  120. *
  121. * <img src="./img/generate.png" width="100%">
  122. *
  123. * @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
  124. * var res = Rx.Observable.generate({
  125. * initialState: 0,
  126. * condition: x => x < 10,
  127. * iterate: x => x + 1
  128. * });
  129. *
  130. * @see {@link from}
  131. * @see {@link create}
  132. *
  133. * @param {GenerateBaseOptions<S>} options Object that must contain initialState, iterate and might contain condition and scheduler.
  134. * @returns {Observable<S>} The generated sequence.
  135. */
  136. static create<S>(options: GenerateBaseOptions<S>): Observable<S>
  137. /**
  138. * Generates an observable sequence by running a state-driven loop
  139. * producing the sequence's elements, using the specified scheduler
  140. * to send out observer messages.
  141. * The overload accepts options object that might contain initial state, iterate,
  142. * condition, result selector and scheduler.
  143. *
  144. * <img src="./img/generate.png" width="100%">
  145. *
  146. * @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
  147. * var res = Rx.Observable.generate({
  148. * initialState: 0,
  149. * condition: x => x < 10,
  150. * iterate: x => x + 1,
  151. * resultSelector: x => x
  152. * });
  153. *
  154. * @see {@link from}
  155. * @see {@link create}
  156. *
  157. * @param {GenerateOptions<T, S>} options Object that must contain initialState, iterate, resultSelector and might contain condition and scheduler.
  158. * @returns {Observable<T>} The generated sequence.
  159. */
  160. static create<T, S>(options: GenerateOptions<T, S>): Observable<T>
  161. static create<T, S>(initialStateOrOptions: S | GenerateOptions<T, S>,
  162. condition?: ConditionFunc<S>,
  163. iterate?: IterateFunc<S>,
  164. resultSelectorOrObservable?: (ResultFunc<S, T>) | IScheduler,
  165. scheduler?: IScheduler): Observable<T> {
  166. if (arguments.length == 1) {
  167. return new GenerateObservable<T, S>(
  168. (<GenerateOptions<T, S>>initialStateOrOptions).initialState,
  169. (<GenerateOptions<T, S>>initialStateOrOptions).condition,
  170. (<GenerateOptions<T, S>>initialStateOrOptions).iterate,
  171. (<GenerateOptions<T, S>>initialStateOrOptions).resultSelector || selfSelector as ResultFunc<S, T>,
  172. (<GenerateOptions<T, S>>initialStateOrOptions).scheduler);
  173. }
  174. if (resultSelectorOrObservable === undefined || isScheduler(resultSelectorOrObservable)) {
  175. return new GenerateObservable<T, S>(
  176. <S>initialStateOrOptions,
  177. condition,
  178. iterate,
  179. selfSelector as ResultFunc<S, T>,
  180. <IScheduler>resultSelectorOrObservable);
  181. }
  182. return new GenerateObservable<T, S>(
  183. <S>initialStateOrOptions,
  184. condition,
  185. iterate,
  186. <ResultFunc<S, T>>resultSelectorOrObservable,
  187. <IScheduler>scheduler);
  188. }
  189. /** @deprecated internal use only */ _subscribe(subscriber: Subscriber<any>): Subscription | Function | void {
  190. let state = this.initialState;
  191. if (this.scheduler) {
  192. return this.scheduler.schedule<SchedulerState<T, S>>(GenerateObservable.dispatch, 0, {
  193. subscriber,
  194. iterate: this.iterate,
  195. condition: this.condition,
  196. resultSelector: this.resultSelector,
  197. state });
  198. }
  199. const { condition, resultSelector, iterate } = this;
  200. do {
  201. if (condition) {
  202. let conditionResult: boolean;
  203. try {
  204. conditionResult = condition(state);
  205. } catch (err) {
  206. subscriber.error(err);
  207. return;
  208. }
  209. if (!conditionResult) {
  210. subscriber.complete();
  211. break;
  212. }
  213. }
  214. let value: T;
  215. try {
  216. value = resultSelector(state);
  217. } catch (err) {
  218. subscriber.error(err);
  219. return;
  220. }
  221. subscriber.next(value);
  222. if (subscriber.closed) {
  223. break;
  224. }
  225. try {
  226. state = iterate(state);
  227. } catch (err) {
  228. subscriber.error(err);
  229. return;
  230. }
  231. } while (true);
  232. }
  233. private static dispatch<T, S>(state: SchedulerState<T, S>): Subscription | void {
  234. const { subscriber, condition } = state;
  235. if (subscriber.closed) {
  236. return;
  237. }
  238. if (state.needIterate) {
  239. try {
  240. state.state = state.iterate(state.state);
  241. } catch (err) {
  242. subscriber.error(err);
  243. return;
  244. }
  245. } else {
  246. state.needIterate = true;
  247. }
  248. if (condition) {
  249. let conditionResult: boolean;
  250. try {
  251. conditionResult = condition(state.state);
  252. } catch (err) {
  253. subscriber.error(err);
  254. return;
  255. }
  256. if (!conditionResult) {
  257. subscriber.complete();
  258. return;
  259. }
  260. if (subscriber.closed) {
  261. return;
  262. }
  263. }
  264. let value: T;
  265. try {
  266. value = state.resultSelector(state.state);
  267. } catch (err) {
  268. subscriber.error(err);
  269. return;
  270. }
  271. if (subscriber.closed) {
  272. return;
  273. }
  274. subscriber.next(value);
  275. if (subscriber.closed) {
  276. return;
  277. }
  278. return (<Action<SchedulerState<T, S>>><any>this).schedule(state);
  279. }
  280. }