KCP improve

This commit is contained in:
EchoNoch 2026-05-30 17:50:45 +08:00
parent 661ed401da
commit 56c74337e4
12 changed files with 272 additions and 31 deletions

View File

@ -15,10 +15,34 @@ LAYLINK_MAX_SEND_BUFFER_BYTES=67108864
# 单连接发送缓冲区上限;单位字节。大文件下载建议 33554432 或 67108864内存紧张时调小。 # 单连接发送缓冲区上限;单位字节。大文件下载建议 33554432 或 67108864内存紧张时调小。
LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432 LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432
# 背压触发水位;单位字节。应小于 LAYLINK_MAX_SEND_BUFFER_BYTES达到后会暂停上游读取直到缓冲排空。 # 背压触发水位;单位字节。应小于 LAYLINK_MAX_SEND_BUFFER_BYTES达到后会暂停上游读取直到缓冲排空。
[kcp]
LAYLINK_KCP_BACKEND=ffi LAYLINK_KCP_BACKEND=ffi
# KCP 实现后端;可选 ffi、php。ffi 使用 native ikcp.c 动态库生产建议使用php 是调试回退实现。 # KCP 实现后端;可选 ffi、php。ffi 使用 native ikcp.c 动态库生产建议使用php 是调试回退实现。
LAYLINK_KCP_FFI_LIB=native/kcp/liblaylink_kcp.so LAYLINK_KCP_FFI_LIB=native/kcp/liblaylink_kcp.so
# native KCP 动态库路径LAYLINK_KCP_BACKEND=ffi 时使用。相对路径按项目根目录解析,先运行 scripts/build-kcp-ffi.sh 构建。 # native KCP 动态库路径LAYLINK_KCP_BACKEND=ffi 时使用。相对路径按项目根目录解析,先运行 scripts/build-kcp-ffi.sh 构建。
LAYLINK_KCP_NODELAY=1
# KCP nodelay 开关1 更低延迟0 更保守。拥堵明显时可保持 1 并调大 interval 或关闭 nc。
LAYLINK_KCP_INTERVAL_MS=10
# KCP 内部 update 间隔;单位毫秒。常用 10、20、30越小越低延迟但 CPU/发包更高。
LAYLINK_KCP_FAST_RESEND=2
# KCP 快速重传阈值0 关闭2 是常见低延迟设置,丢包网络可尝试 2-4。
LAYLINK_KCP_NO_CONGESTION_CONTROL=0
# 是否关闭 KCP 拥塞控制0 开启拥塞控制更稳1 更激进但容易挤爆 UDP 发送缓冲。
LAYLINK_KCP_SEND_WINDOW=256
# KCP 发送窗口;越大吞吐潜力越高但更容易拥堵,常用 128、256、512、1024。
LAYLINK_KCP_RECV_WINDOW=512
# KCP 接收窗口;应不小于发送窗口,常用 256、512、1024。
LAYLINK_KCP_MTU_BYTES=1200
# KCP MTU建议 1200 避免公网路径分片,内网可尝试 1350。
LAYLINK_KCP_TICK_MS=10
# PHP transport tick 间隔;单位毫秒。通常与 LAYLINK_KCP_INTERVAL_MS 一致。
LAYLINK_KCP_UDP_SEND_QUEUE_BYTES=16777216
# UDP EAGAIN 发送队列上限;单位字节。发送缓冲暂满时会排队重试,超过后关闭该 KCP 会话。
LAYLINK_KCP_UDP_FLUSH_PACKETS=256
# 每次 tick 最多刷出的 UDP packet 数;拥堵时可调低到 64 或 128追求吞吐可调高。
LAYLINK_KCP_OUTPUT_DRAIN_PACKETS=256
# 每次从 native KCP 输出队列搬到 UDP 发送队列的最大 packet 数;调低可减少单个大下载占用事件循环时间。
[client-agent] [client-agent]
NODE_ID=client-01 NODE_ID=client-01
@ -75,3 +99,7 @@ POP_AGENT_LISTEN=0.0.0.0:9001
# POP Server 监听 Agent 长连接的地址;格式为 host:port例如 0.0.0.0:9001 或 127.0.0.1:9001。 # POP Server 监听 Agent 长连接的地址;格式为 host:port例如 0.0.0.0:9001 或 127.0.0.1:9001。
POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp
# POP Server 允许 Client Agent 使用的传输协议;可写逗号数组 tcp,kcp也可写 JSON 数组 ["tcp","kcp"];当前已实现 tcp、kcpudp 为预留实现。 # POP Server 允许 Client Agent 使用的传输协议;可写逗号数组 tcp,kcp也可写 JSON 数组 ["tcp","kcp"];当前已实现 tcp、kcpudp 为预留实现。
POP_AGENT_TCP_WORKERS=1
# POP Server TCP Agent listener 的 worker 数量TCP 模式可按 CPU 和并发提高到 2、4、8。只使用 KCP 时该值不影响 KCP。
POP_AGENT_KCP_WORKERS=1
# POP Server KCP/UDP Agent listener 的 worker 数量;当前必须为 1因为 KCP 会话状态在单 worker 内维护,多 worker 会导致 UDP 包跨进程丢状态。

