AmpHttpClient.php 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\HttpClient;
  11. use Amp\CancelledException;
  12. use Amp\Http\Client\DelegateHttpClient;
  13. use Amp\Http\Client\InterceptedHttpClient;
  14. use Amp\Http\Client\PooledHttpClient;
  15. use Amp\Http\Client\Request;
  16. use Amp\Http\Tunnel\Http1TunnelConnector;
  17. use Amp\Promise;
  18. use Psr\Log\LoggerAwareInterface;
  19. use Psr\Log\LoggerAwareTrait;
  20. use Symfony\Component\HttpClient\Exception\TransportException;
  21. use Symfony\Component\HttpClient\Internal\AmpClientState;
  22. use Symfony\Component\HttpClient\Response\AmpResponse;
  23. use Symfony\Component\HttpClient\Response\ResponseStream;
  24. use Symfony\Contracts\HttpClient\HttpClientInterface;
  25. use Symfony\Contracts\HttpClient\ResponseInterface;
  26. use Symfony\Contracts\HttpClient\ResponseStreamInterface;
  27. use Symfony\Contracts\Service\ResetInterface;
  28. if (!interface_exists(DelegateHttpClient::class)) {
  29. throw new \LogicException('You cannot use "Symfony\Component\HttpClient\AmpHttpClient" as the "amphp/http-client" package is not installed. Try running "composer require amphp/http-client:^4.2.1".');
  30. }
  31. if (!interface_exists(Promise::class)) {
  32. throw new \LogicException('You cannot use "Symfony\Component\HttpClient\AmpHttpClient" as the installed "amphp/http-client" is not compatible with this version of "symfony/http-client". Try downgrading "amphp/http-client" to "^4.2.1".');
  33. }
  34. /**
  35. * A portable implementation of the HttpClientInterface contracts based on Amp's HTTP client.
  36. *
  37. * @author Nicolas Grekas <p@tchwork.com>
  38. */
  39. final class AmpHttpClient implements HttpClientInterface, LoggerAwareInterface, ResetInterface
  40. {
  41. use HttpClientTrait;
  42. use LoggerAwareTrait;
  43. private $defaultOptions = self::OPTIONS_DEFAULTS;
  44. private static $emptyDefaults = self::OPTIONS_DEFAULTS;
  45. /** @var AmpClientState */
  46. private $multi;
  47. /**
  48. * @param array $defaultOptions Default requests' options
  49. * @param callable|null $clientConfigurator A callable that builds a {@see DelegateHttpClient} from a {@see PooledHttpClient};
  50. * passing null builds an {@see InterceptedHttpClient} with 2 retries on failures
  51. * @param int $maxHostConnections The maximum number of connections to a single host
  52. * @param int $maxPendingPushes The maximum number of pushed responses to accept in the queue
  53. *
  54. * @see HttpClientInterface::OPTIONS_DEFAULTS for available options
  55. */
  56. public function __construct(array $defaultOptions = [], callable $clientConfigurator = null, int $maxHostConnections = 6, int $maxPendingPushes = 50)
  57. {
  58. $this->defaultOptions['buffer'] = $this->defaultOptions['buffer'] ?? \Closure::fromCallable([__CLASS__, 'shouldBuffer']);
  59. if ($defaultOptions) {
  60. [, $this->defaultOptions] = self::prepareRequest(null, null, $defaultOptions, $this->defaultOptions);
  61. }
  62. $this->multi = new AmpClientState($clientConfigurator, $maxHostConnections, $maxPendingPushes, $this->logger);
  63. }
  64. /**
  65. * @see HttpClientInterface::OPTIONS_DEFAULTS for available options
  66. *
  67. * {@inheritdoc}
  68. */
  69. public function request(string $method, string $url, array $options = []): ResponseInterface
  70. {
  71. [$url, $options] = self::prepareRequest($method, $url, $options, $this->defaultOptions);
  72. $options['proxy'] = self::getProxy($options['proxy'], $url, $options['no_proxy']);
  73. if (null !== $options['proxy'] && !class_exists(Http1TunnelConnector::class)) {
  74. throw new \LogicException('You cannot use the "proxy" option as the "amphp/http-tunnel" package is not installed. Try running "composer require amphp/http-tunnel".');
  75. }
  76. if ($options['bindto']) {
  77. if (0 === strpos($options['bindto'], 'if!')) {
  78. throw new TransportException(__CLASS__.' cannot bind to network interfaces, use e.g. CurlHttpClient instead.');
  79. }
  80. if (0 === strpos($options['bindto'], 'host!')) {
  81. $options['bindto'] = substr($options['bindto'], 5);
  82. }
  83. }
  84. if (('' !== $options['body'] || 'POST' === $method || isset($options['normalized_headers']['content-length'])) && !isset($options['normalized_headers']['content-type'])) {
  85. $options['headers'][] = 'Content-Type: application/x-www-form-urlencoded';
  86. }
  87. if (!isset($options['normalized_headers']['user-agent'])) {
  88. $options['headers'][] = 'User-Agent: Symfony HttpClient/Amp';
  89. }
  90. if (0 < $options['max_duration']) {
  91. $options['timeout'] = min($options['max_duration'], $options['timeout']);
  92. }
  93. if ($options['resolve']) {
  94. $this->multi->dnsCache = $options['resolve'] + $this->multi->dnsCache;
  95. }
  96. if ($options['peer_fingerprint'] && !isset($options['peer_fingerprint']['pin-sha256'])) {
  97. throw new TransportException(__CLASS__.' supports only "pin-sha256" fingerprints.');
  98. }
  99. $request = new Request(implode('', $url), $method);
  100. if ($options['http_version']) {
  101. switch ((float) $options['http_version']) {
  102. case 1.0: $request->setProtocolVersions(['1.0']); break;
  103. case 1.1: $request->setProtocolVersions(['1.1', '1.0']); break;
  104. default: $request->setProtocolVersions(['2', '1.1', '1.0']); break;
  105. }
  106. }
  107. foreach ($options['headers'] as $v) {
  108. $h = explode(': ', $v, 2);
  109. $request->addHeader($h[0], $h[1]);
  110. }
  111. $request->setTcpConnectTimeout(1000 * $options['timeout']);
  112. $request->setTlsHandshakeTimeout(1000 * $options['timeout']);
  113. $request->setTransferTimeout(1000 * $options['max_duration']);
  114. if (method_exists($request, 'setInactivityTimeout')) {
  115. $request->setInactivityTimeout(0);
  116. }
  117. if ('' !== $request->getUri()->getUserInfo() && !$request->hasHeader('authorization')) {
  118. $auth = explode(':', $request->getUri()->getUserInfo(), 2);
  119. $auth = array_map('rawurldecode', $auth) + [1 => ''];
  120. $request->setHeader('Authorization', 'Basic '.base64_encode(implode(':', $auth)));
  121. }
  122. return new AmpResponse($this->multi, $request, $options, $this->logger);
  123. }
  124. /**
  125. * {@inheritdoc}
  126. */
  127. public function stream($responses, float $timeout = null): ResponseStreamInterface
  128. {
  129. if ($responses instanceof AmpResponse) {
  130. $responses = [$responses];
  131. } elseif (!is_iterable($responses)) {
  132. throw new \TypeError(sprintf('"%s()" expects parameter 1 to be an iterable of AmpResponse objects, "%s" given.', __METHOD__, get_debug_type($responses)));
  133. }
  134. return new ResponseStream(AmpResponse::stream($responses, $timeout));
  135. }
  136. public function reset()
  137. {
  138. $this->multi->dnsCache = [];
  139. foreach ($this->multi->pushedResponses as $authority => $pushedResponses) {
  140. foreach ($pushedResponses as [$pushedUrl, $pushDeferred]) {
  141. $pushDeferred->fail(new CancelledException());
  142. if ($this->logger) {
  143. $this->logger->debug(sprintf('Unused pushed response: "%s"', $pushedUrl));
  144. }
  145. }
  146. }
  147. $this->multi->pushedResponses = [];
  148. }
  149. }