From e80fe146245c01c9162f5ef43fa7f38a61bc04df Mon Sep 17 00:00:00 2001 From: EchoNoch Date: Thu, 28 May 2026 23:59:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=962?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- contract.md | 6 +++- readme.md | 2 ++ src/Agent/AgentClient.php | 67 ++++++++++++++++++++++++++++++++---- src/Server/AgentListener.php | 31 +++++++++++------ 4 files changed, 88 insertions(+), 18 deletions(-) diff --git a/contract.md b/contract.md index 9942191..6e00542 100644 --- a/contract.md +++ b/contract.md @@ -169,6 +169,10 @@ Completed in this checkpoint: * Client Agent pauses local client reads when the POP connection send buffer crosses the high watermark. * Client Agent pauses POP reads while a local client output buffer is full. * Send buffer limits are raised to 32 MiB with a 16 MiB backpressure high watermark. +* Fixed large-download truncation risk: + * Client Agent now treats POP `CLOSE` as a graceful remote EOF and waits for the local client send buffer to drain before closing the local socket. + * TCP `DATA` is split into 256 KiB chunks to avoid oversized frames and improve fairness between sessions. + * POP refreshes Agent activity on any valid frame, not only `PING`, reducing heartbeat false positives during heavy traffic. Known MVP limitations: @@ -182,7 +186,7 @@ Known MVP limitations: * No TLS yet. * No production-grade client identity yet; `dev-token` is hardcoded for MVP development. * No automated integration test harness yet. -* TCP stream forwarding is still single Agent-to-POP connection based; binary `DATA` frames and backpressure reduce per-byte overhead and buffer blowups, but KCP/multipath/parallel transport and per-session flow-control tuning are still future performance work. +* TCP stream forwarding is still single Agent-to-POP connection based; binary `DATA` frames, chunking, graceful EOF, and backpressure reduce per-byte overhead and buffer blowups, but KCP/multipath/parallel transport and per-session flow-control tuning are still future performance work. * No explicit idle timeout or connect timeout enforcement yet. * UDP relay is datagram-oriented and currently creates short-lived POP-side UDP sockets per outbound datagram; pooling and stronger timeout accounting are still future work. * HTTP proxy supports `CONNECT` and ordinary absolute URL HTTP requests; advanced proxy auth and full HTTP/2 proxying are not implemented. diff --git a/readme.md b/readme.md index e357400..98cf1d3 100644 --- a/readme.md +++ b/readme.md @@ -382,6 +382,8 @@ TCP 大流量 `DATA` 帧使用二进制帧编码;`AUTH`、`OPEN`、`CLOSE`、` 大文件下载时,LayLink 会使用 Workerman 的 `pauseRecv()` / `resumeRecv()` 做背压:当下游发送缓冲区过高时暂停上游读取,缓冲排空后继续读取。这可以避免单个慢连接无限堆积内存或因为发送缓冲区满而断联。 +当 POP 收到目标站关闭连接时,Client Agent 会先等待本地客户端发送缓冲区排空,再关闭本地 socket,避免大文件尾部数据还在缓冲区里时被提前截断。TCP `DATA` 会按 256 KiB 分片发送,以避免巨帧并改善多会话公平性。 + 当前每个 Client Agent worker 仍然通过单条 Agent-to-POP TCP 长连接承载多个会话。背压可以保护进程不堵死,但单条 TCP 长连接仍可能产生队头阻塞;多 worker、多 POP 长连接、KCP 或 per-session window 是后续性能优化方向。 验证 SOCKS5 HTTPS 联通性和出口 IP: diff --git a/src/Agent/AgentClient.php b/src/Agent/AgentClient.php index 54c01e6..64a8fb5 100644 --- a/src/Agent/AgentClient.php +++ b/src/Agent/AgentClient.php @@ -19,6 +19,7 @@ final class AgentClient { private const MAX_SEND_BUFFER = 32 * 1024 * 1024; private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024; + private const DATA_CHUNK_SIZE = 256 * 1024; private ?AsyncTcpConnection $pop = null; private ?FrameParser $parser = null; @@ -45,6 +46,10 @@ final class AgentClient private array $pausedClientsForPop = []; /** @var array */ private array $clientsPausingPop = []; + /** @var array */ + private array $pendingClientCloses = []; + /** @var array */ + private array $suppressClientCloseFrames = []; public function __construct( private readonly string $clientListen, @@ -132,6 +137,8 @@ final class AgentClient $this->sessionIngressProtocols = []; $this->pausedClientsForPop = []; $this->clientsPausingPop = []; + $this->pendingClientCloses = []; + $this->suppressClientCloseFrames = []; Timer::add(3, fn () => $this->connect(), [], false); }; $connection->connect(); @@ -147,7 +154,7 @@ final class AgentClient FrameType::OPEN_FAIL => $this->failClientSession($frame), FrameType::DATA => $this->forwardToClient($frame), FrameType::UDP_DATA => $this->forwardUdpToClient($frame), - FrameType::CLOSE => $this->closeClient($frame->sessionId), + FrameType::CLOSE => $this->closeClient($frame->sessionId, true), default => null, }; } @@ -156,7 +163,7 @@ final class AgentClient { $connection->maxSendBufferSize = self::MAX_SEND_BUFFER; $connection->onBufferFull = fn (TcpConnection $connection) => $this->pausePopForClient($connection); - $connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumePopForClient($connection); + $connection->onBufferDrain = fn (TcpConnection $connection) => $this->onClientBufferDrain($connection); $this->initialBuffers[$connection->id] = ''; $this->connectionStages[$connection->id] = 'init'; } @@ -174,8 +181,7 @@ final class AgentClient return; } - $sent = $this->send(new Frame(FrameType::DATA, $sessionId, ['data_raw' => $data])); - if ($sent === false) { + if (!$this->sendData($sessionId, $data)) { $this->closeClient($sessionId); return; } @@ -564,7 +570,7 @@ final class AgentClient $pending = $this->pendingData[$frame->sessionId] ?? ''; unset($this->pendingData[$frame->sessionId]); if ($pending !== '') { - $this->send(new Frame(FrameType::DATA, $frame->sessionId, ['data_raw' => $pending])); + $this->sendData($frame->sessionId, $pending); } } @@ -624,26 +630,49 @@ final class AgentClient unset($this->initialBuffers[$connection->id]); unset($this->connectionStages[$connection->id]); unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id]); + $suppressCloseFrame = isset($this->suppressClientCloseFrames[$connection->id]); + unset($this->suppressClientCloseFrames[$connection->id]); if (!isset($this->connectionSessionIds[$connection->id])) { return; } $sessionId = $this->connectionSessionIds[$connection->id]; + unset($this->pendingClientCloses[$sessionId]); unset($this->connectionSessionIds[$connection->id]); unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]); + if ($suppressCloseFrame) { + return; + } $this->send(new Frame(FrameType::CLOSE, $sessionId, ['reason' => 'client_closed'])); } - private function closeClient(?string $sessionId): void + private function closeClient(?string $sessionId, bool $graceful = false): void { if ($sessionId === null || !isset($this->clients[$sessionId])) { return; } + $connection = $this->clients[$sessionId]; + if ($graceful && $connection->getSendBufferQueueSize() > 0) { + $this->pendingClientCloses[$sessionId] = true; + $this->suppressClientCloseFrames[$connection->id] = true; + return; + } + + $this->finalizeClientClose($sessionId); + } + + private function finalizeClientClose(string $sessionId): void + { + if (!isset($this->clients[$sessionId])) { + return; + } + $connection = $this->clients[$sessionId]; unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]); unset($this->connectionSessionIds[$connection->id]); - unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id]); + unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id], $this->pendingClientCloses[$sessionId]); + $this->suppressClientCloseFrames[$connection->id] = true; $connection->close(); } @@ -801,6 +830,21 @@ final class AgentClient return $this->pop?->send(FrameCodec::encode($frame)); } + private function sendData(string $sessionId, string $data): bool + { + $length = strlen($data); + for ($offset = 0; $offset < $length; $offset += self::DATA_CHUNK_SIZE) { + $sent = $this->send(new Frame(FrameType::DATA, $sessionId, [ + 'data_raw' => substr($data, $offset, self::DATA_CHUNK_SIZE), + ])); + if ($sent === false) { + return false; + } + } + + return true; + } + private function resumeClientsForPop(): void { foreach ($this->pausedClientsForPop as $connectionId => $client) { @@ -822,4 +866,13 @@ final class AgentClient $this->pop?->resumeRecv(); } } + + private function onClientBufferDrain(TcpConnection $connection): void + { + $this->resumePopForClient($connection); + $sessionId = $this->connectionSessionIds[$connection->id] ?? null; + if (is_string($sessionId) && isset($this->pendingClientCloses[$sessionId])) { + $this->finalizeClientClose($sessionId); + } + } } diff --git a/src/Server/AgentListener.php b/src/Server/AgentListener.php index 1a7697a..3816190 100644 --- a/src/Server/AgentListener.php +++ b/src/Server/AgentListener.php @@ -26,6 +26,7 @@ final class AgentListener { private const MAX_SEND_BUFFER = 32 * 1024 * 1024; private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024; + private const DATA_CHUNK_SIZE = 256 * 1024; /** @var array */ private array $parsers = []; @@ -96,18 +97,16 @@ final class AgentListener } $nodeId = $this->connectionNodeIds[$connection->id] ?? null; - if (!is_string($nodeId) || $this->nodes->get($nodeId) === null) { + $node = is_string($nodeId) ? $this->nodes->get($nodeId) : null; + if (!is_string($nodeId) || $node === null) { $this->send($connection, new Frame(FrameType::ERROR, $frame->sessionId, ['reason' => 'invalid_auth'])); $connection->close(); return; } + $node->lastHeartbeat = time(); if ($frame->type === FrameType::PING) { - $node = $this->nodes->get($nodeId); - if ($node !== null) { - $node->lastHeartbeat = time(); - $node->activeSessions = (int)($frame->payload['active_sessions'] ?? $node->activeSessions); - } + $node->activeSessions = (int)($frame->payload['active_sessions'] ?? $node->activeSessions); $this->send($connection, new Frame(FrameType::PONG, null, ['timestamp' => time()])); return; } @@ -244,10 +243,7 @@ final class AgentListener }; $target->onMessage = function (AsyncTcpConnection $target, string $data) use ($session, $agentConnection): void { $session->bytesTargetToClient += strlen($data); - $sent = $this->send($agentConnection, new Frame(FrameType::DATA, $session->sessionId, [ - 'data_raw' => $data, - ])); - if ($sent === false) { + if (!$this->sendData($agentConnection, $session->sessionId, $data)) { $this->closeSession($session, 'failed', 'agent_buffer_overflow'); return; } @@ -396,6 +392,21 @@ final class AgentListener return $connection->send(FrameCodec::encode($frame)); } + private function sendData(TcpConnection $connection, string $sessionId, string $data): bool + { + $length = strlen($data); + for ($offset = 0; $offset < $length; $offset += self::DATA_CHUNK_SIZE) { + $sent = $this->send($connection, new Frame(FrameType::DATA, $sessionId, [ + 'data_raw' => substr($data, $offset, self::DATA_CHUNK_SIZE), + ])); + if ($sent === false) { + return false; + } + } + + return true; + } + private function resumeTargetsForAgent(TcpConnection $connection): void { foreach ($this->pausedTargetsByAgent[$connection->id] ?? [] as $sessionId => $target) {