AmpBody.php 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\HttpClient\Internal;
  11. use Amp\ByteStream\InputStream;
  12. use Amp\ByteStream\ResourceInputStream;
  13. use Amp\Http\Client\RequestBody;
  14. use Amp\Promise;
  15. use Amp\Success;
  16. use Symfony\Component\HttpClient\Exception\TransportException;
  17. /**
  18. * @author Nicolas Grekas <p@tchwork.com>
  19. *
  20. * @internal
  21. */
  22. class AmpBody implements RequestBody, InputStream
  23. {
  24. private $body;
  25. private $info;
  26. private $onProgress;
  27. private $offset = 0;
  28. private $length = -1;
  29. private $uploaded;
  30. public function __construct($body, &$info, \Closure $onProgress)
  31. {
  32. $this->body = $body;
  33. $this->info = &$info;
  34. $this->onProgress = $onProgress;
  35. if (\is_resource($body)) {
  36. $this->offset = ftell($body);
  37. $this->length = fstat($body)['size'];
  38. $this->body = new ResourceInputStream($body);
  39. } elseif (\is_string($body)) {
  40. $this->length = \strlen($body);
  41. }
  42. }
  43. public function createBodyStream(): InputStream
  44. {
  45. if (null !== $this->uploaded) {
  46. $this->uploaded = null;
  47. if (\is_string($this->body)) {
  48. $this->offset = 0;
  49. } elseif ($this->body instanceof ResourceInputStream) {
  50. fseek($this->body->getResource(), $this->offset);
  51. }
  52. }
  53. return $this;
  54. }
  55. public function getHeaders(): Promise
  56. {
  57. return new Success([]);
  58. }
  59. public function getBodyLength(): Promise
  60. {
  61. return new Success($this->length - $this->offset);
  62. }
  63. public function read(): Promise
  64. {
  65. $this->info['size_upload'] += $this->uploaded;
  66. $this->uploaded = 0;
  67. ($this->onProgress)();
  68. $chunk = $this->doRead();
  69. $chunk->onResolve(function ($e, $data) {
  70. if (null !== $data) {
  71. $this->uploaded = \strlen($data);
  72. } else {
  73. $this->info['upload_content_length'] = $this->info['size_upload'];
  74. }
  75. });
  76. return $chunk;
  77. }
  78. public static function rewind(RequestBody $body): RequestBody
  79. {
  80. if (!$body instanceof self) {
  81. return $body;
  82. }
  83. $body->uploaded = null;
  84. if ($body->body instanceof ResourceInputStream) {
  85. fseek($body->body->getResource(), $body->offset);
  86. return new $body($body->body, $body->info, $body->onProgress);
  87. }
  88. if (\is_string($body->body)) {
  89. $body->offset = 0;
  90. }
  91. return $body;
  92. }
  93. private function doRead(): Promise
  94. {
  95. if ($this->body instanceof ResourceInputStream) {
  96. return $this->body->read();
  97. }
  98. if (null === $this->offset || !$this->length) {
  99. return new Success();
  100. }
  101. if (\is_string($this->body)) {
  102. $this->offset = null;
  103. return new Success($this->body);
  104. }
  105. if ('' === $data = ($this->body)(16372)) {
  106. $this->offset = null;
  107. return new Success();
  108. }
  109. if (!\is_string($data)) {
  110. throw new TransportException(sprintf('Return value of the "body" option callback must be string, "%s" returned.', get_debug_type($data)));
  111. }
  112. return new Success($data);
  113. }
  114. }