diff --git a/.env.example b/.env.example index 45cc119..8d40b28 100644 --- a/.env.example +++ b/.env.example @@ -15,10 +15,34 @@ LAYLINK_MAX_SEND_BUFFER_BYTES=67108864 # 单连接发送缓冲区上限;单位字节。大文件下载建议 33554432 或 67108864,内存紧张时调小。 LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432 # 背压触发水位;单位字节。应小于 LAYLINK_MAX_SEND_BUFFER_BYTES,达到后会暂停上游读取直到缓冲排空。 + +[kcp] LAYLINK_KCP_BACKEND=ffi # KCP 实现后端;可选 ffi、php。ffi 使用 native ikcp.c 动态库,生产建议使用;php 是调试回退实现。 LAYLINK_KCP_FFI_LIB=native/kcp/liblaylink_kcp.so # native KCP 动态库路径;LAYLINK_KCP_BACKEND=ffi 时使用。相对路径按项目根目录解析,先运行 scripts/build-kcp-ffi.sh 构建。 +LAYLINK_KCP_NODELAY=1 +# KCP nodelay 开关;1 更低延迟,0 更保守。拥堵明显时可保持 1 并调大 interval 或关闭 nc。 +LAYLINK_KCP_INTERVAL_MS=10 +# KCP 内部 update 间隔;单位毫秒。常用 10、20、30,越小越低延迟但 CPU/发包更高。 +LAYLINK_KCP_FAST_RESEND=2 +# KCP 快速重传阈值;0 关闭,2 是常见低延迟设置,丢包网络可尝试 2-4。 +LAYLINK_KCP_NO_CONGESTION_CONTROL=0 +# 是否关闭 KCP 拥塞控制;0 开启拥塞控制更稳,1 更激进但容易挤爆 UDP 发送缓冲。 +LAYLINK_KCP_SEND_WINDOW=256 +# KCP 发送窗口;越大吞吐潜力越高但更容易拥堵,常用 128、256、512、1024。 +LAYLINK_KCP_RECV_WINDOW=512 +# KCP 接收窗口;应不小于发送窗口,常用 256、512、1024。 +LAYLINK_KCP_MTU_BYTES=1200 +# KCP MTU;建议 1200 避免公网路径分片,内网可尝试 1350。 +LAYLINK_KCP_TICK_MS=10 +# PHP transport tick 间隔;单位毫秒。通常与 LAYLINK_KCP_INTERVAL_MS 一致。 +LAYLINK_KCP_UDP_SEND_QUEUE_BYTES=16777216 +# UDP EAGAIN 发送队列上限;单位字节。发送缓冲暂满时会排队重试,超过后关闭该 KCP 会话。 +LAYLINK_KCP_UDP_FLUSH_PACKETS=256 +# 每次 tick 最多刷出的 UDP packet 数;拥堵时可调低到 64 或 128,追求吞吐可调高。 +LAYLINK_KCP_OUTPUT_DRAIN_PACKETS=256 +# 每次从 native KCP 输出队列搬到 UDP 发送队列的最大 packet 数;调低可减少单个大下载占用事件循环时间。 [client-agent] NODE_ID=client-01 @@ -75,3 +99,7 @@ POP_AGENT_LISTEN=0.0.0.0:9001 # POP Server 监听 Agent 长连接的地址;格式为 host:port,例如 0.0.0.0:9001 或 127.0.0.1:9001。 POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp # POP Server 允许 Client Agent 使用的传输协议;可写逗号数组 tcp,kcp,也可写 JSON 数组 ["tcp","kcp"];当前已实现 tcp、kcp,udp 为预留实现。 +POP_AGENT_TCP_WORKERS=1 +# POP Server TCP Agent listener 的 worker 数量;TCP 模式可按 CPU 和并发提高到 2、4、8。只使用 KCP 时该值不影响 KCP。 +POP_AGENT_KCP_WORKERS=1 +# POP Server KCP/UDP Agent listener 的 worker 数量;当前必须为 1,因为 KCP 会话状态在单 worker 内维护,多 worker 会导致 UDP 包跨进程丢状态。 diff --git a/bin/pop-server.php b/bin/pop-server.php index aea74b3..619c409 100755 --- a/bin/pop-server.php +++ b/bin/pop-server.php @@ -27,6 +27,8 @@ $server = new PopServer( Env::int('LAYLINK_MAX_SEND_BUFFER_BYTES', 64 * 1024 * 1024, 1024 * 1024), Env::int('LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES', 32 * 1024 * 1024, 512 * 1024), Env::int('LAYLINK_DATA_CHUNK_BYTES', 1024 * 1024, 16 * 1024, 8 * 1024 * 1024), + Env::int('POP_AGENT_TCP_WORKERS', 1, 1, 64), + Env::int('POP_AGENT_KCP_WORKERS', 1, 1, 1), ); $server->boot(); diff --git a/contract.md b/contract.md index e163566..6caddf0 100644 --- a/contract.md +++ b/contract.md @@ -129,6 +129,7 @@ Completed in this checkpoint: * `https://ip.sb/` for egress IP * Reorganized `.env.example` into readable sections: * `[config]` + * `[kcp]` * `[client-agent]` * `[pop-server]` * Section headers are comments-for-humans in practice; the current Env loader ignores lines without `=`. @@ -218,6 +219,27 @@ Completed in this checkpoint: * Added array-style env parsing: * `Env::csv()` accepts traditional comma-separated values such as `tcp,kcp`. * `Env::csv()` also accepts JSON arrays such as `["tcp","kcp"]`. +* Added KCP congestion and UDP EAGAIN controls: + * `KcpUdpPacketSender` bypasses Workerman `UdpConnection::send()` for KCP packets and uses suppressed `stream_socket_sendto()` directly. + * UDP `EAGAIN` / "Resource temporarily unavailable" no longer emits PHP warnings from the KCP transport path. + * KCP packets that cannot be sent immediately are queued and retried on subsequent transport ticks. + * Added KCP tuning envs: + * `LAYLINK_KCP_NODELAY` + * `LAYLINK_KCP_INTERVAL_MS` + * `LAYLINK_KCP_FAST_RESEND` + * `LAYLINK_KCP_NO_CONGESTION_CONTROL` + * `LAYLINK_KCP_SEND_WINDOW` + * `LAYLINK_KCP_RECV_WINDOW` + * `LAYLINK_KCP_MTU_BYTES` + * `LAYLINK_KCP_TICK_MS` + * `LAYLINK_KCP_UDP_SEND_QUEUE_BYTES` + * `LAYLINK_KCP_UDP_FLUSH_PACKETS` + * `LAYLINK_KCP_OUTPUT_DRAIN_PACKETS` +* Added POP worker count configuration: + * `POP_AGENT_TCP_WORKERS` controls TCP Agent listener worker count. + * `POP_AGENT_KCP_WORKERS` is exposed but currently clamped to `1` in `bin/pop-server.php`. + * KCP/UDP must remain single-worker in the current architecture because KCP session state is process-local and UDP packets for one conv can otherwise be handled by different workers. + * Native KCP output draining is capped per tick by `LAYLINK_KCP_OUTPUT_DRAIN_PACKETS` to reduce single-flow event-loop monopolization during large downloads. Known MVP limitations: @@ -249,13 +271,14 @@ Next recommended tasks: * benchmark buffer pairs such as `64MiB/32MiB` and `128MiB/64MiB` * record direct-vs-LayLink throughput, CPU, memory, and disconnect behavior 8. Benchmark and tune `CLIENT_AGENT_POP_CONNECTIONS` for 1, 2, 4, and 8 long connections under mixed single-download and multi-session workloads. -9. Benchmark native FFI `kcp` against `tcp` under latency, loss, and high-throughput workloads; tune KCP nodelay, window, MTU, resend, and interval settings. -10. Add raw UDP Agent-to-POP transport only for explicitly datagram-oriented frame classes, or after a reliability/window design exists. -11. Add per-session flow-control windows to reduce head-of-line blocking on one Agent connection. -12. Optimize UDP relay with POP-side UDP socket pooling. -13. Add UDP association idle timeouts and cleanup. -14. Aggregate UDP audit records per association instead of per datagram. -15. Add UDP and per-user rate limiting. +9. Benchmark native FFI `kcp` against `tcp` under latency, loss, and high-throughput workloads; tune KCP nodelay, window, MTU, resend, interval, UDP queue, and flush settings. +10. Design KCP horizontal scaling before allowing `POP_AGENT_KCP_WORKERS>1`; options include multiple POP ports/instances, reuseport five-tuple affinity, external session state, or a UDP dispatcher keyed by conv. +11. Add raw UDP Agent-to-POP transport only for explicitly datagram-oriented frame classes, or after a reliability/window design exists. +12. Add per-session flow-control windows to reduce head-of-line blocking on one Agent connection. +13. Optimize UDP relay with POP-side UDP socket pooling. +14. Add UDP association idle timeouts and cleanup. +15. Aggregate UDP audit records per association instead of per datagram. +16. Add UDP and per-user rate limiting. ## 0. Project Name diff --git a/problems.md b/problems.md index 9c0be7d..1feed12 100644 --- a/problems.md +++ b/problems.md @@ -1,9 +1,4 @@ -当前仍无法实现chacha20加密,在开启chacha20之后, -Using SOCKS5 proxy: socks5h://127.0.0.1:21080 -[1/2] HTTPS connectivity: https://bing.com/ -* Trying 127.0.0.1:21080... -* SOCKS5 connect to bing.com:443 (remotely resolved) -* Can't complete SOCKS5 connection to bing.com. (1) -* Closing connection 0 -curl: (97) Can't complete SOCKS5 connection to bing.com. (1) -ERR bing_request_failed status=97 \ No newline at end of file +kcp有可供调整的参数吗? +存在堵塞现象。 +PHP Warning: stream_socket_sendto(): Resource temporarily unavailable + in /www/laylink/vendor/workerman/workerman/src/Connection/UdpConnection.php on line 85 \ No newline at end of file diff --git a/readme.md b/readme.md index 48e4452..19f9862 100644 --- a/readme.md +++ b/readme.md @@ -25,7 +25,7 @@ LayLink 是一个基于 PHP Workerman 的策略控制型四层反向访问网关 cp .env.example .env ``` -`.env.example` 中的 `[config]`、`[client-agent]`、`[pop-server]` 是阅读分组标题,当前加载器会忽略这些标题,只读取 `KEY=value` 配置行。 +`.env.example` 中的 `[config]`、`[kcp]`、`[client-agent]`、`[pop-server]` 是阅读分组标题,当前加载器会忽略这些标题,只读取 `KEY=value` 配置行。 Agent 与 POP Server 之间的 LayLink Frame 支持加密: @@ -75,6 +75,8 @@ POP Server 需要配置这些 `.env`: APP_ENV=dev POP_AGENT_LISTEN=0.0.0.0:9001 POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp +POP_AGENT_TCP_WORKERS=1 +POP_AGENT_KCP_WORKERS=1 AUDIT_LOG=runtime/audit.log LOG_LEVEL=debug ``` @@ -88,6 +90,8 @@ LOG_LEVEL=debug | `LAYLINK_FRAME_ENCRYPTION_KEY` | Frame 加密密钥,启用 `chacha20` 时必填。 | 普通口令、`hex:...`、`base64:...` | | `POP_AGENT_LISTEN` | POP Server 给 Client Agent 连接的监听地址。Agent 的 `POP_SERVER_ADDRESS` 应指向这里。 | `0.0.0.0:9001`、`127.0.0.1:9001` | | `POP_ALLOWED_AGENT_TRANSPORTS` | POP Server 允许 Agent 使用的底层传输协议。支持逗号数组,也支持 JSON 数组。Agent 认证时会上报自己的选择,不在列表内会被拒绝。 | `tcp`、`tcp,kcp`、`["tcp","kcp"]` | +| `POP_AGENT_TCP_WORKERS` | POP TCP Agent listener 的 worker 数。TCP 模式可按 CPU 和并发提高。 | `1`、`2`、`4`、`8` | +| `POP_AGENT_KCP_WORKERS` | POP KCP/UDP Agent listener 的 worker 数。当前必须保持 `1`。 | `1` | | `AUDIT_LOG` | 审计日志路径。MVP 使用 JSON Lines 追加写入。 | `runtime/audit.log` | | `LOG_LEVEL` | 日志级别预留配置。当前 MVP 主要为后续日志工厂使用。 | `debug`、`info`、`warning`、`error` | @@ -383,6 +387,28 @@ LAYLINK_KCP_FFI_LIB=native/kcp/liblaylink_kcp.so 如果运行环境暂时不能启用 FFI,可以配置 `LAYLINK_KCP_BACKEND=php` 使用调试回退实现;该实现不适合作为生产高吞吐路径。 +KCP 可调参数: + +| 变量 | 作用 | 建议 | +| --- | --- | --- | +| `LAYLINK_KCP_NODELAY` | KCP nodelay 开关。 | 默认 `1` | +| `LAYLINK_KCP_INTERVAL_MS` | KCP update 间隔,越小越低延迟但发包/CPU 更高。 | `10`、`20`、`30` | +| `LAYLINK_KCP_FAST_RESEND` | 快速重传阈值。 | `2`,拥堵时试 `3`、`4` | +| `LAYLINK_KCP_NO_CONGESTION_CONTROL` | 是否关闭 KCP 拥塞控制。 | 公网建议 `0`,内网压测可试 `1` | +| `LAYLINK_KCP_SEND_WINDOW` | KCP 发送窗口。 | `128`、`256`、`512` | +| `LAYLINK_KCP_RECV_WINDOW` | KCP 接收窗口。 | `256`、`512`、`1024` | +| `LAYLINK_KCP_MTU_BYTES` | KCP MTU。 | 公网建议 `1200`,内网可试 `1350` | +| `LAYLINK_KCP_TICK_MS` | PHP transport tick 间隔。 | 通常等于 `LAYLINK_KCP_INTERVAL_MS` | +| `LAYLINK_KCP_UDP_SEND_QUEUE_BYTES` | UDP 发送遇到 EAGAIN 时的本地排队上限。 | `16777216`、`33554432` | +| `LAYLINK_KCP_UDP_FLUSH_PACKETS` | 每次 tick 最多刷出的 UDP packet 数。 | 拥堵时 `64`/`128`,吞吐压测 `256`/`512` | +| `LAYLINK_KCP_OUTPUT_DRAIN_PACKETS` | 每次从 native KCP 输出队列搬到 UDP 发送队列的最大 packet 数。 | 延迟敏感用 `64`/`128`,吞吐压测用 `256`/`512` | + +如果出现 `stream_socket_sendto(): Resource temporarily unavailable`,说明 UDP socket 发送缓冲暂时满了。LayLink 会把 KCP packet 放入本地发送队列并在后续 tick 重试;同时建议把 `LAYLINK_KCP_NO_CONGESTION_CONTROL` 保持为 `0`,必要时降低 `LAYLINK_KCP_SEND_WINDOW`、`LAYLINK_KCP_UDP_FLUSH_PACKETS` 或调大 `LAYLINK_KCP_INTERVAL_MS`。 + +KCP/UDP 目前不要把 `POP_AGENT_KCP_WORKERS` 调大。KCP 会话状态存在单个 worker 进程里,UDP 多 worker 会让同一个会话的数据包分散到不同进程,导致找不到会话、重传增加甚至断流。要横向扩展 KCP,当前推荐启动多个 POP 端口或多个 POP 实例,由 Client Agent 配置多 POP/多进程策略;后续可以做 `SO_REUSEPORT` 五元组哈希、外部 session 表或每 conv 固定 worker 分发。 + +单 worker 并不等于一个大下载会同步阻塞其他请求:目标连接、UDP socket 和本地客户端都是非阻塞 I/O。但大文件会带来大量 KCP 分片和加密/解密/FFI 调用,可能短时间占用事件循环 CPU。延迟敏感场景可降低 `LAYLINK_DATA_CHUNK_BYTES`、`LAYLINK_KCP_OUTPUT_DRAIN_PACKETS` 和 `LAYLINK_KCP_UDP_FLUSH_PACKETS`,让一个大流量会话每次 tick 少占一点时间,换取更好的多会话公平性。 + 如果 Agent 配置为 `udp`,进程会启动失败并明确提示该传输尚未实现。 启动 POP Server: diff --git a/src/Server/PopServer.php b/src/Server/PopServer.php index 1250ea4..4f91d2f 100644 --- a/src/Server/PopServer.php +++ b/src/Server/PopServer.php @@ -28,6 +28,8 @@ final class PopServer private readonly int $maxSendBuffer = 64 * 1024 * 1024, private readonly int $backpressureHighWatermark = 32 * 1024 * 1024, private readonly int $dataChunkSize = 1024 * 1024, + private readonly int $tcpWorkerCount = 1, + private readonly int $kcpWorkerCount = 1, ) { $this->nodes = new NodeRegistry(); $this->sessions = new SessionManager(); @@ -45,7 +47,7 @@ final class PopServer $scheme = $transport === 'kcp' ? 'udp' : 'tcp'; $agentWorker = new Worker($scheme . '://' . $this->agentListen); $agentWorker->name = 'laylink-pop-agent-listener-' . $transport; - $agentWorker->count = 1; + $agentWorker->count = $transport === 'kcp' ? $this->kcpWorkerCount : $this->tcpWorkerCount; new AgentListener( $agentWorker, new NodeAuthenticator($this->nodeConfig, $this->allowedAgentTransports), diff --git a/src/Transport/KcpConfig.php b/src/Transport/KcpConfig.php new file mode 100644 index 0000000..bf40f17 --- /dev/null +++ b/src/Transport/KcpConfig.php @@ -0,0 +1,23 @@ +session = null; $connection = new AsyncUdpConnection($this->normalizeAddress($this->address)); $this->connection = $connection; + $this->sender = new KcpUdpPacketSender( + $connection, + KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 16 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024), + ); $connection->onConnect = function () use ($connection): void { - $connection->send(KcpPacketCodec::encode([ + $this->sender?->send(KcpPacketCodec::encode([ 'type' => KcpPacketCodec::SYN, 'conv' => $this->conv, ])); @@ -47,13 +52,14 @@ final class KcpFrameClientTransport implements FrameClientTransport $connection->onClose = function () use ($connection): void { if ($this->connection === $connection) { $this->connection = null; + $this->sender = null; } $this->stopTimer(); ($this->onClose)($this); }; $connection->connect(); - $this->timerId = Timer::add(0.02, fn () => $this->tick()); + $this->timerId = Timer::add(KcpConfig::tickSeconds(), fn () => $this->tick()); } public function send(Frame $frame): bool|null @@ -79,7 +85,7 @@ final class KcpFrameClientTransport implements FrameClientTransport public function getSendBufferQueueSize(): int { - return $this->session?->getSendBufferQueueSize() ?? 0; + return ($this->session?->getSendBufferQueueSize() ?? 0) + ($this->sender?->queuedBytes() ?? 0); } private function handlePacket(string $data): void @@ -120,15 +126,18 @@ final class KcpFrameClientTransport implements FrameClientTransport } if (!$this->connected) { - $this->connection->send(KcpPacketCodec::encode([ + $this->sender?->send(KcpPacketCodec::encode([ 'type' => KcpPacketCodec::SYN, 'conv' => $this->conv, ])); + $this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); return; } $before = $this->getSendBufferQueueSize(); + $this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); $this->session?->tick(); + $this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); if ($before >= $this->maxSendBuffer && $this->getSendBufferQueueSize() < $this->maxSendBuffer) { ($this->onBufferDrain)($this); } @@ -140,7 +149,7 @@ final class KcpFrameClientTransport implements FrameClientTransport $libraryPath = (string)(getenv('LAYLINK_KCP_FFI_LIB') ?: dirname(__DIR__, 2) . '/native/kcp/liblaylink_kcp.so'); $args = [ $this->conv, - fn (string $packet): bool|null => $this->connection?->send($packet), + fn (string $packet): bool|null => $this->sender?->send($packet) ?? false, fn (Frame $frame) => ($this->onFrame)($this, $frame), fn (\Throwable $e) => ($this->onInvalidFrame)($e), ]; diff --git a/src/Transport/KcpFrameServerConnection.php b/src/Transport/KcpFrameServerConnection.php index 6026b55..da257eb 100644 --- a/src/Transport/KcpFrameServerConnection.php +++ b/src/Transport/KcpFrameServerConnection.php @@ -15,6 +15,7 @@ final class KcpFrameServerConnection implements FrameServerConnection private readonly int $id, private UdpConnection $connection, private readonly KcpReliableSession|NativeKcpSession $session, + private readonly KcpUdpPacketSender $sender, private readonly int $maxSendBuffer, ) { } @@ -55,7 +56,7 @@ final class KcpFrameServerConnection implements FrameServerConnection public function getSendBufferQueueSize(): int { - return $this->session->getSendBufferQueueSize(); + return $this->session->getSendBufferQueueSize() + $this->sender->queuedBytes(); } public function getRemoteIp(): string @@ -66,10 +67,21 @@ final class KcpFrameServerConnection implements FrameServerConnection public function updateConnection(UdpConnection $connection): void { $this->connection = $connection; + $this->sender->updateConnection($connection); } public function isClosed(): bool { return $this->closed || $this->session->isClosed(); } + + public function flushUdp(int $maxPackets): void + { + $this->sender->flush($maxPackets); + } + + public function sendUdpPacket(string $packet): bool + { + return $this->sender->send($packet); + } } diff --git a/src/Transport/KcpFrameServerListener.php b/src/Transport/KcpFrameServerListener.php index dc70e73..5cadefb 100644 --- a/src/Transport/KcpFrameServerListener.php +++ b/src/Transport/KcpFrameServerListener.php @@ -29,7 +29,7 @@ final class KcpFrameServerListener ) { $worker->onMessage = fn (UdpConnection $connection, string $data) => $this->handleMessage($connection, $data); $worker->onWorkerStart = function (): void { - $this->timerId = Timer::add(0.02, fn () => $this->tick()); + $this->timerId = Timer::add(KcpConfig::tickSeconds(), fn () => $this->tick()); }; $worker->onWorkerStop = function (): void { if ($this->timerId !== null) { @@ -99,16 +99,22 @@ final class KcpFrameServerListener private function handleSyn(UdpConnection $connection, string $key, int $conv): void { if (isset($this->connections[$key])) { - $connection->send(KcpPacketCodec::encode([ + $this->connections[$key]->updateConnection($connection); + $this->connections[$key]->sendUdpPacket(KcpPacketCodec::encode([ 'type' => KcpPacketCodec::SYN_ACK, 'conv' => $conv, ])); + $this->connections[$key]->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); return; } + $sender = new KcpUdpPacketSender( + $connection, + KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 16 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024), + ); $session = $this->createSession( $conv, - fn (string $packet): bool|null => $connection->send($packet), + fn (string $packet): bool|null => $sender->send($packet), function (Frame $frame) use ($key): void { $wrapped = $this->connections[$key] ?? null; if ($wrapped !== null) { @@ -122,11 +128,11 @@ final class KcpFrameServerListener } }, ); - $wrapped = new KcpFrameServerConnection($this->nextConnectionId++, $connection, $session, $this->maxSendBuffer); + $wrapped = new KcpFrameServerConnection($this->nextConnectionId++, $connection, $session, $sender, $this->maxSendBuffer); $this->sessions[$key] = $session; $this->connections[$key] = $wrapped; - $connection->send(KcpPacketCodec::encode([ + $sender->send(KcpPacketCodec::encode([ 'type' => KcpPacketCodec::SYN_ACK, 'conv' => $conv, ])); @@ -143,7 +149,9 @@ final class KcpFrameServerListener } $before = $session->getSendBufferQueueSize(); + $wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); $session->tick(); + $wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192)); if ($before >= $this->maxSendBuffer && $session->getSendBufferQueueSize() < $this->maxSendBuffer) { ($this->onBufferDrain)($wrapped); } diff --git a/src/Transport/KcpUdpPacketSender.php b/src/Transport/KcpUdpPacketSender.php new file mode 100644 index 0000000..196feee --- /dev/null +++ b/src/Transport/KcpUdpPacketSender.php @@ -0,0 +1,89 @@ +connection = $connection; + } + + public function send(string $packet): bool + { + if ($this->queue !== []) { + return $this->enqueue($packet); + } + + if ($this->trySend($packet)) { + return true; + } + + return $this->enqueue($packet); + } + + public function flush(int $maxPackets): void + { + $sent = 0; + while ($this->queue !== [] && $sent < $maxPackets) { + $packet = $this->queue[0]; + if (!$this->trySend($packet)) { + return; + } + + array_shift($this->queue); + $this->queuedBytes -= strlen($packet); + $sent++; + } + } + + public function queuedBytes(): int + { + return $this->queuedBytes; + } + + private function enqueue(string $packet): bool + { + $length = strlen($packet); + if ($this->queuedBytes + $length > $this->maxQueueBytes) { + return false; + } + + $this->queue[] = $packet; + $this->queuedBytes += $length; + return true; + } + + private function trySend(string $packet): bool + { + $socket = $this->connection->getSocket(); + if (!is_resource($socket)) { + return false; + } + + if ($this->connection instanceof AsyncUdpConnection) { + return @stream_socket_sendto($socket, $packet) === strlen($packet); + } + + $remote = $this->connection->isIpV6() + ? '[' . $this->connection->getRemoteIp() . ']:' . $this->connection->getRemotePort() + : $this->connection->getRemoteAddress(); + + return @stream_socket_sendto($socket, $packet, 0, $remote) === strlen($packet); + } +} diff --git a/src/Transport/NativeKcpSession.php b/src/Transport/NativeKcpSession.php index ec3da9d..4a4a680 100644 --- a/src/Transport/NativeKcpSession.php +++ b/src/Transport/NativeKcpSession.php @@ -35,6 +35,23 @@ final class NativeKcpSession throw new \RuntimeException('kcp_create_failed'); } + $this->ffi->laylink_kcp_nodelay( + $this->session, + KcpConfig::int('LAYLINK_KCP_NODELAY', 1, 0, 1), + KcpConfig::int('LAYLINK_KCP_INTERVAL_MS', 10, 1, 1000), + KcpConfig::int('LAYLINK_KCP_FAST_RESEND', 2, 0, 10), + KcpConfig::int('LAYLINK_KCP_NO_CONGESTION_CONTROL', 0, 0, 1), + ); + $this->ffi->laylink_kcp_wndsize( + $this->session, + KcpConfig::int('LAYLINK_KCP_SEND_WINDOW', 256, 16, 8192), + KcpConfig::int('LAYLINK_KCP_RECV_WINDOW', 512, 16, 8192), + ); + $this->ffi->laylink_kcp_setmtu( + $this->session, + KcpConfig::int('LAYLINK_KCP_MTU_BYTES', 1200, 576, 1400), + ); + $this->parser = new FrameParser(); } @@ -165,13 +182,20 @@ final class NativeKcpSession private function drainOutput(): void { - while (($size = $this->ffi->laylink_kcp_pending_output_size($this->session)) > 0) { + $drained = 0; + $maxPackets = KcpConfig::int('LAYLINK_KCP_OUTPUT_DRAIN_PACKETS', 256, 1, 8192); + while ($drained < $maxPackets && ($size = $this->ffi->laylink_kcp_pending_output_size($this->session)) > 0) { $buffer = $this->ffi->new("char[$size]"); $read = $this->ffi->laylink_kcp_pop_output($this->session, $buffer, $size); if ($read <= 0) { return; } - ($this->sendPacket)(FFI::string($buffer, $read)); + $drained++; + if (($this->sendPacket)(FFI::string($buffer, $read)) === false) { + ($this->onInvalidFrame)(new \RuntimeException('kcp_udp_send_queue_full')); + $this->close(); + return; + } } }