TransportResponseTrait.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\HttpClient\Response;
  11. use Symfony\Component\HttpClient\Chunk\DataChunk;
  12. use Symfony\Component\HttpClient\Chunk\ErrorChunk;
  13. use Symfony\Component\HttpClient\Chunk\FirstChunk;
  14. use Symfony\Component\HttpClient\Chunk\LastChunk;
  15. use Symfony\Component\HttpClient\Exception\TransportException;
  16. use Symfony\Component\HttpClient\Internal\ClientState;
  17. /**
  18. * Implements common logic for transport-level response classes.
  19. *
  20. * @author Nicolas Grekas <p@tchwork.com>
  21. *
  22. * @internal
  23. */
  24. trait TransportResponseTrait
  25. {
  26. private $canary;
  27. private $headers = [];
  28. private $info = [
  29. 'response_headers' => [],
  30. 'http_code' => 0,
  31. 'error' => null,
  32. 'canceled' => false,
  33. ];
  34. /** @var object|resource */
  35. private $handle;
  36. private $id;
  37. private $timeout = 0;
  38. private $inflate;
  39. private $finalInfo;
  40. private $logger;
  41. /**
  42. * {@inheritdoc}
  43. */
  44. public function getStatusCode(): int
  45. {
  46. if ($this->initializer) {
  47. self::initialize($this);
  48. }
  49. return $this->info['http_code'];
  50. }
  51. /**
  52. * {@inheritdoc}
  53. */
  54. public function getHeaders(bool $throw = true): array
  55. {
  56. if ($this->initializer) {
  57. self::initialize($this);
  58. }
  59. if ($throw) {
  60. $this->checkStatusCode();
  61. }
  62. return $this->headers;
  63. }
  64. /**
  65. * {@inheritdoc}
  66. */
  67. public function cancel(): void
  68. {
  69. $this->info['canceled'] = true;
  70. $this->info['error'] = 'Response has been canceled.';
  71. $this->close();
  72. }
  73. /**
  74. * Closes the response and all its network handles.
  75. */
  76. protected function close(): void
  77. {
  78. $this->canary->cancel();
  79. $this->inflate = null;
  80. }
  81. /**
  82. * Adds pending responses to the activity list.
  83. */
  84. abstract protected static function schedule(self $response, array &$runningResponses): void;
  85. /**
  86. * Performs all pending non-blocking operations.
  87. */
  88. abstract protected static function perform(ClientState $multi, array &$responses): void;
  89. /**
  90. * Waits for network activity.
  91. */
  92. abstract protected static function select(ClientState $multi, float $timeout): int;
  93. private static function addResponseHeaders(array $responseHeaders, array &$info, array &$headers, string &$debug = ''): void
  94. {
  95. foreach ($responseHeaders as $h) {
  96. if (11 <= \strlen($h) && '/' === $h[4] && preg_match('#^HTTP/\d+(?:\.\d+)? (\d\d\d)(?: |$)#', $h, $m)) {
  97. if ($headers) {
  98. $debug .= "< \r\n";
  99. $headers = [];
  100. }
  101. $info['http_code'] = (int) $m[1];
  102. } elseif (2 === \count($m = explode(':', $h, 2))) {
  103. $headers[strtolower($m[0])][] = ltrim($m[1]);
  104. }
  105. $debug .= "< {$h}\r\n";
  106. $info['response_headers'][] = $h;
  107. }
  108. $debug .= "< \r\n";
  109. }
  110. /**
  111. * Ensures the request is always sent and that the response code was checked.
  112. */
  113. private function doDestruct()
  114. {
  115. $this->shouldBuffer = true;
  116. if ($this->initializer && null === $this->info['error']) {
  117. self::initialize($this);
  118. $this->checkStatusCode();
  119. }
  120. }
  121. /**
  122. * Implements an event loop based on a buffer activity queue.
  123. *
  124. * @param iterable<array-key, self> $responses
  125. *
  126. * @internal
  127. */
  128. public static function stream(iterable $responses, float $timeout = null): \Generator
  129. {
  130. $runningResponses = [];
  131. foreach ($responses as $response) {
  132. self::schedule($response, $runningResponses);
  133. }
  134. $lastActivity = microtime(true);
  135. $elapsedTimeout = 0;
  136. if ($fromLastTimeout = 0.0 === $timeout && '-0' === (string) $timeout) {
  137. $timeout = null;
  138. } elseif ($fromLastTimeout = 0 > $timeout) {
  139. $timeout = -$timeout;
  140. }
  141. while (true) {
  142. $hasActivity = false;
  143. $timeoutMax = 0;
  144. $timeoutMin = $timeout ?? \INF;
  145. /** @var ClientState $multi */
  146. foreach ($runningResponses as $i => [$multi]) {
  147. $responses = &$runningResponses[$i][1];
  148. self::perform($multi, $responses);
  149. foreach ($responses as $j => $response) {
  150. $timeoutMax = $timeout ?? max($timeoutMax, $response->timeout);
  151. $timeoutMin = min($timeoutMin, $response->timeout, 1);
  152. $chunk = false;
  153. if ($fromLastTimeout && null !== $multi->lastTimeout) {
  154. $elapsedTimeout = microtime(true) - $multi->lastTimeout;
  155. }
  156. if (isset($multi->handlesActivity[$j])) {
  157. $multi->lastTimeout = null;
  158. } elseif (!isset($multi->openHandles[$j])) {
  159. unset($responses[$j]);
  160. continue;
  161. } elseif ($elapsedTimeout >= $timeoutMax) {
  162. $multi->handlesActivity[$j] = [new ErrorChunk($response->offset, sprintf('Idle timeout reached for "%s".', $response->getInfo('url')))];
  163. $multi->lastTimeout ?? $multi->lastTimeout = $lastActivity;
  164. } else {
  165. continue;
  166. }
  167. while ($multi->handlesActivity[$j] ?? false) {
  168. $hasActivity = true;
  169. $elapsedTimeout = 0;
  170. if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) {
  171. if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) {
  172. $multi->handlesActivity[$j] = [null, new TransportException(sprintf('Error while processing content unencoding for "%s".', $response->getInfo('url')))];
  173. continue;
  174. }
  175. if ('' !== $chunk && null !== $response->content && \strlen($chunk) !== fwrite($response->content, $chunk)) {
  176. $multi->handlesActivity[$j] = [null, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($chunk)))];
  177. continue;
  178. }
  179. $chunkLen = \strlen($chunk);
  180. $chunk = new DataChunk($response->offset, $chunk);
  181. $response->offset += $chunkLen;
  182. } elseif (null === $chunk) {
  183. $e = $multi->handlesActivity[$j][0];
  184. unset($responses[$j], $multi->handlesActivity[$j]);
  185. $response->close();
  186. if (null !== $e) {
  187. $response->info['error'] = $e->getMessage();
  188. if ($e instanceof \Error) {
  189. throw $e;
  190. }
  191. $chunk = new ErrorChunk($response->offset, $e);
  192. } else {
  193. if (0 === $response->offset && null === $response->content) {
  194. $response->content = fopen('php://memory', 'w+');
  195. }
  196. $chunk = new LastChunk($response->offset);
  197. }
  198. } elseif ($chunk instanceof ErrorChunk) {
  199. unset($responses[$j]);
  200. $elapsedTimeout = $timeoutMax;
  201. } elseif ($chunk instanceof FirstChunk) {
  202. if ($response->logger) {
  203. $info = $response->getInfo();
  204. $response->logger->info(sprintf('Response: "%s %s"', $info['http_code'], $info['url']));
  205. }
  206. $response->inflate = \extension_loaded('zlib') && $response->inflate && 'gzip' === ($response->headers['content-encoding'][0] ?? null) ? inflate_init(\ZLIB_ENCODING_GZIP) : null;
  207. if ($response->shouldBuffer instanceof \Closure) {
  208. try {
  209. $response->shouldBuffer = ($response->shouldBuffer)($response->headers);
  210. if (null !== $response->info['error']) {
  211. throw new TransportException($response->info['error']);
  212. }
  213. } catch (\Throwable $e) {
  214. $response->close();
  215. $multi->handlesActivity[$j] = [null, $e];
  216. }
  217. }
  218. if (true === $response->shouldBuffer) {
  219. $response->content = fopen('php://temp', 'w+');
  220. } elseif (\is_resource($response->shouldBuffer)) {
  221. $response->content = $response->shouldBuffer;
  222. }
  223. $response->shouldBuffer = null;
  224. yield $response => $chunk;
  225. if ($response->initializer && null === $response->info['error']) {
  226. // Ensure the HTTP status code is always checked
  227. $response->getHeaders(true);
  228. }
  229. continue;
  230. }
  231. yield $response => $chunk;
  232. }
  233. unset($multi->handlesActivity[$j]);
  234. if ($chunk instanceof ErrorChunk && !$chunk->didThrow()) {
  235. // Ensure transport exceptions are always thrown
  236. $chunk->getContent();
  237. }
  238. }
  239. if (!$responses) {
  240. unset($runningResponses[$i]);
  241. }
  242. // Prevent memory leaks
  243. $multi->handlesActivity = $multi->handlesActivity ?: [];
  244. $multi->openHandles = $multi->openHandles ?: [];
  245. }
  246. if (!$runningResponses) {
  247. break;
  248. }
  249. if ($hasActivity) {
  250. $lastActivity = microtime(true);
  251. continue;
  252. }
  253. if (-1 === self::select($multi, min($timeoutMin, $timeoutMax - $elapsedTimeout))) {
  254. usleep(min(500, 1E6 * $timeoutMin));
  255. }
  256. $elapsedTimeout = microtime(true) - $lastActivity;
  257. }
  258. }
  259. }