From 068d3f4be9e4e5fa00209143ba0a6d56d0cb72fb Mon Sep 17 00:00:00 2001 From: EchoNoch Date: Thu, 28 May 2026 23:55:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=8B=E5=8A=9B=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- contract.md | 25 +++++++--- readme.md | 4 ++ src/Agent/AgentClient.php | 64 +++++++++++++++++++++--- src/Server/AgentListener.php | 95 +++++++++++++++++++++++++++++++++--- 4 files changed, 167 insertions(+), 21 deletions(-) diff --git a/contract.md b/contract.md index 8281124..9942191 100644 --- a/contract.md +++ b/contract.md @@ -160,6 +160,15 @@ Completed in this checkpoint: * TCP `DATA` payloads now use binary frame encoding when both ends run the updated code. * This removes base64 expansion and JSON string encoding from the hot TCP data path. * Verified binary TCP `DATA` frame encode/decode under both `none` and `chacha20`. +* Added POP-side target DNS pre-resolution: + * Domain resolution failures return `OPEN_FAIL` with `dns_resolution_failed`. + * POP no longer lets common target DNS failures bubble up as raw `stream_socket_client()` warnings. +* Added TCP backpressure for large transfers: + * POP pauses target reads when the Agent connection send buffer crosses the high watermark. + * POP resumes target reads when the Agent connection drains. + * Client Agent pauses local client reads when the POP connection send buffer crosses the high watermark. + * Client Agent pauses POP reads while a local client output buffer is full. + * Send buffer limits are raised to 32 MiB with a 16 MiB backpressure high watermark. Known MVP limitations: @@ -173,7 +182,7 @@ Known MVP limitations: * No TLS yet. * No production-grade client identity yet; `dev-token` is hardcoded for MVP development. * No automated integration test harness yet. -* TCP stream forwarding is still single Agent-to-POP connection based; binary `DATA` frames reduce per-byte overhead, but KCP/multipath/parallel transport and flow-control tuning are still future performance work. +* TCP stream forwarding is still single Agent-to-POP connection based; binary `DATA` frames and backpressure reduce per-byte overhead and buffer blowups, but KCP/multipath/parallel transport and per-session flow-control tuning are still future performance work. * No explicit idle timeout or connect timeout enforcement yet. * UDP relay is datagram-oriented and currently creates short-lived POP-side UDP sockets per outbound datagram; pooling and stronger timeout accounting are still future work. * HTTP proxy supports `CONNECT` and ordinary absolute URL HTTP requests; advanced proxy auth and full HTTP/2 proxying are not implemented. @@ -183,14 +192,16 @@ Next recommended tasks: 1. Add a local integration harness that starts POP, Client Agent, and a mock TCP echo target, then verifies authorized tunnel, policy denial, and agent local denial. 2. Add configurable client auth token or JWT-ready auth interface. 3. Add target connect timeout and session idle timeout. -4. Add buffer full/drain handling with audit result `buffer_overflow`. +4. Add more detailed buffer overflow audit reasons and metrics. 5. Add README quickstart with exact local commands. 6. Add a reproducible throughput benchmark script for direct-vs-LayLink comparisons. -7. Add KCP or another UDP-based reliable transport behind the transport abstraction. -8. Optimize UDP relay with POP-side UDP socket pooling. -9. Add UDP association idle timeouts and cleanup. -10. Aggregate UDP audit records per association instead of per datagram. -11. Add UDP and per-user rate limiting. +7. Add multi-connection Agent-to-POP support so multiple Client Agent workers can spread concurrent sessions safely. +8. Add KCP or another UDP-based reliable transport behind the transport abstraction. +9. Add per-session flow-control windows to reduce head-of-line blocking on one Agent connection. +10. Optimize UDP relay with POP-side UDP socket pooling. +11. Add UDP association idle timeouts and cleanup. +12. Aggregate UDP audit records per association instead of per datagram. +13. Add UDP and per-user rate limiting. ## 0. Project Name diff --git a/readme.md b/readme.md index d262e8e..e357400 100644 --- a/readme.md +++ b/readme.md @@ -380,6 +380,10 @@ php bin/client-agent.php start TCP 大流量 `DATA` 帧使用二进制帧编码;`AUTH`、`OPEN`、`CLOSE`、`ERROR` 等控制帧仍使用 JSON 编码。启用 `chacha20` 时,二进制和 JSON Frame body 都会被加密。 +大文件下载时,LayLink 会使用 Workerman 的 `pauseRecv()` / `resumeRecv()` 做背压:当下游发送缓冲区过高时暂停上游读取,缓冲排空后继续读取。这可以避免单个慢连接无限堆积内存或因为发送缓冲区满而断联。 + +当前每个 Client Agent worker 仍然通过单条 Agent-to-POP TCP 长连接承载多个会话。背压可以保护进程不堵死,但单条 TCP 长连接仍可能产生队头阻塞;多 worker、多 POP 长连接、KCP 或 per-session window 是后续性能优化方向。 + 验证 SOCKS5 HTTPS 联通性和出口 IP: ```bash diff --git a/src/Agent/AgentClient.php b/src/Agent/AgentClient.php index e901ac3..54c01e6 100644 --- a/src/Agent/AgentClient.php +++ b/src/Agent/AgentClient.php @@ -17,6 +17,9 @@ use Workerman\Worker; final class AgentClient { + private const MAX_SEND_BUFFER = 32 * 1024 * 1024; + private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024; + private ?AsyncTcpConnection $pop = null; private ?FrameParser $parser = null; private bool $authenticated = false; @@ -38,6 +41,10 @@ final class AgentClient private array $udpClients = []; /** @var array */ private array $udpSessions = []; + /** @var array */ + private array $pausedClientsForPop = []; + /** @var array */ + private array $clientsPausingPop = []; public function __construct( private readonly string $clientListen, @@ -87,7 +94,7 @@ final class AgentClient $this->parser = new FrameParser(); $this->authenticated = false; $connection = new AsyncTcpConnection($this->popAddress); - $connection->maxSendBufferSize = 8 * 1024 * 1024; + $connection->maxSendBufferSize = self::MAX_SEND_BUFFER; $this->pop = $connection; $connection->onConnect = function (AsyncTcpConnection $connection): void { @@ -110,6 +117,7 @@ final class AgentClient $connection->close(); } }; + $connection->onBufferDrain = fn (AsyncTcpConnection $connection) => $this->resumeClientsForPop(); $connection->onClose = function (): void { $this->authenticated = false; foreach ($this->clients as $client) { @@ -122,6 +130,8 @@ final class AgentClient $this->pendingData = []; $this->connectionStages = []; $this->sessionIngressProtocols = []; + $this->pausedClientsForPop = []; + $this->clientsPausingPop = []; Timer::add(3, fn () => $this->connect(), [], false); }; $connection->connect(); @@ -144,7 +154,9 @@ final class AgentClient private function onClientConnect(TcpConnection $connection): void { - $connection->maxSendBufferSize = 8 * 1024 * 1024; + $connection->maxSendBufferSize = self::MAX_SEND_BUFFER; + $connection->onBufferFull = fn (TcpConnection $connection) => $this->pausePopForClient($connection); + $connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumePopForClient($connection); $this->initialBuffers[$connection->id] = ''; $this->connectionStages[$connection->id] = 'init'; } @@ -162,7 +174,15 @@ final class AgentClient return; } - $this->send(new Frame(FrameType::DATA, $sessionId, ['data_raw' => $data])); + $sent = $this->send(new Frame(FrameType::DATA, $sessionId, ['data_raw' => $data])); + if ($sent === false) { + $this->closeClient($sessionId); + return; + } + if ($this->pop !== null && $this->pop->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) { + $connection->pauseRecv(); + $this->pausedClientsForPop[$connection->id] = $connection; + } } private function handleInitialRequest(TcpConnection $connection, string $data): void @@ -579,7 +599,15 @@ final class AgentClient return; } - $this->clients[$frame->sessionId]->send($data); + $client = $this->clients[$frame->sessionId]; + $sent = $client->send($data); + if ($sent === false) { + $this->closeClient($frame->sessionId); + return; + } + if ($client->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) { + $this->pausePopForClient($client); + } } private function frameData(Frame $frame): string|false @@ -595,6 +623,7 @@ final class AgentClient { unset($this->initialBuffers[$connection->id]); unset($this->connectionStages[$connection->id]); + unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id]); if (!isset($this->connectionSessionIds[$connection->id])) { return; } @@ -614,6 +643,7 @@ final class AgentClient $connection = $this->clients[$sessionId]; unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]); unset($this->connectionSessionIds[$connection->id]); + unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id]); $connection->close(); } @@ -766,8 +796,30 @@ final class AgentClient ])); } - private function send(Frame $frame): void + private function send(Frame $frame): bool|null { - $this->pop?->send(FrameCodec::encode($frame)); + return $this->pop?->send(FrameCodec::encode($frame)); + } + + private function resumeClientsForPop(): void + { + foreach ($this->pausedClientsForPop as $connectionId => $client) { + $client->resumeRecv(); + unset($this->pausedClientsForPop[$connectionId]); + } + } + + private function pausePopForClient(TcpConnection $connection): void + { + $this->clientsPausingPop[$connection->id] = true; + $this->pop?->pauseRecv(); + } + + private function resumePopForClient(TcpConnection $connection): void + { + unset($this->clientsPausingPop[$connection->id]); + if ($this->clientsPausingPop === []) { + $this->pop?->resumeRecv(); + } } } diff --git a/src/Server/AgentListener.php b/src/Server/AgentListener.php index 1a6fb8a..1a7697a 100644 --- a/src/Server/AgentListener.php +++ b/src/Server/AgentListener.php @@ -24,10 +24,15 @@ use Workerman\Worker; final class AgentListener { + private const MAX_SEND_BUFFER = 32 * 1024 * 1024; + private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024; + /** @var array */ private array $parsers = []; /** @var array */ private array $connectionNodeIds = []; + /** @var array> */ + private array $pausedTargetsByAgent = []; public function __construct( Worker $worker, @@ -46,7 +51,8 @@ final class AgentListener private function onConnect(TcpConnection $connection): void { - $connection->maxSendBufferSize = 8 * 1024 * 1024; + $connection->maxSendBufferSize = self::MAX_SEND_BUFFER; + $connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumeTargetsForAgent($connection); $this->parsers[$connection->id] = new FrameParser(); } @@ -204,6 +210,12 @@ final class AgentListener return; } + $targetAddress = $this->resolveTargetAddress($host, $port); + if ($targetAddress === null) { + $this->rejectOpen($agentConnection, $frame, 'dns_resolution_failed', (string)$auth['user_id'], $nodeId, $decision->policyId); + return; + } + $session = new TunnelSession( $frame->sessionId, (string)$auth['user_id'], @@ -219,8 +231,8 @@ final class AgentListener $session->state = TunnelSession::OPENING; $this->sessions->add($session); - $target = new AsyncTcpConnection("tcp://{$host}:{$port}"); - $target->maxSendBufferSize = 8 * 1024 * 1024; + $target = new AsyncTcpConnection($targetAddress); + $target->maxSendBufferSize = self::MAX_SEND_BUFFER; $session->target = $target; $target->onConnect = function () use ($session, $agentConnection): void { @@ -232,9 +244,17 @@ final class AgentListener }; $target->onMessage = function (AsyncTcpConnection $target, string $data) use ($session, $agentConnection): void { $session->bytesTargetToClient += strlen($data); - $this->send($agentConnection, new Frame(FrameType::DATA, $session->sessionId, [ + $sent = $this->send($agentConnection, new Frame(FrameType::DATA, $session->sessionId, [ 'data_raw' => $data, ])); + if ($sent === false) { + $this->closeSession($session, 'failed', 'agent_buffer_overflow'); + return; + } + if ($agentConnection->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) { + $target->pauseRecv(); + $this->pausedTargetsByAgent[$agentConnection->id][$session->sessionId] = $target; + } }; $target->onClose = fn () => $this->closeSession($session, 'closed', null); $target->onError = fn () => $this->failOpenSession($agentConnection, $session, 'target_connection_refused'); @@ -250,12 +270,28 @@ final class AgentListener } $session->bytesClientToTarget += strlen($data); - $session->target?->send($data); + if ($session->target === null) { + return; + } + + $sent = $session->target->send($data); + if ($sent === false) { + $this->closeSession($session, 'failed', 'target_buffer_overflow'); + return; + } + if ($session->target->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK && $session->agent !== null) { + $session->agent->pauseRecv(); + $session->target->onBufferDrain = function (AsyncTcpConnection $target) use ($session): void { + $session->agent?->resumeRecv(); + }; + } } private function failOpenSession(TcpConnection $agentConnection, TunnelSession $session, string $reason): void { - $this->send($agentConnection, new Frame(FrameType::OPEN_FAIL, $session->sessionId, ['reason' => $reason])); + if ($session->state === TunnelSession::OPENING) { + $this->send($agentConnection, new Frame(FrameType::OPEN_FAIL, $session->sessionId, ['reason' => $reason])); + } $this->closeSession($session, 'failed', $reason); } @@ -298,6 +334,9 @@ final class AgentListener $session->state = TunnelSession::CLOSED; $this->sessions->remove($session->sessionId); + if ($session->agent !== null) { + unset($this->pausedTargetsByAgent[$session->agent->id][$session->sessionId]); + } if ($session->agent !== null) { $this->send($session->agent, new Frame(FrameType::CLOSE, $session->sessionId, ['reason' => $reason ?? $result])); @@ -329,6 +368,7 @@ final class AgentListener { unset($this->parsers[$connection->id]); unset($this->connectionNodeIds[$connection->id]); + unset($this->pausedTargetsByAgent[$connection->id]); $node = $this->nodes->unregisterByConnection($connection); foreach ($this->sessions->all() as $session) { if ($session->agent === $connection) { @@ -351,8 +391,47 @@ final class AgentListener } } - private function send(TcpConnection $connection, Frame $frame): void + private function send(TcpConnection $connection, Frame $frame): bool|null { - $connection->send(FrameCodec::encode($frame)); + return $connection->send(FrameCodec::encode($frame)); + } + + private function resumeTargetsForAgent(TcpConnection $connection): void + { + foreach ($this->pausedTargetsByAgent[$connection->id] ?? [] as $sessionId => $target) { + $target->resumeRecv(); + unset($this->pausedTargetsByAgent[$connection->id][$sessionId]); + } + if (($this->pausedTargetsByAgent[$connection->id] ?? []) === []) { + unset($this->pausedTargetsByAgent[$connection->id]); + } + } + + private function resolveTargetAddress(string $host, int $port): ?string + { + if (filter_var($host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4) !== false) { + return "tcp://{$host}:{$port}"; + } + if (filter_var($host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6) !== false) { + return "tcp://[{$host}]:{$port}"; + } + + $records = @dns_get_record($host, DNS_A | DNS_AAAA); + if ($records === false || $records === []) { + return null; + } + + foreach ($records as $record) { + if (isset($record['ip']) && is_string($record['ip'])) { + return "tcp://{$record['ip']}:{$port}"; + } + } + foreach ($records as $record) { + if (isset($record['ipv6']) && is_string($record['ipv6'])) { + return "tcp://[{$record['ipv6']}]:{$port}"; + } + } + + return null; } }