View File

@ -27,6 +27,8 @@ $server = new PopServer(
Env::int('LAYLINK_MAX_SEND_BUFFER_BYTES', 64 * 1024 * 1024, 1024 * 1024), Env::int('LAYLINK_MAX_SEND_BUFFER_BYTES', 64 * 1024 * 1024, 1024 * 1024),
Env::int('LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES', 32 * 1024 * 1024, 512 * 1024), Env::int('LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES', 32 * 1024 * 1024, 512 * 1024),
Env::int('LAYLINK_DATA_CHUNK_BYTES', 1024 * 1024, 16 * 1024, 8 * 1024 * 1024), Env::int('LAYLINK_DATA_CHUNK_BYTES', 1024 * 1024, 16 * 1024, 8 * 1024 * 1024),
Env::int('POP_AGENT_TCP_WORKERS', 1, 1, 64),
Env::int('POP_AGENT_KCP_WORKERS', 1, 1, 1),
); );
$server->boot(); $server->boot();

View File

@ -129,6 +129,7 @@ Completed in this checkpoint:
* `https://ip.sb/` for egress IP * `https://ip.sb/` for egress IP
* Reorganized `.env.example` into readable sections: * Reorganized `.env.example` into readable sections:
* `[config]` * `[config]`
* `[kcp]`
* `[client-agent]` * `[client-agent]`
* `[pop-server]` * `[pop-server]`
* Section headers are comments-for-humans in practice; the current Env loader ignores lines without `=`. * Section headers are comments-for-humans in practice; the current Env loader ignores lines without `=`.
@ -218,6 +219,27 @@ Completed in this checkpoint:
* Added array-style env parsing: * Added array-style env parsing:
* `Env::csv()` accepts traditional comma-separated values such as `tcp,kcp`. * `Env::csv()` accepts traditional comma-separated values such as `tcp,kcp`.
* `Env::csv()` also accepts JSON arrays such as `["tcp","kcp"]`. * `Env::csv()` also accepts JSON arrays such as `["tcp","kcp"]`.
* Added KCP congestion and UDP EAGAIN controls:
* `KcpUdpPacketSender` bypasses Workerman `UdpConnection::send()` for KCP packets and uses suppressed `stream_socket_sendto()` directly.
* UDP `EAGAIN` / "Resource temporarily unavailable" no longer emits PHP warnings from the KCP transport path.
* KCP packets that cannot be sent immediately are queued and retried on subsequent transport ticks.
* Added KCP tuning envs:
* `LAYLINK_KCP_NODELAY`
* `LAYLINK_KCP_INTERVAL_MS`
* `LAYLINK_KCP_FAST_RESEND`
* `LAYLINK_KCP_NO_CONGESTION_CONTROL`
* `LAYLINK_KCP_SEND_WINDOW`
* `LAYLINK_KCP_RECV_WINDOW`
* `LAYLINK_KCP_MTU_BYTES`
* `LAYLINK_KCP_TICK_MS`
* `LAYLINK_KCP_UDP_SEND_QUEUE_BYTES`
* `LAYLINK_KCP_UDP_FLUSH_PACKETS`
* `LAYLINK_KCP_OUTPUT_DRAIN_PACKETS`
* Added POP worker count configuration:
* `POP_AGENT_TCP_WORKERS` controls TCP Agent listener worker count.
* `POP_AGENT_KCP_WORKERS` is exposed but currently clamped to `1` in `bin/pop-server.php`.
* KCP/UDP must remain single-worker in the current architecture because KCP session state is process-local and UDP packets for one conv can otherwise be handled by different workers.
* Native KCP output draining is capped per tick by `LAYLINK_KCP_OUTPUT_DRAIN_PACKETS` to reduce single-flow event-loop monopolization during large downloads.
Known MVP limitations: Known MVP limitations:
@ -249,13 +271,14 @@ Next recommended tasks:
* benchmark buffer pairs such as `64MiB/32MiB` and `128MiB/64MiB` * benchmark buffer pairs such as `64MiB/32MiB` and `128MiB/64MiB`
* record direct-vs-LayLink throughput, CPU, memory, and disconnect behavior * record direct-vs-LayLink throughput, CPU, memory, and disconnect behavior
8. Benchmark and tune `CLIENT_AGENT_POP_CONNECTIONS` for 1, 2, 4, and 8 long connections under mixed single-download and multi-session workloads. 8. Benchmark and tune `CLIENT_AGENT_POP_CONNECTIONS` for 1, 2, 4, and 8 long connections under mixed single-download and multi-session workloads.
9. Benchmark native FFI `kcp` against `tcp` under latency, loss, and high-throughput workloads; tune KCP nodelay, window, MTU, resend, and interval settings. 9. Benchmark native FFI `kcp` against `tcp` under latency, loss, and high-throughput workloads; tune KCP nodelay, window, MTU, resend, interval, UDP queue, and flush settings.
10. Add raw UDP Agent-to-POP transport only for explicitly datagram-oriented frame classes, or after a reliability/window design exists. 10. Design KCP horizontal scaling before allowing `POP_AGENT_KCP_WORKERS>1`; options include multiple POP ports/instances, reuseport five-tuple affinity, external session state, or a UDP dispatcher keyed by conv.
11. Add per-session flow-control windows to reduce head-of-line blocking on one Agent connection. 11. Add raw UDP Agent-to-POP transport only for explicitly datagram-oriented frame classes, or after a reliability/window design exists.
12. Optimize UDP relay with POP-side UDP socket pooling. 12. Add per-session flow-control windows to reduce head-of-line blocking on one Agent connection.
13. Add UDP association idle timeouts and cleanup. 13. Optimize UDP relay with POP-side UDP socket pooling.
14. Aggregate UDP audit records per association instead of per datagram. 14. Add UDP association idle timeouts and cleanup.
15. Add UDP and per-user rate limiting. 15. Aggregate UDP audit records per association instead of per datagram.
16. Add UDP and per-user rate limiting.
## 0. Project Name ## 0. Project Name

