ConnectionPool.php 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. <?php
  2. namespace Smf\ConnectionPool;
  3. use Smf\ConnectionPool\Connectors\ConnectorInterface;
  4. use Swoole\Coroutine\Channel;
  5. use Swoole\Coroutine;
  6. class ConnectionPool implements ConnectionPoolInterface
  7. {
  8. /**@var float The timeout of the operation channel */
  9. const CHANNEL_TIMEOUT = 0.001;
  10. /**@var float The minimum interval to check the idle connections */
  11. const MIN_CHECK_IDLE_INTERVAL = 10;
  12. /**@var string The key about the last active time of connection */
  13. const KEY_LAST_ACTIVE_TIME = '__lat';
  14. /**@var bool Whether the connection pool is initialized */
  15. protected $initialized;
  16. /**@var bool Whether the connection pool is closed */
  17. protected $closed;
  18. /**@var Channel The connection pool */
  19. protected $pool;
  20. /**@var ConnectorInterface The connector */
  21. protected $connector;
  22. /**@var array The config of connection */
  23. protected $connectionConfig;
  24. /**@var int Current all connection count */
  25. protected $connectionCount = 0;
  26. /**@var int The minimum number of active connections */
  27. protected $minActive = 1;
  28. /**@var int The maximum number of active connections */
  29. protected $maxActive = 1;
  30. /**@var float The maximum waiting time for connection, when reached, an exception will be thrown */
  31. protected $maxWaitTime = 5;
  32. /**@var float The maximum idle time for the connection, when reached, the connection will be removed from pool, and keep the least $minActive connections in the pool */
  33. protected $maxIdleTime = 5;
  34. /**@var float The interval to check idle connection */
  35. protected $idleCheckInterval = 5;
  36. /**@var int The timer id of balancer */
  37. protected $balancerTimerId;
  38. /**
  39. * ConnectionPool constructor.
  40. * @param array $poolConfig The minimum number of active connections, the detail keys:
  41. * int minActive The minimum number of active connections
  42. * int maxActive The maximum number of active connections
  43. * float maxWaitTime The maximum waiting time for connection, when reached, an exception will be thrown
  44. * float maxIdleTime The maximum idle time for the connection, when reached, the connection will be removed from pool, and keep the least $minActive connections in the pool
  45. * float idleCheckInterval The interval to check idle connection
  46. * @param ConnectorInterface $connector The connector instance of ConnectorInterface
  47. * @param array $connectionConfig The config of connection
  48. */
  49. public function __construct(array $poolConfig, ConnectorInterface $connector, array $connectionConfig)
  50. {
  51. $this->initialized = false;
  52. $this->closed = false;
  53. $this->minActive = $poolConfig['minActive'] ?? 20;
  54. $this->maxActive = $poolConfig['maxActive'] ?? 100;
  55. $this->maxWaitTime = $poolConfig['maxWaitTime'] ?? 5;
  56. $this->maxIdleTime = $poolConfig['maxIdleTime'] ?? 30;
  57. $poolConfig['idleCheckInterval'] = $poolConfig['idleCheckInterval'] ?? 15;
  58. $this->idleCheckInterval = $poolConfig['idleCheckInterval'] >= static::MIN_CHECK_IDLE_INTERVAL ? $poolConfig['idleCheckInterval'] : static::MIN_CHECK_IDLE_INTERVAL;
  59. $this->connectionConfig = $connectionConfig;
  60. $this->connector = $connector;
  61. }
  62. /**
  63. * Initialize the connection pool
  64. * @return bool
  65. */
  66. public function init(): bool
  67. {
  68. if ($this->initialized) {
  69. return false;
  70. }
  71. $this->initialized = true;
  72. $this->pool = new Channel($this->maxActive);
  73. $this->balancerTimerId = $this->startBalanceTimer($this->idleCheckInterval);
  74. Coroutine::create(function () {
  75. for ($i = 0; $i < $this->minActive; $i++) {
  76. $connection = $this->createConnection();
  77. $ret = $this->pool->push($connection, static::CHANNEL_TIMEOUT);
  78. if ($ret === false) {
  79. $this->removeConnection($connection);
  80. }
  81. }
  82. });
  83. return true;
  84. }
  85. /**
  86. * Borrow a connection from the connection pool, throw an exception if timeout
  87. * @return mixed The connection resource
  88. * @throws BorrowConnectionTimeoutException
  89. * @throws \RuntimeException
  90. */
  91. public function borrow()
  92. {
  93. if (!$this->initialized) {
  94. throw new \RuntimeException('Please initialize the connection pool first, call $pool->init().');
  95. }
  96. if ($this->pool->isEmpty()) {
  97. // Create more connections
  98. if ($this->connectionCount < $this->maxActive) {
  99. return $this->createConnection();
  100. }
  101. }
  102. $connection = $this->pool->pop($this->maxWaitTime);
  103. if ($connection === false) {
  104. $exception = new BorrowConnectionTimeoutException(sprintf(
  105. 'Borrow the connection timeout in %.2f(s), connections in pool: %d, all connections: %d',
  106. $this->maxWaitTime,
  107. $this->pool->length(),
  108. $this->connectionCount
  109. ));
  110. $exception->setTimeout($this->maxWaitTime);
  111. throw $exception;
  112. }
  113. if ($this->connector->isConnected($connection)) {
  114. // Reset the connection for the connected connection
  115. $this->connector->reset($connection, $this->connectionConfig);
  116. } else {
  117. // Remove the disconnected connection, then create a new connection
  118. $this->removeConnection($connection);
  119. $connection = $this->createConnection();
  120. }
  121. return $connection;
  122. }
  123. /**
  124. * Return a connection to the connection pool
  125. * @param mixed $connection The connection resource
  126. * @return bool
  127. */
  128. public function return($connection): bool
  129. {
  130. if (!$this->connector->validate($connection)) {
  131. throw new \RuntimeException('Connection of unexpected type returned.');
  132. }
  133. if (!$this->initialized) {
  134. throw new \RuntimeException('Please initialize the connection pool first, call $pool->init().');
  135. }
  136. if ($this->pool->isFull()) {
  137. // Discard the connection
  138. $this->removeConnection($connection);
  139. return false;
  140. }
  141. $connection->{static::KEY_LAST_ACTIVE_TIME} = time();
  142. $ret = $this->pool->push($connection, static::CHANNEL_TIMEOUT);
  143. if ($ret === false) {
  144. $this->removeConnection($connection);
  145. }
  146. return true;
  147. }
  148. /**
  149. * Get the number of created connections
  150. * @return int
  151. */
  152. public function getConnectionCount(): int
  153. {
  154. return $this->connectionCount;
  155. }
  156. /**
  157. * Get the number of idle connections
  158. * @return int
  159. */
  160. public function getIdleCount(): int
  161. {
  162. return $this->pool->length();
  163. }
  164. /**
  165. * Close the connection pool and disconnect all connections
  166. * @return bool
  167. */
  168. public function close(): bool
  169. {
  170. if (!$this->initialized) {
  171. return false;
  172. }
  173. if ($this->closed) {
  174. return false;
  175. }
  176. $this->closed = true;
  177. swoole_timer_clear($this->balancerTimerId);
  178. Coroutine::create(function () {
  179. while (true) {
  180. if ($this->pool->isEmpty()) {
  181. break;
  182. }
  183. $connection = $this->pool->pop(static::CHANNEL_TIMEOUT);
  184. if ($connection !== false) {
  185. $this->connector->disconnect($connection);
  186. }
  187. }
  188. $this->pool->close();
  189. });
  190. return true;
  191. }
  192. public function __destruct()
  193. {
  194. $this->close();
  195. }
  196. protected function startBalanceTimer(float $interval)
  197. {
  198. return swoole_timer_tick(round($interval) * 1000, function () {
  199. $now = time();
  200. $validConnections = [];
  201. while (true) {
  202. if ($this->closed) {
  203. break;
  204. }
  205. if ($this->connectionCount <= $this->minActive) {
  206. break;
  207. }
  208. if ($this->pool->isEmpty()) {
  209. break;
  210. }
  211. $connection = $this->pool->pop(static::CHANNEL_TIMEOUT);
  212. if ($connection === false) {
  213. continue;
  214. }
  215. $lastActiveTime = $connection->{static::KEY_LAST_ACTIVE_TIME} ?? 0;
  216. if ($now - $lastActiveTime < $this->maxIdleTime) {
  217. $validConnections[] = $connection;
  218. } else {
  219. $this->removeConnection($connection);
  220. }
  221. }
  222. foreach ($validConnections as $validConnection) {
  223. $ret = $this->pool->push($validConnection, static::CHANNEL_TIMEOUT);
  224. if ($ret === false) {
  225. $this->removeConnection($validConnection);
  226. }
  227. }
  228. });
  229. }
  230. protected function createConnection()
  231. {
  232. $this->connectionCount++;
  233. $connection = $this->connector->connect($this->connectionConfig);
  234. $connection->{static::KEY_LAST_ACTIVE_TIME} = time();
  235. return $connection;
  236. }
  237. protected function removeConnection($connection)
  238. {
  239. $this->connectionCount--;
  240. Coroutine::create(function () use ($connection) {
  241. try {
  242. $this->connector->disconnect($connection);
  243. } catch (\Throwable $e) {
  244. // Ignore this exception.
  245. }
  246. });
  247. }
  248. }