diff --git a/src/__init__.php b/src/__init__.php index 2d6ccc38..fa9fbd6b 100644 --- a/src/__init__.php +++ b/src/__init__.php @@ -32,6 +32,9 @@ 'core/Coroutine/Barrier.php', 'core/Coroutine/Http/ClientProxy.php', 'core/Coroutine/Http/functions.php', + # # + 'core/Coroutine/Http2/Client2.php', + 'core/Coroutine/Http2/ChannelManager.php', # # 'core/ConnectionPool.php', 'core/Database/ObjectProxy.php', diff --git a/src/core/Coroutine/Http2/ChannelManager.php b/src/core/Coroutine/Http2/ChannelManager.php new file mode 100644 index 00000000..1f644a37 --- /dev/null +++ b/src/core/Coroutine/Http2/ChannelManager.php @@ -0,0 +1,61 @@ +channels[$streamId])) { + return $this->channels[$streamId]; + } + + if ($initialize) { + return $this->channels[$streamId] = $this->make(1); + } + + return null; + } + + public function make(int $limit): Channel + { + return new Channel($limit); + } + + public function close(int $streamId): void + { + if ($channel = $this->channels[$streamId] ?? null) { + $channel->close(); + } + + unset($this->channels[$streamId]); + } + + public function getChannels(): array + { + return $this->channels; + } + + public function flush(): void + { + $channels = $this->getChannels(); + $streamIds = array_keys($channels); + foreach ($streamIds as $streamId) { + $this->close($streamId); + } + } + + public function isEmpty(): bool + { + return count($this->channels) === 0; + } +} diff --git a/src/core/Coroutine/Http2/Client2.php b/src/core/Coroutine/Http2/Client2.php new file mode 100644 index 00000000..79646b19 --- /dev/null +++ b/src/core/Coroutine/Http2/Client2.php @@ -0,0 +1,147 @@ +channelManager = new ChannelManager(); + } + + public function request(Request $request, float $timeout = -1): false|Response + { + $this->loop(); + $streamId = $this->send($request); + $this->lastSendTime = time(); + + if ($streamId === false) { + return false; + } + $manager = $this->getChannelManager(); + $chan = $manager->get($streamId, true); + try { + $data = $chan->pop($timeout); + } finally { + $manager->close($streamId); + } + + return $data; + } + + public function close(): bool + { + $this->getChannelManager()->flush(); + $this->chan?->close(); + $this->chan = null; + $this->sleepChan?->close(); + $this->sleepChan = null; + return parent::close(); + } + + protected function getChannelManager(): ChannelManager + { + return $this->channelManager; + } + + protected function reconnect(): bool + { + parent::close(); + return parent::connect(); + } + + protected function loop(): void + { + $this->idleClose(); + + if ($this->chan !== null) { + return; + } + $this->chan = new Channel(65535); + + if (! $this->ping()) { + $this->reconnect(); + } + go( + function () { + $reason = ''; + try { + $chan = $this->chan; + while (true) { + $response = $this->recv(); + + if ($chan?->errCode !== SWOOLE_CHANNEL_OK) { + $reason = 'channel closed.'; + break; + } + + if ($response === false) { + $reason = 'client broken.'; + break; + } + + if ($channel = $this->getChannelManager()->get($response->streamId)) { + $channel->push($response); + } + } + } catch (Throwable $exception) { + swoole_error_log(SWOOLE_LOG_ERROR, (string) $exception); + } finally { + swoole_error_log(SWOOLE_LOG_DEBUG, 'Recv loop broken, wait to restart in next time. The reason is ' . $reason); + $this->close(); + } + } + ); + } + + protected function idleClose(): void + { + if (! $this->idleClose) { + $this->idleClose = true; + go( + function () { + try { + while (true) { + $this->sleep(3); + if ($this->chan === null) { + break; + } + if ($this->channelManager->isEmpty() && time() - $this->lastSendTime > 10) { + $this->close(); + break; + } + } + } finally { + $this->idleClose = false; + } + } + ); + } + } + + protected function sleep(float $timeout = -1): void + { + $this->sleepChan ??= new Channel(1); + $this->sleepChan->pop($timeout); + } +} diff --git a/tests/unit/Coroutine/Http2/ChannelManagerTest.php b/tests/unit/Coroutine/Http2/ChannelManagerTest.php new file mode 100644 index 00000000..27a0f057 --- /dev/null +++ b/tests/unit/Coroutine/Http2/ChannelManagerTest.php @@ -0,0 +1,59 @@ +get(1, true); + $this->assertInstanceOf(Channel::class, $chan); + $chan = $manager->get(1); + $this->assertInstanceOf(Channel::class, $chan); + go( + function () use ($chan) { + usleep(10 * 1000); + $chan->push('Hello World.'); + } + ); + + $this->assertSame('Hello World.', $chan->pop()); + $manager->close(1); + $this->assertNull($manager->get(1)); + } + ); + } + + public function testChannelFlush() + { + run( + function () { + $manager = new ChannelManager(); + $manager->get(1, true); + $manager->get(2, true); + $manager->get(4, true); + $manager->get(5, true); + + $this->assertSame(4, count($manager->getChannels())); + $manager->flush(); + $this->assertSame(0, count($manager->getChannels())); + } + ); + } +} diff --git a/tests/unit/Coroutine/Http2/Client2Test.php b/tests/unit/Coroutine/Http2/Client2Test.php new file mode 100644 index 00000000..1b8aca0f --- /dev/null +++ b/tests/unit/Coroutine/Http2/Client2Test.php @@ -0,0 +1,57 @@ +set([ + 'timeout' => -1, + 'ssl_host_name' => $domain, + ]); + $client->connect(); + for ($i = 1; $i < 30; ++$i) { + go(function () use ($client, $i) { + $req = new Request(); + $req->method = 'POST'; + $req->path = '/post'; + $req->headers = [ + 'host' => '127.0.0.1', + 'user-agent' => 'Chrome/49.0.2587.3', + 'accept' => 'text/html,application/xhtml+xml,application/xml', + 'accept-encoding' => 'gzip', + ]; + $req->data = (string) $i; + $data = $client->request($req); + $result = json_decode($data->data, true); + $this->assertEquals($i, (int) $result['data']); + }); + } + }); + } +}