View File

@ -1,9 +1,4 @@
当前仍无法实现chacha20加密在开启chacha20之后 kcp有可供调整的参数吗
Using SOCKS5 proxy: socks5h://127.0.0.1:21080 存在堵塞现象。
[1/2] HTTPS connectivity: https://bing.com/ PHP Warning: stream_socket_sendto(): Resource temporarily unavailable
* Trying 127.0.0.1:21080... in /www/laylink/vendor/workerman/workerman/src/Connection/UdpConnection.php on line 85
* SOCKS5 connect to bing.com:443 (remotely resolved)
* Can't complete SOCKS5 connection to bing.com. (1)
* Closing connection 0
curl: (97) Can't complete SOCKS5 connection to bing.com. (1)
ERR bing_request_failed status=97

View File

@ -25,7 +25,7 @@ LayLink 是一个基于 PHP Workerman 的策略控制型四层反向访问网关
cp .env.example .env cp .env.example .env
``` ```
`.env.example` 中的 `[config]`、`[client-agent]`、`[pop-server]` 是阅读分组标题,当前加载器会忽略这些标题,只读取 `KEY=value` 配置行。 `.env.example` 中的 `[config]`、`[kcp]`、`[client-agent]`、`[pop-server]` 是阅读分组标题,当前加载器会忽略这些标题,只读取 `KEY=value` 配置行。
Agent 与 POP Server 之间的 LayLink Frame 支持加密: Agent 与 POP Server 之间的 LayLink Frame 支持加密:
@ -75,6 +75,8 @@ POP Server 需要配置这些 `.env`
APP_ENV=dev APP_ENV=dev
POP_AGENT_LISTEN=0.0.0.0:9001 POP_AGENT_LISTEN=0.0.0.0:9001
POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp
POP_AGENT_TCP_WORKERS=1
POP_AGENT_KCP_WORKERS=1
AUDIT_LOG=runtime/audit.log AUDIT_LOG=runtime/audit.log
LOG_LEVEL=debug LOG_LEVEL=debug
``` ```
@ -88,6 +90,8 @@ LOG_LEVEL=debug
| `LAYLINK_FRAME_ENCRYPTION_KEY` | Frame 加密密钥,启用 `chacha20` 时必填。 | 普通口令、`hex:...`、`base64:...` | | `LAYLINK_FRAME_ENCRYPTION_KEY` | Frame 加密密钥,启用 `chacha20` 时必填。 | 普通口令、`hex:...`、`base64:...` |
| `POP_AGENT_LISTEN` | POP Server 给 Client Agent 连接的监听地址。Agent 的 `POP_SERVER_ADDRESS` 应指向这里。 | `0.0.0.0:9001`、`127.0.0.1:9001` | | `POP_AGENT_LISTEN` | POP Server 给 Client Agent 连接的监听地址。Agent 的 `POP_SERVER_ADDRESS` 应指向这里。 | `0.0.0.0:9001`、`127.0.0.1:9001` |
| `POP_ALLOWED_AGENT_TRANSPORTS` | POP Server 允许 Agent 使用的底层传输协议。支持逗号数组,也支持 JSON 数组。Agent 认证时会上报自己的选择,不在列表内会被拒绝。 | `tcp`、`tcp,kcp`、`["tcp","kcp"]` | | `POP_ALLOWED_AGENT_TRANSPORTS` | POP Server 允许 Agent 使用的底层传输协议。支持逗号数组,也支持 JSON 数组。Agent 认证时会上报自己的选择,不在列表内会被拒绝。 | `tcp`、`tcp,kcp`、`["tcp","kcp"]` |
| `POP_AGENT_TCP_WORKERS` | POP TCP Agent listener 的 worker 数。TCP 模式可按 CPU 和并发提高。 | `1`、`2`、`4`、`8` |
| `POP_AGENT_KCP_WORKERS` | POP KCP/UDP Agent listener 的 worker 数。当前必须保持 `1`。 | `1` |
| `AUDIT_LOG` | 审计日志路径。MVP 使用 JSON Lines 追加写入。 | `runtime/audit.log` | | `AUDIT_LOG` | 审计日志路径。MVP 使用 JSON Lines 追加写入。 | `runtime/audit.log` |
| `LOG_LEVEL` | 日志级别预留配置。当前 MVP 主要为后续日志工厂使用。 | `debug`、`info`、`warning`、`error` | | `LOG_LEVEL` | 日志级别预留配置。当前 MVP 主要为后续日志工厂使用。 | `debug`、`info`、`warning`、`error` |
@ -383,6 +387,28 @@ LAYLINK_KCP_FFI_LIB=native/kcp/liblaylink_kcp.so
如果运行环境暂时不能启用 FFI可以配置 `LAYLINK_KCP_BACKEND=php` 使用调试回退实现;该实现不适合作为生产高吞吐路径。 如果运行环境暂时不能启用 FFI可以配置 `LAYLINK_KCP_BACKEND=php` 使用调试回退实现;该实现不适合作为生产高吞吐路径。
KCP 可调参数:
| 变量 | 作用 | 建议 |
| --- | --- | --- |
| `LAYLINK_KCP_NODELAY` | KCP nodelay 开关。 | 默认 `1` |
| `LAYLINK_KCP_INTERVAL_MS` | KCP update 间隔,越小越低延迟但发包/CPU 更高。 | `10`、`20`、`30` |
| `LAYLINK_KCP_FAST_RESEND` | 快速重传阈值。 | `2`,拥堵时试 `3`、`4` |
| `LAYLINK_KCP_NO_CONGESTION_CONTROL` | 是否关闭 KCP 拥塞控制。 | 公网建议 `0`,内网压测可试 `1` |
| `LAYLINK_KCP_SEND_WINDOW` | KCP 发送窗口。 | `128`、`256`、`512` |
| `LAYLINK_KCP_RECV_WINDOW` | KCP 接收窗口。 | `256`、`512`、`1024` |
| `LAYLINK_KCP_MTU_BYTES` | KCP MTU。 | 公网建议 `1200`,内网可试 `1350` |
| `LAYLINK_KCP_TICK_MS` | PHP transport tick 间隔。 | 通常等于 `LAYLINK_KCP_INTERVAL_MS` |
| `LAYLINK_KCP_UDP_SEND_QUEUE_BYTES` | UDP 发送遇到 EAGAIN 时的本地排队上限。 | `16777216`、`33554432` |
| `LAYLINK_KCP_UDP_FLUSH_PACKETS` | 每次 tick 最多刷出的 UDP packet 数。 | 拥堵时 `64`/`128`,吞吐压测 `256`/`512` |
| `LAYLINK_KCP_OUTPUT_DRAIN_PACKETS` | 每次从 native KCP 输出队列搬到 UDP 发送队列的最大 packet 数。 | 延迟敏感用 `64`/`128`,吞吐压测用 `256`/`512` |
如果出现 `stream_socket_sendto(): Resource temporarily unavailable`,说明 UDP socket 发送缓冲暂时满了。LayLink 会把 KCP packet 放入本地发送队列并在后续 tick 重试;同时建议把 `LAYLINK_KCP_NO_CONGESTION_CONTROL` 保持为 `0`,必要时降低 `LAYLINK_KCP_SEND_WINDOW`、`LAYLINK_KCP_UDP_FLUSH_PACKETS` 或调大 `LAYLINK_KCP_INTERVAL_MS`
KCP/UDP 目前不要把 `POP_AGENT_KCP_WORKERS` 调大。KCP 会话状态存在单个 worker 进程里UDP 多 worker 会让同一个会话的数据包分散到不同进程,导致找不到会话、重传增加甚至断流。要横向扩展 KCP当前推荐启动多个 POP 端口或多个 POP 实例,由 Client Agent 配置多 POP/多进程策略;后续可以做 `SO_REUSEPORT` 五元组哈希、外部 session 表或每 conv 固定 worker 分发。
单 worker 并不等于一个大下载会同步阻塞其他请求目标连接、UDP socket 和本地客户端都是非阻塞 I/O。但大文件会带来大量 KCP 分片和加密/解密/FFI 调用,可能短时间占用事件循环 CPU。延迟敏感场景可降低 `LAYLINK_DATA_CHUNK_BYTES`、`LAYLINK_KCP_OUTPUT_DRAIN_PACKETS` 和 `LAYLINK_KCP_UDP_FLUSH_PACKETS`,让一个大流量会话每次 tick 少占一点时间,换取更好的多会话公平性。
如果 Agent 配置为 `udp`,进程会启动失败并明确提示该传输尚未实现。 如果 Agent 配置为 `udp`,进程会启动失败并明确提示该传输尚未实现。
启动 POP Server 启动 POP Server

