KCP Improving
This commit is contained in:
parent
56c74337e4
commit
38797fe078
28
.env.example
28
.env.example
@ -27,22 +27,22 @@ LAYLINK_KCP_INTERVAL_MS=10
|
||||
# KCP 内部 update 间隔;单位毫秒。常用 10、20、30,越小越低延迟但 CPU/发包更高。
|
||||
LAYLINK_KCP_FAST_RESEND=2
|
||||
# KCP 快速重传阈值;0 关闭,2 是常见低延迟设置,丢包网络可尝试 2-4。
|
||||
LAYLINK_KCP_NO_CONGESTION_CONTROL=0
|
||||
# 是否关闭 KCP 拥塞控制;0 开启拥塞控制更稳,1 更激进但容易挤爆 UDP 发送缓冲。
|
||||
LAYLINK_KCP_SEND_WINDOW=256
|
||||
# KCP 发送窗口;越大吞吐潜力越高但更容易拥堵,常用 128、256、512、1024。
|
||||
LAYLINK_KCP_RECV_WINDOW=512
|
||||
# KCP 接收窗口;应不小于发送窗口,常用 256、512、1024。
|
||||
LAYLINK_KCP_MTU_BYTES=1200
|
||||
# KCP MTU;建议 1200 避免公网路径分片,内网可尝试 1350。
|
||||
LAYLINK_KCP_NO_CONGESTION_CONTROL=1
|
||||
# 是否关闭 KCP 拥塞控制;1 为高吞吐模式,接近早期 100Mbps 默认值;丢包/拥堵明显时改 0。
|
||||
LAYLINK_KCP_SEND_WINDOW=1024
|
||||
# KCP 发送窗口;高吞吐建议 1024,保守可用 256 或 512。
|
||||
LAYLINK_KCP_RECV_WINDOW=1024
|
||||
# KCP 接收窗口;应不小于发送窗口,高吞吐建议 1024。
|
||||
LAYLINK_KCP_MTU_BYTES=1350
|
||||
# KCP MTU;高吞吐建议 1350,若公网链路分片/丢包明显则改 1200。
|
||||
LAYLINK_KCP_TICK_MS=10
|
||||
# PHP transport tick 间隔;单位毫秒。通常与 LAYLINK_KCP_INTERVAL_MS 一致。
|
||||
LAYLINK_KCP_UDP_SEND_QUEUE_BYTES=16777216
|
||||
# UDP EAGAIN 发送队列上限;单位字节。发送缓冲暂满时会排队重试,超过后关闭该 KCP 会话。
|
||||
LAYLINK_KCP_UDP_FLUSH_PACKETS=256
|
||||
# 每次 tick 最多刷出的 UDP packet 数;拥堵时可调低到 64 或 128,追求吞吐可调高。
|
||||
LAYLINK_KCP_OUTPUT_DRAIN_PACKETS=256
|
||||
# 每次从 native KCP 输出队列搬到 UDP 发送队列的最大 packet 数;调低可减少单个大下载占用事件循环时间。
|
||||
LAYLINK_KCP_UDP_SEND_QUEUE_BYTES=67108864
|
||||
# UDP EAGAIN 发送队列上限;单位字节。高吞吐建议 67108864,内存紧张或拥堵明显时调低。
|
||||
LAYLINK_KCP_UDP_FLUSH_PACKETS=1024
|
||||
# 每次 tick 最多刷出的 UDP packet 数;高吞吐建议 1024,拥堵时可调低到 256 或 128。
|
||||
LAYLINK_KCP_OUTPUT_DRAIN_PACKETS=1024
|
||||
# 每次从 native KCP 输出队列搬到 UDP 发送队列的最大 packet 数;高吞吐建议 1024,单连接影响事件循环时调低。
|
||||
|
||||
[client-agent]
|
||||
NODE_ID=client-01
|
||||
|
||||
17
contract.md
17
contract.md
@ -2,7 +2,7 @@
|
||||
|
||||
## Implementation Status
|
||||
|
||||
Last updated: 2026-05-29 Asia/Shanghai.
|
||||
Last updated: 2026-05-30 Asia/Shanghai.
|
||||
|
||||
Current phase: MVP bootstrap in progress.
|
||||
|
||||
@ -240,6 +240,21 @@ Completed in this checkpoint:
|
||||
* `POP_AGENT_KCP_WORKERS` is exposed but currently clamped to `1` in `bin/pop-server.php`.
|
||||
* KCP/UDP must remain single-worker in the current architecture because KCP session state is process-local and UDP packets for one conv can otherwise be handled by different workers.
|
||||
* Native KCP output draining is capped per tick by `LAYLINK_KCP_OUTPUT_DRAIN_PACKETS` to reduce single-flow event-loop monopolization during large downloads.
|
||||
* Fixed KCP POP-side session lookup for real-world UDP/NAT behavior:
|
||||
* POP no longer depends only on `remote ip:port + conv` for KCP session lookup.
|
||||
* POP keeps a `conv -> session` index and migrates the current UDP remote address when a known `conv` arrives from a changed source port.
|
||||
* KCP callbacks now resolve the active connection by `conv`, so migrated sessions continue delivering frames instead of silently dropping `DATA`.
|
||||
* Fixed native KCP send-buffer accounting after heavy speedtest-style traffic:
|
||||
* The previous PHP-side native KCP `queuedBytes` counter could grow during large transfers and never fall back to zero unless user payload was received in the opposite direction.
|
||||
* This could make a long-lived KCP Agent connection permanently appear full after upload/download tests, causing later `OPEN_OK` / `DATA` sends to fail and audit rows to show zero transferred bytes.
|
||||
* Added native wrapper `laylink_kcp_waitsnd()` around upstream `ikcp_waitsnd()`.
|
||||
* `NativeKcpSession::getSendBufferQueueSize()` now derives watermarks from real KCP pending segment count plus pending UDP output bytes.
|
||||
* POP now treats failure to send `OPEN_OK` as `agent_buffer_overflow` instead of leaving a target connection open with no client-visible success.
|
||||
* Restored KCP high-throughput defaults:
|
||||
* Early native KCP testing used hardcoded `nodelay=1, interval=10, resend=2, nc=1, sndwnd=1024, rcvwnd=1024, mtu=1350`.
|
||||
* The first exposed `.env` defaults were more conservative (`nc=0`, smaller windows, `mtu=1200`) and could reduce throughput dramatically on speedtest-style high-BDP paths.
|
||||
* `NativeKcpSession`, KCP UDP queue/flush fallback defaults, current `.env`, and `.env.example` now use the high-throughput profile by default.
|
||||
* For lossy or congested paths, tune down to `LAYLINK_KCP_NO_CONGESTION_CONTROL=0`, `LAYLINK_KCP_MTU_BYTES=1200`, smaller windows, or lower flush/drain packet counts.
|
||||
|
||||
Known MVP limitations:
|
||||
|
||||
|
||||
@ -164,6 +164,13 @@ void laylink_kcp_flush(laylink_kcp* session) {
|
||||
ikcp_flush(session->kcp);
|
||||
}
|
||||
|
||||
int laylink_kcp_waitsnd(laylink_kcp* session) {
|
||||
if (session == NULL || session->kcp == NULL) {
|
||||
return 0;
|
||||
}
|
||||
return ikcp_waitsnd(session->kcp);
|
||||
}
|
||||
|
||||
int laylink_kcp_pending_output_size(laylink_kcp* session) {
|
||||
if (session == NULL || session->output_head == NULL) {
|
||||
return 0;
|
||||
|
||||
@ -19,6 +19,7 @@ unsigned int laylink_kcp_check(laylink_kcp* session, unsigned int current);
|
||||
int laylink_kcp_peeksize(laylink_kcp* session);
|
||||
int laylink_kcp_recv(laylink_kcp* session, char* buffer, int len);
|
||||
void laylink_kcp_flush(laylink_kcp* session);
|
||||
int laylink_kcp_waitsnd(laylink_kcp* session);
|
||||
int laylink_kcp_pending_output_size(laylink_kcp* session);
|
||||
int laylink_kcp_pop_output(laylink_kcp* session, char* buffer, int len);
|
||||
|
||||
|
||||
@ -237,10 +237,13 @@ final class AgentListener
|
||||
|
||||
$target->onConnect = function () use ($session, $agentConnection): void {
|
||||
$session->state = TunnelSession::OPEN;
|
||||
$this->send($agentConnection, new Frame(FrameType::OPEN_OK, $session->sessionId, [
|
||||
$sent = $this->send($agentConnection, new Frame(FrameType::OPEN_OK, $session->sessionId, [
|
||||
'target_host' => $session->targetHost,
|
||||
'target_port' => $session->targetPort,
|
||||
]));
|
||||
if ($sent === false) {
|
||||
$this->closeSession($session, 'failed', 'agent_buffer_overflow');
|
||||
}
|
||||
};
|
||||
$target->onMessage = function (AsyncTcpConnection $target, string $data) use ($session, $agentConnection): void {
|
||||
$session->bytesTargetToClient += strlen($data);
|
||||
|
||||
@ -37,7 +37,7 @@ final class KcpFrameClientTransport implements FrameClientTransport
|
||||
$this->connection = $connection;
|
||||
$this->sender = new KcpUdpPacketSender(
|
||||
$connection,
|
||||
KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 16 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024),
|
||||
KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 64 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024),
|
||||
);
|
||||
|
||||
$connection->onConnect = function () use ($connection): void {
|
||||
@ -130,14 +130,14 @@ final class KcpFrameClientTransport implements FrameClientTransport
|
||||
'type' => KcpPacketCodec::SYN,
|
||||
'conv' => $this->conv,
|
||||
]));
|
||||
$this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
|
||||
$this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192));
|
||||
return;
|
||||
}
|
||||
|
||||
$before = $this->getSendBufferQueueSize();
|
||||
$this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
|
||||
$this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192));
|
||||
$this->session?->tick();
|
||||
$this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
|
||||
$this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192));
|
||||
if ($before >= $this->maxSendBuffer && $this->getSendBufferQueueSize() < $this->maxSendBuffer) {
|
||||
($this->onBufferDrain)($this);
|
||||
}
|
||||
|
||||
@ -15,6 +15,8 @@ final class KcpFrameServerListener
|
||||
private array $connections = [];
|
||||
/** @var array<string, KcpReliableSession|NativeKcpSession> */
|
||||
private array $sessions = [];
|
||||
/** @var array<int, string> */
|
||||
private array $keysByConv = [];
|
||||
private ?int $timerId = null;
|
||||
private int $nextConnectionId = 1_000_000;
|
||||
|
||||
@ -48,6 +50,7 @@ final class KcpFrameServerListener
|
||||
return;
|
||||
}
|
||||
$key = $this->key($connection, $conv);
|
||||
$key = $this->knownKey($connection, $conv, $key);
|
||||
$wrapped = $this->connections[$key] ?? null;
|
||||
$session = $this->sessions[$key] ?? null;
|
||||
if ($wrapped === null || $session === null) {
|
||||
@ -72,6 +75,7 @@ final class KcpFrameServerListener
|
||||
$this->handleSyn($connection, $key, $packet['conv']);
|
||||
return;
|
||||
}
|
||||
$key = $this->knownKey($connection, $packet['conv'], $key);
|
||||
|
||||
$wrapped = $this->connections[$key] ?? null;
|
||||
$session = $this->sessions[$key] ?? null;
|
||||
@ -98,31 +102,35 @@ final class KcpFrameServerListener
|
||||
|
||||
private function handleSyn(UdpConnection $connection, string $key, int $conv): void
|
||||
{
|
||||
if (isset($this->keysByConv[$conv]) && $this->keysByConv[$conv] !== $key) {
|
||||
$key = $this->migrateConnection($this->keysByConv[$conv], $key, $connection, $conv);
|
||||
}
|
||||
|
||||
if (isset($this->connections[$key])) {
|
||||
$this->connections[$key]->updateConnection($connection);
|
||||
$this->connections[$key]->sendUdpPacket(KcpPacketCodec::encode([
|
||||
'type' => KcpPacketCodec::SYN_ACK,
|
||||
'conv' => $conv,
|
||||
]));
|
||||
$this->connections[$key]->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
|
||||
$this->connections[$key]->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192));
|
||||
return;
|
||||
}
|
||||
|
||||
$sender = new KcpUdpPacketSender(
|
||||
$connection,
|
||||
KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 16 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024),
|
||||
KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 64 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024),
|
||||
);
|
||||
$session = $this->createSession(
|
||||
$conv,
|
||||
fn (string $packet): bool|null => $sender->send($packet),
|
||||
function (Frame $frame) use ($key): void {
|
||||
$wrapped = $this->connections[$key] ?? null;
|
||||
function (Frame $frame) use ($conv): void {
|
||||
$wrapped = $this->connectionForConv($conv);
|
||||
if ($wrapped !== null) {
|
||||
($this->onFrame)($wrapped, $frame);
|
||||
}
|
||||
},
|
||||
function (\Throwable $e) use ($key): void {
|
||||
$wrapped = $this->connections[$key] ?? null;
|
||||
function (\Throwable $e) use ($conv): void {
|
||||
$wrapped = $this->connectionForConv($conv);
|
||||
if ($wrapped !== null) {
|
||||
($this->onInvalidFrame)($wrapped, $e);
|
||||
}
|
||||
@ -131,6 +139,7 @@ final class KcpFrameServerListener
|
||||
$wrapped = new KcpFrameServerConnection($this->nextConnectionId++, $connection, $session, $sender, $this->maxSendBuffer);
|
||||
$this->sessions[$key] = $session;
|
||||
$this->connections[$key] = $wrapped;
|
||||
$this->keysByConv[$conv] = $key;
|
||||
|
||||
$sender->send(KcpPacketCodec::encode([
|
||||
'type' => KcpPacketCodec::SYN_ACK,
|
||||
@ -149,9 +158,9 @@ final class KcpFrameServerListener
|
||||
}
|
||||
|
||||
$before = $session->getSendBufferQueueSize();
|
||||
$wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
|
||||
$wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192));
|
||||
$session->tick();
|
||||
$wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
|
||||
$wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192));
|
||||
if ($before >= $this->maxSendBuffer && $session->getSendBufferQueueSize() < $this->maxSendBuffer) {
|
||||
($this->onBufferDrain)($wrapped);
|
||||
}
|
||||
@ -161,7 +170,11 @@ final class KcpFrameServerListener
|
||||
private function closeConnection(string $key): void
|
||||
{
|
||||
$wrapped = $this->connections[$key] ?? null;
|
||||
$session = $this->sessions[$key] ?? null;
|
||||
unset($this->connections[$key], $this->sessions[$key]);
|
||||
if ($session !== null) {
|
||||
unset($this->keysByConv[$session->conv()]);
|
||||
}
|
||||
if ($wrapped !== null) {
|
||||
($this->onClose)($wrapped);
|
||||
}
|
||||
@ -182,4 +195,45 @@ final class KcpFrameServerListener
|
||||
{
|
||||
return $connection->getRemoteAddress() . '#' . $conv;
|
||||
}
|
||||
|
||||
private function knownKey(UdpConnection $connection, int $conv, string $candidate): string
|
||||
{
|
||||
if (isset($this->connections[$candidate])) {
|
||||
return $candidate;
|
||||
}
|
||||
|
||||
$known = $this->keysByConv[$conv] ?? null;
|
||||
if ($known === null || !isset($this->connections[$known], $this->sessions[$known])) {
|
||||
return $candidate;
|
||||
}
|
||||
|
||||
return $this->migrateConnection($known, $candidate, $connection, $conv);
|
||||
}
|
||||
|
||||
private function migrateConnection(string $oldKey, string $newKey, UdpConnection $connection, int $conv): string
|
||||
{
|
||||
if ($oldKey === $newKey) {
|
||||
return $newKey;
|
||||
}
|
||||
|
||||
$wrapped = $this->connections[$oldKey] ?? null;
|
||||
$session = $this->sessions[$oldKey] ?? null;
|
||||
if ($wrapped === null || $session === null) {
|
||||
return $newKey;
|
||||
}
|
||||
|
||||
unset($this->connections[$oldKey], $this->sessions[$oldKey]);
|
||||
$wrapped->updateConnection($connection);
|
||||
$this->connections[$newKey] = $wrapped;
|
||||
$this->sessions[$newKey] = $session;
|
||||
$this->keysByConv[$conv] = $newKey;
|
||||
|
||||
return $newKey;
|
||||
}
|
||||
|
||||
private function connectionForConv(int $conv): ?KcpFrameServerConnection
|
||||
{
|
||||
$key = $this->keysByConv[$conv] ?? null;
|
||||
return $key === null ? null : ($this->connections[$key] ?? null);
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ final class NativeKcpLibrary
|
||||
int laylink_kcp_peeksize(laylink_kcp* session);
|
||||
int laylink_kcp_recv(laylink_kcp* session, char* buffer, int len);
|
||||
void laylink_kcp_flush(laylink_kcp* session);
|
||||
int laylink_kcp_waitsnd(laylink_kcp* session);
|
||||
int laylink_kcp_pending_output_size(laylink_kcp* session);
|
||||
int laylink_kcp_pop_output(laylink_kcp* session, char* buffer, int len);
|
||||
CDEF;
|
||||
|
||||
@ -18,9 +18,9 @@ final class NativeKcpSession
|
||||
private FrameParser $parser;
|
||||
private bool $paused = false;
|
||||
private bool $closed = false;
|
||||
private int $mtuBytes;
|
||||
/** @var Frame[] */
|
||||
private array $pausedFrames = [];
|
||||
private int $queuedBytes = 0;
|
||||
|
||||
public function __construct(
|
||||
private readonly int $conv,
|
||||
@ -40,17 +40,15 @@ final class NativeKcpSession
|
||||
KcpConfig::int('LAYLINK_KCP_NODELAY', 1, 0, 1),
|
||||
KcpConfig::int('LAYLINK_KCP_INTERVAL_MS', 10, 1, 1000),
|
||||
KcpConfig::int('LAYLINK_KCP_FAST_RESEND', 2, 0, 10),
|
||||
KcpConfig::int('LAYLINK_KCP_NO_CONGESTION_CONTROL', 0, 0, 1),
|
||||
KcpConfig::int('LAYLINK_KCP_NO_CONGESTION_CONTROL', 1, 0, 1),
|
||||
);
|
||||
$this->ffi->laylink_kcp_wndsize(
|
||||
$this->session,
|
||||
KcpConfig::int('LAYLINK_KCP_SEND_WINDOW', 256, 16, 8192),
|
||||
KcpConfig::int('LAYLINK_KCP_RECV_WINDOW', 512, 16, 8192),
|
||||
);
|
||||
$this->ffi->laylink_kcp_setmtu(
|
||||
$this->session,
|
||||
KcpConfig::int('LAYLINK_KCP_MTU_BYTES', 1200, 576, 1400),
|
||||
KcpConfig::int('LAYLINK_KCP_SEND_WINDOW', 1024, 16, 8192),
|
||||
KcpConfig::int('LAYLINK_KCP_RECV_WINDOW', 1024, 16, 8192),
|
||||
);
|
||||
$this->mtuBytes = KcpConfig::int('LAYLINK_KCP_MTU_BYTES', 1350, 576, 1400);
|
||||
$this->ffi->laylink_kcp_setmtu($this->session, $this->mtuBytes);
|
||||
|
||||
$this->parser = new FrameParser();
|
||||
}
|
||||
@ -70,7 +68,7 @@ final class NativeKcpSession
|
||||
|
||||
public function sendFrame(Frame $frame, int $maxSendBuffer): bool|null
|
||||
{
|
||||
if ($this->closed || $this->queuedBytes >= $maxSendBuffer) {
|
||||
if ($this->closed || $this->getSendBufferQueueSize() >= $maxSendBuffer) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -80,7 +78,6 @@ final class NativeKcpSession
|
||||
return false;
|
||||
}
|
||||
|
||||
$this->queuedBytes += strlen($bytes);
|
||||
$this->ffi->laylink_kcp_flush($this->session);
|
||||
$this->drainOutput();
|
||||
return true;
|
||||
@ -146,7 +143,10 @@ final class NativeKcpSession
|
||||
|
||||
public function getSendBufferQueueSize(): int
|
||||
{
|
||||
return max($this->queuedBytes, $this->pendingOutputBytes());
|
||||
$waitSegments = $this->ffi->laylink_kcp_waitsnd($this->session);
|
||||
$waitBytes = max(0, $waitSegments) * $this->mtuBytes;
|
||||
|
||||
return $waitBytes + $this->pendingOutputBytes();
|
||||
}
|
||||
|
||||
private function drainFrames(): void
|
||||
@ -164,7 +164,6 @@ final class NativeKcpSession
|
||||
return;
|
||||
}
|
||||
|
||||
$this->queuedBytes = max(0, $this->queuedBytes - $read);
|
||||
$bytes = FFI::string($buffer, $read);
|
||||
try {
|
||||
foreach ($this->parser->push($bytes) as $frame) {
|
||||
@ -183,7 +182,7 @@ final class NativeKcpSession
|
||||
private function drainOutput(): void
|
||||
{
|
||||
$drained = 0;
|
||||
$maxPackets = KcpConfig::int('LAYLINK_KCP_OUTPUT_DRAIN_PACKETS', 256, 1, 8192);
|
||||
$maxPackets = KcpConfig::int('LAYLINK_KCP_OUTPUT_DRAIN_PACKETS', 1024, 1, 8192);
|
||||
while ($drained < $maxPackets && ($size = $this->ffi->laylink_kcp_pending_output_size($this->session)) > 0) {
|
||||
$buffer = $this->ffi->new("char[$size]");
|
||||
$read = $this->ffi->laylink_kcp_pop_output($this->session, $buffer, $size);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user