ThroughStream.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. <?php
  2. namespace React\Stream;
  3. use Evenement\EventEmitter;
  4. use InvalidArgumentException;
  5. /**
  6. * The `ThroughStream` implements the
  7. * [`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data
  8. * you write to it through to its readable end.
  9. *
  10. * ```php
  11. * $through = new ThroughStream();
  12. * $through->on('data', $this->expectCallableOnceWith('hello'));
  13. *
  14. * $through->write('hello');
  15. * ```
  16. *
  17. * Similarly, the [`end()` method](#end) will end the stream and emit an
  18. * [`end` event](#end-event) and then [`close()`](#close-1) the stream.
  19. * The [`close()` method](#close-1) will close the stream and emit a
  20. * [`close` event](#close-event).
  21. * Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this:
  22. *
  23. * ```php
  24. * $through = new ThroughStream();
  25. * $source->pipe($through)->pipe($dest);
  26. * ```
  27. *
  28. * Optionally, its constructor accepts any callable function which will then be
  29. * used to *filter* any data written to it. This function receives a single data
  30. * argument as passed to the writable side and must return the data as it will be
  31. * passed to its readable end:
  32. *
  33. * ```php
  34. * $through = new ThroughStream('strtoupper');
  35. * $source->pipe($through)->pipe($dest);
  36. * ```
  37. *
  38. * Note that this class makes no assumptions about any data types. This can be
  39. * used to convert data, for example for transforming any structured data into
  40. * a newline-delimited JSON (NDJSON) stream like this:
  41. *
  42. * ```php
  43. * $through = new ThroughStream(function ($data) {
  44. * return json_encode($data) . PHP_EOL;
  45. * });
  46. * $through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
  47. *
  48. * $through->write(array(2, true));
  49. * ```
  50. *
  51. * The callback function is allowed to throw an `Exception`. In this case,
  52. * the stream will emit an `error` event and then [`close()`](#close-1) the stream.
  53. *
  54. * ```php
  55. * $through = new ThroughStream(function ($data) {
  56. * if (!is_string($data)) {
  57. * throw new \UnexpectedValueException('Only strings allowed');
  58. * }
  59. * return $data;
  60. * });
  61. * $through->on('error', $this->expectCallableOnce()));
  62. * $through->on('close', $this->expectCallableOnce()));
  63. * $through->on('data', $this->expectCallableNever()));
  64. *
  65. * $through->write(2);
  66. * ```
  67. *
  68. * @see WritableStreamInterface::write()
  69. * @see WritableStreamInterface::end()
  70. * @see DuplexStreamInterface::close()
  71. * @see WritableStreamInterface::pipe()
  72. */
  73. final class ThroughStream extends EventEmitter implements DuplexStreamInterface
  74. {
  75. private $readable = true;
  76. private $writable = true;
  77. private $closed = false;
  78. private $paused = false;
  79. private $drain = false;
  80. private $callback;
  81. public function __construct($callback = null)
  82. {
  83. if ($callback !== null && !\is_callable($callback)) {
  84. throw new InvalidArgumentException('Invalid transformation callback given');
  85. }
  86. $this->callback = $callback;
  87. }
  88. public function pause()
  89. {
  90. // only allow pause if still readable, false otherwise
  91. $this->paused = $this->readable;
  92. }
  93. public function resume()
  94. {
  95. $this->paused = false;
  96. // emit drain event if previous write was paused (throttled)
  97. if ($this->drain) {
  98. $this->drain = false;
  99. $this->emit('drain');
  100. }
  101. }
  102. public function pipe(WritableStreamInterface $dest, array $options = array())
  103. {
  104. return Util::pipe($this, $dest, $options);
  105. }
  106. public function isReadable()
  107. {
  108. return $this->readable;
  109. }
  110. public function isWritable()
  111. {
  112. return $this->writable;
  113. }
  114. public function write($data)
  115. {
  116. if (!$this->writable) {
  117. return false;
  118. }
  119. if ($this->callback !== null) {
  120. try {
  121. $data = \call_user_func($this->callback, $data);
  122. } catch (\Exception $e) {
  123. $this->emit('error', array($e));
  124. $this->close();
  125. return false;
  126. }
  127. }
  128. $this->emit('data', array($data));
  129. // emit drain event on next resume if currently paused (throttled)
  130. if ($this->paused) {
  131. $this->drain = true;
  132. }
  133. // continue writing if still writable and not paused (throttled), false otherwise
  134. return $this->writable && !$this->paused;
  135. }
  136. public function end($data = null)
  137. {
  138. if (!$this->writable) {
  139. return;
  140. }
  141. if (null !== $data) {
  142. $this->write($data);
  143. // return if write() already caused the stream to close
  144. if (!$this->writable) {
  145. return;
  146. }
  147. }
  148. $this->readable = false;
  149. $this->writable = false;
  150. $this->paused = false;
  151. $this->drain = false;
  152. $this->emit('end');
  153. $this->close();
  154. }
  155. public function close()
  156. {
  157. if ($this->closed) {
  158. return;
  159. }
  160. $this->readable = false;
  161. $this->writable = false;
  162. $this->paused = false;
  163. $this->drain = false;
  164. $this->closed = true;
  165. $this->callback = null;
  166. $this->emit('close');
  167. $this->removeAllListeners();
  168. }
  169. }