View File

@ -28,6 +28,8 @@ final class PopServer
private readonly int $maxSendBuffer = 64 * 1024 * 1024, private readonly int $maxSendBuffer = 64 * 1024 * 1024,
private readonly int $backpressureHighWatermark = 32 * 1024 * 1024, private readonly int $backpressureHighWatermark = 32 * 1024 * 1024,
private readonly int $dataChunkSize = 1024 * 1024, private readonly int $dataChunkSize = 1024 * 1024,
private readonly int $tcpWorkerCount = 1,
private readonly int $kcpWorkerCount = 1,
) { ) {
$this->nodes = new NodeRegistry(); $this->nodes = new NodeRegistry();
$this->sessions = new SessionManager(); $this->sessions = new SessionManager();
@ -45,7 +47,7 @@ final class PopServer
$scheme = $transport === 'kcp' ? 'udp' : 'tcp'; $scheme = $transport === 'kcp' ? 'udp' : 'tcp';
$agentWorker = new Worker($scheme . '://' . $this->agentListen); $agentWorker = new Worker($scheme . '://' . $this->agentListen);
$agentWorker->name = 'laylink-pop-agent-listener-' . $transport; $agentWorker->name = 'laylink-pop-agent-listener-' . $transport;
$agentWorker->count = 1; $agentWorker->count = $transport === 'kcp' ? $this->kcpWorkerCount : $this->tcpWorkerCount;
new AgentListener( new AgentListener(
$agentWorker, $agentWorker,
new NodeAuthenticator($this->nodeConfig, $this->allowedAgentTransports), new NodeAuthenticator($this->nodeConfig, $this->allowedAgentTransports),

View File

@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
final class KcpConfig
{
public static function int(string $key, int $default, int $min, int $max): int
{
$value = getenv($key);
if ($value === false || trim($value) === '' || !preg_match('/^-?\d+$/', trim($value))) {
return $default;
}
return max($min, min($max, (int)trim($value)));
}
public static function tickSeconds(): float
{
return self::int('LAYLINK_KCP_TICK_MS', 10, 1, 1000) / 1000;
}
}

View File

@ -12,6 +12,7 @@ final class KcpFrameClientTransport implements FrameClientTransport
{ {
private ?AsyncUdpConnection $connection = null; private ?AsyncUdpConnection $connection = null;
private KcpReliableSession|NativeKcpSession|null $session = null; private KcpReliableSession|NativeKcpSession|null $session = null;
private ?KcpUdpPacketSender $sender = null;
private ?int $timerId = null; private ?int $timerId = null;
private int $conv; private int $conv;
private bool $connected = false; private bool $connected = false;
@ -34,9 +35,13 @@ final class KcpFrameClientTransport implements FrameClientTransport
$this->session = null; $this->session = null;
$connection = new AsyncUdpConnection($this->normalizeAddress($this->address)); $connection = new AsyncUdpConnection($this->normalizeAddress($this->address));
$this->connection = $connection; $this->connection = $connection;
$this->sender = new KcpUdpPacketSender(
$connection,
KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 16 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024),
);
$connection->onConnect = function () use ($connection): void { $connection->onConnect = function () use ($connection): void {
$connection->send(KcpPacketCodec::encode([ $this->sender?->send(KcpPacketCodec::encode([
'type' => KcpPacketCodec::SYN, 'type' => KcpPacketCodec::SYN,
'conv' => $this->conv, 'conv' => $this->conv,
])); ]));
@ -47,13 +52,14 @@ final class KcpFrameClientTransport implements FrameClientTransport
$connection->onClose = function () use ($connection): void { $connection->onClose = function () use ($connection): void {
if ($this->connection === $connection) { if ($this->connection === $connection) {
$this->connection = null; $this->connection = null;
$this->sender = null;
} }
$this->stopTimer(); $this->stopTimer();
($this->onClose)($this); ($this->onClose)($this);
}; };
$connection->connect(); $connection->connect();
$this->timerId = Timer::add(0.02, fn () => $this->tick()); $this->timerId = Timer::add(KcpConfig::tickSeconds(), fn () => $this->tick());
} }
public function send(Frame $frame): bool|null public function send(Frame $frame): bool|null
@ -79,7 +85,7 @@ final class KcpFrameClientTransport implements FrameClientTransport
public function getSendBufferQueueSize(): int public function getSendBufferQueueSize(): int
{ {
return $this->session?->getSendBufferQueueSize() ?? 0; return ($this->session?->getSendBufferQueueSize() ?? 0) + ($this->sender?->queuedBytes() ?? 0);
} }
private function handlePacket(string $data): void private function handlePacket(string $data): void
@ -120,15 +126,18 @@ final class KcpFrameClientTransport implements FrameClientTransport
} }
if (!$this->connected) { if (!$this->connected) {
$this->connection->send(KcpPacketCodec::encode([ $this->sender?->send(KcpPacketCodec::encode([
'type' => KcpPacketCodec::SYN, 'type' => KcpPacketCodec::SYN,
'conv' => $this->conv, 'conv' => $this->conv,
])); ]));
$this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
return; return;
} }
$before = $this->getSendBufferQueueSize(); $before = $this->getSendBufferQueueSize();
$this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
$this->session?->tick(); $this->session?->tick();
$this->sender?->flush(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
if ($before >= $this->maxSendBuffer && $this->getSendBufferQueueSize() < $this->maxSendBuffer) { if ($before >= $this->maxSendBuffer && $this->getSendBufferQueueSize() < $this->maxSendBuffer) {
($this->onBufferDrain)($this); ($this->onBufferDrain)($this);
} }
@ -140,7 +149,7 @@ final class KcpFrameClientTransport implements FrameClientTransport
$libraryPath = (string)(getenv('LAYLINK_KCP_FFI_LIB') ?: dirname(__DIR__, 2) . '/native/kcp/liblaylink_kcp.so'); $libraryPath = (string)(getenv('LAYLINK_KCP_FFI_LIB') ?: dirname(__DIR__, 2) . '/native/kcp/liblaylink_kcp.so');
$args = [ $args = [
$this->conv, $this->conv,
fn (string $packet): bool|null => $this->connection?->send($packet), fn (string $packet): bool|null => $this->sender?->send($packet) ?? false,
fn (Frame $frame) => ($this->onFrame)($this, $frame), fn (Frame $frame) => ($this->onFrame)($this, $frame),
fn (\Throwable $e) => ($this->onInvalidFrame)($e), fn (\Throwable $e) => ($this->onInvalidFrame)($e),
]; ];

View File

@ -15,6 +15,7 @@ final class KcpFrameServerConnection implements FrameServerConnection
private readonly int $id, private readonly int $id,
private UdpConnection $connection, private UdpConnection $connection,
private readonly KcpReliableSession|NativeKcpSession $session, private readonly KcpReliableSession|NativeKcpSession $session,
private readonly KcpUdpPacketSender $sender,
private readonly int $maxSendBuffer, private readonly int $maxSendBuffer,
) { ) {
} }
@ -55,7 +56,7 @@ final class KcpFrameServerConnection implements FrameServerConnection
public function getSendBufferQueueSize(): int public function getSendBufferQueueSize(): int
{ {
return $this->session->getSendBufferQueueSize(); return $this->session->getSendBufferQueueSize() + $this->sender->queuedBytes();
} }
public function getRemoteIp(): string public function getRemoteIp(): string
@ -66,10 +67,21 @@ final class KcpFrameServerConnection implements FrameServerConnection
public function updateConnection(UdpConnection $connection): void public function updateConnection(UdpConnection $connection): void
{ {
$this->connection = $connection; $this->connection = $connection;
$this->sender->updateConnection($connection);
} }
public function isClosed(): bool public function isClosed(): bool
{ {
return $this->closed || $this->session->isClosed(); return $this->closed || $this->session->isClosed();
} }
public function flushUdp(int $maxPackets): void
{
$this->sender->flush($maxPackets);
}
public function sendUdpPacket(string $packet): bool
{
return $this->sender->send($packet);
}
} }

