From 6d9299eeb0838c9a36759b3546adbeebc261931d Mon Sep 17 00:00:00 2001 From: EchoNoch Date: Fri, 29 May 2026 00:08:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=963?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 6 ++++++ bin/client-agent.php | 3 +++ bin/pop-server.php | 3 +++ contract.md | 8 ++++++-- readme.md | 12 +++++++++++- src/Agent/AgentClient.php | 19 +++++++++---------- src/Server/AgentListener.php | 19 +++++++++---------- src/Server/PopServer.php | 6 ++++++ src/Util/Env.php | 15 +++++++++++++++ 9 files changed, 68 insertions(+), 23 deletions(-) diff --git a/.env.example b/.env.example index b1ff81d..e6dbe21 100644 --- a/.env.example +++ b/.env.example @@ -9,6 +9,12 @@ LAYLINK_FRAME_ENCRYPTION=none # Agent 与 POP Server 之间 Frame 加密方式;可选 none、chacha20,两端必须一致。 LAYLINK_FRAME_ENCRYPTION_KEY= # Frame 加密密钥;LAYLINK_FRAME_ENCRYPTION=chacha20 时必填。可填普通口令,或 hex:32字节十六进制,或 base64:32字节base64。 +LAYLINK_DATA_CHUNK_BYTES=1048576 +# TCP DATA 帧分片大小;单位字节。较大值可减少帧数量并提升吞吐,建议 262144、524288、1048576、2097152。 +LAYLINK_MAX_SEND_BUFFER_BYTES=67108864 +# 单连接发送缓冲区上限;单位字节。大文件下载建议 33554432 或 67108864,内存紧张时调小。 +LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432 +# 背压触发水位;单位字节。应小于 LAYLINK_MAX_SEND_BUFFER_BYTES,达到后会暂停上游读取直到缓冲排空。 [client-agent] NODE_ID=client-01 diff --git a/bin/client-agent.php b/bin/client-agent.php index 04c2a24..52d7ebe 100755 --- a/bin/client-agent.php +++ b/bin/client-agent.php @@ -39,6 +39,9 @@ $bootAgent = function (string $protocol, string $listen, string $name) use ($nod ? Env::get('CLIENT_AGENT_SOCKS5_UDP_LISTEN_IP', '127.0.0.1') . ':' . Env::get('CLIENT_AGENT_SOCKS5_UDP_LISTEN_PORT', '1081') : null, Env::get('CLIENT_AGENT_SOCKS5_UDP_ADVERTISE_IP', Env::get('CLIENT_AGENT_SOCKS5_LISTEN_IP', '127.0.0.1')), + Env::int('LAYLINK_MAX_SEND_BUFFER_BYTES', 64 * 1024 * 1024, 1024 * 1024), + Env::int('LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES', 32 * 1024 * 1024, 512 * 1024), + Env::int('LAYLINK_DATA_CHUNK_BYTES', 1024 * 1024, 16 * 1024, 8 * 1024 * 1024), ); $agent->boot($name); }; diff --git a/bin/pop-server.php b/bin/pop-server.php index c1531d6..aea74b3 100755 --- a/bin/pop-server.php +++ b/bin/pop-server.php @@ -24,6 +24,9 @@ $server = new PopServer( require dirname(__DIR__) . '/config/policies.php', Env::csv('POP_ALLOWED_AGENT_TRANSPORTS', ['tcp']), Env::get('AUDIT_LOG', dirname(__DIR__) . '/runtime/audit.log'), + Env::int('LAYLINK_MAX_SEND_BUFFER_BYTES', 64 * 1024 * 1024, 1024 * 1024), + Env::int('LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES', 32 * 1024 * 1024, 512 * 1024), + Env::int('LAYLINK_DATA_CHUNK_BYTES', 1024 * 1024, 16 * 1024, 8 * 1024 * 1024), ); $server->boot(); diff --git a/contract.md b/contract.md index 6e00542..fdf7133 100644 --- a/contract.md +++ b/contract.md @@ -168,10 +168,14 @@ Completed in this checkpoint: * POP resumes target reads when the Agent connection drains. * Client Agent pauses local client reads when the POP connection send buffer crosses the high watermark. * Client Agent pauses POP reads while a local client output buffer is full. - * Send buffer limits are raised to 32 MiB with a 16 MiB backpressure high watermark. + * Send buffer limits default to 64 MiB with a 32 MiB backpressure high watermark. + * Tuning envs: + * `LAYLINK_DATA_CHUNK_BYTES` + * `LAYLINK_MAX_SEND_BUFFER_BYTES` + * `LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES` * Fixed large-download truncation risk: * Client Agent now treats POP `CLOSE` as a graceful remote EOF and waits for the local client send buffer to drain before closing the local socket. - * TCP `DATA` is split into 256 KiB chunks to avoid oversized frames and improve fairness between sessions. + * TCP `DATA` is split into configurable chunks, defaulting to 1 MiB, to reduce frame overhead while avoiding oversized frames. * POP refreshes Agent activity on any valid frame, not only `PING`, reducing heartbeat false positives during heavy traffic. Known MVP limitations: diff --git a/readme.md b/readme.md index 98cf1d3..7eb5568 100644 --- a/readme.md +++ b/readme.md @@ -380,9 +380,19 @@ php bin/client-agent.php start TCP 大流量 `DATA` 帧使用二进制帧编码;`AUTH`、`OPEN`、`CLOSE`、`ERROR` 等控制帧仍使用 JSON 编码。启用 `chacha20` 时,二进制和 JSON Frame body 都会被加密。 +吞吐相关参数由两端共用,POP Server 和 Client Agent 建议保持一致: + +```env +LAYLINK_DATA_CHUNK_BYTES=1048576 +LAYLINK_MAX_SEND_BUFFER_BYTES=67108864 +LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432 +``` + +`LAYLINK_DATA_CHUNK_BYTES` 越大,每 MB 需要处理的 Frame 越少,单连接下载通常越快;如果多会话公平性变差,可以降到 `262144` 或 `524288`。`LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES` 必须小于 `LAYLINK_MAX_SEND_BUFFER_BYTES`。 + 大文件下载时,LayLink 会使用 Workerman 的 `pauseRecv()` / `resumeRecv()` 做背压:当下游发送缓冲区过高时暂停上游读取,缓冲排空后继续读取。这可以避免单个慢连接无限堆积内存或因为发送缓冲区满而断联。 -当 POP 收到目标站关闭连接时,Client Agent 会先等待本地客户端发送缓冲区排空,再关闭本地 socket,避免大文件尾部数据还在缓冲区里时被提前截断。TCP `DATA` 会按 256 KiB 分片发送,以避免巨帧并改善多会话公平性。 +当 POP 收到目标站关闭连接时,Client Agent 会先等待本地客户端发送缓冲区排空,再关闭本地 socket,避免大文件尾部数据还在缓冲区里时被提前截断。TCP `DATA` 默认按 1 MiB 分片发送,以减少帧开销;可通过 `LAYLINK_DATA_CHUNK_BYTES` 调整。 当前每个 Client Agent worker 仍然通过单条 Agent-to-POP TCP 长连接承载多个会话。背压可以保护进程不堵死,但单条 TCP 长连接仍可能产生队头阻塞;多 worker、多 POP 长连接、KCP 或 per-session window 是后续性能优化方向。 diff --git a/src/Agent/AgentClient.php b/src/Agent/AgentClient.php index 64a8fb5..d7cbacd 100644 --- a/src/Agent/AgentClient.php +++ b/src/Agent/AgentClient.php @@ -17,10 +17,6 @@ use Workerman\Worker; final class AgentClient { - private const MAX_SEND_BUFFER = 32 * 1024 * 1024; - private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024; - private const DATA_CHUNK_SIZE = 256 * 1024; - private ?AsyncTcpConnection $pop = null; private ?FrameParser $parser = null; private bool $authenticated = false; @@ -67,6 +63,9 @@ final class AgentClient private readonly string $socks5Password = '', private readonly ?string $socks5UdpListen = null, private readonly string $socks5UdpAdvertiseIp = '127.0.0.1', + private readonly int $maxSendBuffer = 64 * 1024 * 1024, + private readonly int $backpressureHighWatermark = 32 * 1024 * 1024, + private readonly int $dataChunkSize = 1024 * 1024, ) { } @@ -99,7 +98,7 @@ final class AgentClient $this->parser = new FrameParser(); $this->authenticated = false; $connection = new AsyncTcpConnection($this->popAddress); - $connection->maxSendBufferSize = self::MAX_SEND_BUFFER; + $connection->maxSendBufferSize = $this->maxSendBuffer; $this->pop = $connection; $connection->onConnect = function (AsyncTcpConnection $connection): void { @@ -161,7 +160,7 @@ final class AgentClient private function onClientConnect(TcpConnection $connection): void { - $connection->maxSendBufferSize = self::MAX_SEND_BUFFER; + $connection->maxSendBufferSize = $this->maxSendBuffer; $connection->onBufferFull = fn (TcpConnection $connection) => $this->pausePopForClient($connection); $connection->onBufferDrain = fn (TcpConnection $connection) => $this->onClientBufferDrain($connection); $this->initialBuffers[$connection->id] = ''; @@ -185,7 +184,7 @@ final class AgentClient $this->closeClient($sessionId); return; } - if ($this->pop !== null && $this->pop->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) { + if ($this->pop !== null && $this->pop->getSendBufferQueueSize() >= $this->backpressureHighWatermark) { $connection->pauseRecv(); $this->pausedClientsForPop[$connection->id] = $connection; } @@ -611,7 +610,7 @@ final class AgentClient $this->closeClient($frame->sessionId); return; } - if ($client->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) { + if ($client->getSendBufferQueueSize() >= $this->backpressureHighWatermark) { $this->pausePopForClient($client); } } @@ -833,9 +832,9 @@ final class AgentClient private function sendData(string $sessionId, string $data): bool { $length = strlen($data); - for ($offset = 0; $offset < $length; $offset += self::DATA_CHUNK_SIZE) { + for ($offset = 0; $offset < $length; $offset += $this->dataChunkSize) { $sent = $this->send(new Frame(FrameType::DATA, $sessionId, [ - 'data_raw' => substr($data, $offset, self::DATA_CHUNK_SIZE), + 'data_raw' => substr($data, $offset, $this->dataChunkSize), ])); if ($sent === false) { return false; diff --git a/src/Server/AgentListener.php b/src/Server/AgentListener.php index 3816190..18e446c 100644 --- a/src/Server/AgentListener.php +++ b/src/Server/AgentListener.php @@ -24,10 +24,6 @@ use Workerman\Worker; final class AgentListener { - private const MAX_SEND_BUFFER = 32 * 1024 * 1024; - private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024; - private const DATA_CHUNK_SIZE = 256 * 1024; - /** @var array */ private array $parsers = []; /** @var array */ @@ -43,6 +39,9 @@ final class AgentListener private readonly NodeRegistry $nodes, private readonly SessionManager $sessions, private readonly AuditLogger $audit, + private readonly int $maxSendBuffer = 64 * 1024 * 1024, + private readonly int $backpressureHighWatermark = 32 * 1024 * 1024, + private readonly int $dataChunkSize = 1024 * 1024, ) { $worker->onConnect = fn (TcpConnection $connection) => $this->onConnect($connection); $worker->onMessage = fn (TcpConnection $connection, string $data) => $this->onMessage($connection, $data); @@ -52,7 +51,7 @@ final class AgentListener private function onConnect(TcpConnection $connection): void { - $connection->maxSendBufferSize = self::MAX_SEND_BUFFER; + $connection->maxSendBufferSize = $this->maxSendBuffer; $connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumeTargetsForAgent($connection); $this->parsers[$connection->id] = new FrameParser(); } @@ -231,7 +230,7 @@ final class AgentListener $this->sessions->add($session); $target = new AsyncTcpConnection($targetAddress); - $target->maxSendBufferSize = self::MAX_SEND_BUFFER; + $target->maxSendBufferSize = $this->maxSendBuffer; $session->target = $target; $target->onConnect = function () use ($session, $agentConnection): void { @@ -247,7 +246,7 @@ final class AgentListener $this->closeSession($session, 'failed', 'agent_buffer_overflow'); return; } - if ($agentConnection->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) { + if ($agentConnection->getSendBufferQueueSize() >= $this->backpressureHighWatermark) { $target->pauseRecv(); $this->pausedTargetsByAgent[$agentConnection->id][$session->sessionId] = $target; } @@ -275,7 +274,7 @@ final class AgentListener $this->closeSession($session, 'failed', 'target_buffer_overflow'); return; } - if ($session->target->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK && $session->agent !== null) { + if ($session->target->getSendBufferQueueSize() >= $this->backpressureHighWatermark && $session->agent !== null) { $session->agent->pauseRecv(); $session->target->onBufferDrain = function (AsyncTcpConnection $target) use ($session): void { $session->agent?->resumeRecv(); @@ -395,9 +394,9 @@ final class AgentListener private function sendData(TcpConnection $connection, string $sessionId, string $data): bool { $length = strlen($data); - for ($offset = 0; $offset < $length; $offset += self::DATA_CHUNK_SIZE) { + for ($offset = 0; $offset < $length; $offset += $this->dataChunkSize) { $sent = $this->send($connection, new Frame(FrameType::DATA, $sessionId, [ - 'data_raw' => substr($data, $offset, self::DATA_CHUNK_SIZE), + 'data_raw' => substr($data, $offset, $this->dataChunkSize), ])); if ($sent === false) { return false; diff --git a/src/Server/PopServer.php b/src/Server/PopServer.php index 161e596..565728b 100644 --- a/src/Server/PopServer.php +++ b/src/Server/PopServer.php @@ -25,6 +25,9 @@ final class PopServer private readonly array $policies, private readonly array $allowedAgentTransports, string $auditLog, + private readonly int $maxSendBuffer = 64 * 1024 * 1024, + private readonly int $backpressureHighWatermark = 32 * 1024 * 1024, + private readonly int $dataChunkSize = 1024 * 1024, ) { $this->nodes = new NodeRegistry(); $this->sessions = new SessionManager(); @@ -44,6 +47,9 @@ final class PopServer $this->nodes, $this->sessions, $this->audit, + $this->maxSendBuffer, + $this->backpressureHighWatermark, + $this->dataChunkSize, ); } } diff --git a/src/Util/Env.php b/src/Util/Env.php index 813224c..fc73bb2 100644 --- a/src/Util/Env.php +++ b/src/Util/Env.php @@ -49,6 +49,21 @@ final class Env return in_array(strtolower(trim($value)), ['1', 'true', 'yes', 'on'], true); } + public static function int(string $key, int $default, int $min = PHP_INT_MIN, int $max = PHP_INT_MAX): int + { + $value = getenv($key); + if ($value === false || trim($value) === '') { + return $default; + } + + $value = trim($value); + if (!preg_match('/^-?\d+$/', $value)) { + return $default; + } + + return max($min, min($max, (int)$value)); + } + /** * @return string[] */