优化3
This commit is contained in:
parent
e80fe14624
commit
6d9299eeb0
@ -9,6 +9,12 @@ LAYLINK_FRAME_ENCRYPTION=none
|
|||||||
# Agent 与 POP Server 之间 Frame 加密方式;可选 none、chacha20,两端必须一致。
|
# Agent 与 POP Server 之间 Frame 加密方式;可选 none、chacha20,两端必须一致。
|
||||||
LAYLINK_FRAME_ENCRYPTION_KEY=
|
LAYLINK_FRAME_ENCRYPTION_KEY=
|
||||||
# Frame 加密密钥;LAYLINK_FRAME_ENCRYPTION=chacha20 时必填。可填普通口令,或 hex:32字节十六进制,或 base64:32字节base64。
|
# Frame 加密密钥;LAYLINK_FRAME_ENCRYPTION=chacha20 时必填。可填普通口令,或 hex:32字节十六进制,或 base64:32字节base64。
|
||||||
|
LAYLINK_DATA_CHUNK_BYTES=1048576
|
||||||
|
# TCP DATA 帧分片大小;单位字节。较大值可减少帧数量并提升吞吐,建议 262144、524288、1048576、2097152。
|
||||||
|
LAYLINK_MAX_SEND_BUFFER_BYTES=67108864
|
||||||
|
# 单连接发送缓冲区上限;单位字节。大文件下载建议 33554432 或 67108864,内存紧张时调小。
|
||||||
|
LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432
|
||||||
|
# 背压触发水位;单位字节。应小于 LAYLINK_MAX_SEND_BUFFER_BYTES,达到后会暂停上游读取直到缓冲排空。
|
||||||
|
|
||||||
[client-agent]
|
[client-agent]
|
||||||
NODE_ID=client-01
|
NODE_ID=client-01
|
||||||
|
|||||||
@ -39,6 +39,9 @@ $bootAgent = function (string $protocol, string $listen, string $name) use ($nod
|
|||||||
? Env::get('CLIENT_AGENT_SOCKS5_UDP_LISTEN_IP', '127.0.0.1') . ':' . Env::get('CLIENT_AGENT_SOCKS5_UDP_LISTEN_PORT', '1081')
|
? Env::get('CLIENT_AGENT_SOCKS5_UDP_LISTEN_IP', '127.0.0.1') . ':' . Env::get('CLIENT_AGENT_SOCKS5_UDP_LISTEN_PORT', '1081')
|
||||||
: null,
|
: null,
|
||||||
Env::get('CLIENT_AGENT_SOCKS5_UDP_ADVERTISE_IP', Env::get('CLIENT_AGENT_SOCKS5_LISTEN_IP', '127.0.0.1')),
|
Env::get('CLIENT_AGENT_SOCKS5_UDP_ADVERTISE_IP', Env::get('CLIENT_AGENT_SOCKS5_LISTEN_IP', '127.0.0.1')),
|
||||||
|
Env::int('LAYLINK_MAX_SEND_BUFFER_BYTES', 64 * 1024 * 1024, 1024 * 1024),
|
||||||
|
Env::int('LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES', 32 * 1024 * 1024, 512 * 1024),
|
||||||
|
Env::int('LAYLINK_DATA_CHUNK_BYTES', 1024 * 1024, 16 * 1024, 8 * 1024 * 1024),
|
||||||
);
|
);
|
||||||
$agent->boot($name);
|
$agent->boot($name);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -24,6 +24,9 @@ $server = new PopServer(
|
|||||||
require dirname(__DIR__) . '/config/policies.php',
|
require dirname(__DIR__) . '/config/policies.php',
|
||||||
Env::csv('POP_ALLOWED_AGENT_TRANSPORTS', ['tcp']),
|
Env::csv('POP_ALLOWED_AGENT_TRANSPORTS', ['tcp']),
|
||||||
Env::get('AUDIT_LOG', dirname(__DIR__) . '/runtime/audit.log'),
|
Env::get('AUDIT_LOG', dirname(__DIR__) . '/runtime/audit.log'),
|
||||||
|
Env::int('LAYLINK_MAX_SEND_BUFFER_BYTES', 64 * 1024 * 1024, 1024 * 1024),
|
||||||
|
Env::int('LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES', 32 * 1024 * 1024, 512 * 1024),
|
||||||
|
Env::int('LAYLINK_DATA_CHUNK_BYTES', 1024 * 1024, 16 * 1024, 8 * 1024 * 1024),
|
||||||
);
|
);
|
||||||
$server->boot();
|
$server->boot();
|
||||||
|
|
||||||
|
|||||||
@ -168,10 +168,14 @@ Completed in this checkpoint:
|
|||||||
* POP resumes target reads when the Agent connection drains.
|
* POP resumes target reads when the Agent connection drains.
|
||||||
* Client Agent pauses local client reads when the POP connection send buffer crosses the high watermark.
|
* Client Agent pauses local client reads when the POP connection send buffer crosses the high watermark.
|
||||||
* Client Agent pauses POP reads while a local client output buffer is full.
|
* Client Agent pauses POP reads while a local client output buffer is full.
|
||||||
* Send buffer limits are raised to 32 MiB with a 16 MiB backpressure high watermark.
|
* Send buffer limits default to 64 MiB with a 32 MiB backpressure high watermark.
|
||||||
|
* Tuning envs:
|
||||||
|
* `LAYLINK_DATA_CHUNK_BYTES`
|
||||||
|
* `LAYLINK_MAX_SEND_BUFFER_BYTES`
|
||||||
|
* `LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES`
|
||||||
* Fixed large-download truncation risk:
|
* Fixed large-download truncation risk:
|
||||||
* Client Agent now treats POP `CLOSE` as a graceful remote EOF and waits for the local client send buffer to drain before closing the local socket.
|
* Client Agent now treats POP `CLOSE` as a graceful remote EOF and waits for the local client send buffer to drain before closing the local socket.
|
||||||
* TCP `DATA` is split into 256 KiB chunks to avoid oversized frames and improve fairness between sessions.
|
* TCP `DATA` is split into configurable chunks, defaulting to 1 MiB, to reduce frame overhead while avoiding oversized frames.
|
||||||
* POP refreshes Agent activity on any valid frame, not only `PING`, reducing heartbeat false positives during heavy traffic.
|
* POP refreshes Agent activity on any valid frame, not only `PING`, reducing heartbeat false positives during heavy traffic.
|
||||||
|
|
||||||
Known MVP limitations:
|
Known MVP limitations:
|
||||||
|
|||||||
12
readme.md
12
readme.md
@ -380,9 +380,19 @@ php bin/client-agent.php start
|
|||||||
|
|
||||||
TCP 大流量 `DATA` 帧使用二进制帧编码;`AUTH`、`OPEN`、`CLOSE`、`ERROR` 等控制帧仍使用 JSON 编码。启用 `chacha20` 时,二进制和 JSON Frame body 都会被加密。
|
TCP 大流量 `DATA` 帧使用二进制帧编码;`AUTH`、`OPEN`、`CLOSE`、`ERROR` 等控制帧仍使用 JSON 编码。启用 `chacha20` 时,二进制和 JSON Frame body 都会被加密。
|
||||||
|
|
||||||
|
吞吐相关参数由两端共用,POP Server 和 Client Agent 建议保持一致:
|
||||||
|
|
||||||
|
```env
|
||||||
|
LAYLINK_DATA_CHUNK_BYTES=1048576
|
||||||
|
LAYLINK_MAX_SEND_BUFFER_BYTES=67108864
|
||||||
|
LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432
|
||||||
|
```
|
||||||
|
|
||||||
|
`LAYLINK_DATA_CHUNK_BYTES` 越大,每 MB 需要处理的 Frame 越少,单连接下载通常越快;如果多会话公平性变差,可以降到 `262144` 或 `524288`。`LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES` 必须小于 `LAYLINK_MAX_SEND_BUFFER_BYTES`。
|
||||||
|
|
||||||
大文件下载时,LayLink 会使用 Workerman 的 `pauseRecv()` / `resumeRecv()` 做背压:当下游发送缓冲区过高时暂停上游读取,缓冲排空后继续读取。这可以避免单个慢连接无限堆积内存或因为发送缓冲区满而断联。
|
大文件下载时,LayLink 会使用 Workerman 的 `pauseRecv()` / `resumeRecv()` 做背压:当下游发送缓冲区过高时暂停上游读取,缓冲排空后继续读取。这可以避免单个慢连接无限堆积内存或因为发送缓冲区满而断联。
|
||||||
|
|
||||||
当 POP 收到目标站关闭连接时,Client Agent 会先等待本地客户端发送缓冲区排空,再关闭本地 socket,避免大文件尾部数据还在缓冲区里时被提前截断。TCP `DATA` 会按 256 KiB 分片发送,以避免巨帧并改善多会话公平性。
|
当 POP 收到目标站关闭连接时,Client Agent 会先等待本地客户端发送缓冲区排空,再关闭本地 socket,避免大文件尾部数据还在缓冲区里时被提前截断。TCP `DATA` 默认按 1 MiB 分片发送,以减少帧开销;可通过 `LAYLINK_DATA_CHUNK_BYTES` 调整。
|
||||||
|
|
||||||
当前每个 Client Agent worker 仍然通过单条 Agent-to-POP TCP 长连接承载多个会话。背压可以保护进程不堵死,但单条 TCP 长连接仍可能产生队头阻塞;多 worker、多 POP 长连接、KCP 或 per-session window 是后续性能优化方向。
|
当前每个 Client Agent worker 仍然通过单条 Agent-to-POP TCP 长连接承载多个会话。背压可以保护进程不堵死,但单条 TCP 长连接仍可能产生队头阻塞;多 worker、多 POP 长连接、KCP 或 per-session window 是后续性能优化方向。
|
||||||
|
|
||||||
|
|||||||
@ -17,10 +17,6 @@ use Workerman\Worker;
|
|||||||
|
|
||||||
final class AgentClient
|
final class AgentClient
|
||||||
{
|
{
|
||||||
private const MAX_SEND_BUFFER = 32 * 1024 * 1024;
|
|
||||||
private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024;
|
|
||||||
private const DATA_CHUNK_SIZE = 256 * 1024;
|
|
||||||
|
|
||||||
private ?AsyncTcpConnection $pop = null;
|
private ?AsyncTcpConnection $pop = null;
|
||||||
private ?FrameParser $parser = null;
|
private ?FrameParser $parser = null;
|
||||||
private bool $authenticated = false;
|
private bool $authenticated = false;
|
||||||
@ -67,6 +63,9 @@ final class AgentClient
|
|||||||
private readonly string $socks5Password = '',
|
private readonly string $socks5Password = '',
|
||||||
private readonly ?string $socks5UdpListen = null,
|
private readonly ?string $socks5UdpListen = null,
|
||||||
private readonly string $socks5UdpAdvertiseIp = '127.0.0.1',
|
private readonly string $socks5UdpAdvertiseIp = '127.0.0.1',
|
||||||
|
private readonly int $maxSendBuffer = 64 * 1024 * 1024,
|
||||||
|
private readonly int $backpressureHighWatermark = 32 * 1024 * 1024,
|
||||||
|
private readonly int $dataChunkSize = 1024 * 1024,
|
||||||
) {
|
) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,7 +98,7 @@ final class AgentClient
|
|||||||
$this->parser = new FrameParser();
|
$this->parser = new FrameParser();
|
||||||
$this->authenticated = false;
|
$this->authenticated = false;
|
||||||
$connection = new AsyncTcpConnection($this->popAddress);
|
$connection = new AsyncTcpConnection($this->popAddress);
|
||||||
$connection->maxSendBufferSize = self::MAX_SEND_BUFFER;
|
$connection->maxSendBufferSize = $this->maxSendBuffer;
|
||||||
$this->pop = $connection;
|
$this->pop = $connection;
|
||||||
|
|
||||||
$connection->onConnect = function (AsyncTcpConnection $connection): void {
|
$connection->onConnect = function (AsyncTcpConnection $connection): void {
|
||||||
@ -161,7 +160,7 @@ final class AgentClient
|
|||||||
|
|
||||||
private function onClientConnect(TcpConnection $connection): void
|
private function onClientConnect(TcpConnection $connection): void
|
||||||
{
|
{
|
||||||
$connection->maxSendBufferSize = self::MAX_SEND_BUFFER;
|
$connection->maxSendBufferSize = $this->maxSendBuffer;
|
||||||
$connection->onBufferFull = fn (TcpConnection $connection) => $this->pausePopForClient($connection);
|
$connection->onBufferFull = fn (TcpConnection $connection) => $this->pausePopForClient($connection);
|
||||||
$connection->onBufferDrain = fn (TcpConnection $connection) => $this->onClientBufferDrain($connection);
|
$connection->onBufferDrain = fn (TcpConnection $connection) => $this->onClientBufferDrain($connection);
|
||||||
$this->initialBuffers[$connection->id] = '';
|
$this->initialBuffers[$connection->id] = '';
|
||||||
@ -185,7 +184,7 @@ final class AgentClient
|
|||||||
$this->closeClient($sessionId);
|
$this->closeClient($sessionId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if ($this->pop !== null && $this->pop->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) {
|
if ($this->pop !== null && $this->pop->getSendBufferQueueSize() >= $this->backpressureHighWatermark) {
|
||||||
$connection->pauseRecv();
|
$connection->pauseRecv();
|
||||||
$this->pausedClientsForPop[$connection->id] = $connection;
|
$this->pausedClientsForPop[$connection->id] = $connection;
|
||||||
}
|
}
|
||||||
@ -611,7 +610,7 @@ final class AgentClient
|
|||||||
$this->closeClient($frame->sessionId);
|
$this->closeClient($frame->sessionId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if ($client->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) {
|
if ($client->getSendBufferQueueSize() >= $this->backpressureHighWatermark) {
|
||||||
$this->pausePopForClient($client);
|
$this->pausePopForClient($client);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -833,9 +832,9 @@ final class AgentClient
|
|||||||
private function sendData(string $sessionId, string $data): bool
|
private function sendData(string $sessionId, string $data): bool
|
||||||
{
|
{
|
||||||
$length = strlen($data);
|
$length = strlen($data);
|
||||||
for ($offset = 0; $offset < $length; $offset += self::DATA_CHUNK_SIZE) {
|
for ($offset = 0; $offset < $length; $offset += $this->dataChunkSize) {
|
||||||
$sent = $this->send(new Frame(FrameType::DATA, $sessionId, [
|
$sent = $this->send(new Frame(FrameType::DATA, $sessionId, [
|
||||||
'data_raw' => substr($data, $offset, self::DATA_CHUNK_SIZE),
|
'data_raw' => substr($data, $offset, $this->dataChunkSize),
|
||||||
]));
|
]));
|
||||||
if ($sent === false) {
|
if ($sent === false) {
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@ -24,10 +24,6 @@ use Workerman\Worker;
|
|||||||
|
|
||||||
final class AgentListener
|
final class AgentListener
|
||||||
{
|
{
|
||||||
private const MAX_SEND_BUFFER = 32 * 1024 * 1024;
|
|
||||||
private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024;
|
|
||||||
private const DATA_CHUNK_SIZE = 256 * 1024;
|
|
||||||
|
|
||||||
/** @var array<int, FrameParser> */
|
/** @var array<int, FrameParser> */
|
||||||
private array $parsers = [];
|
private array $parsers = [];
|
||||||
/** @var array<int, string> */
|
/** @var array<int, string> */
|
||||||
@ -43,6 +39,9 @@ final class AgentListener
|
|||||||
private readonly NodeRegistry $nodes,
|
private readonly NodeRegistry $nodes,
|
||||||
private readonly SessionManager $sessions,
|
private readonly SessionManager $sessions,
|
||||||
private readonly AuditLogger $audit,
|
private readonly AuditLogger $audit,
|
||||||
|
private readonly int $maxSendBuffer = 64 * 1024 * 1024,
|
||||||
|
private readonly int $backpressureHighWatermark = 32 * 1024 * 1024,
|
||||||
|
private readonly int $dataChunkSize = 1024 * 1024,
|
||||||
) {
|
) {
|
||||||
$worker->onConnect = fn (TcpConnection $connection) => $this->onConnect($connection);
|
$worker->onConnect = fn (TcpConnection $connection) => $this->onConnect($connection);
|
||||||
$worker->onMessage = fn (TcpConnection $connection, string $data) => $this->onMessage($connection, $data);
|
$worker->onMessage = fn (TcpConnection $connection, string $data) => $this->onMessage($connection, $data);
|
||||||
@ -52,7 +51,7 @@ final class AgentListener
|
|||||||
|
|
||||||
private function onConnect(TcpConnection $connection): void
|
private function onConnect(TcpConnection $connection): void
|
||||||
{
|
{
|
||||||
$connection->maxSendBufferSize = self::MAX_SEND_BUFFER;
|
$connection->maxSendBufferSize = $this->maxSendBuffer;
|
||||||
$connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumeTargetsForAgent($connection);
|
$connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumeTargetsForAgent($connection);
|
||||||
$this->parsers[$connection->id] = new FrameParser();
|
$this->parsers[$connection->id] = new FrameParser();
|
||||||
}
|
}
|
||||||
@ -231,7 +230,7 @@ final class AgentListener
|
|||||||
$this->sessions->add($session);
|
$this->sessions->add($session);
|
||||||
|
|
||||||
$target = new AsyncTcpConnection($targetAddress);
|
$target = new AsyncTcpConnection($targetAddress);
|
||||||
$target->maxSendBufferSize = self::MAX_SEND_BUFFER;
|
$target->maxSendBufferSize = $this->maxSendBuffer;
|
||||||
$session->target = $target;
|
$session->target = $target;
|
||||||
|
|
||||||
$target->onConnect = function () use ($session, $agentConnection): void {
|
$target->onConnect = function () use ($session, $agentConnection): void {
|
||||||
@ -247,7 +246,7 @@ final class AgentListener
|
|||||||
$this->closeSession($session, 'failed', 'agent_buffer_overflow');
|
$this->closeSession($session, 'failed', 'agent_buffer_overflow');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if ($agentConnection->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) {
|
if ($agentConnection->getSendBufferQueueSize() >= $this->backpressureHighWatermark) {
|
||||||
$target->pauseRecv();
|
$target->pauseRecv();
|
||||||
$this->pausedTargetsByAgent[$agentConnection->id][$session->sessionId] = $target;
|
$this->pausedTargetsByAgent[$agentConnection->id][$session->sessionId] = $target;
|
||||||
}
|
}
|
||||||
@ -275,7 +274,7 @@ final class AgentListener
|
|||||||
$this->closeSession($session, 'failed', 'target_buffer_overflow');
|
$this->closeSession($session, 'failed', 'target_buffer_overflow');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if ($session->target->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK && $session->agent !== null) {
|
if ($session->target->getSendBufferQueueSize() >= $this->backpressureHighWatermark && $session->agent !== null) {
|
||||||
$session->agent->pauseRecv();
|
$session->agent->pauseRecv();
|
||||||
$session->target->onBufferDrain = function (AsyncTcpConnection $target) use ($session): void {
|
$session->target->onBufferDrain = function (AsyncTcpConnection $target) use ($session): void {
|
||||||
$session->agent?->resumeRecv();
|
$session->agent?->resumeRecv();
|
||||||
@ -395,9 +394,9 @@ final class AgentListener
|
|||||||
private function sendData(TcpConnection $connection, string $sessionId, string $data): bool
|
private function sendData(TcpConnection $connection, string $sessionId, string $data): bool
|
||||||
{
|
{
|
||||||
$length = strlen($data);
|
$length = strlen($data);
|
||||||
for ($offset = 0; $offset < $length; $offset += self::DATA_CHUNK_SIZE) {
|
for ($offset = 0; $offset < $length; $offset += $this->dataChunkSize) {
|
||||||
$sent = $this->send($connection, new Frame(FrameType::DATA, $sessionId, [
|
$sent = $this->send($connection, new Frame(FrameType::DATA, $sessionId, [
|
||||||
'data_raw' => substr($data, $offset, self::DATA_CHUNK_SIZE),
|
'data_raw' => substr($data, $offset, $this->dataChunkSize),
|
||||||
]));
|
]));
|
||||||
if ($sent === false) {
|
if ($sent === false) {
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@ -25,6 +25,9 @@ final class PopServer
|
|||||||
private readonly array $policies,
|
private readonly array $policies,
|
||||||
private readonly array $allowedAgentTransports,
|
private readonly array $allowedAgentTransports,
|
||||||
string $auditLog,
|
string $auditLog,
|
||||||
|
private readonly int $maxSendBuffer = 64 * 1024 * 1024,
|
||||||
|
private readonly int $backpressureHighWatermark = 32 * 1024 * 1024,
|
||||||
|
private readonly int $dataChunkSize = 1024 * 1024,
|
||||||
) {
|
) {
|
||||||
$this->nodes = new NodeRegistry();
|
$this->nodes = new NodeRegistry();
|
||||||
$this->sessions = new SessionManager();
|
$this->sessions = new SessionManager();
|
||||||
@ -44,6 +47,9 @@ final class PopServer
|
|||||||
$this->nodes,
|
$this->nodes,
|
||||||
$this->sessions,
|
$this->sessions,
|
||||||
$this->audit,
|
$this->audit,
|
||||||
|
$this->maxSendBuffer,
|
||||||
|
$this->backpressureHighWatermark,
|
||||||
|
$this->dataChunkSize,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -49,6 +49,21 @@ final class Env
|
|||||||
return in_array(strtolower(trim($value)), ['1', 'true', 'yes', 'on'], true);
|
return in_array(strtolower(trim($value)), ['1', 'true', 'yes', 'on'], true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static function int(string $key, int $default, int $min = PHP_INT_MIN, int $max = PHP_INT_MAX): int
|
||||||
|
{
|
||||||
|
$value = getenv($key);
|
||||||
|
if ($value === false || trim($value) === '') {
|
||||||
|
return $default;
|
||||||
|
}
|
||||||
|
|
||||||
|
$value = trim($value);
|
||||||
|
if (!preg_match('/^-?\d+$/', $value)) {
|
||||||
|
return $default;
|
||||||
|
}
|
||||||
|
|
||||||
|
return max($min, min($max, (int)$value));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return string[]
|
* @return string[]
|
||||||
*/
|
*/
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user