View File

@ -29,7 +29,7 @@ final class KcpFrameServerListener
) { ) {
$worker->onMessage = fn (UdpConnection $connection, string $data) => $this->handleMessage($connection, $data); $worker->onMessage = fn (UdpConnection $connection, string $data) => $this->handleMessage($connection, $data);
$worker->onWorkerStart = function (): void { $worker->onWorkerStart = function (): void {
$this->timerId = Timer::add(0.02, fn () => $this->tick()); $this->timerId = Timer::add(KcpConfig::tickSeconds(), fn () => $this->tick());
}; };
$worker->onWorkerStop = function (): void { $worker->onWorkerStop = function (): void {
if ($this->timerId !== null) { if ($this->timerId !== null) {
@ -99,16 +99,22 @@ final class KcpFrameServerListener
private function handleSyn(UdpConnection $connection, string $key, int $conv): void private function handleSyn(UdpConnection $connection, string $key, int $conv): void
{ {
if (isset($this->connections[$key])) { if (isset($this->connections[$key])) {
$connection->send(KcpPacketCodec::encode([ $this->connections[$key]->updateConnection($connection);
$this->connections[$key]->sendUdpPacket(KcpPacketCodec::encode([
'type' => KcpPacketCodec::SYN_ACK, 'type' => KcpPacketCodec::SYN_ACK,
'conv' => $conv, 'conv' => $conv,
])); ]));
$this->connections[$key]->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
return; return;
} }
$sender = new KcpUdpPacketSender(
$connection,
KcpConfig::int('LAYLINK_KCP_UDP_SEND_QUEUE_BYTES', 16 * 1024 * 1024, 1024 * 1024, 512 * 1024 * 1024),
);
$session = $this->createSession( $session = $this->createSession(
$conv, $conv,
fn (string $packet): bool|null => $connection->send($packet), fn (string $packet): bool|null => $sender->send($packet),
function (Frame $frame) use ($key): void { function (Frame $frame) use ($key): void {
$wrapped = $this->connections[$key] ?? null; $wrapped = $this->connections[$key] ?? null;
if ($wrapped !== null) { if ($wrapped !== null) {
@ -122,11 +128,11 @@ final class KcpFrameServerListener
} }
}, },
); );
$wrapped = new KcpFrameServerConnection($this->nextConnectionId++, $connection, $session, $this->maxSendBuffer); $wrapped = new KcpFrameServerConnection($this->nextConnectionId++, $connection, $session, $sender, $this->maxSendBuffer);
$this->sessions[$key] = $session; $this->sessions[$key] = $session;
$this->connections[$key] = $wrapped; $this->connections[$key] = $wrapped;
$connection->send(KcpPacketCodec::encode([ $sender->send(KcpPacketCodec::encode([
'type' => KcpPacketCodec::SYN_ACK, 'type' => KcpPacketCodec::SYN_ACK,
'conv' => $conv, 'conv' => $conv,
])); ]));
@ -143,7 +149,9 @@ final class KcpFrameServerListener
} }
$before = $session->getSendBufferQueueSize(); $before = $session->getSendBufferQueueSize();
$wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
$session->tick(); $session->tick();
$wrapped->flushUdp(KcpConfig::int('LAYLINK_KCP_UDP_FLUSH_PACKETS', 256, 1, 8192));
if ($before >= $this->maxSendBuffer && $session->getSendBufferQueueSize() < $this->maxSendBuffer) { if ($before >= $this->maxSendBuffer && $session->getSendBufferQueueSize() < $this->maxSendBuffer) {
($this->onBufferDrain)($wrapped); ($this->onBufferDrain)($wrapped);
} }

View File

@ -0,0 +1,89 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Connection\UdpConnection;
final class KcpUdpPacketSender
{
/** @var string[] */
private array $queue = [];
private int $queuedBytes = 0;
public function __construct(
private UdpConnection $connection,
private readonly int $maxQueueBytes,
) {
}
public function updateConnection(UdpConnection $connection): void
{
$this->connection = $connection;
}
public function send(string $packet): bool
{
if ($this->queue !== []) {
return $this->enqueue($packet);
}
if ($this->trySend($packet)) {
return true;
}
return $this->enqueue($packet);
}
public function flush(int $maxPackets): void
{
$sent = 0;
while ($this->queue !== [] && $sent < $maxPackets) {
$packet = $this->queue[0];
if (!$this->trySend($packet)) {
return;
}
array_shift($this->queue);
$this->queuedBytes -= strlen($packet);
$sent++;
}
}
public function queuedBytes(): int
{
return $this->queuedBytes;
}
private function enqueue(string $packet): bool
{
$length = strlen($packet);
if ($this->queuedBytes + $length > $this->maxQueueBytes) {
return false;
}
$this->queue[] = $packet;
$this->queuedBytes += $length;
return true;
}
private function trySend(string $packet): bool
{
$socket = $this->connection->getSocket();
if (!is_resource($socket)) {
return false;
}
if ($this->connection instanceof AsyncUdpConnection) {
return @stream_socket_sendto($socket, $packet) === strlen($packet);
}
$remote = $this->connection->isIpV6()
? '[' . $this->connection->getRemoteIp() . ']:' . $this->connection->getRemotePort()
: $this->connection->getRemoteAddress();
return @stream_socket_sendto($socket, $packet, 0, $remote) === strlen($packet);
}
}

View File

@ -35,6 +35,23 @@ final class NativeKcpSession
throw new \RuntimeException('kcp_create_failed'); throw new \RuntimeException('kcp_create_failed');
} }
$this->ffi->laylink_kcp_nodelay(
$this->session,
KcpConfig::int('LAYLINK_KCP_NODELAY', 1, 0, 1),
KcpConfig::int('LAYLINK_KCP_INTERVAL_MS', 10, 1, 1000),
KcpConfig::int('LAYLINK_KCP_FAST_RESEND', 2, 0, 10),
KcpConfig::int('LAYLINK_KCP_NO_CONGESTION_CONTROL', 0, 0, 1),
);
$this->ffi->laylink_kcp_wndsize(
$this->session,
KcpConfig::int('LAYLINK_KCP_SEND_WINDOW', 256, 16, 8192),
KcpConfig::int('LAYLINK_KCP_RECV_WINDOW', 512, 16, 8192),
);
$this->ffi->laylink_kcp_setmtu(
$this->session,
KcpConfig::int('LAYLINK_KCP_MTU_BYTES', 1200, 576, 1400),
);
$this->parser = new FrameParser(); $this->parser = new FrameParser();
} }
@ -165,13 +182,20 @@ final class NativeKcpSession
private function drainOutput(): void private function drainOutput(): void
{ {
while (($size = $this->ffi->laylink_kcp_pending_output_size($this->session)) > 0) { $drained = 0;
$maxPackets = KcpConfig::int('LAYLINK_KCP_OUTPUT_DRAIN_PACKETS', 256, 1, 8192);
while ($drained < $maxPackets && ($size = $this->ffi->laylink_kcp_pending_output_size($this->session)) > 0) {
$buffer = $this->ffi->new("char[$size]"); $buffer = $this->ffi->new("char[$size]");
$read = $this->ffi->laylink_kcp_pop_output($this->session, $buffer, $size); $read = $this->ffi->laylink_kcp_pop_output($this->session, $buffer, $size);
if ($read <= 0) { if ($read <= 0) {
return; return;
} }
($this->sendPacket)(FFI::string($buffer, $read)); $drained++;
if (($this->sendPacket)(FFI::string($buffer, $read)) === false) {
($this->onInvalidFrame)(new \RuntimeException('kcp_udp_send_queue_full'));
$this->close();
return;
}
} }
} }