AsyncResponse.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\HttpClient\Response;
  11. use Symfony\Component\HttpClient\Chunk\ErrorChunk;
  12. use Symfony\Component\HttpClient\Chunk\FirstChunk;
  13. use Symfony\Component\HttpClient\Chunk\LastChunk;
  14. use Symfony\Component\HttpClient\Exception\TransportException;
  15. use Symfony\Contracts\HttpClient\ChunkInterface;
  16. use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
  17. use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
  18. use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
  19. use Symfony\Contracts\HttpClient\HttpClientInterface;
  20. use Symfony\Contracts\HttpClient\ResponseInterface;
  21. /**
  22. * Provides a single extension point to process a response's content stream.
  23. *
  24. * @author Nicolas Grekas <p@tchwork.com>
  25. */
  26. final class AsyncResponse implements ResponseInterface, StreamableInterface
  27. {
  28. use CommonResponseTrait;
  29. private const FIRST_CHUNK_YIELDED = 1;
  30. private const LAST_CHUNK_YIELDED = 2;
  31. private $client;
  32. private $response;
  33. private $info = ['canceled' => false];
  34. private $passthru;
  35. private $stream;
  36. private $yieldedState;
  37. /**
  38. * @param ?callable(ChunkInterface, AsyncContext): ?\Iterator $passthru
  39. */
  40. public function __construct(HttpClientInterface $client, string $method, string $url, array $options, callable $passthru = null)
  41. {
  42. $this->client = $client;
  43. $this->shouldBuffer = $options['buffer'] ?? true;
  44. if (null !== $onProgress = $options['on_progress'] ?? null) {
  45. $thisInfo = &$this->info;
  46. $options['on_progress'] = static function (int $dlNow, int $dlSize, array $info) use (&$thisInfo, $onProgress) {
  47. $onProgress($dlNow, $dlSize, $thisInfo + $info);
  48. };
  49. }
  50. $this->response = $client->request($method, $url, ['buffer' => false] + $options);
  51. $this->passthru = $passthru;
  52. $this->initializer = static function (self $response, float $timeout = null) {
  53. if (null === $response->shouldBuffer) {
  54. return false;
  55. }
  56. while (true) {
  57. foreach (self::stream([$response], $timeout) as $chunk) {
  58. if ($chunk->isTimeout() && $response->passthru) {
  59. foreach (self::passthru($response->client, $response, new ErrorChunk($response->offset, new TransportException($chunk->getError()))) as $chunk) {
  60. if ($chunk->isFirst()) {
  61. return false;
  62. }
  63. }
  64. continue 2;
  65. }
  66. if ($chunk->isFirst()) {
  67. return false;
  68. }
  69. }
  70. return false;
  71. }
  72. };
  73. if (\array_key_exists('user_data', $options)) {
  74. $this->info['user_data'] = $options['user_data'];
  75. }
  76. if (\array_key_exists('max_duration', $options)) {
  77. $this->info['max_duration'] = $options['max_duration'];
  78. }
  79. }
  80. public function getStatusCode(): int
  81. {
  82. if ($this->initializer) {
  83. self::initialize($this);
  84. }
  85. return $this->response->getStatusCode();
  86. }
  87. public function getHeaders(bool $throw = true): array
  88. {
  89. if ($this->initializer) {
  90. self::initialize($this);
  91. }
  92. $headers = $this->response->getHeaders(false);
  93. if ($throw) {
  94. $this->checkStatusCode();
  95. }
  96. return $headers;
  97. }
  98. public function getInfo(string $type = null)
  99. {
  100. if (null !== $type) {
  101. return $this->info[$type] ?? $this->response->getInfo($type);
  102. }
  103. return $this->info + $this->response->getInfo();
  104. }
  105. /**
  106. * {@inheritdoc}
  107. */
  108. public function toStream(bool $throw = true)
  109. {
  110. if ($throw) {
  111. // Ensure headers arrived
  112. $this->getHeaders(true);
  113. }
  114. $handle = function () {
  115. $stream = $this->response instanceof StreamableInterface ? $this->response->toStream(false) : StreamWrapper::createResource($this->response);
  116. return stream_get_meta_data($stream)['wrapper_data']->stream_cast(\STREAM_CAST_FOR_SELECT);
  117. };
  118. $stream = StreamWrapper::createResource($this);
  119. stream_get_meta_data($stream)['wrapper_data']
  120. ->bindHandles($handle, $this->content);
  121. return $stream;
  122. }
  123. /**
  124. * {@inheritdoc}
  125. */
  126. public function cancel(): void
  127. {
  128. if ($this->info['canceled']) {
  129. return;
  130. }
  131. $this->info['canceled'] = true;
  132. $this->info['error'] = 'Response has been canceled.';
  133. $this->close();
  134. $client = $this->client;
  135. $this->client = null;
  136. if (!$this->passthru) {
  137. return;
  138. }
  139. try {
  140. foreach (self::passthru($client, $this, new LastChunk()) as $chunk) {
  141. // no-op
  142. }
  143. $this->passthru = null;
  144. } catch (ExceptionInterface $e) {
  145. // ignore any errors when canceling
  146. }
  147. }
  148. public function __destruct()
  149. {
  150. $httpException = null;
  151. if ($this->initializer && null === $this->getInfo('error')) {
  152. try {
  153. self::initialize($this, -0.0);
  154. $this->getHeaders(true);
  155. } catch (HttpExceptionInterface $httpException) {
  156. // no-op
  157. }
  158. }
  159. if ($this->passthru && null === $this->getInfo('error')) {
  160. $this->info['canceled'] = true;
  161. try {
  162. foreach (self::passthru($this->client, $this, new LastChunk()) as $chunk) {
  163. // no-op
  164. }
  165. } catch (ExceptionInterface $e) {
  166. // ignore any errors when destructing
  167. }
  168. }
  169. if (null !== $httpException) {
  170. throw $httpException;
  171. }
  172. }
  173. /**
  174. * @internal
  175. */
  176. public static function stream(iterable $responses, float $timeout = null, string $class = null): \Generator
  177. {
  178. while ($responses) {
  179. $wrappedResponses = [];
  180. $asyncMap = new \SplObjectStorage();
  181. $client = null;
  182. foreach ($responses as $r) {
  183. if (!$r instanceof self) {
  184. throw new \TypeError(sprintf('"%s::stream()" expects parameter 1 to be an iterable of AsyncResponse objects, "%s" given.', $class ?? static::class, get_debug_type($r)));
  185. }
  186. if (null !== $e = $r->info['error'] ?? null) {
  187. yield $r => $chunk = new ErrorChunk($r->offset, new TransportException($e));
  188. $chunk->didThrow() ?: $chunk->getContent();
  189. continue;
  190. }
  191. if (null === $client) {
  192. $client = $r->client;
  193. } elseif ($r->client !== $client) {
  194. throw new TransportException('Cannot stream AsyncResponse objects with many clients.');
  195. }
  196. $asyncMap[$r->response] = $r;
  197. $wrappedResponses[] = $r->response;
  198. if ($r->stream) {
  199. yield from self::passthruStream($response = $r->response, $r, new FirstChunk(), $asyncMap);
  200. if (!isset($asyncMap[$response])) {
  201. array_pop($wrappedResponses);
  202. }
  203. if ($r->response !== $response && !isset($asyncMap[$r->response])) {
  204. $asyncMap[$r->response] = $r;
  205. $wrappedResponses[] = $r->response;
  206. }
  207. }
  208. }
  209. if (!$client || !$wrappedResponses) {
  210. return;
  211. }
  212. foreach ($client->stream($wrappedResponses, $timeout) as $response => $chunk) {
  213. $r = $asyncMap[$response];
  214. if (null === $chunk->getError()) {
  215. if ($chunk->isFirst()) {
  216. // Ensure no exception is thrown on destruct for the wrapped response
  217. $r->response->getStatusCode();
  218. } elseif (0 === $r->offset && null === $r->content && $chunk->isLast()) {
  219. $r->content = fopen('php://memory', 'w+');
  220. }
  221. }
  222. if (!$r->passthru) {
  223. if (null !== $chunk->getError() || $chunk->isLast()) {
  224. unset($asyncMap[$response]);
  225. } elseif (null !== $r->content && '' !== ($content = $chunk->getContent()) && \strlen($content) !== fwrite($r->content, $content)) {
  226. $chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
  227. $r->info['error'] = $chunk->getError();
  228. $r->response->cancel();
  229. }
  230. yield $r => $chunk;
  231. continue;
  232. }
  233. if (null !== $chunk->getError()) {
  234. // no-op
  235. } elseif ($chunk->isFirst()) {
  236. $r->yieldedState = self::FIRST_CHUNK_YIELDED;
  237. } elseif (self::FIRST_CHUNK_YIELDED !== $r->yieldedState && null === $chunk->getInformationalStatus()) {
  238. throw new \LogicException(sprintf('Instance of "%s" is already consumed and cannot be managed by "%s". A decorated client should not call any of the response\'s methods in its "request()" method.', get_debug_type($response), $class ?? static::class));
  239. }
  240. foreach (self::passthru($r->client, $r, $chunk, $asyncMap) as $chunk) {
  241. yield $r => $chunk;
  242. }
  243. if ($r->response !== $response && isset($asyncMap[$response])) {
  244. break;
  245. }
  246. }
  247. if (null === $chunk->getError() && $chunk->isLast()) {
  248. $r->yieldedState = self::LAST_CHUNK_YIELDED;
  249. }
  250. if (null === $chunk->getError() && self::LAST_CHUNK_YIELDED !== $r->yieldedState && $r->response === $response && null !== $r->client) {
  251. throw new \LogicException('A chunk passthru must yield an "isLast()" chunk before ending a stream.');
  252. }
  253. $responses = [];
  254. foreach ($asyncMap as $response) {
  255. $r = $asyncMap[$response];
  256. if (null !== $r->client) {
  257. $responses[] = $asyncMap[$response];
  258. }
  259. }
  260. }
  261. }
  262. /**
  263. * @param \SplObjectStorage<ResponseInterface, AsyncResponse>|null $asyncMap
  264. */
  265. private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, \SplObjectStorage $asyncMap = null): \Generator
  266. {
  267. $r->stream = null;
  268. $response = $r->response;
  269. $context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset);
  270. if (null === $stream = ($r->passthru)($chunk, $context)) {
  271. if ($r->response === $response && (null !== $chunk->getError() || $chunk->isLast())) {
  272. throw new \LogicException('A chunk passthru cannot swallow the last chunk.');
  273. }
  274. return;
  275. }
  276. if (!$stream instanceof \Iterator) {
  277. throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
  278. }
  279. $r->stream = $stream;
  280. yield from self::passthruStream($response, $r, null, $asyncMap);
  281. }
  282. /**
  283. * @param \SplObjectStorage<ResponseInterface, AsyncResponse>|null $asyncMap
  284. */
  285. private static function passthruStream(ResponseInterface $response, self $r, ?ChunkInterface $chunk, ?\SplObjectStorage $asyncMap): \Generator
  286. {
  287. while (true) {
  288. try {
  289. if (null !== $chunk && $r->stream) {
  290. $r->stream->next();
  291. }
  292. if (!$r->stream || !$r->stream->valid() || !$r->stream) {
  293. $r->stream = null;
  294. break;
  295. }
  296. } catch (\Throwable $e) {
  297. unset($asyncMap[$response]);
  298. $r->stream = null;
  299. $r->info['error'] = $e->getMessage();
  300. $r->response->cancel();
  301. yield $r => $chunk = new ErrorChunk($r->offset, $e);
  302. $chunk->didThrow() ?: $chunk->getContent();
  303. break;
  304. }
  305. $chunk = $r->stream->current();
  306. if (!$chunk instanceof ChunkInterface) {
  307. throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
  308. }
  309. if (null !== $chunk->getError()) {
  310. // no-op
  311. } elseif ($chunk->isFirst()) {
  312. $e = $r->openBuffer();
  313. yield $r => $chunk;
  314. if ($r->initializer && null === $r->getInfo('error')) {
  315. // Ensure the HTTP status code is always checked
  316. $r->getHeaders(true);
  317. }
  318. if (null === $e) {
  319. continue;
  320. }
  321. $r->response->cancel();
  322. $chunk = new ErrorChunk($r->offset, $e);
  323. } elseif ('' !== $content = $chunk->getContent()) {
  324. if (null !== $r->shouldBuffer) {
  325. throw new \LogicException('A chunk passthru must yield an "isFirst()" chunk before any content chunk.');
  326. }
  327. if (null !== $r->content && \strlen($content) !== fwrite($r->content, $content)) {
  328. $chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
  329. $r->info['error'] = $chunk->getError();
  330. $r->response->cancel();
  331. }
  332. }
  333. if (null !== $chunk->getError() || $chunk->isLast()) {
  334. $stream = $r->stream;
  335. $r->stream = null;
  336. unset($asyncMap[$response]);
  337. }
  338. if (null === $chunk->getError()) {
  339. $r->offset += \strlen($content);
  340. yield $r => $chunk;
  341. if (!$chunk->isLast()) {
  342. continue;
  343. }
  344. $stream->next();
  345. if ($stream->valid()) {
  346. throw new \LogicException('A chunk passthru cannot yield after an "isLast()" chunk.');
  347. }
  348. $r->passthru = null;
  349. } else {
  350. if ($chunk instanceof ErrorChunk) {
  351. $chunk->didThrow(false);
  352. } else {
  353. try {
  354. $chunk = new ErrorChunk($chunk->getOffset(), !$chunk->isTimeout() ?: $chunk->getError());
  355. } catch (TransportExceptionInterface $e) {
  356. $chunk = new ErrorChunk($chunk->getOffset(), $e);
  357. }
  358. }
  359. yield $r => $chunk;
  360. $chunk->didThrow() ?: $chunk->getContent();
  361. }
  362. break;
  363. }
  364. }
  365. private function openBuffer(): ?\Throwable
  366. {
  367. if (null === $shouldBuffer = $this->shouldBuffer) {
  368. throw new \LogicException('A chunk passthru cannot yield more than one "isFirst()" chunk.');
  369. }
  370. $e = $this->shouldBuffer = null;
  371. if ($shouldBuffer instanceof \Closure) {
  372. try {
  373. $shouldBuffer = $shouldBuffer($this->getHeaders(false));
  374. if (null !== $e = $this->response->getInfo('error')) {
  375. throw new TransportException($e);
  376. }
  377. } catch (\Throwable $e) {
  378. $this->info['error'] = $e->getMessage();
  379. $this->response->cancel();
  380. }
  381. }
  382. if (true === $shouldBuffer) {
  383. $this->content = fopen('php://temp', 'w+');
  384. } elseif (\is_resource($shouldBuffer)) {
  385. $this->content = $shouldBuffer;
  386. }
  387. return $e;
  388. }
  389. private function close(): void
  390. {
  391. $this->response->cancel();
  392. }
  393. }