AmpResponse.php 17 KB

  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\HttpClient\Response;
  11. use Amp\ByteStream\StreamException;
  12. use Amp\CancellationTokenSource;
  13. use Amp\Coroutine;
  14. use Amp\Deferred;
  15. use Amp\Http\Client\HttpException;
  16. use Amp\Http\Client\Request;
  17. use Amp\Http\Client\Response;
  18. use Amp\Loop;
  19. use Amp\Promise;
  20. use Amp\Success;
  21. use Psr\Log\LoggerInterface;
  22. use Symfony\Component\HttpClient\Chunk\FirstChunk;
  23. use Symfony\Component\HttpClient\Chunk\InformationalChunk;
  24. use Symfony\Component\HttpClient\Exception\InvalidArgumentException;
  25. use Symfony\Component\HttpClient\Exception\TransportException;
  26. use Symfony\Component\HttpClient\HttpClientTrait;
  27. use Symfony\Component\HttpClient\Internal\AmpBody;
  28. use Symfony\Component\HttpClient\Internal\AmpClientState;
  29. use Symfony\Component\HttpClient\Internal\Canary;
  30. use Symfony\Component\HttpClient\Internal\ClientState;
  31. use Symfony\Contracts\HttpClient\ResponseInterface;
  32. /**
  33. * @author Nicolas Grekas <>
  34. *
  35. * @internal
  36. */
  37. final class AmpResponse implements ResponseInterface, StreamableInterface
  38. {
  39. use CommonResponseTrait;
  40. use TransportResponseTrait;
  41. private static $nextId = 'a';
  42. private $multi;
  43. private $options;
  44. private $onProgress;
  45. private static $delay;
  46. /**
  47. * @internal
  48. */
  49. public function __construct(AmpClientState $multi, Request $request, array $options, ?LoggerInterface $logger)
  50. {
  51. $this->multi = $multi;
  52. $this->options = &$options;
  53. $this->logger = $logger;
  54. $this->timeout = $options['timeout'];
  55. $this->shouldBuffer = $options['buffer'];
  56. if ($this->inflate = \extension_loaded('zlib') && !$request->hasHeader('accept-encoding')) {
  57. $request->setHeader('Accept-Encoding', 'gzip');
  58. }
  59. $this->initializer = static function (self $response) {
  60. return null !== $response->options;
  61. };
  62. $info = &$this->info;
  63. $headers = &$this->headers;
  64. $canceller = new CancellationTokenSource();
  65. $handle = &$this->handle;
  66. $info['url'] = (string) $request->getUri();
  67. $info['http_method'] = $request->getMethod();
  68. $info['start_time'] = null;
  69. $info['redirect_url'] = null;
  70. $info['redirect_time'] = 0.0;
  71. $info['redirect_count'] = 0;
  72. $info['size_upload'] = 0.0;
  73. $info['size_download'] = 0.0;
  74. $info['upload_content_length'] = -1.0;
  75. $info['download_content_length'] = -1.0;
  76. $info['user_data'] = $options['user_data'];
  77. $info['max_duration'] = $options['max_duration'];
  78. $info['debug'] = '';
  79. $onProgress = $options['on_progress'] ?? static function () {};
  80. $onProgress = $this->onProgress = static function () use (&$info, $onProgress) {
  81. $info['total_time'] = microtime(true) - $info['start_time'];
  82. $onProgress((int) $info['size_download'], ((int) (1 + $info['download_content_length']) ?: 1) - 1, (array) $info);
  83. };
  84. $pauseDeferred = new Deferred();
  85. $pause = new Success();
  86. $throttleWatcher = null;
  87. $this->id = $id = self::$nextId++;
  88. Loop::defer(static function () use ($request, $multi, &$id, &$info, &$headers, $canceller, &$options, $onProgress, &$handle, $logger, &$pause) {
  89. return new Coroutine(self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger, $pause));
  90. });
  91. $info['pause_handler'] = static function (float $duration) use (&$throttleWatcher, &$pauseDeferred, &$pause) {
  92. if (null !== $throttleWatcher) {
  93. Loop::cancel($throttleWatcher);
  94. }
  95. $pause = $pauseDeferred->promise();
  96. if ($duration <= 0) {
  97. $deferred = $pauseDeferred;
  98. $pauseDeferred = new Deferred();
  99. $deferred->resolve();
  100. } else {
  101. $throttleWatcher = Loop::delay(ceil(1000 * $duration), static function () use (&$pauseDeferred) {
  102. $deferred = $pauseDeferred;
  103. $pauseDeferred = new Deferred();
  104. $deferred->resolve();
  105. });
  106. }
  107. };
  108. $multi->lastTimeout = null;
  109. $multi->openHandles[$id] = $id;
  110. ++$multi->responseCount;
  111. $this->canary = new Canary(static function () use ($canceller, $multi, $id) {
  112. $canceller->cancel();
  113. unset($multi->openHandles[$id], $multi->handlesActivity[$id]);
  114. });
  115. }
  116. /**
  117. * {@inheritdoc}
  118. */
  119. public function getInfo(string $type = null)
  120. {
  121. return null !== $type ? $this->info[$type] ?? null : $this->info;
  122. }
  123. public function __sleep(): array
  124. {
  125. throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
  126. }
  127. public function __wakeup()
  128. {
  129. throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
  130. }
  131. public function __destruct()
  132. {
  133. try {
  134. $this->doDestruct();
  135. } finally {
  136. // Clear the DNS cache when all requests completed
  137. if (0 >= --$this->multi->responseCount) {
  138. $this->multi->responseCount = 0;
  139. $this->multi->dnsCache = [];
  140. }
  141. }
  142. }
  143. /**
  144. * {@inheritdoc}
  145. */
  146. private static function schedule(self $response, array &$runningResponses): void
  147. {
  148. if (isset($runningResponses[0])) {
  149. $runningResponses[0][1][$response->id] = $response;
  150. } else {
  151. $runningResponses[0] = [$response->multi, [$response->id => $response]];
  152. }
  153. if (!isset($response->multi->openHandles[$response->id])) {
  154. $response->multi->handlesActivity[$response->id][] = null;
  155. $response->multi->handlesActivity[$response->id][] = null !== $response->info['error'] ? new TransportException($response->info['error']) : null;
  156. }
  157. }
  158. /**
  159. * {@inheritdoc}
  160. *
  161. * @param AmpClientState $multi
  162. */
  163. private static function perform(ClientState $multi, array &$responses = null): void
  164. {
  165. if ($responses) {
  166. foreach ($responses as $response) {
  167. try {
  168. if ($response->info['start_time']) {
  169. $response->info['total_time'] = microtime(true) - $response->info['start_time'];
  170. ($response->onProgress)();
  171. }
  172. } catch (\Throwable $e) {
  173. $multi->handlesActivity[$response->id][] = null;
  174. $multi->handlesActivity[$response->id][] = $e;
  175. }
  176. }
  177. }
  178. }
  179. /**
  180. * {@inheritdoc}
  181. *
  182. * @param AmpClientState $multi
  183. */
  184. private static function select(ClientState $multi, float $timeout): int
  185. {
  186. $timeout += microtime(true);
  187. self::$delay = Loop::defer(static function () use ($timeout) {
  188. if (0 < $timeout -= microtime(true)) {
  189. self::$delay = Loop::delay(ceil(1000 * $timeout), [Loop::class, 'stop']);
  190. } else {
  191. Loop::stop();
  192. }
  193. });
  194. Loop::run();
  195. return null === self::$delay ? 1 : 0;
  196. }
  197. private static function generateResponse(Request $request, AmpClientState $multi, string $id, array &$info, array &$headers, CancellationTokenSource $canceller, array &$options, \Closure $onProgress, &$handle, ?LoggerInterface $logger, Promise &$pause)
  198. {
  199. $request->setInformationalResponseHandler(static function (Response $response) use ($multi, $id, &$info, &$headers) {
  200. self::addResponseHeaders($response, $info, $headers);
  201. $multi->handlesActivity[$id][] = new InformationalChunk($response->getStatus(), $response->getHeaders());
  202. self::stopLoop();
  203. });
  204. try {
  205. /* @var Response $response */
  206. if (null === $response = yield from self::getPushedResponse($request, $multi, $info, $headers, $options, $logger)) {
  207. $logger && $logger->info(sprintf('Request: "%s %s"', $info['http_method'], $info['url']));
  208. $response = yield from self::followRedirects($request, $multi, $info, $headers, $canceller, $options, $onProgress, $handle, $logger, $pause);
  209. }
  210. $options = null;
  211. $multi->handlesActivity[$id][] = new FirstChunk();
  212. if ('HEAD' === $response->getRequest()->getMethod() || \in_array($info['http_code'], [204, 304], true)) {
  213. $multi->handlesActivity[$id][] = null;
  214. $multi->handlesActivity[$id][] = null;
  215. self::stopLoop();
  216. return;
  217. }
  218. if ($response->hasHeader('content-length')) {
  219. $info['download_content_length'] = (float) $response->getHeader('content-length');
  220. }
  221. $body = $response->getBody();
  222. while (true) {
  223. self::stopLoop();
  224. yield $pause;
  225. if (null === $data = yield $body->read()) {
  226. break;
  227. }
  228. $info['size_download'] += \strlen($data);
  229. $multi->handlesActivity[$id][] = $data;
  230. }
  231. $multi->handlesActivity[$id][] = null;
  232. $multi->handlesActivity[$id][] = null;
  233. } catch (\Throwable $e) {
  234. $multi->handlesActivity[$id][] = null;
  235. $multi->handlesActivity[$id][] = $e;
  236. } finally {
  237. $info['download_content_length'] = $info['size_download'];
  238. }
  239. self::stopLoop();
  240. }
  241. private static function followRedirects(Request $originRequest, AmpClientState $multi, array &$info, array &$headers, CancellationTokenSource $canceller, array $options, \Closure $onProgress, &$handle, ?LoggerInterface $logger, Promise &$pause)
  242. {
  243. yield $pause;
  244. $originRequest->setBody(new AmpBody($options['body'], $info, $onProgress));
  245. $response = yield $multi->request($options, $originRequest, $canceller->getToken(), $info, $onProgress, $handle);
  246. $previousUrl = null;
  247. while (true) {
  248. self::addResponseHeaders($response, $info, $headers);
  249. $status = $response->getStatus();
  250. if (!\in_array($status, [301, 302, 303, 307, 308], true) || null === $location = $response->getHeader('location')) {
  251. return $response;
  252. }
  253. $urlResolver = new class() {
  254. use HttpClientTrait {
  255. parseUrl as public;
  256. resolveUrl as public;
  257. }
  258. };
  259. try {
  260. $previousUrl = $previousUrl ?? $urlResolver::parseUrl($info['url']);
  261. $location = $urlResolver::parseUrl($location);
  262. $location = $urlResolver::resolveUrl($location, $previousUrl);
  263. $info['redirect_url'] = implode('', $location);
  264. } catch (InvalidArgumentException $e) {
  265. return $response;
  266. }
  267. if (0 >= $options['max_redirects'] || $info['redirect_count'] >= $options['max_redirects']) {
  268. return $response;
  269. }
  270. $logger && $logger->info(sprintf('Redirecting: "%s %s"', $status, $info['url']));
  271. try {
  272. // Discard body of redirects
  273. while (null !== yield $response->getBody()->read()) {
  274. }
  275. } catch (HttpException|StreamException $e) {
  276. // Ignore streaming errors on previous responses
  277. }
  278. ++$info['redirect_count'];
  279. $info['url'] = $info['redirect_url'];
  280. $info['redirect_url'] = null;
  281. $previousUrl = $location;
  282. $request = new Request($info['url'], $info['http_method']);
  283. $request->setProtocolVersions($originRequest->getProtocolVersions());
  284. $request->setTcpConnectTimeout($originRequest->getTcpConnectTimeout());
  285. $request->setTlsHandshakeTimeout($originRequest->getTlsHandshakeTimeout());
  286. $request->setTransferTimeout($originRequest->getTransferTimeout());
  287. if (\in_array($status, [301, 302, 303], true)) {
  288. $originRequest->removeHeader('transfer-encoding');
  289. $originRequest->removeHeader('content-length');
  290. $originRequest->removeHeader('content-type');
  291. // Do like curl and browsers: turn POST to GET on 301, 302 and 303
  292. if ('POST' === $response->getRequest()->getMethod() || 303 === $status) {
  293. $info['http_method'] = 'HEAD' === $response->getRequest()->getMethod() ? 'HEAD' : 'GET';
  294. $request->setMethod($info['http_method']);
  295. }
  296. } else {
  297. $request->setBody(AmpBody::rewind($response->getRequest()->getBody()));
  298. }
  299. foreach ($originRequest->getRawHeaders() as [$name, $value]) {
  300. $request->addHeader($name, $value);
  301. }
  302. if ($request->getUri()->getAuthority() !== $originRequest->getUri()->getAuthority()) {
  303. $request->removeHeader('authorization');
  304. $request->removeHeader('cookie');
  305. $request->removeHeader('host');
  306. }
  307. yield $pause;
  308. $response = yield $multi->request($options, $request, $canceller->getToken(), $info, $onProgress, $handle);
  309. $info['redirect_time'] = microtime(true) - $info['start_time'];
  310. }
  311. }
  312. private static function addResponseHeaders(Response $response, array &$info, array &$headers): void
  313. {
  314. $info['http_code'] = $response->getStatus();
  315. if ($headers) {
  316. $info['debug'] .= "< \r\n";
  317. $headers = [];
  318. }
  319. $h = sprintf('HTTP/%s %s %s', $response->getProtocolVersion(), $response->getStatus(), $response->getReason());
  320. $info['debug'] .= "< {$h}\r\n";
  321. $info['response_headers'][] = $h;
  322. foreach ($response->getRawHeaders() as [$name, $value]) {
  323. $headers[strtolower($name)][] = $value;
  324. $h = $name.': '.$value;
  325. $info['debug'] .= "< {$h}\r\n";
  326. $info['response_headers'][] = $h;
  327. }
  328. $info['debug'] .= "< \r\n";
  329. }
  330. /**
  331. * Accepts pushed responses only if their headers related to authentication match the request.
  332. */
  333. private static function getPushedResponse(Request $request, AmpClientState $multi, array &$info, array &$headers, array $options, ?LoggerInterface $logger)
  334. {
  335. if ('' !== $options['body']) {
  336. return null;
  337. }
  338. $authority = $request->getUri()->getAuthority();
  339. foreach ($multi->pushedResponses[$authority] ?? [] as $i => [$pushedUrl, $pushDeferred, $pushedRequest, $pushedResponse, $parentOptions]) {
  340. if ($info['url'] !== $pushedUrl || $info['http_method'] !== $pushedRequest->getMethod()) {
  341. continue;
  342. }
  343. foreach ($parentOptions as $k => $v) {
  344. if ($options[$k] !== $v) {
  345. continue 2;
  346. }
  347. }
  348. foreach (['authorization', 'cookie', 'range', 'proxy-authorization'] as $k) {
  349. if ($pushedRequest->getHeaderArray($k) !== $request->getHeaderArray($k)) {
  350. continue 2;
  351. }
  352. }
  353. $response = yield $pushedResponse;
  354. foreach ($response->getHeaderArray('vary') as $vary) {
  355. foreach (preg_split('/\s*+,\s*+/', $vary) as $v) {
  356. if ('*' === $v || ($pushedRequest->getHeaderArray($v) !== $request->getHeaderArray($v) && 'accept-encoding' !== strtolower($v))) {
  357. $logger && $logger->debug(sprintf('Skipping pushed response: "%s"', $info['url']));
  358. continue 3;
  359. }
  360. }
  361. }
  362. $pushDeferred->resolve();
  363. $logger && $logger->debug(sprintf('Accepting pushed response: "%s %s"', $info['http_method'], $info['url']));
  364. self::addResponseHeaders($response, $info, $headers);
  365. unset($multi->pushedResponses[$authority][$i]);
  366. if (!$multi->pushedResponses[$authority]) {
  367. unset($multi->pushedResponses[$authority]);
  368. }
  369. return $response;
  370. }
  371. }
  372. private static function stopLoop(): void
  373. {
  374. if (null !== self::$delay) {
  375. Loop::cancel(self::$delay);
  376. self::$delay = null;
  377. }
  378. Loop::defer([Loop::class, 'stop']);
  379. }
  380. }