diff --git a/.env.example b/.env.example index 8d40b28..5d7961e 100644 --- a/.env.example +++ b/.env.example @@ -27,22 +27,22 @@ LAYLINK_KCP_INTERVAL_MS=10 # KCP 内部 update 间隔;单位毫秒。常用 10、20、30,越小越低延迟但 CPU/发包更高。 LAYLINK_KCP_FAST_RESEND=2 # KCP 快速重传阈值;0 关闭,2 是常见低延迟设置,丢包网络可尝试 2-4。 -LAYLINK_KCP_NO_CONGESTION_CONTROL=0 -# 是否关闭 KCP 拥塞控制;0 开启拥塞控制更稳,1 更激进但容易挤爆 UDP 发送缓冲。 -LAYLINK_KCP_SEND_WINDOW=256 -# KCP 发送窗口;越大吞吐潜力越高但更容易拥堵,常用 128、256、512、1024。 -LAYLINK_KCP_RECV_WINDOW=512 -# KCP 接收窗口;应不小于发送窗口,常用 256、512、1024。 -LAYLINK_KCP_MTU_BYTES=1200 -# KCP MTU;建议 1200 避免公网路径分片,内网可尝试 1350。 +LAYLINK_KCP_NO_CONGESTION_CONTROL=1 +# 是否关闭 KCP 拥塞控制;1 为高吞吐模式,接近早期 100Mbps 默认值;丢包/拥堵明显时改 0。 +LAYLINK_KCP_SEND_WINDOW=1024 +# KCP 发送窗口;高吞吐建议 1024,保守可用 256 或 512。 +LAYLINK_KCP_RECV_WINDOW=1024 +# KCP 接收窗口;应不小于发送窗口,高吞吐建议 1024。 +LAYLINK_KCP_MTU_BYTES=1350 +# KCP MTU;高吞吐建议 1350,若公网链路分片/丢包明显则改 1200。 LAYLINK_KCP_TICK_MS=10 # PHP transport tick 间隔;单位毫秒。通常与 LAYLINK_KCP_INTERVAL_MS 一致。 -LAYLINK_KCP_UDP_SEND_QUEUE_BYTES=16777216 -# UDP EAGAIN 发送队列上限;单位字节。发送缓冲暂满时会排队重试,超过后关闭该 KCP 会话。 -LAYLINK_KCP_UDP_FLUSH_PACKETS=256 -# 每次 tick 最多刷出的 UDP packet 数;拥堵时可调低到 64 或 128,追求吞吐可调高。 -LAYLINK_KCP_OUTPUT_DRAIN_PACKETS=256 -# 每次从 native KCP 输出队列搬到 UDP 发送队列的最大 packet 数;调低可减少单个大下载占用事件循环时间。 +LAYLINK_KCP_UDP_SEND_QUEUE_BYTES=67108864 +# UDP EAGAIN 发送队列上限;单位字节。高吞吐建议 67108864,内存紧张或拥堵明显时调低。 +LAYLINK_KCP_UDP_FLUSH_PACKETS=1024 +# 每次 tick 最多刷出的 UDP packet 数;高吞吐建议 1024,拥堵时可调低到 256 或 128。 +LAYLINK_KCP_OUTPUT_DRAIN_PACKETS=1024 +# 每次从 native KCP 输出队列搬到 UDP 发送队列的最大 packet 数;高吞吐建议 1024,单连接影响事件循环时调低。 [client-agent] NODE_ID=client-01 diff --git a/contract.md b/contract.md index 6caddf0..9281dd3 100644 --- a/contract.md +++ b/contract.md @@ -2,7 +2,7 @@ ## Implementation Status -Last updated: 2026-05-29 Asia/Shanghai. +Last updated: 2026-05-30 Asia/Shanghai. Current phase: MVP bootstrap in progress. @@ -240,6 +240,21 @@ Completed in this checkpoint: * `POP_AGENT_KCP_WORKERS` is exposed but currently clamped to `1` in `bin/pop-server.php`. * KCP/UDP must remain single-worker in the current architecture because KCP session state is process-local and UDP packets for one conv can otherwise be handled by different workers. * Native KCP output draining is capped per tick by `LAYLINK_KCP_OUTPUT_DRAIN_PACKETS` to reduce single-flow event-loop monopolization during large downloads. +* Fixed KCP POP-side session lookup for real-world UDP/NAT behavior: + * POP no longer depends only on `remote ip:port + conv` for KCP session lookup. + * POP keeps a `conv -> session` index and migrates the current UDP remote address when a known `conv` arrives from a changed source port. + * KCP callbacks now resolve the active connection by `conv`, so migrated sessions continue delivering frames instead of silently dropping `DATA`. +* Fixed native KCP send-buffer accounting after heavy speedtest-style traffic: + * The previous PHP-side native KCP `queuedBytes` counter could grow during large transfers and never fall back to zero unless user payload was received in the opposite direction. + * This could make a long-lived KCP Agent connection permanently appear full after upload/download tests, causing later `OPEN_OK` / `DATA` sends to fail and audit rows to show zero transferred bytes. + * Added native wrapper `laylink_kcp_waitsnd()` around upstream `ikcp_waitsnd()`. + * `NativeKcpSession::getSendBufferQueueSize()` now derives watermarks from real KCP pending segment count plus pending UDP output bytes. + * POP now treats failure to send `OPEN_OK` as `agent_buffer_overflow` instead of leaving a target connection open with no client-visible success. +* Restored KCP high-throughput defaults: + * Early native KCP testing used hardcoded `nodelay=1, interval=10, resend=2, nc=1, sndwnd=1024, rcvwnd=1024, mtu=1350`. + * The first exposed `.env` defaults were more conservative (`nc=0`, smaller windows, `mtu=1200`) and could reduce throughput dramatically on speedtest-style high-BDP paths. + * `NativeKcpSession`, KCP UDP queue/flush fallback defaults, current `.env`, and `.env.example` now use the high-throughput profile by default. + * For lossy or congested paths, tune down to `LAYLINK_KCP_NO_CONGESTION_CONTROL=0`, `LAYLINK_KCP_MTU_BYTES=1200`, smaller windows, or lower flush/drain packet counts. Known MVP limitations: diff --git a/native/kcp/laylink_kcp.c b/native/kcp/laylink_kcp.c index 1c1e133..2c98e9b 100644 --- a/native/kcp/laylink_kcp.c +++ b/native/kcp/laylink_kcp.c @@ -164,6 +164,13 @@ void laylink_kcp_flush(laylink_kcp* session) { ikcp_flush(session->kcp); } +int laylink_kcp_waitsnd(laylink_kcp* session) { + if (session == NULL || session->kcp == NULL) { + return 0; + } + return ikcp_waitsnd(session->kcp); +} + int laylink_kcp_pending_output_size(laylink_kcp* session) { if (session == NULL || session->output_head == NULL) { return 0; diff --git a/native/kcp/laylink_kcp.h b/native/kcp/laylink_kcp.h index 6e0a7ba..f9180c9 100644 --- a/native/kcp/laylink_kcp.h +++ b/native/kcp/laylink_kcp.h @@ -19,6 +19,7 @@ unsigned int laylink_kcp_check(laylink_kcp* session, unsigned int current); int laylink_kcp_peeksize(laylink_kcp* session); int laylink_kcp_recv(laylink_kcp* session, char* buffer, int len); void laylink_kcp_flush(laylink_kcp* session); +int laylink_kcp_waitsnd(laylink_kcp* session); int laylink_kcp_pending_output_size(laylink_kcp* session); int laylink_kcp_pop_output(laylink_kcp* session, char* buffer, int len); diff --git a/src/Server/AgentListener.php b/src/Server/AgentListener.php index 56481ae..68f15c3 100644 --- a/src/Server/AgentListener.php +++ b/src/Server/AgentListener.php @@ -237,10 +237,13 @@ final class AgentListener $target->onConnect = function () use ($session, $agentConnection): void { $session->state = TunnelSession::OPEN; - $this->send($agentConnection, new Frame(FrameType::OPEN_OK, $session->sessionId, [ + $sent = $this->send($agentConnection, new Frame(FrameType::OPEN_OK, $session->sessionId, [ 'target_host' => $session->targetHost, 'target_port' => $session->targetPort, ])); + if ($sent === false) { + $this->closeSession($session, 'failed', 'agent_buffer_overflow'); + } }; $target->onMessage = function (AsyncTcpConnection $target, string $data) use ($session, $agentConnection): void { $session->bytesTargetToClient += strlen($data); diff --git a/src/Transport/KcpFrameClientTransport.php b/src/Transport/KcpFrameClientTransport.php index 920ce58..aeb2aa2 100644 --- a/src/Transport/KcpFrameClientTransport.php +++ b/src/Transport/KcpFrameClientTransport.php @@ -37,7 +37,7 @@ final class KcpFrameClientTransport implements FrameClientTransport $this->connection = $connection; $this->sender = new KcpUdpPacketSender( $connection, - KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 16 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024), + KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 64 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024), ); $connection->onConnect = function () use ($connection): void { @@ -130,14 +130,14 @@ final class KcpFrameClientTransport implements FrameClientTransport 'type' => KcpPacketCodec::SYN, 'conv' => $this->conv, ])); - $this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); + $this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192)); return; } $before = $this->getSendBufferQueueSize(); - $this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); + $this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192)); $this->session?->tick(); - $this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); + $this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192)); if ($before >= $this->maxSendBuffer && $this->getSendBufferQueueSize() < $this->maxSendBuffer) { ($this->onBufferDrain)($this); } diff --git a/src/Transport/KcpFrameServerListener.php b/src/Transport/KcpFrameServerListener.php index 5cadefb..9bb3212 100644 --- a/src/Transport/KcpFrameServerListener.php +++ b/src/Transport/KcpFrameServerListener.php @@ -15,6 +15,8 @@ final class KcpFrameServerListener private array $connections = []; /** @var array */ private array $sessions = []; + /** @var array */ + private array $keysByConv = []; private ?int $timerId = null; private int $nextConnectionId = 1_000_000; @@ -48,6 +50,7 @@ final class KcpFrameServerListener return; } $key = $this->key($connection, $conv); + $key = $this->knownKey($connection, $conv, $key); $wrapped = $this->connections[$key] ?? null; $session = $this->sessions[$key] ?? null; if ($wrapped === null || $session === null) { @@ -72,6 +75,7 @@ final class KcpFrameServerListener $this->handleSyn($connection, $key, $packet['conv']); return; } + $key = $this->knownKey($connection, $packet['conv'], $key); $wrapped = $this->connections[$key] ?? null; $session = $this->sessions[$key] ?? null; @@ -98,31 +102,35 @@ final class KcpFrameServerListener private function handleSyn(UdpConnection $connection, string $key, int $conv): void { + if (isset($this->keysByConv[$conv]) && $this->keysByConv[$conv] !== $key) { + $key = $this->migrateConnection($this->keysByConv[$conv], $key, $connection, $conv); + } + if (isset($this->connections[$key])) { $this->connections[$key]->updateConnection($connection); $this->connections[$key]->sendUdpPacket(KcpPacketCodec::encode([ 'type' => KcpPacketCodec::SYN_ACK, 'conv' => $conv, ])); - $this->connections[$key]->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); + $this->connections[$key]->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192)); return; } $sender = new KcpUdpPacketSender( $connection, - KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 16 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024), + KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 64 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024), ); $session = $this->createSession( $conv, fn (string $packet): bool|null => $sender->send($packet), - function (Frame $frame) use ($key): void { - $wrapped = $this->connections[$key] ?? null; + function (Frame $frame) use ($conv): void { + $wrapped = $this->connectionForConv($conv); if ($wrapped !== null) { ($this->onFrame)($wrapped, $frame); } }, - function (\Throwable $e) use ($key): void { - $wrapped = $this->connections[$key] ?? null; + function (\Throwable $e) use ($conv): void { + $wrapped = $this->connectionForConv($conv); if ($wrapped !== null) { ($this->onInvalidFrame)($wrapped, $e); } @@ -131,6 +139,7 @@ final class KcpFrameServerListener $wrapped = new KcpFrameServerConnection($this->nextConnectionId++, $connection, $session, $sender, $this->maxSendBuffer); $this->sessions[$key] = $session; $this->connections[$key] = $wrapped; + $this->keysByConv[$conv] = $key; $sender->send(KcpPacketCodec::encode([ 'type' => KcpPacketCodec::SYN_ACK, @@ -149,9 +158,9 @@ final class KcpFrameServerListener } $before = $session->getSendBufferQueueSize(); - $wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); + $wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192)); $session->tick(); - $wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); + $wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 1024, 1, 8192)); if ($before >= $this->maxSendBuffer && $session->getSendBufferQueueSize() < $this->maxSendBuffer) { ($this->onBufferDrain)($wrapped); } @@ -161,7 +170,11 @@ final class KcpFrameServerListener private function closeConnection(string $key): void { $wrapped = $this->connections[$key] ?? null; + $session = $this->sessions[$key] ?? null; unset($this->connections[$key], $this->sessions[$key]); + if ($session !== null) { + unset($this->keysByConv[$session->conv()]); + } if ($wrapped !== null) { ($this->onClose)($wrapped); } @@ -182,4 +195,45 @@ final class KcpFrameServerListener { return $connection->getRemoteAddress() . '#' . $conv; } + + private function knownKey(UdpConnection $connection, int $conv, string $candidate): string + { + if (isset($this->connections[$candidate])) { + return $candidate; + } + + $known = $this->keysByConv[$conv] ?? null; + if ($known === null || !isset($this->connections[$known], $this->sessions[$known])) { + return $candidate; + } + + return $this->migrateConnection($known, $candidate, $connection, $conv); + } + + private function migrateConnection(string $oldKey, string $newKey, UdpConnection $connection, int $conv): string + { + if ($oldKey === $newKey) { + return $newKey; + } + + $wrapped = $this->connections[$oldKey] ?? null; + $session = $this->sessions[$oldKey] ?? null; + if ($wrapped === null || $session === null) { + return $newKey; + } + + unset($this->connections[$oldKey], $this->sessions[$oldKey]); + $wrapped->updateConnection($connection); + $this->connections[$newKey] = $wrapped; + $this->sessions[$newKey] = $session; + $this->keysByConv[$conv] = $newKey; + + return $newKey; + } + + private function connectionForConv(int $conv): ?KcpFrameServerConnection + { + $key = $this->keysByConv[$conv] ?? null; + return $key === null ? null : ($this->connections[$key] ?? null); + } } diff --git a/src/Transport/NativeKcpLibrary.php b/src/Transport/NativeKcpLibrary.php index 9b9d048..ede92a3 100644 --- a/src/Transport/NativeKcpLibrary.php +++ b/src/Transport/NativeKcpLibrary.php @@ -22,6 +22,7 @@ final class NativeKcpLibrary int laylink_kcp_peeksize(laylink_kcp* session); int laylink_kcp_recv(laylink_kcp* session, char* buffer, int len); void laylink_kcp_flush(laylink_kcp* session); + int laylink_kcp_waitsnd(laylink_kcp* session); int laylink_kcp_pending_output_size(laylink_kcp* session); int laylink_kcp_pop_output(laylink_kcp* session, char* buffer, int len); CDEF; diff --git a/src/Transport/NativeKcpSession.php b/src/Transport/NativeKcpSession.php index 4a4a680..361987d 100644 --- a/src/Transport/NativeKcpSession.php +++ b/src/Transport/NativeKcpSession.php @@ -18,9 +18,9 @@ final class NativeKcpSession private FrameParser $parser; private bool $paused = false; private bool $closed = false; + private int $mtuBytes; /** @var Frame[] */ private array $pausedFrames = []; - private int $queuedBytes = 0; public function __construct( private readonly int $conv, @@ -40,17 +40,15 @@ final class NativeKcpSession KcpConfig::int('LAYLINK_KCP_NODELAY', 1, 0, 1), KcpConfig::int('LAYLINK_KCP_INTERVAL_MS', 10, 1, 1000), KcpConfig::int('LAYLINK_KCP_FAST_RESEND', 2, 0, 10), - KcpConfig::int('LAYLINK_KCP_NO_CONGESTION_CONTROL', 0, 0, 1), + KcpConfig::int('LAYLINK_KCP_NO_CONGESTION_CONTROL', 1, 0, 1), ); $this->ffi->laylink_kcp_wndsize( $this->session, - KcpConfig::int('LAYLINK_KCP_SEND_WINDOW', 256, 16, 8192), - KcpConfig::int('LAYLINK_KCP_RECV_WINDOW', 512, 16, 8192), - ); - $this->ffi->laylink_kcp_setmtu( - $this->session, - KcpConfig::int('LAYLINK_KCP_MTU_BYTES', 1200, 576, 1400), + KcpConfig::int('LAYLINK_KCP_SEND_WINDOW', 1024, 16, 8192), + KcpConfig::int('LAYLINK_KCP_RECV_WINDOW', 1024, 16, 8192), ); + $this->mtuBytes = KcpConfig::int('LAYLINK_KCP_MTU_BYTES', 1350, 576, 1400); + $this->ffi->laylink_kcp_setmtu($this->session, $this->mtuBytes); $this->parser = new FrameParser(); } @@ -70,7 +68,7 @@ final class NativeKcpSession public function sendFrame(Frame $frame, int $maxSendBuffer): bool|null { - if ($this->closed || $this->queuedBytes >= $maxSendBuffer) { + if ($this->closed || $this->getSendBufferQueueSize() >= $maxSendBuffer) { return false; } @@ -80,7 +78,6 @@ final class NativeKcpSession return false; } - $this->queuedBytes += strlen($bytes); $this->ffi->laylink_kcp_flush($this->session); $this->drainOutput(); return true; @@ -146,7 +143,10 @@ final class NativeKcpSession public function getSendBufferQueueSize(): int { - return max($this->queuedBytes, $this->pendingOutputBytes()); + $waitSegments = $this->ffi->laylink_kcp_waitsnd($this->session); + $waitBytes = max(0, $waitSegments) * $this->mtuBytes; + + return $waitBytes + $this->pendingOutputBytes(); } private function drainFrames(): void @@ -164,7 +164,6 @@ final class NativeKcpSession return; } - $this->queuedBytes = max(0, $this->queuedBytes - $read); $bytes = FFI::string($buffer, $read); try { foreach ($this->parser->push($bytes) as $frame) { @@ -183,7 +182,7 @@ final class NativeKcpSession private function drainOutput(): void { $drained = 0; - $maxPackets = KcpConfig::int('LAYLINK_KCP_OUTPUT_DRAIN_PACKETS', 256, 1, 8192); + $maxPackets = KcpConfig::int('LAYLINK_KCP_OUTPUT_DRAIN_PACKETS', 1024, 1, 8192); while ($drained < $maxPackets && ($size = $this->ffi->laylink_kcp_pending_output_size($this->session)) > 0) { $buffer = $this->ffi->new("char[$size]"); $read = $this->ffi->laylink_kcp_pop_output($this->session, $buffer, $size);