This commit is contained in:
EchoNoch 2026-05-28 23:59:12 +08:00
parent 068d3f4be9
commit e80fe14624
4 changed files with 88 additions and 18 deletions

View File

@ -169,6 +169,10 @@ Completed in this checkpoint:
* Client Agent pauses local client reads when the POP connection send buffer crosses the high watermark.
* Client Agent pauses POP reads while a local client output buffer is full.
* Send buffer limits are raised to 32 MiB with a 16 MiB backpressure high watermark.
* Fixed large-download truncation risk:
* Client Agent now treats POP `CLOSE` as a graceful remote EOF and waits for the local client send buffer to drain before closing the local socket.
* TCP `DATA` is split into 256 KiB chunks to avoid oversized frames and improve fairness between sessions.
* POP refreshes Agent activity on any valid frame, not only `PING`, reducing heartbeat false positives during heavy traffic.
Known MVP limitations:
@ -182,7 +186,7 @@ Known MVP limitations:
* No TLS yet.
* No production-grade client identity yet; `dev-token` is hardcoded for MVP development.
* No automated integration test harness yet.
* TCP stream forwarding is still single Agent-to-POP connection based; binary `DATA` frames and backpressure reduce per-byte overhead and buffer blowups, but KCP/multipath/parallel transport and per-session flow-control tuning are still future performance work.
* TCP stream forwarding is still single Agent-to-POP connection based; binary `DATA` frames, chunking, graceful EOF, and backpressure reduce per-byte overhead and buffer blowups, but KCP/multipath/parallel transport and per-session flow-control tuning are still future performance work.
* No explicit idle timeout or connect timeout enforcement yet.
* UDP relay is datagram-oriented and currently creates short-lived POP-side UDP sockets per outbound datagram; pooling and stronger timeout accounting are still future work.
* HTTP proxy supports `CONNECT` and ordinary absolute URL HTTP requests; advanced proxy auth and full HTTP/2 proxying are not implemented.

View File

