InteractsWithRpcClient.php 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. <?php
  2. namespace think\swoole\concerns;
  3. use Smf\ConnectionPool\ConnectionPool;
  4. use think\App;
  5. use think\swoole\Pool;
  6. use think\swoole\pool\Client;
  7. use think\swoole\rpc\client\Connector;
  8. use think\swoole\rpc\client\Gateway;
  9. use think\swoole\rpc\client\Proxy;
  10. use think\swoole\rpc\JsonParser;
  11. use Throwable;
  12. /**
  13. * Trait InteractsWithRpcClient
  14. * @package think\swoole\concerns
  15. * @property App $app
  16. * @property App $container
  17. * @method Pool getPools()
  18. */
  19. trait InteractsWithRpcClient
  20. {
  21. protected function prepareRpcClient()
  22. {
  23. $this->onEvent('workerStart', function () {
  24. $this->bindRpcClientPool();
  25. $this->bindRpcInterface();
  26. });
  27. }
  28. protected function bindRpcInterface()
  29. {
  30. //引入rpc接口文件
  31. if (file_exists($rpc = $this->container->getBasePath() . 'rpc.php')) {
  32. /** @noinspection PhpIncludeInspection */
  33. $rpcServices = (array) include $rpc;
  34. //绑定rpc接口
  35. try {
  36. foreach ($rpcServices as $name => $abstracts) {
  37. $parserClass = $this->getConfig("rpc.client.{$name}.parser", JsonParser::class);
  38. $parser = $this->getApplication()->make($parserClass);
  39. $gateway = new Gateway($this->createRpcConnector($name), $parser);
  40. $middleware = $this->getConfig("rpc.client.{$name}.middleware", []);
  41. foreach ($abstracts as $abstract) {
  42. $this->getApplication()
  43. ->bind($abstract, function (App $app) use ($middleware, $gateway, $name, $abstract) {
  44. return $app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway, $middleware]);
  45. });
  46. }
  47. }
  48. } catch (Throwable $e) {
  49. }
  50. }
  51. }
  52. protected function bindRpcClientPool()
  53. {
  54. if (!empty($clients = $this->getConfig('rpc.client'))) {
  55. //创建client连接池
  56. foreach ($clients as $name => $config) {
  57. $pool = new ConnectionPool(
  58. Pool::pullPoolConfig($config),
  59. new Client(),
  60. $config
  61. );
  62. $this->getPools()->add("rpc.client.{$name}", $pool);
  63. }
  64. }
  65. }
  66. protected function createRpcConnector($name)
  67. {
  68. $pool = $this->getPools()->get("rpc.client.{$name}");
  69. return new class($pool) implements Connector {
  70. use InteractsWithRpcConnector;
  71. protected $pool;
  72. public function __construct(ConnectionPool $pool)
  73. {
  74. $this->pool = $pool;
  75. }
  76. protected function runWithClient($callback)
  77. {
  78. /** @var \Swoole\Coroutine\Client $client */
  79. $client = $this->pool->borrow();
  80. try {
  81. return $callback($client);
  82. } finally {
  83. $this->pool->return($client);
  84. }
  85. }
  86. };
  87. }
  88. }