123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478 |
- <?php
- /*
- * This file is part of the Symfony package.
- *
- * (c) Fabien Potencier <fabien@symfony.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
- namespace Symfony\Component\HttpClient\Response;
- use Symfony\Component\HttpClient\Chunk\ErrorChunk;
- use Symfony\Component\HttpClient\Chunk\FirstChunk;
- use Symfony\Component\HttpClient\Chunk\LastChunk;
- use Symfony\Component\HttpClient\Exception\TransportException;
- use Symfony\Contracts\HttpClient\ChunkInterface;
- use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
- use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
- use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
- use Symfony\Contracts\HttpClient\HttpClientInterface;
- use Symfony\Contracts\HttpClient\ResponseInterface;
- /**
- * Provides a single extension point to process a response's content stream.
- *
- * @author Nicolas Grekas <p@tchwork.com>
- */
- final class AsyncResponse implements ResponseInterface, StreamableInterface
- {
- use CommonResponseTrait;
- private const FIRST_CHUNK_YIELDED = 1;
- private const LAST_CHUNK_YIELDED = 2;
- private $client;
- private $response;
- private $info = ['canceled' => false];
- private $passthru;
- private $stream;
- private $yieldedState;
- /**
- * @param ?callable(ChunkInterface, AsyncContext): ?\Iterator $passthru
- */
- public function __construct(HttpClientInterface $client, string $method, string $url, array $options, callable $passthru = null)
- {
- $this->client = $client;
- $this->shouldBuffer = $options['buffer'] ?? true;
- if (null !== $onProgress = $options['on_progress'] ?? null) {
- $thisInfo = &$this->info;
- $options['on_progress'] = static function (int $dlNow, int $dlSize, array $info) use (&$thisInfo, $onProgress) {
- $onProgress($dlNow, $dlSize, $thisInfo + $info);
- };
- }
- $this->response = $client->request($method, $url, ['buffer' => false] + $options);
- $this->passthru = $passthru;
- $this->initializer = static function (self $response, float $timeout = null) {
- if (null === $response->shouldBuffer) {
- return false;
- }
- while (true) {
- foreach (self::stream([$response], $timeout) as $chunk) {
- if ($chunk->isTimeout() && $response->passthru) {
- foreach (self::passthru($response->client, $response, new ErrorChunk($response->offset, new TransportException($chunk->getError()))) as $chunk) {
- if ($chunk->isFirst()) {
- return false;
- }
- }
- continue 2;
- }
- if ($chunk->isFirst()) {
- return false;
- }
- }
- return false;
- }
- };
- if (\array_key_exists('user_data', $options)) {
- $this->info['user_data'] = $options['user_data'];
- }
- if (\array_key_exists('max_duration', $options)) {
- $this->info['max_duration'] = $options['max_duration'];
- }
- }
- public function getStatusCode(): int
- {
- if ($this->initializer) {
- self::initialize($this);
- }
- return $this->response->getStatusCode();
- }
- public function getHeaders(bool $throw = true): array
- {
- if ($this->initializer) {
- self::initialize($this);
- }
- $headers = $this->response->getHeaders(false);
- if ($throw) {
- $this->checkStatusCode();
- }
- return $headers;
- }
- public function getInfo(string $type = null)
- {
- if (null !== $type) {
- return $this->info[$type] ?? $this->response->getInfo($type);
- }
- return $this->info + $this->response->getInfo();
- }
- /**
- * {@inheritdoc}
- */
- public function toStream(bool $throw = true)
- {
- if ($throw) {
- // Ensure headers arrived
- $this->getHeaders(true);
- }
- $handle = function () {
- $stream = $this->response instanceof StreamableInterface ? $this->response->toStream(false) : StreamWrapper::createResource($this->response);
- return stream_get_meta_data($stream)['wrapper_data']->stream_cast(\STREAM_CAST_FOR_SELECT);
- };
- $stream = StreamWrapper::createResource($this);
- stream_get_meta_data($stream)['wrapper_data']
- ->bindHandles($handle, $this->content);
- return $stream;
- }
- /**
- * {@inheritdoc}
- */
- public function cancel(): void
- {
- if ($this->info['canceled']) {
- return;
- }
- $this->info['canceled'] = true;
- $this->info['error'] = 'Response has been canceled.';
- $this->close();
- $client = $this->client;
- $this->client = null;
- if (!$this->passthru) {
- return;
- }
- try {
- foreach (self::passthru($client, $this, new LastChunk()) as $chunk) {
- // no-op
- }
- $this->passthru = null;
- } catch (ExceptionInterface $e) {
- // ignore any errors when canceling
- }
- }
- public function __destruct()
- {
- $httpException = null;
- if ($this->initializer && null === $this->getInfo('error')) {
- try {
- self::initialize($this, -0.0);
- $this->getHeaders(true);
- } catch (HttpExceptionInterface $httpException) {
- // no-op
- }
- }
- if ($this->passthru && null === $this->getInfo('error')) {
- $this->info['canceled'] = true;
- try {
- foreach (self::passthru($this->client, $this, new LastChunk()) as $chunk) {
- // no-op
- }
- } catch (ExceptionInterface $e) {
- // ignore any errors when destructing
- }
- }
- if (null !== $httpException) {
- throw $httpException;
- }
- }
- /**
- * @internal
- */
- public static function stream(iterable $responses, float $timeout = null, string $class = null): \Generator
- {
- while ($responses) {
- $wrappedResponses = [];
- $asyncMap = new \SplObjectStorage();
- $client = null;
- foreach ($responses as $r) {
- if (!$r instanceof self) {
- throw new \TypeError(sprintf('"%s::stream()" expects parameter 1 to be an iterable of AsyncResponse objects, "%s" given.', $class ?? static::class, get_debug_type($r)));
- }
- if (null !== $e = $r->info['error'] ?? null) {
- yield $r => $chunk = new ErrorChunk($r->offset, new TransportException($e));
- $chunk->didThrow() ?: $chunk->getContent();
- continue;
- }
- if (null === $client) {
- $client = $r->client;
- } elseif ($r->client !== $client) {
- throw new TransportException('Cannot stream AsyncResponse objects with many clients.');
- }
- $asyncMap[$r->response] = $r;
- $wrappedResponses[] = $r->response;
- if ($r->stream) {
- yield from self::passthruStream($response = $r->response, $r, new FirstChunk(), $asyncMap);
- if (!isset($asyncMap[$response])) {
- array_pop($wrappedResponses);
- }
- if ($r->response !== $response && !isset($asyncMap[$r->response])) {
- $asyncMap[$r->response] = $r;
- $wrappedResponses[] = $r->response;
- }
- }
- }
- if (!$client || !$wrappedResponses) {
- return;
- }
- foreach ($client->stream($wrappedResponses, $timeout) as $response => $chunk) {
- $r = $asyncMap[$response];
- if (null === $chunk->getError()) {
- if ($chunk->isFirst()) {
- // Ensure no exception is thrown on destruct for the wrapped response
- $r->response->getStatusCode();
- } elseif (0 === $r->offset && null === $r->content && $chunk->isLast()) {
- $r->content = fopen('php://memory', 'w+');
- }
- }
- if (!$r->passthru) {
- if (null !== $chunk->getError() || $chunk->isLast()) {
- unset($asyncMap[$response]);
- } elseif (null !== $r->content && '' !== ($content = $chunk->getContent()) && \strlen($content) !== fwrite($r->content, $content)) {
- $chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
- $r->info['error'] = $chunk->getError();
- $r->response->cancel();
- }
- yield $r => $chunk;
- continue;
- }
- if (null !== $chunk->getError()) {
- // no-op
- } elseif ($chunk->isFirst()) {
- $r->yieldedState = self::FIRST_CHUNK_YIELDED;
- } elseif (self::FIRST_CHUNK_YIELDED !== $r->yieldedState && null === $chunk->getInformationalStatus()) {
- throw new \LogicException(sprintf('Instance of "%s" is already consumed and cannot be managed by "%s". A decorated client should not call any of the response\'s methods in its "request()" method.', get_debug_type($response), $class ?? static::class));
- }
- foreach (self::passthru($r->client, $r, $chunk, $asyncMap) as $chunk) {
- yield $r => $chunk;
- }
- if ($r->response !== $response && isset($asyncMap[$response])) {
- break;
- }
- }
- if (null === $chunk->getError() && $chunk->isLast()) {
- $r->yieldedState = self::LAST_CHUNK_YIELDED;
- }
- if (null === $chunk->getError() && self::LAST_CHUNK_YIELDED !== $r->yieldedState && $r->response === $response && null !== $r->client) {
- throw new \LogicException('A chunk passthru must yield an "isLast()" chunk before ending a stream.');
- }
- $responses = [];
- foreach ($asyncMap as $response) {
- $r = $asyncMap[$response];
- if (null !== $r->client) {
- $responses[] = $asyncMap[$response];
- }
- }
- }
- }
- /**
- * @param \SplObjectStorage<ResponseInterface, AsyncResponse>|null $asyncMap
- */
- private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, \SplObjectStorage $asyncMap = null): \Generator
- {
- $r->stream = null;
- $response = $r->response;
- $context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset);
- if (null === $stream = ($r->passthru)($chunk, $context)) {
- if ($r->response === $response && (null !== $chunk->getError() || $chunk->isLast())) {
- throw new \LogicException('A chunk passthru cannot swallow the last chunk.');
- }
- return;
- }
- if (!$stream instanceof \Iterator) {
- throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
- }
- $r->stream = $stream;
- yield from self::passthruStream($response, $r, null, $asyncMap);
- }
- /**
- * @param \SplObjectStorage<ResponseInterface, AsyncResponse>|null $asyncMap
- */
- private static function passthruStream(ResponseInterface $response, self $r, ?ChunkInterface $chunk, ?\SplObjectStorage $asyncMap): \Generator
- {
- while (true) {
- try {
- if (null !== $chunk && $r->stream) {
- $r->stream->next();
- }
- if (!$r->stream || !$r->stream->valid() || !$r->stream) {
- $r->stream = null;
- break;
- }
- } catch (\Throwable $e) {
- unset($asyncMap[$response]);
- $r->stream = null;
- $r->info['error'] = $e->getMessage();
- $r->response->cancel();
- yield $r => $chunk = new ErrorChunk($r->offset, $e);
- $chunk->didThrow() ?: $chunk->getContent();
- break;
- }
- $chunk = $r->stream->current();
- if (!$chunk instanceof ChunkInterface) {
- throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
- }
- if (null !== $chunk->getError()) {
- // no-op
- } elseif ($chunk->isFirst()) {
- $e = $r->openBuffer();
- yield $r => $chunk;
- if ($r->initializer && null === $r->getInfo('error')) {
- // Ensure the HTTP status code is always checked
- $r->getHeaders(true);
- }
- if (null === $e) {
- continue;
- }
- $r->response->cancel();
- $chunk = new ErrorChunk($r->offset, $e);
- } elseif ('' !== $content = $chunk->getContent()) {
- if (null !== $r->shouldBuffer) {
- throw new \LogicException('A chunk passthru must yield an "isFirst()" chunk before any content chunk.');
- }
- if (null !== $r->content && \strlen($content) !== fwrite($r->content, $content)) {
- $chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
- $r->info['error'] = $chunk->getError();
- $r->response->cancel();
- }
- }
- if (null !== $chunk->getError() || $chunk->isLast()) {
- $stream = $r->stream;
- $r->stream = null;
- unset($asyncMap[$response]);
- }
- if (null === $chunk->getError()) {
- $r->offset += \strlen($content);
- yield $r => $chunk;
- if (!$chunk->isLast()) {
- continue;
- }
- $stream->next();
- if ($stream->valid()) {
- throw new \LogicException('A chunk passthru cannot yield after an "isLast()" chunk.');
- }
- $r->passthru = null;
- } else {
- if ($chunk instanceof ErrorChunk) {
- $chunk->didThrow(false);
- } else {
- try {
- $chunk = new ErrorChunk($chunk->getOffset(), !$chunk->isTimeout() ?: $chunk->getError());
- } catch (TransportExceptionInterface $e) {
- $chunk = new ErrorChunk($chunk->getOffset(), $e);
- }
- }
- yield $r => $chunk;
- $chunk->didThrow() ?: $chunk->getContent();
- }
- break;
- }
- }
- private function openBuffer(): ?\Throwable
- {
- if (null === $shouldBuffer = $this->shouldBuffer) {
- throw new \LogicException('A chunk passthru cannot yield more than one "isFirst()" chunk.');
- }
- $e = $this->shouldBuffer = null;
- if ($shouldBuffer instanceof \Closure) {
- try {
- $shouldBuffer = $shouldBuffer($this->getHeaders(false));
- if (null !== $e = $this->response->getInfo('error')) {
- throw new TransportException($e);
- }
- } catch (\Throwable $e) {
- $this->info['error'] = $e->getMessage();
- $this->response->cancel();
- }
- }
- if (true === $shouldBuffer) {
- $this->content = fopen('php://temp', 'w+');
- } elseif (\is_resource($shouldBuffer)) {
- $this->content = $shouldBuffer;
- }
- return $e;
- }
- private function close(): void
- {
- $this->response->cancel();
- }
- }
|