@ -382,6 +382,8 @@ TCP 大流量 `DATA` 帧使用二进制帧编码;`AUTH`、`OPEN`、`CLOSE`、`
大文件下载时LayLink 会使用 Workerman 的 `pauseRecv()` / `resumeRecv()` 做背压:当下游发送缓冲区过高时暂停上游读取,缓冲排空后继续读取。这可以避免单个慢连接无限堆积内存或因为发送缓冲区满而断联。
当 POP 收到目标站关闭连接时Client Agent 会先等待本地客户端发送缓冲区排空,再关闭本地 socket避免大文件尾部数据还在缓冲区里时被提前截断。TCP `DATA` 会按 256 KiB 分片发送,以避免巨帧并改善多会话公平性。
当前每个 Client Agent worker 仍然通过单条 Agent-to-POP TCP 长连接承载多个会话。背压可以保护进程不堵死,但单条 TCP 长连接仍可能产生队头阻塞;多 worker、多 POP 长连接、KCP 或 per-session window 是后续性能优化方向。
验证 SOCKS5 HTTPS 联通性和出口 IP

View File

@ -19,6 +19,7 @@ final class AgentClient
{
private const MAX_SEND_BUFFER = 32 * 1024 * 1024;
private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024;
private const DATA_CHUNK_SIZE = 256 * 1024;
private ?AsyncTcpConnection $pop = null;
private ?FrameParser $parser = null;
@ -45,6 +46,10 @@ final class AgentClient
private array $pausedClientsForPop = [];
/** @var array<int, true> */
private array $clientsPausingPop = [];
/** @var array<string, true> */
private array $pendingClientCloses = [];
/** @var array<int, true> */
private array $suppressClientCloseFrames = [];
public function __construct(
private readonly string $clientListen,
@ -132,6 +137,8 @@ final class AgentClient
$this->sessionIngressProtocols = [];
$this->pausedClientsForPop = [];
$this->clientsPausingPop = [];
$this->pendingClientCloses = [];
$this->suppressClientCloseFrames = [];
Timer::add(3, fn () => $this->connect(), [], false);
};
$connection->connect();
@ -147,7 +154,7 @@ final class AgentClient
FrameType::OPEN_FAIL => $this->failClientSession($frame),
FrameType::DATA => $this->forwardToClient($frame),
FrameType::UDP_DATA => $this->forwardUdpToClient($frame),
FrameType::CLOSE => $this->closeClient($frame->sessionId),
FrameType::CLOSE => $this->closeClient($frame->sessionId, true),
default => null,
};
}
@ -156,7 +163,7 @@ final class AgentClient
{
$connection->maxSendBufferSize = self::MAX_SEND_BUFFER;
$connection->onBufferFull = fn (TcpConnection $connection) => $this->pausePopForClient($connection);
$connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumePopForClient($connection);
$connection->onBufferDrain = fn (TcpConnection $connection) => $this->onClientBufferDrain($connection);
$this->initialBuffers[$connection->id] = '';
$this->connectionStages[$connection->id] = 'init';
}
@ -174,8 +181,7 @@ final class AgentClient
return;
}
$sent = $this->send(new Frame(FrameType::DATA, $sessionId, ['data_raw' => $data]));
if ($sent === false) {
if (!$this->sendData($sessionId, $data)) {
$this->closeClient($sessionId);
return;
}
@ -564,7 +570,7 @@ final class AgentClient
$pending = $this->pendingData[$frame->sessionId] ?? '';
unset($this->pendingData[$frame->sessionId]);
if ($pending !== '') {
$this->send(new Frame(FrameType::DATA, $frame->sessionId, ['data_raw' => $pending]));
$this->sendData($frame->sessionId, $pending);
}
}
@ -624,26 +630,49 @@ final class AgentClient
unset($this->initialBuffers[$connection->id]);
unset($this->connectionStages[$connection->id]);
unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id]);
$suppressCloseFrame = isset($this->suppressClientCloseFrames[$connection->id]);
unset($this->suppressClientCloseFrames[$connection->id]);
if (!isset($this->connectionSessionIds[$connection->id])) {
return;
}
$sessionId = $this->connectionSessionIds[$connection->id];
unset($this->pendingClientCloses[$sessionId]);
unset($this->connectionSessionIds[$connection->id]);
unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]);
if ($suppressCloseFrame) {
return;
}
$this->send(new Frame(FrameType::CLOSE, $sessionId, ['reason' => 'client_closed']));
}
private function closeClient(?string $sessionId): void
private function closeClient(?string $sessionId, bool $graceful = false): void
{
if ($sessionId === null || !isset($this->clients[$sessionId])) {
return;
}
$connection = $this->clients[$sessionId];
if ($graceful && $connection->getSendBufferQueueSize() > 0) {
$this->pendingClientCloses[$sessionId] = true;
$this->suppressClientCloseFrames[$connection->id] = true;
return;
}
$this->finalizeClientClose($sessionId);
}
private function finalizeClientClose(string $sessionId): void
{
if (!isset($this->clients[$sessionId])) {
return;
}
$connection = $this->clients[$sessionId];
unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]);
unset($this->connectionSessionIds[$connection->id]);
unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id]);
unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id], $this->pendingClientCloses[$sessionId]);
$this->suppressClientCloseFrames[$connection->id] = true;
$connection->close();
}
@ -801,6 +830,21 @@ final class AgentClient
return $this->pop?->send(FrameCodec::encode($frame));
}
private function sendData(string $sessionId, string $data): bool
{
$length = strlen($data);
for ($offset = 0; $offset < $length; $offset += self::DATA_CHUNK_SIZE) {
$sent = $this->send(new Frame(FrameType::DATA, $sessionId, [
'data_raw' => substr($data, $offset, self::DATA_CHUNK_SIZE),
]));
if ($sent === false) {
return false;
}
}
return true;
}
private function resumeClientsForPop(): void
{
foreach ($this->pausedClientsForPop as $connectionId => $client) {
@ -822,4 +866,13 @@ final class AgentClient
$this->pop?->resumeRecv();
}
}
private function onClientBufferDrain(TcpConnection $connection): void
{
$this->resumePopForClient($connection);
$sessionId = $this->connectionSessionIds[$connection->id] ?? null;
if (is_string($sessionId) && isset($this->pendingClientCloses[$sessionId])) {
$this->finalizeClientClose($sessionId);
}
}
}

View File

@ -26,6 +26,7 @@ final class AgentListener
{
private const MAX_SEND_BUFFER = 32 * 1024 * 1024;
private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024;
private const DATA_CHUNK_SIZE = 256 * 1024;
/** @var array<int, FrameParser> */
private array $parsers = [];
@ -96,18 +97,16 @@ final class AgentListener
}
$nodeId = $this->connectionNodeIds[$connection->id] ?? null;
if (!is_string($nodeId) || $this->nodes->get($nodeId) === null) {
$node = is_string($nodeId) ? $this->nodes->get($nodeId) : null;
if (!is_string($nodeId) || $node === null) {
$this->send($connection, new Frame(FrameType::ERROR, $frame->sessionId, ['reason' => 'invalid_auth']));
$connection->close();
return;
}
$node->lastHeartbeat = time();
if ($frame->type === FrameType::PING) {
$node = $this->nodes->get($nodeId);
if ($node !== null) {
$node->lastHeartbeat = time();
$node->activeSessions = (int)($frame->payload['active_sessions'] ?? $node->activeSessions);
}
$this->send($connection, new Frame(FrameType::PONG, null, ['timestamp' => time()]));
return;
}
@ -244,10 +243,7 @@ final class AgentListener
};
$target->onMessage = function (AsyncTcpConnection $target, string $data) use ($session, $agentConnection): void {
$session->bytesTargetToClient += strlen($data);
$sent = $this->send($agentConnection, new Frame(FrameType::DATA, $session->sessionId, [
'data_raw' => $data,
]));
if ($sent === false) {
if (!$this->sendData($agentConnection, $session->sessionId, $data)) {
$this->closeSession($session, 'failed', 'agent_buffer_overflow');
return;
}
@ -396,6 +392,21 @@ final class AgentListener
return $connection->send(FrameCodec::encode($frame));
}
private function sendData(TcpConnection $connection, string $sessionId, string $data): bool
{
$length = strlen($data);
for ($offset = 0; $offset < $length; $offset += self::DATA_CHUNK_SIZE) {
$sent = $this->send($connection, new Frame(FrameType::DATA, $sessionId, [
'data_raw' => substr($data, $offset, self::DATA_CHUNK_SIZE),
]));
if ($sent === false) {
return false;
}
}
return true;
}
private function resumeTargetsForAgent(TcpConnection $connection): void
{
foreach ($this->pausedTargetsByAgent[$connection->id] ?? [] as $sessionId => $target) {