From 661ed401dab22ebc14afcb590927eb933d410a2c Mon Sep 17 00:00:00 2001 From: EchoNoch Date: Fri, 29 May 2026 01:06:18 +0800 Subject: [PATCH] Add KCP support --- .env.example | 14 +- .gitignore | 3 +- bin/client-agent.php | 1 + contract.md | 68 +- native/kcp/ikcp.c | 1423 +++++++++++++++++ native/kcp/ikcp.h | 452 ++++++ native/kcp/laylink_kcp.c | 193 +++ native/kcp/laylink_kcp.h | 29 + readme.md | 35 +- scripts/build-kcp-ffi.sh | 14 + src/Agent/AgentClient.php | 244 ++- src/Node/NodeConnection.php | 4 +- src/Node/NodeRegistry.php | 48 +- src/Server/AgentListener.php | 99 +- src/Server/PopServer.php | 39 +- src/Session/TunnelSession.php | 3 +- src/Transport/FrameClientTransport.php | 22 + src/Transport/FrameClientTransportFactory.php | 44 + src/Transport/FrameServerConnection.php | 24 + src/Transport/FrameServerListenerFactory.php | 45 + src/Transport/KcpFrameClientTransport.php | 176 ++ src/Transport/KcpFrameServerConnection.php | 75 + src/Transport/KcpFrameServerListener.php | 177 ++ src/Transport/KcpPacketCodec.php | 71 + src/Transport/KcpReliableSession.php | 235 +++ src/Transport/NativeKcpLibrary.php | 52 + src/Transport/NativeKcpSession.php | 188 +++ src/Transport/TcpFrameClientTransport.php | 85 + src/Transport/TcpFrameServerConnection.php | 63 + src/Transport/TcpFrameServerListener.php | 66 + src/Util/Env.php | 11 + 31 files changed, 3829 insertions(+), 174 deletions(-) create mode 100644 native/kcp/ikcp.c create mode 100644 native/kcp/ikcp.h create mode 100644 native/kcp/laylink_kcp.c create mode 100644 native/kcp/laylink_kcp.h create mode 100755 scripts/build-kcp-ffi.sh create mode 100644 src/Transport/FrameClientTransport.php create mode 100644 src/Transport/FrameClientTransportFactory.php create mode 100644 src/Transport/FrameServerConnection.php create mode 100644 src/Transport/FrameServerListenerFactory.php create mode 100644 src/Transport/KcpFrameClientTransport.php create mode 100644 src/Transport/KcpFrameServerConnection.php create mode 100644 src/Transport/KcpFrameServerListener.php create mode 100644 src/Transport/KcpPacketCodec.php create mode 100644 src/Transport/KcpReliableSession.php create mode 100644 src/Transport/NativeKcpLibrary.php create mode 100644 src/Transport/NativeKcpSession.php create mode 100644 src/Transport/TcpFrameClientTransport.php create mode 100644 src/Transport/TcpFrameServerConnection.php create mode 100644 src/Transport/TcpFrameServerListener.php diff --git a/.env.example b/.env.example index e6dbe21..45cc119 100644 --- a/.env.example +++ b/.env.example @@ -15,6 +15,10 @@ LAYLINK_MAX_SEND_BUFFER_BYTES=67108864 # 单连接发送缓冲区上限;单位字节。大文件下载建议 33554432 或 67108864,内存紧张时调小。 LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432 # 背压触发水位;单位字节。应小于 LAYLINK_MAX_SEND_BUFFER_BYTES,达到后会暂停上游读取直到缓冲排空。 +LAYLINK_KCP_BACKEND=ffi +# KCP 实现后端;可选 ffi、php。ffi 使用 native ikcp.c 动态库,生产建议使用;php 是调试回退实现。 +LAYLINK_KCP_FFI_LIB=native/kcp/liblaylink_kcp.so +# native KCP 动态库路径;LAYLINK_KCP_BACKEND=ffi 时使用。相对路径按项目根目录解析,先运行 scripts/build-kcp-ffi.sh 构建。 [client-agent] NODE_ID=client-01 @@ -26,9 +30,11 @@ NODE_TOKEN=CHANGE_ME NODE_ZONE=default # 当前 Agent 所在逻辑区域;可按部署场景填写,例如 local、corp、restricted-a。 POP_SERVER_ADDRESS=tcp://127.0.0.1:9001 -# Agent 出站连接 POP Server 的地址;格式为 tcp://host:port,例如 tcp://10.1.0.2:9001。 +# Agent 出站连接 POP Server 的地址;格式为 tcp://host:port,例如 tcp://10.1.0.2:9001;AGENT_TRANSPORT_PROTOCOL=kcp 时会使用同一 host:port 的 UDP。 AGENT_TRANSPORT_PROTOCOL=tcp -# 当前 Agent 到 POP Server 使用的传输协议;可选值 tcp、udp、kcp;必须被 POP_ALLOWED_AGENT_TRANSPORTS 允许,当前可运行值为 tcp。 +# 当前 Agent 到 POP Server 使用的传输协议;可选值 tcp、udp、kcp;必须被 POP_ALLOWED_AGENT_TRANSPORTS 允许,当前可运行值为 tcp、kcp。 +CLIENT_AGENT_POP_CONNECTIONS=1 +# Client Agent 到 POP Server 的并行长连接数量;默认 1。提高到 2、4 可分摊多会话,当前 tcp/kcp 生效。 CLIENT_AGENT_AUTH_TOKEN=dev-token # Client Agent 为 SOCKS5/HTTP 代理入口生成 OPEN 帧时使用的客户端认证 token;当前 MVP 默认 dev-token。 CLIENT_AGENT_USER_ID=admin @@ -67,5 +73,5 @@ CLIENT_AGENT_RAW_JSON_LISTEN_PORT=9000 [pop-server] POP_AGENT_LISTEN=0.0.0.0:9001 # POP Server 监听 Agent 长连接的地址;格式为 host:port,例如 0.0.0.0:9001 或 127.0.0.1:9001。 -POP_ALLOWED_AGENT_TRANSPORTS=tcp -# POP Server 允许 Client Agent 使用的传输协议;逗号分隔,可选值 tcp、udp、kcp;当前已实现 tcp,udp/kcp 为预留实现。 +POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp +# POP Server 允许 Client Agent 使用的传输协议;可写逗号数组 tcp,kcp,也可写 JSON 数组 ["tcp","kcp"];当前已实现 tcp、kcp,udp 为预留实现。 diff --git a/.gitignore b/.gitignore index e0b9c9d..ef8718a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .env -runtime/* \ No newline at end of file +runtime/* +native/kcp/*.so diff --git a/bin/client-agent.php b/bin/client-agent.php index 52d7ebe..6d5b71f 100755 --- a/bin/client-agent.php +++ b/bin/client-agent.php @@ -42,6 +42,7 @@ $bootAgent = function (string $protocol, string $listen, string $name) use ($nod Env::int('LAYLINK_MAX_SEND_BUFFER_BYTES', 64 * 1024 * 1024, 1024 * 1024), Env::int('LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES', 32 * 1024 * 1024, 512 * 1024), Env::int('LAYLINK_DATA_CHUNK_BYTES', 1024 * 1024, 16 * 1024, 8 * 1024 * 1024), + Env::int('CLIENT_AGENT_POP_CONNECTIONS', 1, 1, 16), ); $agent->boot($name); }; diff --git a/contract.md b/contract.md index fdf7133..e163566 100644 --- a/contract.md +++ b/contract.md @@ -2,7 +2,7 @@ ## Implementation Status -Last updated: 2026-05-28 Asia/Shanghai. +Last updated: 2026-05-29 Asia/Shanghai. Current phase: MVP bootstrap in progress. @@ -24,7 +24,7 @@ Direction update under evaluation: * POP Server uses `POP_ALLOWED_AGENT_TRANSPORTS` to allow one or more Agent-to-POP transports. * Agent uses `AGENT_TRANSPORT_PROTOCOL` to choose one concrete transport. * Allowed names are `tcp`, `udp`, and `kcp`. - * Current runnable implementation is `tcp`; `udp` and `kcp` are reserved and must be implemented behind a transport abstraction. + * Current runnable implementations are `tcp` and experimental `kcp`; `udp` is reserved. * Feasibility: * Workerman supports long-running async TCP servers and custom protocols; it is suitable for the framed fallback/control channel. * KCP itself is a UDP-based reliable ARQ protocol, so adding KCP means adding a UDP transport layer and session demultiplexing below the existing LayLink frame protocol. @@ -33,7 +33,7 @@ Direction update under evaluation: 1. Complete Client Agent naming migration in code, docs, config, and entrypoints. 2. Implement TCP-framed Client Agent -> POP -> public target path. 3. Define `TransportInterface` so frame protocol can run over TCP now and KCP later. - 4. Add KCP-over-UDP transport via extension/FFI/proxy after the TCP framed path is stable. + 4. Add KCP-over-UDP transport behind the transport abstraction after the TCP framed path is stable. * Main risk: * KCP is not a socket by itself. It needs UDP I/O, timers, packet flush/update scheduling, MTU handling, retransmission tuning, and connection/session management. * PHP-only KCP may work as a prototype but is likely CPU-heavy under concurrency. @@ -177,6 +177,47 @@ Completed in this checkpoint: * Client Agent now treats POP `CLOSE` as a graceful remote EOF and waits for the local client send buffer to drain before closing the local socket. * TCP `DATA` is split into configurable chunks, defaulting to 1 MiB, to reduce frame overhead while avoiding oversized frames. * POP refreshes Agent activity on any valid frame, not only `PING`, reducing heartbeat false positives during heavy traffic. +* Started Agent-to-POP transport abstraction: + * Added `FrameClientTransport`. + * Added `TcpFrameClientTransport` as the current TCP implementation. + * `AgentClient` now sends and receives LayLink frames through the transport interface instead of directly owning `AsyncTcpConnection` and `FrameParser`. + * This preserves current TCP behavior while preparing a `KcpFrameClientTransport` implementation. +* Added POP-side frame transport abstraction: + * Added `FrameServerConnection`. + * Added `TcpFrameServerConnection`. + * Added `TcpFrameServerListener`. + * `AgentListener`, `NodeConnection`, `NodeRegistry`, and `TunnelSession` now hold Agent connections through `FrameServerConnection`. + * TCP listener decode/encode details are isolated from POP session, policy, heartbeat, and relay logic. +* Added transport factory/config selection: + * `FrameClientTransportFactory` maps `AGENT_TRANSPORT_PROTOCOL=tcp` to `TcpFrameClientTransport`. + * `FrameServerListenerFactory` maps the implemented POP transport `tcp` to `TcpFrameServerListener`. + * `FrameClientTransportFactory` maps `AGENT_TRANSPORT_PROTOCOL=kcp` to `KcpFrameClientTransport`. + * `FrameServerListenerFactory` maps POP transport `kcp` to `KcpFrameServerListener`. + * `udp` still fails at factory boundaries with explicit not-implemented errors instead of leaking into business logic. +* Added experimental multi-connection Client Agent -> POP support: + * `CLIENT_AGENT_POP_CONNECTIONS` controls how many parallel Agent-to-POP long connections a Client Agent opens. + * New local TCP sessions are distributed round-robin across authenticated POP transports. + * Each session stays bound to its selected POP transport for the whole session lifetime. + * POP `NodeRegistry` now supports multiple live connections under the same `NODE_ID`. + * Heartbeat activity and offline cleanup are tracked per Agent connection. +* KCP/UDP implementation decision: + * Start with `kcp` before raw `udp` for Agent-to-POP frame transport. + * Existing TCP tunnel sessions require ordered, reliable byte-stream semantics; raw UDP would need retransmission, ordering, MTU fragmentation, congestion/window handling, and session cleanup. + * Implementing raw UDP as a general Frame transport would effectively recreate a weaker KCP. + * Keep the existing SOCKS5 `UDP ASSOCIATE`/`UDP_DATA` feature separate: it is application datagram relay over the current reliable Agent-to-POP channel, not the Agent-to-POP transport itself. + * Recommended KCP path is a transport implementation behind `FrameClientTransport` / `FrameServerConnection`, backed by a native extension, FFI binding, or sidecar process rather than pure PHP for production throughput. +* Added KCP Agent-to-POP transport: + * `KcpPacketCodec` defines UDP packet types for `SYN`, `SYN_ACK`, `DATA`, `ACK`, and `CLOSE`. + * `KcpFrameClientTransport` runs Client Agent frames over UDP while preserving the existing `FrameClientTransport` interface. + * `KcpFrameServerListener` and `KcpFrameServerConnection` expose KCP/UDP sessions to POP as `FrameServerConnection`. + * POP can now listen on both TCP and KCP when `POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp`. + * `NativeKcpSession` uses PHP FFI to call native upstream `ikcp.c` through `native/kcp/liblaylink_kcp.so`. + * `scripts/build-kcp-ffi.sh` builds the native shared library from vendored `native/kcp/ikcp.c`. + * `LAYLINK_KCP_BACKEND=ffi` selects the native KCP backend; `LAYLINK_KCP_BACKEND=php` remains as a debugging fallback through `KcpReliableSession`. + * `LAYLINK_KCP_FFI_LIB` can point to a custom native KCP library path. +* Added array-style env parsing: + * `Env::csv()` accepts traditional comma-separated values such as `tcp,kcp`. + * `Env::csv()` also accepts JSON arrays such as `["tcp","kcp"]`. Known MVP limitations: @@ -190,7 +231,7 @@ Known MVP limitations: * No TLS yet. * No production-grade client identity yet; `dev-token` is hardcoded for MVP development. * No automated integration test harness yet. -* TCP stream forwarding is still single Agent-to-POP connection based; binary `DATA` frames, chunking, graceful EOF, and backpressure reduce per-byte overhead and buffer blowups, but KCP/multipath/parallel transport and per-session flow-control tuning are still future performance work. +* TCP stream forwarding can now use multiple Agent-to-POP connections per Client Agent, but a single TCP session is still pinned to one POP transport. Binary `DATA` frames, chunking, graceful EOF, and backpressure reduce per-byte overhead and buffer blowups; KCP is experimental and still needs throughput/loss tuning, while multipath and per-session flow-control tuning are future performance work. * No explicit idle timeout or connect timeout enforcement yet. * UDP relay is datagram-oriented and currently creates short-lived POP-side UDP sockets per outbound datagram; pooling and stronger timeout accounting are still future work. * HTTP proxy supports `CONNECT` and ordinary absolute URL HTTP requests; advanced proxy auth and full HTTP/2 proxying are not implemented. @@ -203,13 +244,18 @@ Next recommended tasks: 4. Add more detailed buffer overflow audit reasons and metrics. 5. Add README quickstart with exact local commands. 6. Add a reproducible throughput benchmark script for direct-vs-LayLink comparisons. -7. Add multi-connection Agent-to-POP support so multiple Client Agent workers can spread concurrent sessions safely. -8. Add KCP or another UDP-based reliable transport behind the transport abstraction. -9. Add per-session flow-control windows to reduce head-of-line blocking on one Agent connection. -10. Optimize UDP relay with POP-side UDP socket pooling. -11. Add UDP association idle timeouts and cleanup. -12. Aggregate UDP audit records per association instead of per datagram. -13. Add UDP and per-user rate limiting. +7. Keep TCP tuning as an ongoing task: + * benchmark `LAYLINK_DATA_CHUNK_BYTES` at `524288`, `1048576`, `2097152`, and `4194304` + * benchmark buffer pairs such as `64MiB/32MiB` and `128MiB/64MiB` + * record direct-vs-LayLink throughput, CPU, memory, and disconnect behavior +8. Benchmark and tune `CLIENT_AGENT_POP_CONNECTIONS` for 1, 2, 4, and 8 long connections under mixed single-download and multi-session workloads. +9. Benchmark native FFI `kcp` against `tcp` under latency, loss, and high-throughput workloads; tune KCP nodelay, window, MTU, resend, and interval settings. +10. Add raw UDP Agent-to-POP transport only for explicitly datagram-oriented frame classes, or after a reliability/window design exists. +11. Add per-session flow-control windows to reduce head-of-line blocking on one Agent connection. +12. Optimize UDP relay with POP-side UDP socket pooling. +13. Add UDP association idle timeouts and cleanup. +14. Aggregate UDP audit records per association instead of per datagram. +15. Add UDP and per-user rate limiting. ## 0. Project Name diff --git a/native/kcp/ikcp.c b/native/kcp/ikcp.c new file mode 100644 index 0000000..1f32039 --- /dev/null +++ b/native/kcp/ikcp.c @@ -0,0 +1,1423 @@ +//===================================================================== +// +// KCP - A Better ARQ Protocol Implementation +// skywind3000 (at) gmail.com, 2010-2011 +// +// Features: +// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp. +// + Maximum RTT reduce three times vs tcp. +// + Lightweight, distributed as a single source file. +// +//===================================================================== +#include "ikcp.h" + +#include +#include +#include +#include +#include + +#define IKCP_FASTACK_CONSERVE + +//===================================================================== +// KCP BASIC +//===================================================================== +const IUINT32 IKCP_RTO_NDL = 30; // no delay min rto +const IUINT32 IKCP_RTO_MIN = 100; // normal min rto +const IUINT32 IKCP_RTO_DEF = 200; +const IUINT32 IKCP_RTO_MAX = 60000; +const IUINT32 IKCP_CMD_PUSH = 81; // cmd: push data +const IUINT32 IKCP_CMD_ACK = 82; // cmd: ack +const IUINT32 IKCP_CMD_WASK = 83; // cmd: window probe (ask) +const IUINT32 IKCP_CMD_WINS = 84; // cmd: window size (tell) +const IUINT32 IKCP_ASK_SEND = 1; // need to send IKCP_CMD_WASK +const IUINT32 IKCP_ASK_TELL = 2; // need to send IKCP_CMD_WINS +const IUINT32 IKCP_WND_SND = 32; +const IUINT32 IKCP_WND_RCV = 128; // must >= max fragment size +const IUINT32 IKCP_MTU_DEF = 1400; +const IUINT32 IKCP_ACK_FAST = 3; +const IUINT32 IKCP_INTERVAL = 100; +const IUINT32 IKCP_OVERHEAD = 24; +const IUINT32 IKCP_DEADLINK = 20; +const IUINT32 IKCP_THRESH_INIT = 2; +const IUINT32 IKCP_THRESH_MIN = 2; +const IUINT32 IKCP_PROBE_INIT = 5000; // 7 secs to probe window size +const IUINT32 IKCP_PROBE_LIMIT = 120000; // up to 120 secs to probe window +const IUINT32 IKCP_FASTACK_LIMIT = 5; // max times to trigger fastack + + +//--------------------------------------------------------------------- +// encode / decode +//--------------------------------------------------------------------- + +/* encode 8 bits unsigned int */ +static inline char *ikcp_encode8u(char *p, unsigned char c) +{ + *(unsigned char*)p++ = c; + return p; +} + +/* decode 8 bits unsigned int */ +static inline const char *ikcp_decode8u(const char *p, unsigned char *c) +{ + *c = *(unsigned char*)p++; + return p; +} + +/* encode 16 bits unsigned int (lsb) */ +static inline char *ikcp_encode16u(char *p, unsigned short w) +{ +#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN + *(unsigned char*)(p + 0) = (w & 255); + *(unsigned char*)(p + 1) = (w >> 8); +#else + memcpy(p, &w, 2); +#endif + p += 2; + return p; +} + +/* decode 16 bits unsigned int (lsb) */ +static inline const char *ikcp_decode16u(const char *p, unsigned short *w) +{ +#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN + *w = *(const unsigned char*)(p + 1); + *w = *(const unsigned char*)(p + 0) + (*w << 8); +#else + memcpy(w, p, 2); +#endif + p += 2; + return p; +} + +/* encode 32 bits unsigned int (lsb) */ +static inline char *ikcp_encode32u(char *p, IUINT32 l) +{ +#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN + *(unsigned char*)(p + 0) = (unsigned char)((l >> 0) & 0xff); + *(unsigned char*)(p + 1) = (unsigned char)((l >> 8) & 0xff); + *(unsigned char*)(p + 2) = (unsigned char)((l >> 16) & 0xff); + *(unsigned char*)(p + 3) = (unsigned char)((l >> 24) & 0xff); +#else + memcpy(p, &l, 4); +#endif + p += 4; + return p; +} + +/* decode 32 bits unsigned int (lsb) */ +static inline const char *ikcp_decode32u(const char *p, IUINT32 *l) +{ +#if IWORDS_BIG_ENDIAN || IWORDS_MUST_ALIGN + *l = *(const unsigned char*)(p + 3); + *l = *(const unsigned char*)(p + 2) + (*l << 8); + *l = *(const unsigned char*)(p + 1) + (*l << 8); + *l = *(const unsigned char*)(p + 0) + (*l << 8); +#else + memcpy(l, p, 4); +#endif + p += 4; + return p; +} + +static inline IUINT32 _imin_(IUINT32 a, IUINT32 b) { + return a <= b ? a : b; +} + +static inline IUINT32 _imax_(IUINT32 a, IUINT32 b) { + return a >= b ? a : b; +} + +static inline IUINT32 _ibound_(IUINT32 lower, IUINT32 middle, IUINT32 upper) +{ + return _imin_(_imax_(lower, middle), upper); +} + +static inline long _itimediff(IUINT32 later, IUINT32 earlier) +{ + return ((IINT32)(later - earlier)); +} + +//--------------------------------------------------------------------- +// manage segment +//--------------------------------------------------------------------- +typedef struct IKCPSEG IKCPSEG; + +static void* (*ikcp_malloc_hook)(size_t) = NULL; +static void (*ikcp_free_hook)(void *) = NULL; + +// internal malloc +static void* ikcp_malloc(size_t size) { + if (ikcp_malloc_hook) + return ikcp_malloc_hook(size); + return malloc(size); +} + +// internal free +static void ikcp_free(void *ptr) { + if (ikcp_free_hook) { + ikcp_free_hook(ptr); + } else { + free(ptr); + } +} + +// redefine allocator +void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*)) +{ + ikcp_malloc_hook = new_malloc; + ikcp_free_hook = new_free; +} + +// allocate a new kcp segment +static IKCPSEG* ikcp_segment_new(ikcpcb *kcp, int size) +{ + return (IKCPSEG*)ikcp_malloc(sizeof(IKCPSEG) + size); +} + +// delete a segment +static void ikcp_segment_delete(ikcpcb *kcp, IKCPSEG *seg) +{ + ikcp_free(seg); +} + +// write log +void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...) +{ + char buffer[1024]; + va_list argptr; + if ((mask & kcp->logmask) == 0 || kcp->writelog == 0) return; + va_start(argptr, fmt); + vsprintf(buffer, fmt, argptr); + va_end(argptr); + kcp->writelog(buffer, kcp, kcp->user); +} + +// check log mask +static int ikcp_canlog(const ikcpcb *kcp, int mask) +{ + if ((mask & kcp->logmask) == 0 || kcp->writelog == NULL) return 0; + return 1; +} + +// output segment +static int ikcp_output(ikcpcb *kcp, const void *data, int size) +{ + assert(kcp); + assert(kcp->output); + if (ikcp_canlog(kcp, IKCP_LOG_OUTPUT)) { + ikcp_log(kcp, IKCP_LOG_OUTPUT, "[RO] %ld bytes", (long)size); + } + if (size == 0) return 0; + return kcp->output((const char*)data, size, kcp, kcp->user); +} + +// output queue +void ikcp_qprint(const char *name, const struct IQUEUEHEAD *head) +{ +#if 0 + const struct IQUEUEHEAD *p; + printf("<%s>: [", name); + for (p = head->next; p != head; p = p->next) { + const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node); + printf("(%lu %d)", (unsigned long)seg->sn, (int)(seg->ts % 10000)); + if (p->next != head) printf(","); + } + printf("]\n"); +#endif +} + + +//--------------------------------------------------------------------- +// create a new kcpcb +//--------------------------------------------------------------------- +ikcpcb* ikcp_create(IUINT32 conv, void *user) +{ + ikcpcb *kcp = (ikcpcb*)ikcp_malloc(sizeof(struct IKCPCB)); + if (kcp == NULL) return NULL; + kcp->conv = conv; + kcp->user = user; + kcp->snd_una = 0; + kcp->snd_nxt = 0; + kcp->rcv_nxt = 0; + kcp->ts_recent = 0; + kcp->ts_lastack = 0; + kcp->ts_probe = 0; + kcp->probe_wait = 0; + kcp->snd_wnd = IKCP_WND_SND; + kcp->rcv_wnd = IKCP_WND_RCV; + kcp->rmt_wnd = IKCP_WND_RCV; + kcp->cwnd = 0; + kcp->incr = 0; + kcp->probe = 0; + kcp->mtu = IKCP_MTU_DEF; + kcp->mss = kcp->mtu - IKCP_OVERHEAD; + kcp->stream = 0; + + kcp->buffer = (char*)ikcp_malloc((kcp->mtu + IKCP_OVERHEAD) * 3); + if (kcp->buffer == NULL) { + ikcp_free(kcp); + return NULL; + } + + iqueue_init(&kcp->snd_queue); + iqueue_init(&kcp->rcv_queue); + iqueue_init(&kcp->snd_buf); + iqueue_init(&kcp->rcv_buf); + kcp->nrcv_buf = 0; + kcp->nsnd_buf = 0; + kcp->nrcv_que = 0; + kcp->nsnd_que = 0; + kcp->state = 0; + kcp->acklist = NULL; + kcp->ackblock = 0; + kcp->ackcount = 0; + kcp->ackedlen = 0; + kcp->rx_srtt = 0; + kcp->rx_rttval = 0; + kcp->rx_rto = IKCP_RTO_DEF; + kcp->rx_minrto = IKCP_RTO_MIN; + kcp->current = 0; + kcp->interval = IKCP_INTERVAL; + kcp->ts_flush = IKCP_INTERVAL; + kcp->nodelay = 0; + kcp->updated = 0; + kcp->logmask = 0; + kcp->ssthresh = IKCP_THRESH_INIT; + kcp->fastresend = 0; + kcp->fastlimit = IKCP_FASTACK_LIMIT; + kcp->nocwnd = 0; + kcp->xmit = 0; + kcp->dead_link = IKCP_DEADLINK; + kcp->output = NULL; + kcp->ccops = NULL; + kcp->congest = NULL; + kcp->writelog = NULL; + + return kcp; +} + + +//--------------------------------------------------------------------- +// release a kcpcb +//--------------------------------------------------------------------- +void ikcp_release(ikcpcb *kcp) +{ + IKCPSEG *seg; + assert(kcp); + if (kcp) { + if (kcp->ccops && kcp->ccops->release) { + kcp->ccops->release(kcp); + } + while (!iqueue_is_empty(&kcp->snd_buf)) { + seg = iqueue_entry(kcp->snd_buf.next, IKCPSEG, node); + iqueue_del(&seg->node); + ikcp_segment_delete(kcp, seg); + } + while (!iqueue_is_empty(&kcp->rcv_buf)) { + seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node); + iqueue_del(&seg->node); + ikcp_segment_delete(kcp, seg); + } + while (!iqueue_is_empty(&kcp->snd_queue)) { + seg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node); + iqueue_del(&seg->node); + ikcp_segment_delete(kcp, seg); + } + while (!iqueue_is_empty(&kcp->rcv_queue)) { + seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node); + iqueue_del(&seg->node); + ikcp_segment_delete(kcp, seg); + } + if (kcp->buffer) { + ikcp_free(kcp->buffer); + } + if (kcp->acklist) { + ikcp_free(kcp->acklist); + } + + kcp->nrcv_buf = 0; + kcp->nsnd_buf = 0; + kcp->nrcv_que = 0; + kcp->nsnd_que = 0; + kcp->ackcount = 0; + kcp->buffer = NULL; + kcp->acklist = NULL; + ikcp_free(kcp); + } +} + + +//--------------------------------------------------------------------- +// set output callback, which will be invoked by kcp +//--------------------------------------------------------------------- +void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len, + ikcpcb *kcp, void *user)) +{ + kcp->output = output; +} + + +//--------------------------------------------------------------------- +// upper-level recv: returns size, or a negative value for EAGAIN +//--------------------------------------------------------------------- +int ikcp_recv(ikcpcb *kcp, char *buffer, int len) +{ + struct IQUEUEHEAD *p; + int ispeek = (len < 0)? 1 : 0; + int peeksize; + int recover = 0; + IKCPSEG *seg; + assert(kcp); + + if (iqueue_is_empty(&kcp->rcv_queue)) + return -1; + + if (len < 0) len = -len; + + peeksize = ikcp_peeksize(kcp); + + if (peeksize < 0) + return -2; + + if (peeksize > len) + return -3; + + if (kcp->nrcv_que >= kcp->rcv_wnd) + recover = 1; + + // merge fragment + for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) { + int fragment; + seg = iqueue_entry(p, IKCPSEG, node); + p = p->next; + + if (buffer) { + memcpy(buffer, seg->data, seg->len); + buffer += seg->len; + } + + len += seg->len; + fragment = seg->frg; + + if (ikcp_canlog(kcp, IKCP_LOG_RECV)) { + ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn); + } + + if (ispeek == 0) { + iqueue_del(&seg->node); + ikcp_segment_delete(kcp, seg); + kcp->nrcv_que--; + } + + if (fragment == 0) + break; + } + + assert(len == peeksize); + + // move available data from rcv_buf -> rcv_queue + while (! iqueue_is_empty(&kcp->rcv_buf)) { + seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node); + if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) { + iqueue_del(&seg->node); + kcp->nrcv_buf--; + iqueue_add_tail(&seg->node, &kcp->rcv_queue); + kcp->nrcv_que++; + kcp->rcv_nxt++; + } else { + break; + } + } + + // fast recover + if (kcp->nrcv_que < kcp->rcv_wnd && recover) { + // ready to send back IKCP_CMD_WINS in ikcp_flush + // tell remote my window size + kcp->probe |= IKCP_ASK_TELL; + } + + return len; +} + + +//--------------------------------------------------------------------- +// peek data size +//--------------------------------------------------------------------- +int ikcp_peeksize(const ikcpcb *kcp) +{ + struct IQUEUEHEAD *p; + IKCPSEG *seg; + int length = 0; + + assert(kcp); + + if (iqueue_is_empty(&kcp->rcv_queue)) return -1; + + seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node); + if (seg->frg == 0) return seg->len; + + if (kcp->nrcv_que < seg->frg + 1) return -1; + + for (p = kcp->rcv_queue.next; p != &kcp->rcv_queue; p = p->next) { + seg = iqueue_entry(p, IKCPSEG, node); + length += seg->len; + if (seg->frg == 0) break; + } + + return length; +} + + +//--------------------------------------------------------------------- +// upper-level send: returns size, or a negative value on error +//--------------------------------------------------------------------- +int ikcp_send(ikcpcb *kcp, const char *buffer, int len) +{ + IKCPSEG *seg; + int count, i; + int sent = 0; + + assert(kcp->mss > 0); + if (len < 0) return -1; + + // append to previous segment in streaming mode (if possible) + if (kcp->stream != 0) { + if (!iqueue_is_empty(&kcp->snd_queue)) { + IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node); + if (old->len < kcp->mss) { + int capacity = kcp->mss - old->len; + int extend = (len < capacity)? len : capacity; + seg = ikcp_segment_new(kcp, old->len + extend); + assert(seg); + if (seg == NULL) { + return -2; + } + iqueue_add_tail(&seg->node, &kcp->snd_queue); + memcpy(seg->data, old->data, old->len); + if (buffer) { + memcpy(seg->data + old->len, buffer, extend); + buffer += extend; + } + seg->len = old->len + extend; + seg->frg = 0; + len -= extend; + iqueue_del_init(&old->node); + ikcp_segment_delete(kcp, old); + sent = extend; + } + } + if (len <= 0) { + return sent; + } + } + + if (len <= (int)kcp->mss) count = 1; + else count = (len + kcp->mss - 1) / kcp->mss; + + if (count >= (int)IKCP_WND_RCV) { + if (kcp->stream != 0 && sent > 0) + return sent; + return -2; + } + + if (count == 0) count = 1; + + // fragment + for (i = 0; i < count; i++) { + int size = len > (int)kcp->mss ? (int)kcp->mss : len; + seg = ikcp_segment_new(kcp, size); + assert(seg); + if (seg == NULL) { + return -2; + } + if (buffer && len > 0) { + memcpy(seg->data, buffer, size); + } + seg->len = size; + seg->frg = (kcp->stream == 0)? (count - i - 1) : 0; + iqueue_init(&seg->node); + iqueue_add_tail(&seg->node, &kcp->snd_queue); + kcp->nsnd_que++; + if (buffer) { + buffer += size; + } + len -= size; + sent += size; + } + + return sent; +} + + +//--------------------------------------------------------------------- +// parse ack +//--------------------------------------------------------------------- +static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt) +{ + IINT32 rto = 0; + if (kcp->rx_srtt == 0) { + kcp->rx_srtt = rtt; + kcp->rx_rttval = rtt / 2; + } else { + long delta = rtt - kcp->rx_srtt; + if (delta < 0) delta = -delta; + kcp->rx_rttval = (3 * kcp->rx_rttval + delta) / 4; + kcp->rx_srtt = (7 * kcp->rx_srtt + rtt) / 8; + if (kcp->rx_srtt < 1) kcp->rx_srtt = 1; + } + rto = kcp->rx_srtt + _imax_(kcp->interval, 4 * kcp->rx_rttval); + kcp->rx_rto = _ibound_(kcp->rx_minrto, rto, IKCP_RTO_MAX); + if (kcp->ccops && kcp->ccops->on_rtt) { + kcp->ccops->on_rtt(kcp, rtt); + } +} + +static void ikcp_shrink_buf(ikcpcb *kcp) +{ + struct IQUEUEHEAD *p = kcp->snd_buf.next; + if (p != &kcp->snd_buf) { + IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node); + kcp->snd_una = seg->sn; + } else { + kcp->snd_una = kcp->snd_nxt; + } +} + +static void ikcp_parse_ack(ikcpcb *kcp, IUINT32 sn) +{ + struct IQUEUEHEAD *p, *next; + IINT32 pkt_rtt; + + if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0) + return; + + for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) { + IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node); + next = p->next; + if (sn == seg->sn) { + kcp->ackedlen += seg->len; + if (kcp->ccops && kcp->ccops->on_pkt_acked) { + pkt_rtt = -1; + if (_itimediff(kcp->current, seg->ts) >= 0) { + pkt_rtt = _itimediff(kcp->current, seg->ts); + } + kcp->ccops->on_pkt_acked(kcp, seg->sn, seg->ts, + seg->len, pkt_rtt, seg->xmit); + } + iqueue_del(p); + ikcp_segment_delete(kcp, seg); + kcp->nsnd_buf--; + break; + } + if (_itimediff(sn, seg->sn) < 0) { + break; + } + } +} + +static void ikcp_parse_una(ikcpcb *kcp, IUINT32 una) +{ + struct IQUEUEHEAD *p, *next; + for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) { + IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node); + next = p->next; + if (_itimediff(una, seg->sn) > 0) { + kcp->ackedlen += seg->len; + if (kcp->ccops && kcp->ccops->on_pkt_acked) { + kcp->ccops->on_pkt_acked(kcp, seg->sn, seg->ts, + seg->len, -1, seg->xmit); + } + iqueue_del(p); + ikcp_segment_delete(kcp, seg); + kcp->nsnd_buf--; + } else { + break; + } + } +} + +static void ikcp_parse_fastack(ikcpcb *kcp, IUINT32 sn, IUINT32 ts) +{ + struct IQUEUEHEAD *p, *next; + + if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0) + return; + + for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) { + IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node); + next = p->next; + if (_itimediff(sn, seg->sn) < 0) { + break; + } + else if (sn != seg->sn) { + #ifndef IKCP_FASTACK_CONSERVE + seg->fastack++; + #else + if (_itimediff(ts, seg->ts) >= 0) + seg->fastack++; + #endif + } + } +} + + +//--------------------------------------------------------------------- +// ack append +//--------------------------------------------------------------------- +static void ikcp_ack_push(ikcpcb *kcp, IUINT32 sn, IUINT32 ts) +{ + IUINT32 newsize = kcp->ackcount + 1; + IUINT32 *ptr; + + if (newsize > kcp->ackblock) { + IUINT32 *acklist; + IUINT32 newblock; + + for (newblock = 8; newblock < newsize; newblock <<= 1); + acklist = (IUINT32*)ikcp_malloc(newblock * sizeof(IUINT32) * 2); + + if (acklist == NULL) { + assert(acklist != NULL); + abort(); + } + + if (kcp->acklist != NULL) { + IUINT32 x; + for (x = 0; x < kcp->ackcount; x++) { + acklist[x * 2 + 0] = kcp->acklist[x * 2 + 0]; + acklist[x * 2 + 1] = kcp->acklist[x * 2 + 1]; + } + ikcp_free(kcp->acklist); + } + + kcp->acklist = acklist; + kcp->ackblock = newblock; + } + + ptr = &kcp->acklist[kcp->ackcount * 2]; + ptr[0] = sn; + ptr[1] = ts; + kcp->ackcount++; +} + +static void ikcp_ack_get(const ikcpcb *kcp, int p, IUINT32 *sn, IUINT32 *ts) +{ + if (sn) sn[0] = kcp->acklist[p * 2 + 0]; + if (ts) ts[0] = kcp->acklist[p * 2 + 1]; +} + + +//--------------------------------------------------------------------- +// parse data +//--------------------------------------------------------------------- +void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg) +{ + struct IQUEUEHEAD *p, *prev; + IUINT32 sn = newseg->sn; + int repeat = 0; + + if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 || + _itimediff(sn, kcp->rcv_nxt) < 0) { + ikcp_segment_delete(kcp, newseg); + return; + } + + for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) { + IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node); + prev = p->prev; + if (seg->sn == sn) { + repeat = 1; + break; + } + if (_itimediff(sn, seg->sn) > 0) { + break; + } + } + + if (repeat == 0) { + iqueue_init(&newseg->node); + iqueue_add(&newseg->node, p); + kcp->nrcv_buf++; + } else { + ikcp_segment_delete(kcp, newseg); + } + +#if 0 + ikcp_qprint("rcvbuf", &kcp->rcv_buf); + printf("rcv_nxt=%lu\n", kcp->rcv_nxt); +#endif + + // move available data from rcv_buf -> rcv_queue + while (! iqueue_is_empty(&kcp->rcv_buf)) { + IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node); + if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) { + iqueue_del(&seg->node); + kcp->nrcv_buf--; + iqueue_add_tail(&seg->node, &kcp->rcv_queue); + kcp->nrcv_que++; + kcp->rcv_nxt++; + } else { + break; + } + } + +#if 0 + ikcp_qprint("queue", &kcp->rcv_queue); + printf("rcv_nxt=%lu\n", kcp->rcv_nxt); +#endif + +#if 1 +// printf("snd(buf=%d, queue=%d)\n", kcp->nsnd_buf, kcp->nsnd_que); +// printf("rcv(buf=%d, queue=%d)\n", kcp->nrcv_buf, kcp->nrcv_que); +#endif +} + + +//--------------------------------------------------------------------- +// input data +//--------------------------------------------------------------------- +int ikcp_input(ikcpcb *kcp, const char *data, long size) +{ + IUINT32 prev_una = kcp->snd_una; + IUINT32 prev_nsnd_buf = kcp->nsnd_buf; + IUINT32 acked_segs, prior_in_flight; + IUINT32 maxack = 0, latest_ts = 0; + int flag = 0; + + kcp->ackedlen = 0; + + if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) { + ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size); + } + + if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1; + + while (1) { + IUINT32 ts, sn, len, una, conv; + IUINT16 wnd; + IUINT8 cmd, frg; + IKCPSEG *seg; + + if (size < (int)IKCP_OVERHEAD) break; + + data = ikcp_decode32u(data, &conv); + if (conv != kcp->conv) return -1; + + data = ikcp_decode8u(data, &cmd); + data = ikcp_decode8u(data, &frg); + data = ikcp_decode16u(data, &wnd); + data = ikcp_decode32u(data, &ts); + data = ikcp_decode32u(data, &sn); + data = ikcp_decode32u(data, &una); + data = ikcp_decode32u(data, &len); + + size -= IKCP_OVERHEAD; + + if ((long)size < (long)len || (int)len < 0) return -2; + + if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK && + cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) + return -3; + + kcp->rmt_wnd = wnd; + ikcp_parse_una(kcp, una); + ikcp_shrink_buf(kcp); + + if (cmd == IKCP_CMD_ACK) { + if (_itimediff(kcp->current, ts) >= 0) { + ikcp_update_ack(kcp, _itimediff(kcp->current, ts)); + } + ikcp_parse_ack(kcp, sn); + ikcp_shrink_buf(kcp); + if (flag == 0) { + flag = 1; + maxack = sn; + latest_ts = ts; + } else { + if (_itimediff(sn, maxack) > 0) { + #ifndef IKCP_FASTACK_CONSERVE + maxack = sn; + latest_ts = ts; + #else + if (_itimediff(ts, latest_ts) > 0) { + maxack = sn; + latest_ts = ts; + } + #endif + } + } + if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) { + ikcp_log(kcp, IKCP_LOG_IN_ACK, + "input ack: sn=%lu rtt=%ld rto=%ld", (unsigned long)sn, + (long)_itimediff(kcp->current, ts), + (long)kcp->rx_rto); + } + } + else if (cmd == IKCP_CMD_PUSH) { + if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) { + ikcp_log(kcp, IKCP_LOG_IN_DATA, + "input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts); + } + if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) { + ikcp_ack_push(kcp, sn, ts); + if (_itimediff(sn, kcp->rcv_nxt) >= 0) { + seg = ikcp_segment_new(kcp, len); + seg->conv = conv; + seg->cmd = cmd; + seg->frg = frg; + seg->wnd = wnd; + seg->ts = ts; + seg->sn = sn; + seg->una = una; + seg->len = len; + + if (len > 0) { + memcpy(seg->data, data, len); + } + + ikcp_parse_data(kcp, seg); + } + } + } + else if (cmd == IKCP_CMD_WASK) { + // ready to send back IKCP_CMD_WINS in ikcp_flush + // tell remote my window size + kcp->probe |= IKCP_ASK_TELL; + if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) { + ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe"); + } + } + else if (cmd == IKCP_CMD_WINS) { + // do nothing + if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) { + ikcp_log(kcp, IKCP_LOG_IN_WINS, + "input wins: %lu", (unsigned long)(wnd)); + } + } + else { + return -3; + } + + data += len; + size -= len; + } + + if (flag != 0) { + ikcp_parse_fastack(kcp, maxack, latest_ts); + } + + if (_itimediff(kcp->snd_una, prev_una) > 0) { + acked_segs = kcp->snd_una - prev_una; + prior_in_flight = prev_nsnd_buf; + if (kcp->ccops && kcp->ccops->on_ack) { + kcp->ccops->on_ack(kcp, acked_segs, kcp->ackedlen, + prior_in_flight); + } + else { + if (kcp->cwnd < kcp->rmt_wnd) { + IUINT32 mss = kcp->mss; + if (kcp->cwnd < kcp->ssthresh) { + kcp->cwnd++; + kcp->incr += mss; + } else { + if (kcp->incr < mss) kcp->incr = mss; + kcp->incr += (mss * mss) / kcp->incr + (mss / 16); + if ((kcp->cwnd + 1) * mss <= kcp->incr) { + #if 1 + kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1); + #else + kcp->cwnd++; + #endif + } + } + if (kcp->cwnd > kcp->rmt_wnd) { + kcp->cwnd = kcp->rmt_wnd; + kcp->incr = kcp->rmt_wnd * mss; + } + } + } + } + + return 0; +} + + +//--------------------------------------------------------------------- +// ikcp_encode_seg +//--------------------------------------------------------------------- +static char *ikcp_encode_seg(char *ptr, const IKCPSEG *seg) +{ + ptr = ikcp_encode32u(ptr, seg->conv); + ptr = ikcp_encode8u(ptr, (IUINT8)seg->cmd); + ptr = ikcp_encode8u(ptr, (IUINT8)seg->frg); + ptr = ikcp_encode16u(ptr, (IUINT16)seg->wnd); + ptr = ikcp_encode32u(ptr, seg->ts); + ptr = ikcp_encode32u(ptr, seg->sn); + ptr = ikcp_encode32u(ptr, seg->una); + ptr = ikcp_encode32u(ptr, seg->len); + return ptr; +} + +static int ikcp_wnd_unused(const ikcpcb *kcp) +{ + if (kcp->nrcv_que < kcp->rcv_wnd) { + return kcp->rcv_wnd - kcp->nrcv_que; + } + return 0; +} + + +//--------------------------------------------------------------------- +// ikcp_flush +//--------------------------------------------------------------------- +void ikcp_flush(ikcpcb *kcp) +{ + IUINT32 current = kcp->current; + char *buffer = kcp->buffer; + char *ptr = buffer; + int count, size, i; + IUINT32 resent, cwnd; + IUINT32 rtomin; + IUINT32 prior_cwnd; + IUINT32 eff_cwnd, cur_inflight; + IINT32 pacing_budget = -1; + struct IQUEUEHEAD *p; + int change = 0; + int lost = 0; + IKCPSEG seg; + + // 'ikcp_update' hasn't been called yet. + if (kcp->updated == 0) return; + + if (kcp->ccops && kcp->ccops->on_tick) { + kcp->ccops->on_tick(kcp); + } + + if (kcp->ccops && kcp->ccops->pacing_rate) { + pacing_budget = (IINT32)kcp->ccops->pacing_rate(kcp); + } + + prior_cwnd = kcp->cwnd; + + seg.conv = kcp->conv; + seg.cmd = IKCP_CMD_ACK; + seg.frg = 0; + seg.wnd = ikcp_wnd_unused(kcp); + seg.una = kcp->rcv_nxt; + seg.len = 0; + seg.sn = 0; + seg.ts = 0; + + // flush acknowledges + count = kcp->ackcount; + for (i = 0; i < count; i++) { + size = (int)(ptr - buffer); + if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { + ikcp_output(kcp, buffer, size); + ptr = buffer; + } + ikcp_ack_get(kcp, i, &seg.sn, &seg.ts); + ptr = ikcp_encode_seg(ptr, &seg); + } + + kcp->ackcount = 0; + + // probe window size (if remote window size equals zero) + if (kcp->rmt_wnd == 0) { + if (kcp->probe_wait == 0) { + kcp->probe_wait = IKCP_PROBE_INIT; + kcp->ts_probe = kcp->current + kcp->probe_wait; + } + else { + if (_itimediff(kcp->current, kcp->ts_probe) >= 0) { + if (kcp->probe_wait < IKCP_PROBE_INIT) + kcp->probe_wait = IKCP_PROBE_INIT; + kcp->probe_wait += kcp->probe_wait / 2; + if (kcp->probe_wait > IKCP_PROBE_LIMIT) + kcp->probe_wait = IKCP_PROBE_LIMIT; + kcp->ts_probe = kcp->current + kcp->probe_wait; + kcp->probe |= IKCP_ASK_SEND; + } + } + } else { + kcp->ts_probe = 0; + kcp->probe_wait = 0; + } + + // flush window probing commands + if (kcp->probe & IKCP_ASK_SEND) { + seg.cmd = IKCP_CMD_WASK; + size = (int)(ptr - buffer); + if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { + ikcp_output(kcp, buffer, size); + ptr = buffer; + } + ptr = ikcp_encode_seg(ptr, &seg); + } + + // flush window probing commands + if (kcp->probe & IKCP_ASK_TELL) { + seg.cmd = IKCP_CMD_WINS; + size = (int)(ptr - buffer); + if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { + ikcp_output(kcp, buffer, size); + ptr = buffer; + } + ptr = ikcp_encode_seg(ptr, &seg); + } + + kcp->probe = 0; + + // calculate window size + cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd); + if (kcp->ccops != NULL || kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd); + + // move data from snd_queue to snd_buf + while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) { + IKCPSEG *newseg; + if (iqueue_is_empty(&kcp->snd_queue)) break; + + newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node); + + iqueue_del(&newseg->node); + iqueue_add_tail(&newseg->node, &kcp->snd_buf); + kcp->nsnd_que--; + kcp->nsnd_buf++; + + newseg->conv = kcp->conv; + newseg->cmd = IKCP_CMD_PUSH; + newseg->wnd = seg.wnd; + newseg->ts = current; + newseg->sn = kcp->snd_nxt++; + newseg->una = kcp->rcv_nxt; + newseg->resendts = current; + newseg->rto = kcp->rx_rto; + newseg->fastack = 0; + newseg->xmit = 0; + } + + // check on_app_limited + if (kcp->ccops && kcp->ccops->on_app_limited) { + if (iqueue_is_empty(&kcp->snd_queue)) { + eff_cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd); + eff_cwnd = _imin_(kcp->cwnd, eff_cwnd); + cur_inflight = kcp->nsnd_buf; + if (cur_inflight < eff_cwnd) { + kcp->ccops->on_app_limited(kcp, cur_inflight); + } + } + } + + // calculate resent + resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff; + rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0; + + // flush data segments + for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) { + IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node); + int needsend = 0; + if (segment->xmit == 0) { + needsend = 1; + segment->xmit++; + segment->rto = kcp->rx_rto; + segment->resendts = current + segment->rto + rtomin; + } + else if (_itimediff(current, segment->resendts) >= 0) { + needsend = 1; + segment->xmit++; + kcp->xmit++; + if (kcp->nodelay == 0) { + segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto); + } else { + IINT32 step = (kcp->nodelay < 2)? + ((IINT32)(segment->rto)) : kcp->rx_rto; + segment->rto += step / 2; + } + segment->resendts = current + segment->rto; + lost = 1; + } + else if (segment->fastack >= resent) { + if ((int)segment->xmit <= kcp->fastlimit || + kcp->fastlimit <= 0) { + needsend = 1; + segment->xmit++; + segment->fastack = 0; + segment->resendts = current + segment->rto; + change++; + } + } + + if (needsend) { + int need; + segment->ts = current; + segment->wnd = seg.wnd; + segment->una = kcp->rcv_nxt; + + if (pacing_budget >= 0 && pacing_budget < (IINT32)segment->len) { + break; + } + + if (kcp->ccops && kcp->ccops->on_pkt_sent) { + kcp->ccops->on_pkt_sent(kcp, segment->sn, current, + segment->len, kcp->nsnd_buf, segment->xmit); + } + + size = (int)(ptr - buffer); + need = IKCP_OVERHEAD + segment->len; + + if (size + need > (int)kcp->mtu) { + ikcp_output(kcp, buffer, size); + ptr = buffer; + } + + ptr = ikcp_encode_seg(ptr, segment); + + if (segment->len > 0) { + memcpy(ptr, segment->data, segment->len); + ptr += segment->len; + } + + if (pacing_budget >= 0) { + pacing_budget -= (IINT32)segment->len; + } + + if (segment->xmit >= kcp->dead_link) { + kcp->state = (IUINT32)-1; + } + } + } + + // flash remaining segments + size = (int)(ptr - buffer); + if (size > 0) { + ikcp_output(kcp, buffer, size); + } + + // update ssthresh + if (change) { + if (kcp->ccops && kcp->ccops->on_fast_retransmit) { + kcp->ccops->on_fast_retransmit(kcp, (IUINT32)change, + kcp->nsnd_buf, prior_cwnd); + } + else { + IUINT32 inflight = kcp->snd_nxt - kcp->snd_una; + kcp->ssthresh = inflight / 2; + if (kcp->ssthresh < IKCP_THRESH_MIN) + kcp->ssthresh = IKCP_THRESH_MIN; + kcp->cwnd = kcp->ssthresh + resent; + kcp->incr = kcp->cwnd * kcp->mss; + } + } + + if (lost) { + if (kcp->ccops && kcp->ccops->on_timeout) { + kcp->ccops->on_timeout(kcp, prior_cwnd); + } + else { + kcp->ssthresh = prior_cwnd / 2; + if (kcp->ssthresh < IKCP_THRESH_MIN) + kcp->ssthresh = IKCP_THRESH_MIN; + kcp->cwnd = 1; + kcp->incr = kcp->mss; + } + } + + if (kcp->cwnd < 1) { + kcp->cwnd = 1; + kcp->incr = kcp->mss; + } +} + + +//--------------------------------------------------------------------- +// update state (call it repeatedly, every 10ms-100ms), or you can ask +// ikcp_check when to call it again (if no ikcp_input/_send calls occur). +// 'current' - current timestamp in milliseconds. +//--------------------------------------------------------------------- +void ikcp_update(ikcpcb *kcp, IUINT32 current) +{ + IINT32 slap; + + kcp->current = current; + + if (kcp->updated == 0) { + kcp->updated = 1; + kcp->ts_flush = kcp->current; + } + + slap = _itimediff(kcp->current, kcp->ts_flush); + + if (slap >= 10000 || slap < -10000) { + kcp->ts_flush = kcp->current; + slap = 0; + } + + if (slap >= 0) { + kcp->ts_flush += kcp->interval; + if (_itimediff(kcp->current, kcp->ts_flush) >= 0) { + kcp->ts_flush = kcp->current + kcp->interval; + } + ikcp_flush(kcp); + } +} + + +//--------------------------------------------------------------------- +// Determines when you should invoke ikcp_update next: +// returns the timestamp (in milliseconds) at which you should call +// ikcp_update, assuming no ikcp_input/_send calls occur in between. +// You can call ikcp_update at that time instead of calling it repeatedly. +// Important for reducing unnecessary ikcp_update invocations. Use it to +// schedule ikcp_update (e.g., implementing an epoll-like mechanism, +// or optimizing ikcp_update when handling massive kcp connections). +//--------------------------------------------------------------------- +IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current) +{ + IUINT32 ts_flush = kcp->ts_flush; + IINT32 tm_flush = 0x7fffffff; + IINT32 tm_packet = 0x7fffffff; + IUINT32 minimal = 0; + struct IQUEUEHEAD *p; + + if (kcp->updated == 0) { + return current; + } + + if (_itimediff(current, ts_flush) >= 10000 || + _itimediff(current, ts_flush) < -10000) { + ts_flush = current; + } + + if (_itimediff(current, ts_flush) >= 0) { + return current; + } + + tm_flush = _itimediff(ts_flush, current); + + for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) { + const IKCPSEG *seg = iqueue_entry(p, const IKCPSEG, node); + IINT32 diff = _itimediff(seg->resendts, current); + if (diff <= 0) { + return current; + } + if (diff < tm_packet) tm_packet = diff; + } + + minimal = (IUINT32)(tm_packet < tm_flush ? tm_packet : tm_flush); + if (minimal >= kcp->interval) minimal = kcp->interval; + + return current + minimal; +} + + + +int ikcp_setmtu(ikcpcb *kcp, int mtu) +{ + char *buffer; + if (mtu < 50 || mtu < (int)IKCP_OVERHEAD) + return -1; + buffer = (char*)ikcp_malloc((mtu + IKCP_OVERHEAD) * 3); + if (buffer == NULL) + return -2; + kcp->mtu = mtu; + kcp->mss = kcp->mtu - IKCP_OVERHEAD; + ikcp_free(kcp->buffer); + kcp->buffer = buffer; + return 0; +} + +int ikcp_interval(ikcpcb *kcp, int interval) +{ + if (interval > 5000) interval = 5000; + else if (interval < 10) interval = 10; + kcp->interval = interval; + return 0; +} + +int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc) +{ + if (nodelay >= 0) { + kcp->nodelay = nodelay; + if (nodelay) { + kcp->rx_minrto = IKCP_RTO_NDL; + } + else { + kcp->rx_minrto = IKCP_RTO_MIN; + } + } + if (interval >= 0) { + if (interval > 5000) interval = 5000; + else if (interval < 10) interval = 10; + kcp->interval = interval; + } + if (resend >= 0) { + kcp->fastresend = resend; + } + if (nc >= 0) { + kcp->nocwnd = nc; + } + return 0; +} + + +int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd) +{ + if (kcp) { + if (sndwnd > 0) { + kcp->snd_wnd = sndwnd; + } + if (rcvwnd > 0) { // must >= max fragment size + kcp->rcv_wnd = _imax_(rcvwnd, IKCP_WND_RCV); + } + } + return 0; +} + +int ikcp_waitsnd(const ikcpcb *kcp) +{ + return kcp->nsnd_buf + kcp->nsnd_que; +} + + +// read conv +IUINT32 ikcp_getconv(const void *ptr) +{ + IUINT32 conv; + ikcp_decode32u((const char*)ptr, &conv); + return conv; +} + + +//--------------------------------------------------------------------- +// install congestion control +//--------------------------------------------------------------------- +int ikcp_setcc(ikcpcb *kcp, const struct IKCPOPS *ops) +{ + assert(kcp); + if (kcp->ccops && kcp->ccops->release) { + kcp->ccops->release(kcp); + } + kcp->congest = NULL; + kcp->ccops = ops; + if (ops) { + if (ops->init) { + if (ops->init(kcp) < 0) { + kcp->ccops = NULL; + kcp->congest = NULL; + if (kcp->cwnd < 1) kcp->cwnd = 1; + kcp->incr = kcp->cwnd * kcp->mss; + return -1; + } + } + } + else { + if (kcp->cwnd < 1) kcp->cwnd = 1; + kcp->incr = kcp->cwnd * kcp->mss; + if (kcp->incr < kcp->mss) kcp->incr = kcp->mss; + } + return 0; +} + + + diff --git a/native/kcp/ikcp.h b/native/kcp/ikcp.h new file mode 100644 index 0000000..a7447c1 --- /dev/null +++ b/native/kcp/ikcp.h @@ -0,0 +1,452 @@ +//===================================================================== +// +// KCP - A Better ARQ Protocol Implementation +// skywind3000 (at) gmail.com, 2010-2011 +// +// Features: +// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp. +// + Maximum RTT reduce three times vs tcp. +// + Lightweight, distributed as a single source file. +// +//===================================================================== +#ifndef _IKCP_H_ +#define _IKCP_H_ + +#include +#include +#include + + +//===================================================================== +// 32BIT INTEGER DEFINITION +//===================================================================== +#ifndef __INTEGER_32_BITS__ +#define __INTEGER_32_BITS__ +#if defined(_WIN64) || defined(WIN64) || defined(__amd64__) || \ + defined(__x86_64) || defined(__x86_64__) || defined(_M_IA64) || \ + defined(_M_AMD64) + typedef unsigned int ISTDUINT32; + typedef int ISTDINT32; +#elif defined(_WIN32) || defined(WIN32) || defined(__i386__) || \ + defined(__i386) || defined(_M_X86) + typedef unsigned long ISTDUINT32; + typedef long ISTDINT32; +#elif defined(__MACOS__) + typedef UInt32 ISTDUINT32; + typedef SInt32 ISTDINT32; +#elif defined(__APPLE__) && defined(__MACH__) + #include + typedef u_int32_t ISTDUINT32; + typedef int32_t ISTDINT32; +#elif defined(__BEOS__) + #include + typedef u_int32_t ISTDUINT32; + typedef int32_t ISTDINT32; +#elif (defined(_MSC_VER) || defined(__BORLANDC__)) && (!defined(__MSDOS__)) + typedef unsigned __int32 ISTDUINT32; + typedef __int32 ISTDINT32; +#elif defined(__GNUC__) + #include + typedef uint32_t ISTDUINT32; + typedef int32_t ISTDINT32; +#else + typedef unsigned long ISTDUINT32; + typedef long ISTDINT32; +#endif +#endif + + +//===================================================================== +// Integer Definition +//===================================================================== +#ifndef __IINT8_DEFINED +#define __IINT8_DEFINED +typedef char IINT8; +#endif + +#ifndef __IUINT8_DEFINED +#define __IUINT8_DEFINED +typedef unsigned char IUINT8; +#endif + +#ifndef __IUINT16_DEFINED +#define __IUINT16_DEFINED +typedef unsigned short IUINT16; +#endif + +#ifndef __IINT16_DEFINED +#define __IINT16_DEFINED +typedef short IINT16; +#endif + +#ifndef __IINT32_DEFINED +#define __IINT32_DEFINED +typedef ISTDINT32 IINT32; +#endif + +#ifndef __IUINT32_DEFINED +#define __IUINT32_DEFINED +typedef ISTDUINT32 IUINT32; +#endif + +#ifndef __IINT64_DEFINED +#define __IINT64_DEFINED +#if defined(_MSC_VER) || defined(__BORLANDC__) +typedef __int64 IINT64; +#else +typedef long long IINT64; +#endif +#endif + +#ifndef __IUINT64_DEFINED +#define __IUINT64_DEFINED +#if defined(_MSC_VER) || defined(__BORLANDC__) +typedef unsigned __int64 IUINT64; +#else +typedef unsigned long long IUINT64; +#endif +#endif + +#ifndef INLINE +#if defined(__GNUC__) + +#if (__GNUC__ > 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ >= 1)) +#define INLINE __inline__ __attribute__((always_inline)) +#else +#define INLINE __inline__ +#endif + +#elif (defined(_MSC_VER) || defined(__BORLANDC__) || defined(__WATCOMC__)) +#define INLINE __inline +#else +#define INLINE +#endif +#endif + +#if (!defined(__cplusplus)) && (!defined(inline)) +#define inline INLINE +#endif + + +//===================================================================== +// QUEUE DEFINITION +//===================================================================== +#ifndef __IQUEUE_DEF__ +#define __IQUEUE_DEF__ + +struct IQUEUEHEAD { + struct IQUEUEHEAD *next, *prev; +}; + +typedef struct IQUEUEHEAD iqueue_head; + + +//--------------------------------------------------------------------- +// queue init +//--------------------------------------------------------------------- +#define IQUEUE_HEAD_INIT(name) { &(name), &(name) } +#define IQUEUE_HEAD(name) \ + struct IQUEUEHEAD name = IQUEUE_HEAD_INIT(name) + +#define IQUEUE_INIT(ptr) ( \ + (ptr)->next = (ptr), (ptr)->prev = (ptr)) + +#define IOFFSETOF(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) + +#define ICONTAINEROF(ptr, type, member) ( \ + (type*)( ((char*)((type*)ptr)) - IOFFSETOF(type, member)) ) + +#define IQUEUE_ENTRY(ptr, type, member) ICONTAINEROF(ptr, type, member) + + +//--------------------------------------------------------------------- +// queue operation +//--------------------------------------------------------------------- +#define IQUEUE_ADD(node, head) ( \ + (node)->prev = (head), (node)->next = (head)->next, \ + (head)->next->prev = (node), (head)->next = (node)) + +#define IQUEUE_ADD_TAIL(node, head) ( \ + (node)->prev = (head)->prev, (node)->next = (head), \ + (head)->prev->next = (node), (head)->prev = (node)) + +#define IQUEUE_DEL_BETWEEN(p, n) ((n)->prev = (p), (p)->next = (n)) + +#define IQUEUE_DEL(entry) (\ + (entry)->next->prev = (entry)->prev, \ + (entry)->prev->next = (entry)->next, \ + (entry)->next = 0, (entry)->prev = 0) + +#define IQUEUE_DEL_INIT(entry) do { \ + IQUEUE_DEL(entry); IQUEUE_INIT(entry); } while (0) + +#define IQUEUE_IS_EMPTY(entry) ((entry) == (entry)->next) + +#define iqueue_init IQUEUE_INIT +#define iqueue_entry IQUEUE_ENTRY +#define iqueue_add IQUEUE_ADD +#define iqueue_add_tail IQUEUE_ADD_TAIL +#define iqueue_del IQUEUE_DEL +#define iqueue_del_init IQUEUE_DEL_INIT +#define iqueue_is_empty IQUEUE_IS_EMPTY + +#define IQUEUE_FOREACH(iterator, head, TYPE, MEMBER) \ + for ((iterator) = iqueue_entry((head)->next, TYPE, MEMBER); \ + &((iterator)->MEMBER) != (head); \ + (iterator) = iqueue_entry((iterator)->MEMBER.next, TYPE, MEMBER)) + +#define iqueue_foreach(iterator, head, TYPE, MEMBER) \ + IQUEUE_FOREACH(iterator, head, TYPE, MEMBER) + +#define iqueue_foreach_entry(pos, head) \ + for( (pos) = (head)->next; (pos) != (head) ; (pos) = (pos)->next ) + + +#define __iqueue_splice(list, head) do { \ + iqueue_head *first = (list)->next, *last = (list)->prev; \ + iqueue_head *at = (head)->next; \ + (first)->prev = (head), (head)->next = (first); \ + (last)->next = (at), (at)->prev = (last); } while (0) + +#define iqueue_splice(list, head) do { \ + if (!iqueue_is_empty(list)) __iqueue_splice(list, head); } while (0) + +#define iqueue_splice_init(list, head) do { \ + iqueue_splice(list, head); iqueue_init(list); } while (0) + + +#ifdef _MSC_VER +#pragma warning(disable:4311) +#pragma warning(disable:4312) +#pragma warning(disable:4996) +#endif + +#endif + + +//--------------------------------------------------------------------- +// BYTE ORDER & ALIGNMENT +//--------------------------------------------------------------------- +#ifndef IWORDS_BIG_ENDIAN + #ifdef _BIG_ENDIAN_ + #if _BIG_ENDIAN_ + #define IWORDS_BIG_ENDIAN 1 + #endif + #endif + #ifndef IWORDS_BIG_ENDIAN + #if defined(__hppa__) || \ + defined(__m68k__) || defined(mc68000) || defined(_M_M68K) || \ + (defined(__MIPS__) && defined(__MIPSEB__)) || \ + defined(__ppc__) || defined(__POWERPC__) || defined(_M_PPC) || \ + defined(__sparc__) || defined(__powerpc__) || \ + defined(__mc68000__) || defined(__s390x__) || defined(__s390__) + #define IWORDS_BIG_ENDIAN 1 + #endif + #endif + #ifndef IWORDS_BIG_ENDIAN + #define IWORDS_BIG_ENDIAN 0 + #endif +#endif + +#ifndef IWORDS_MUST_ALIGN + #if defined(__i386__) || defined(__i386) || defined(_i386_) + #define IWORDS_MUST_ALIGN 0 + #elif defined(_M_IX86) || defined(_X86_) || defined(__x86_64__) + #define IWORDS_MUST_ALIGN 0 + #elif defined(__amd64) || defined(__amd64__) + #define IWORDS_MUST_ALIGN 0 + #else + #define IWORDS_MUST_ALIGN 1 + #endif +#endif + + +//===================================================================== +// Predefine struct +//===================================================================== +struct IKCPCB; +typedef struct IKCPCB ikcpcb; + + +//===================================================================== +// SEGMENT +//===================================================================== +struct IKCPSEG +{ + struct IQUEUEHEAD node; + IUINT32 conv; + IUINT32 cmd; + IUINT32 frg; + IUINT32 wnd; + IUINT32 ts; + IUINT32 sn; + IUINT32 una; + IUINT32 len; + IUINT32 resendts; + IUINT32 rto; + IUINT32 fastack; + IUINT32 xmit; + char data[1]; +}; + + +//--------------------------------------------------------------------- +// IKCPOPS - pluggable congestion control operations +//--------------------------------------------------------------------- +struct IKCPOPS +{ + const char *name; + int (*init)(ikcpcb *kcp); + void (*release)(ikcpcb *kcp); + void (*on_ack)(ikcpcb *kcp, IUINT32 acked_segs, IUINT32 acked_bytes, + IUINT32 prior_in_flight); + void (*on_fast_retransmit)(ikcpcb *kcp, IUINT32 fast_retrans, + IUINT32 inflight, IUINT32 prior_cwnd); + void (*on_timeout)(ikcpcb *kcp, IUINT32 prior_cwnd); + void (*on_tick)(ikcpcb *kcp); + void (*on_app_limited)(ikcpcb *kcp, IUINT32 inflight); + void (*on_rtt)(ikcpcb *kcp, IINT32 rtt); + void (*on_pkt_sent)(ikcpcb *kcp, IUINT32 sn, IUINT32 ts, + IUINT32 len, IUINT32 inflight, IUINT32 xmit); + void (*on_pkt_acked)(ikcpcb *kcp, IUINT32 sn, IUINT32 ts, + IUINT32 len, IINT32 rtt, IUINT32 xmit); + IUINT32 (*get_info)(ikcpcb *kcp, void *buf, IUINT32 bufsize); + IUINT32 (*pacing_rate)(ikcpcb *kcp); +}; + + +//--------------------------------------------------------------------- +// IKCPCB +//--------------------------------------------------------------------- +struct IKCPCB +{ + IUINT32 conv, mtu, mss, state; + IUINT32 snd_una, snd_nxt, rcv_nxt; + IUINT32 ts_recent, ts_lastack, ssthresh; + IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto; + IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe; + IUINT32 current, interval, ts_flush, xmit; + IUINT32 nrcv_buf, nsnd_buf; + IUINT32 nrcv_que, nsnd_que; + IUINT32 nodelay, updated; + IUINT32 ts_probe, probe_wait; + IUINT32 dead_link, incr; + struct IQUEUEHEAD snd_queue; + struct IQUEUEHEAD rcv_queue; + struct IQUEUEHEAD snd_buf; + struct IQUEUEHEAD rcv_buf; + IUINT32 *acklist; + IUINT32 ackcount; + IUINT32 ackblock; + IUINT32 ackedlen; + void *user; + char *buffer; + int fastresend; + int fastlimit; + int nocwnd, stream; + const struct IKCPOPS *ccops; + void *congest; + int logmask; + int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user); + void (*writelog)(const char *log, struct IKCPCB *kcp, void *user); +}; + + +#define IKCP_LOG_OUTPUT 1 +#define IKCP_LOG_INPUT 2 +#define IKCP_LOG_SEND 4 +#define IKCP_LOG_RECV 8 +#define IKCP_LOG_IN_DATA 16 +#define IKCP_LOG_IN_ACK 32 +#define IKCP_LOG_IN_PROBE 64 +#define IKCP_LOG_IN_WINS 128 +#define IKCP_LOG_OUT_DATA 256 +#define IKCP_LOG_OUT_ACK 512 +#define IKCP_LOG_OUT_PROBE 1024 +#define IKCP_LOG_OUT_WINS 2048 + +#ifdef __cplusplus +extern "C" { +#endif + +//--------------------------------------------------------------------- +// interface +//--------------------------------------------------------------------- + +// create a new kcp control object, 'conv' must be equal in both endpoints +// of the same connection. 'user' will be passed to the output callback. +// output callback can be set up like this: 'kcp->output = my_udp_output' +ikcpcb* ikcp_create(IUINT32 conv, void *user); + +// release kcp control object +void ikcp_release(ikcpcb *kcp); + +// set output callback, which will be invoked by kcp +void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len, + ikcpcb *kcp, void *user)); + +// user/upper level recv: returns size, returns below zero for EAGAIN +int ikcp_recv(ikcpcb *kcp, char *buffer, int len); + +// user/upper level send, returns below zero for error +int ikcp_send(ikcpcb *kcp, const char *buffer, int len); + +// update state (call it repeatedly, every 10ms-100ms), or you can ask +// ikcp_check when to call it again (without ikcp_input/_send calling). +// 'current' - current timestamp in millisec. +void ikcp_update(ikcpcb *kcp, IUINT32 current); + +// Determines when you should invoke ikcp_update next: +// returns the timestamp (in milliseconds) at which you should call +// ikcp_update, assuming no ikcp_input/_send calls occur in between. +// You can call ikcp_update at that time instead of calling it repeatedly. +// Important for reducing unnecessary ikcp_update invocations. Use it to +// schedule ikcp_update (e.g., implementing an epoll-like mechanism, +// or optimizing ikcp_update when handling massive kcp connections). +IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current); + +// when you receive a low-level packet (e.g., UDP packet), call this +int ikcp_input(ikcpcb *kcp, const char *data, long size); + +// flush pending data +void ikcp_flush(ikcpcb *kcp); + +// check the size of next message in the recv queue +int ikcp_peeksize(const ikcpcb *kcp); + +// change MTU size, default is 1400 +int ikcp_setmtu(ikcpcb *kcp, int mtu); + +// set maximum window size: sndwnd=32, rcvwnd=32 by default +int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd); + +// get how many packets are waiting to be sent +int ikcp_waitsnd(const ikcpcb *kcp); + +// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1) +// nodelay: 0:disable (default), 1:enable +// interval: internal update timer interval in ms, default is 100ms +// resend: 0:disable fast resend (default), 1:enable fast resend +// nc: 0:normal congestion control (default), 1:disable congestion control +int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc); + +// install congestion control algorithm, NULL restores builtin +int ikcp_setcc(ikcpcb *kcp, const struct IKCPOPS *ops); + +// write log with kcp->writelog +void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...); + +// setup allocator +void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*)); + +// read conv +IUINT32 ikcp_getconv(const void *ptr); + + +#ifdef __cplusplus +} +#endif + +#endif + + diff --git a/native/kcp/laylink_kcp.c b/native/kcp/laylink_kcp.c new file mode 100644 index 0000000..1c1e133 --- /dev/null +++ b/native/kcp/laylink_kcp.c @@ -0,0 +1,193 @@ +#include "laylink_kcp.h" +#include "ikcp.h" + +#include +#include + +typedef struct laylink_packet { + int len; + char* data; + struct laylink_packet* next; +} laylink_packet; + +struct laylink_kcp { + ikcpcb* kcp; + laylink_packet* output_head; + laylink_packet* output_tail; +}; + +static void laylink_packet_free(laylink_packet* packet) { + if (packet == NULL) { + return; + } + free(packet->data); + free(packet); +} + +static int laylink_kcp_output(const char* buffer, int len, ikcpcb* kcp, void* user) { + (void)kcp; + laylink_kcp* session = (laylink_kcp*)user; + if (session == NULL || len <= 0) { + return -1; + } + + laylink_packet* packet = (laylink_packet*)malloc(sizeof(laylink_packet)); + if (packet == NULL) { + return -2; + } + + packet->data = (char*)malloc((size_t)len); + if (packet->data == NULL) { + free(packet); + return -3; + } + + memcpy(packet->data, buffer, (size_t)len); + packet->len = len; + packet->next = NULL; + + if (session->output_tail != NULL) { + session->output_tail->next = packet; + } else { + session->output_head = packet; + } + session->output_tail = packet; + + return 0; +} + +laylink_kcp* laylink_kcp_create(unsigned int conv) { + laylink_kcp* session = (laylink_kcp*)calloc(1, sizeof(laylink_kcp)); + if (session == NULL) { + return NULL; + } + + session->kcp = ikcp_create(conv, session); + if (session->kcp == NULL) { + free(session); + return NULL; + } + + session->kcp->output = laylink_kcp_output; + ikcp_nodelay(session->kcp, 1, 10, 2, 1); + ikcp_wndsize(session->kcp, 1024, 1024); + ikcp_setmtu(session->kcp, 1350); + + return session; +} + +void laylink_kcp_release(laylink_kcp* session) { + if (session == NULL) { + return; + } + + laylink_packet* packet = session->output_head; + while (packet != NULL) { + laylink_packet* next = packet->next; + laylink_packet_free(packet); + packet = next; + } + + if (session->kcp != NULL) { + ikcp_release(session->kcp); + } + free(session); +} + +int laylink_kcp_nodelay(laylink_kcp* session, int nodelay, int interval, int resend, int nc) { + if (session == NULL || session->kcp == NULL) { + return -1; + } + return ikcp_nodelay(session->kcp, nodelay, interval, resend, nc); +} + +int laylink_kcp_wndsize(laylink_kcp* session, int sndwnd, int rcvwnd) { + if (session == NULL || session->kcp == NULL) { + return -1; + } + return ikcp_wndsize(session->kcp, sndwnd, rcvwnd); +} + +int laylink_kcp_setmtu(laylink_kcp* session, int mtu) { + if (session == NULL || session->kcp == NULL) { + return -1; + } + return ikcp_setmtu(session->kcp, mtu); +} + +int laylink_kcp_send(laylink_kcp* session, const char* buffer, int len) { + if (session == NULL || session->kcp == NULL) { + return -1; + } + return ikcp_send(session->kcp, buffer, len); +} + +int laylink_kcp_input(laylink_kcp* session, const char* buffer, long size) { + if (session == NULL || session->kcp == NULL) { + return -1; + } + return ikcp_input(session->kcp, buffer, size); +} + +void laylink_kcp_update(laylink_kcp* session, unsigned int current) { + if (session == NULL || session->kcp == NULL) { + return; + } + ikcp_update(session->kcp, current); +} + +unsigned int laylink_kcp_check(laylink_kcp* session, unsigned int current) { + if (session == NULL || session->kcp == NULL) { + return current + 10; + } + return ikcp_check(session->kcp, current); +} + +int laylink_kcp_peeksize(laylink_kcp* session) { + if (session == NULL || session->kcp == NULL) { + return -1; + } + return ikcp_peeksize(session->kcp); +} + +int laylink_kcp_recv(laylink_kcp* session, char* buffer, int len) { + if (session == NULL || session->kcp == NULL) { + return -1; + } + return ikcp_recv(session->kcp, buffer, len); +} + +void laylink_kcp_flush(laylink_kcp* session) { + if (session == NULL || session->kcp == NULL) { + return; + } + ikcp_flush(session->kcp); +} + +int laylink_kcp_pending_output_size(laylink_kcp* session) { + if (session == NULL || session->output_head == NULL) { + return 0; + } + return session->output_head->len; +} + +int laylink_kcp_pop_output(laylink_kcp* session, char* buffer, int len) { + if (session == NULL || session->output_head == NULL) { + return 0; + } + + laylink_packet* packet = session->output_head; + if (len < packet->len) { + return -packet->len; + } + + memcpy(buffer, packet->data, (size_t)packet->len); + session->output_head = packet->next; + if (session->output_head == NULL) { + session->output_tail = NULL; + } + + int packet_len = packet->len; + laylink_packet_free(packet); + return packet_len; +} diff --git a/native/kcp/laylink_kcp.h b/native/kcp/laylink_kcp.h new file mode 100644 index 0000000..6e0a7ba --- /dev/null +++ b/native/kcp/laylink_kcp.h @@ -0,0 +1,29 @@ +#ifndef LAYLINK_KCP_H +#define LAYLINK_KCP_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct laylink_kcp laylink_kcp; + +laylink_kcp* laylink_kcp_create(unsigned int conv); +void laylink_kcp_release(laylink_kcp* session); +int laylink_kcp_nodelay(laylink_kcp* session, int nodelay, int interval, int resend, int nc); +int laylink_kcp_wndsize(laylink_kcp* session, int sndwnd, int rcvwnd); +int laylink_kcp_setmtu(laylink_kcp* session, int mtu); +int laylink_kcp_send(laylink_kcp* session, const char* buffer, int len); +int laylink_kcp_input(laylink_kcp* session, const char* buffer, long size); +void laylink_kcp_update(laylink_kcp* session, unsigned int current); +unsigned int laylink_kcp_check(laylink_kcp* session, unsigned int current); +int laylink_kcp_peeksize(laylink_kcp* session); +int laylink_kcp_recv(laylink_kcp* session, char* buffer, int len); +void laylink_kcp_flush(laylink_kcp* session); +int laylink_kcp_pending_output_size(laylink_kcp* session); +int laylink_kcp_pop_output(laylink_kcp* session, char* buffer, int len); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/readme.md b/readme.md index 7eb5568..48e4452 100644 --- a/readme.md +++ b/readme.md @@ -74,7 +74,7 @@ POP Server 需要配置这些 `.env`: ```env APP_ENV=dev POP_AGENT_LISTEN=0.0.0.0:9001 -POP_ALLOWED_AGENT_TRANSPORTS=tcp +POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp AUDIT_LOG=runtime/audit.log LOG_LEVEL=debug ``` @@ -87,7 +87,7 @@ LOG_LEVEL=debug | `LAYLINK_FRAME_ENCRYPTION` | Agent 与 POP Server 之间 Frame 加密方式,两端必须一致。 | `none`、`chacha20` | | `LAYLINK_FRAME_ENCRYPTION_KEY` | Frame 加密密钥,启用 `chacha20` 时必填。 | 普通口令、`hex:...`、`base64:...` | | `POP_AGENT_LISTEN` | POP Server 给 Client Agent 连接的监听地址。Agent 的 `POP_SERVER_ADDRESS` 应指向这里。 | `0.0.0.0:9001`、`127.0.0.1:9001` | -| `POP_ALLOWED_AGENT_TRANSPORTS` | POP Server 允许 Agent 使用的底层传输协议,逗号分隔。Agent 认证时会上报自己的选择,不在列表内会被拒绝。 | `tcp`、`tcp,kcp`、`tcp,udp,kcp` | +| `POP_ALLOWED_AGENT_TRANSPORTS` | POP Server 允许 Agent 使用的底层传输协议。支持逗号数组,也支持 JSON 数组。Agent 认证时会上报自己的选择,不在列表内会被拒绝。 | `tcp`、`tcp,kcp`、`["tcp","kcp"]` | | `AUDIT_LOG` | 审计日志路径。MVP 使用 JSON Lines 追加写入。 | `runtime/audit.log` | | `LOG_LEVEL` | 日志级别预留配置。当前 MVP 主要为后续日志工厂使用。 | `debug`、`info`、`warning`、`error` | @@ -121,6 +121,7 @@ NODE_ID=client-01 NODE_TYPE=client NODE_TOKEN=CHANGE_ME AGENT_TRANSPORT_PROTOCOL=tcp +CLIENT_AGENT_POP_CONNECTIONS=1 CLIENT_AGENT_AUTH_TOKEN=dev-token CLIENT_AGENT_USER_ID=admin CLIENT_AGENT_SOCKS5_ENABLED=true @@ -151,6 +152,7 @@ LOG_LEVEL=debug | `NODE_TYPE` | 当前节点类型。Client Agent 必须配置为 `client`。 | `client` | | `NODE_TOKEN` | 当前节点认证密钥。必须和 `config/nodes.php` 中同一 `NODE_ID` 的 `token` 一致。 | 强随机字符串,开发时可临时用 `CHANGE_ME` | | `AGENT_TRANSPORT_PROTOCOL` | 当前 Agent 到 POP Server 使用的底层传输协议。必须被 POP Server 的 `POP_ALLOWED_AGENT_TRANSPORTS` 允许。 | `tcp`、`udp`、`kcp` | +| `CLIENT_AGENT_POP_CONNECTIONS` | Client Agent 到 POP Server 的并行长连接数量。新 TCP 会话会在已认证连接间轮询分配,适合多并发请求或多线程测速。 | `1`、`2`、`4` | | `CLIENT_AGENT_AUTH_TOKEN` | SOCKS5/HTTP 代理入口生成 `OPEN` 帧时使用的客户端认证 token。 | `dev-token`,生产应替换 | | `CLIENT_AGENT_USER_ID` | SOCKS5/HTTP 代理入口生成 `OPEN` 帧时使用的默认用户 ID。 | `admin`、`normal-user` | | `CLIENT_AGENT_SOCKS5_ENABLED` | 是否启用 SOCKS5 本地入口。 | `true`、`false` | @@ -309,7 +311,7 @@ CLIENT_AGENT_SOCKS5_PASSWORD=change-this-password ```env APP_ENV=dev POP_AGENT_LISTEN=127.0.0.1:9001 -POP_ALLOWED_AGENT_TRANSPORTS=tcp +POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp NODE_ID=client-01 NODE_TYPE=client NODE_TOKEN=CHANGE_ME @@ -346,23 +348,42 @@ Agent 到 POP Server 的业务数据始终使用 LayLink 自定义 Frame 协议 | --- | --- | --- | | `tcp` | Frame over TCP,最容易部署和调试。 | 已实现 | | `udp` | Frame over UDP,需要额外处理可靠性、顺序和丢包。 | 已预留,未实现 | -| `kcp` | Frame over KCP/UDP,用 KCP 做可靠、低延迟传输。 | 已预留,未实现 | +| `kcp` | Frame over KCP/UDP,默认通过 FFI 调用 native `ikcp.c`。 | 已实现,需构建动态库 | POP Server 用 `POP_ALLOWED_AGENT_TRANSPORTS` 控制允许哪些传输协议。例如: ```env POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp +# 也可以写成: +POP_ALLOWED_AGENT_TRANSPORTS=["tcp","kcp"] ``` Client Agent 用 `AGENT_TRANSPORT_PROTOCOL` 选择自己实际使用哪种协议。例如: ```env -AGENT_TRANSPORT_PROTOCOL=tcp +AGENT_TRANSPORT_PROTOCOL=kcp ``` 如果 Agent 选择的协议不在 POP 允许列表中,POP 会在认证阶段返回 `AUTH_FAIL`,原因是 `transport_not_allowed`。 -当前代码只实现了 `tcp`。如果 Agent 配置为 `udp` 或 `kcp`,进程会启动失败并明确提示该传输尚未实现。 +`kcp` 默认使用 FFI 调用 native `ikcp.c`。首次使用前需要构建动态库: + +```bash +scripts/build-kcp-ffi.sh +``` + +然后两端配置: + +```env +LAYLINK_KCP_BACKEND=ffi +LAYLINK_KCP_FFI_LIB=native/kcp/liblaylink_kcp.so +``` + +使用 `kcp` 时,POP Server 会在 `POP_AGENT_LISTEN` 的同一 host:port 上监听 UDP,Client Agent 的 `POP_SERVER_ADDRESS` 仍填写同一地址即可。服务器防火墙需要放行同端口 UDP。 + +如果运行环境暂时不能启用 FFI,可以配置 `LAYLINK_KCP_BACKEND=php` 使用调试回退实现;该实现不适合作为生产高吞吐路径。 + +如果 Agent 配置为 `udp`,进程会启动失败并明确提示该传输尚未实现。 启动 POP Server: @@ -394,7 +415,7 @@ LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432 当 POP 收到目标站关闭连接时,Client Agent 会先等待本地客户端发送缓冲区排空,再关闭本地 socket,避免大文件尾部数据还在缓冲区里时被提前截断。TCP `DATA` 默认按 1 MiB 分片发送,以减少帧开销;可通过 `LAYLINK_DATA_CHUNK_BYTES` 调整。 -当前每个 Client Agent worker 仍然通过单条 Agent-to-POP TCP 长连接承载多个会话。背压可以保护进程不堵死,但单条 TCP 长连接仍可能产生队头阻塞;多 worker、多 POP 长连接、KCP 或 per-session window 是后续性能优化方向。 +Client Agent 默认使用 1 条 Agent-to-POP TCP 长连接。可通过 `CLIENT_AGENT_POP_CONNECTIONS` 增加并行长连接数,新 TCP 会话会在已认证 POP 连接之间轮询分配,并在会话生命周期内固定使用同一条连接。它主要改善多并发请求、多线程下载或测速场景;单个 TCP 下载是否变快取决于客户端和目标站是否本身使用多连接。背压可以保护进程不堵死,但单条会话仍受单 TCP 流限制;KCP、per-session window 和更细粒度 TCP 调优仍是后续性能优化方向。 验证 SOCKS5 HTTPS 联通性和出口 IP: diff --git a/scripts/build-kcp-ffi.sh b/scripts/build-kcp-ffi.sh new file mode 100755 index 0000000..4214e0c --- /dev/null +++ b/scripts/build-kcp-ffi.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +SRC_DIR="${ROOT_DIR}/native/kcp" +OUT="${SRC_DIR}/liblaylink_kcp.so" + +cc="${CC:-gcc}" +"${cc}" -O3 -fPIC -shared \ + "${SRC_DIR}/ikcp.c" \ + "${SRC_DIR}/laylink_kcp.c" \ + -o "${OUT}" + +echo "${OUT}" diff --git a/src/Agent/AgentClient.php b/src/Agent/AgentClient.php index d7cbacd..e6390ea 100644 --- a/src/Agent/AgentClient.php +++ b/src/Agent/AgentClient.php @@ -5,11 +5,10 @@ declare(strict_types=1); namespace LayLink\Agent; use LayLink\Protocol\Frame; -use LayLink\Protocol\FrameCodec; -use LayLink\Protocol\FrameParser; use LayLink\Protocol\FrameType; +use LayLink\Transport\FrameClientTransport; +use LayLink\Transport\FrameClientTransportFactory; use LayLink\Util\Uuid; -use Workerman\Connection\AsyncTcpConnection; use Workerman\Connection\TcpConnection; use Workerman\Connection\UdpConnection; use Workerman\Timer; @@ -17,8 +16,13 @@ use Workerman\Worker; final class AgentClient { - private ?AsyncTcpConnection $pop = null; - private ?FrameParser $parser = null; + /** @var array */ + private array $pops = []; + /** @var array */ + private array $authenticatedPops = []; + /** @var array */ + private array $sessionTransports = []; + private int $nextTransportCursor = 0; private bool $authenticated = false; /** @var array */ private array $initialBuffers = []; @@ -66,6 +70,7 @@ final class AgentClient private readonly int $maxSendBuffer = 64 * 1024 * 1024, private readonly int $backpressureHighWatermark = 32 * 1024 * 1024, private readonly int $dataChunkSize = 1024 * 1024, + private readonly int $popConnectionCount = 1, ) { } @@ -75,9 +80,6 @@ final class AgentClient $worker->name = $workerName; $worker->count = 1; $worker->onWorkerStart = function (): void { - if ($this->transportProtocol !== 'tcp') { - throw new \RuntimeException("Agent transport '{$this->transportProtocol}' is configured but not implemented yet."); - } $this->connect(); Timer::add(10, fn () => $this->heartbeat()); }; @@ -95,59 +97,53 @@ final class AgentClient private function connect(): void { - $this->parser = new FrameParser(); $this->authenticated = false; - $connection = new AsyncTcpConnection($this->popAddress); - $connection->maxSendBufferSize = $this->maxSendBuffer; - $this->pop = $connection; - - $connection->onConnect = function (AsyncTcpConnection $connection): void { - $this->send(new Frame(FrameType::AUTH, null, [ - 'node_id' => $this->nodeId, - 'node_type' => $this->nodeType, - 'node_zone' => $this->nodeZone, - 'node_token' => $this->nodeToken, - 'transport_protocol' => $this->transportProtocol, - 'supported_protocols' => ['tcp'], - 'supported_transports' => ['tcp', 'udp', 'kcp'], - ])); - }; - $connection->onMessage = function (AsyncTcpConnection $connection, string $data): void { - try { - foreach ($this->parser?->push($data) ?? [] as $frame) { - $this->handleFrame($frame); - } - } catch (\Throwable $e) { - $connection->close(); - } - }; - $connection->onBufferDrain = fn (AsyncTcpConnection $connection) => $this->resumeClientsForPop(); - $connection->onClose = function (): void { - $this->authenticated = false; - foreach ($this->clients as $client) { - $client->close(); - } - $this->initialBuffers = []; - $this->connectionSessionIds = []; - $this->clients = []; - $this->sessionStates = []; - $this->pendingData = []; - $this->connectionStages = []; - $this->sessionIngressProtocols = []; - $this->pausedClientsForPop = []; - $this->clientsPausingPop = []; - $this->pendingClientCloses = []; - $this->suppressClientCloseFrames = []; - Timer::add(3, fn () => $this->connect(), [], false); - }; - $connection->connect(); + $this->pops = []; + $this->authenticatedPops = []; + for ($i = 0; $i < $this->popConnectionCount; $i++) { + $this->connectOne(); + } } - private function handleFrame(Frame $frame): void + private function connectOne(): void + { + $transport = (new FrameClientTransportFactory())->create( + $this->transportProtocol, + $this->popAddress, + $this->maxSendBuffer, + function (FrameClientTransport $transport): void { + $transport->send(new Frame(FrameType::AUTH, null, [ + 'node_id' => $this->nodeId, + 'node_type' => $this->nodeType, + 'node_zone' => $this->nodeZone, + 'node_token' => $this->nodeToken, + 'transport_protocol' => $this->transportProtocol, + 'supported_protocols' => ['tcp'], + 'supported_transports' => ['tcp', 'udp', 'kcp'], + ])); + }, + function (FrameClientTransport $transport, Frame $frame): void { + $this->handleFrame($transport, $frame); + }, + function (FrameClientTransport $transport): void { + $this->handlePopClose($transport); + Timer::add(3, fn () => $this->connectOne(), [], false); + }, + function (FrameClientTransport $transport): void { + $this->resumeClientsForPop($transport); + }, + function (\Throwable $e): void { + }, + ); + $this->pops[spl_object_id($transport)] = $transport; + $transport->connect(); + } + + private function handleFrame(FrameClientTransport $transport, Frame $frame): void { match ($frame->type) { - FrameType::AUTH_OK => $this->authenticated = true, - FrameType::AUTH_FAIL => $this->pop?->close(), + FrameType::AUTH_OK => $this->markPopAuthenticated($transport), + FrameType::AUTH_FAIL => $transport->close(), FrameType::PONG => null, FrameType::OPEN_OK => $this->openClientSession($frame), FrameType::OPEN_FAIL => $this->failClientSession($frame), @@ -184,7 +180,8 @@ final class AgentClient $this->closeClient($sessionId); return; } - if ($this->pop !== null && $this->pop->getSendBufferQueueSize() >= $this->backpressureHighWatermark) { + $transport = $this->sessionTransports[$sessionId] ?? null; + if ($transport !== null && $transport->getSendBufferQueueSize() >= $this->backpressureHighWatermark) { $connection->pauseRecv(); $this->pausedClientsForPop[$connection->id] = $connection; } @@ -447,7 +444,8 @@ final class AgentClient private function startPopSession(TcpConnection $connection, array $request, string $payloadBytes, string $ingressProtocol): void { - if (!$this->authenticated || $this->pop === null) { + $transport = $this->selectTransport(); + if (!$this->authenticated || $transport === null) { $this->failOpeningLocalClient($connection, $ingressProtocol, 'pop_not_connected'); return; } @@ -455,13 +453,14 @@ final class AgentClient $sessionId = Uuid::v4(); $this->connectionSessionIds[$connection->id] = $sessionId; $this->clients[$sessionId] = $connection; + $this->sessionTransports[$sessionId] = $transport; $this->sessionStates[$sessionId] = 'opening'; $this->sessionIngressProtocols[$sessionId] = $ingressProtocol; if ($payloadBytes !== '') { $this->pendingData[$sessionId] = $payloadBytes; } - $this->send(new Frame(FrameType::OPEN, $sessionId, [ + $transport->send(new Frame(FrameType::OPEN, $sessionId, [ 'auth_token' => (string)($request['auth_token'] ?? ''), 'user_id' => (string)($request['user_id'] ?? ''), 'target_host' => (string)($request['target_host'] ?? ''), @@ -486,7 +485,7 @@ final class AgentClient private function onSocks5UdpMessage(UdpConnection $connection, string $data): void { - if (!$this->authenticated || $this->pop === null || strlen($data) < 10) { + if (!$this->authenticated || $this->selectTransport() === null || strlen($data) < 10) { return; } @@ -638,7 +637,7 @@ final class AgentClient $sessionId = $this->connectionSessionIds[$connection->id]; unset($this->pendingClientCloses[$sessionId]); unset($this->connectionSessionIds[$connection->id]); - unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]); + unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId], $this->sessionTransports[$sessionId]); if ($suppressCloseFrame) { return; } @@ -668,7 +667,7 @@ final class AgentClient } $connection = $this->clients[$sessionId]; - unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]); + unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId], $this->sessionTransports[$sessionId]); unset($this->connectionSessionIds[$connection->id]); unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id], $this->pendingClientCloses[$sessionId]); $this->suppressClientCloseFrames[$connection->id] = true; @@ -812,21 +811,23 @@ final class AgentClient private function heartbeat(): void { - if (!$this->authenticated || $this->pop === null) { + if (!$this->authenticated) { return; } - $this->send(new Frame(FrameType::PING, null, [ - 'node_id' => $this->nodeId, - 'active_sessions' => count($this->clients), - 'load' => sys_getloadavg()[0] ?? 0.0, - 'timestamp' => time(), - ])); + foreach ($this->authenticatedTransports() as $transport) { + $transport->send(new Frame(FrameType::PING, null, [ + 'node_id' => $this->nodeId, + 'active_sessions' => $this->activeSessionsForTransport($transport), + 'load' => sys_getloadavg()[0] ?? 0.0, + 'timestamp' => time(), + ])); + } } private function send(Frame $frame): bool|null { - return $this->pop?->send(FrameCodec::encode($frame)); + return $this->transportForSession($frame->sessionId)?->send($frame); } private function sendData(string $sessionId, string $data): bool @@ -844,9 +845,12 @@ final class AgentClient return true; } - private function resumeClientsForPop(): void + private function resumeClientsForPop(FrameClientTransport $transport): void { foreach ($this->pausedClientsForPop as $connectionId => $client) { + if ($this->transportForConnection($client) !== $transport) { + continue; + } $client->resumeRecv(); unset($this->pausedClientsForPop[$connectionId]); } @@ -854,15 +858,20 @@ final class AgentClient private function pausePopForClient(TcpConnection $connection): void { + $transport = $this->transportForConnection($connection); + if ($transport === null) { + return; + } $this->clientsPausingPop[$connection->id] = true; - $this->pop?->pauseRecv(); + $transport->pauseRecv(); } private function resumePopForClient(TcpConnection $connection): void { unset($this->clientsPausingPop[$connection->id]); - if ($this->clientsPausingPop === []) { - $this->pop?->resumeRecv(); + $transport = $this->transportForConnection($connection); + if ($transport !== null && !$this->hasClientsPausingTransport($transport)) { + $transport->resumeRecv(); } } @@ -874,4 +883,91 @@ final class AgentClient $this->finalizeClientClose($sessionId); } } + + private function markPopAuthenticated(FrameClientTransport $transport): void + { + $this->authenticatedPops[spl_object_id($transport)] = true; + $this->authenticated = true; + } + + private function handlePopClose(FrameClientTransport $transport): void + { + $transportId = spl_object_id($transport); + unset($this->pops[$transportId], $this->authenticatedPops[$transportId]); + $this->authenticated = $this->authenticatedPops !== []; + foreach ($this->sessionTransports as $sessionId => $sessionTransport) { + if ($sessionTransport !== $transport || !isset($this->clients[$sessionId])) { + continue; + } + $client = $this->clients[$sessionId]; + $this->suppressClientCloseFrames[$client->id] = true; + $client->close(); + } + } + + private function selectTransport(): ?FrameClientTransport + { + $transports = $this->authenticatedTransports(); + if ($transports === []) { + return null; + } + + $transport = $transports[$this->nextTransportCursor % count($transports)]; + $this->nextTransportCursor++; + return $transport; + } + + /** + * @return FrameClientTransport[] + */ + private function authenticatedTransports(): array + { + $transports = []; + foreach ($this->authenticatedPops as $transportId => $_) { + if (isset($this->pops[$transportId])) { + $transports[] = $this->pops[$transportId]; + } + } + + return $transports; + } + + private function transportForSession(?string $sessionId): ?FrameClientTransport + { + if ($sessionId !== null && isset($this->sessionTransports[$sessionId])) { + return $this->sessionTransports[$sessionId]; + } + + return $this->selectTransport(); + } + + private function transportForConnection(TcpConnection $connection): ?FrameClientTransport + { + $sessionId = $this->connectionSessionIds[$connection->id] ?? null; + return is_string($sessionId) ? ($this->sessionTransports[$sessionId] ?? null) : null; + } + + private function hasClientsPausingTransport(FrameClientTransport $transport): bool + { + foreach ($this->clientsPausingPop as $connectionId => $_) { + $sessionId = $this->connectionSessionIds[$connectionId] ?? null; + if (is_string($sessionId) && ($this->sessionTransports[$sessionId] ?? null) === $transport) { + return true; + } + } + + return false; + } + + private function activeSessionsForTransport(FrameClientTransport $transport): int + { + $count = 0; + foreach ($this->sessionTransports as $sessionId => $sessionTransport) { + if ($sessionTransport === $transport && isset($this->clients[$sessionId])) { + $count++; + } + } + + return $count; + } } diff --git a/src/Node/NodeConnection.php b/src/Node/NodeConnection.php index 1196fa5..53589aa 100644 --- a/src/Node/NodeConnection.php +++ b/src/Node/NodeConnection.php @@ -4,7 +4,7 @@ declare(strict_types=1); namespace LayLink\Node; -use Workerman\Connection\TcpConnection; +use LayLink\Transport\FrameServerConnection; final class NodeConnection { @@ -15,7 +15,7 @@ final class NodeConnection public readonly string $nodeId, public readonly string $nodeType, public readonly string $nodeZone, - public readonly TcpConnection $connection, + public readonly FrameServerConnection $connection, ) { $this->lastHeartbeat = time(); } diff --git a/src/Node/NodeRegistry.php b/src/Node/NodeRegistry.php index 00dbc59..e800dd5 100644 --- a/src/Node/NodeRegistry.php +++ b/src/Node/NodeRegistry.php @@ -4,23 +4,30 @@ declare(strict_types=1); namespace LayLink\Node; -use Workerman\Connection\TcpConnection; +use LayLink\Transport\FrameServerConnection; final class NodeRegistry { - /** @var array */ + /** @var array> */ private array $nodes = []; public function register(NodeConnection $node): void { - $this->nodes[$node->nodeId] = $node; + $this->nodes[$node->nodeId][$node->connection->id()] = $node; } - public function unregisterByConnection(TcpConnection $connection): ?NodeConnection + public function unregisterByConnection(FrameServerConnection $connection): ?NodeConnection { - foreach ($this->nodes as $nodeId => $node) { - if ($node->connection === $connection) { - unset($this->nodes[$nodeId]); + foreach ($this->nodes as $nodeId => $connections) { + foreach ($connections as $connectionId => $node) { + if ($node->connection !== $connection) { + continue; + } + + unset($this->nodes[$nodeId][$connectionId]); + if ($this->nodes[$nodeId] === []) { + unset($this->nodes[$nodeId]); + } return $node; } } @@ -30,12 +37,26 @@ final class NodeRegistry public function get(string $nodeId): ?NodeConnection { - return $this->nodes[$nodeId] ?? null; + $connections = $this->nodes[$nodeId] ?? []; + return $connections === [] ? null : array_values($connections)[0]; + } + + public function getByConnection(FrameServerConnection $connection): ?NodeConnection + { + foreach ($this->nodes as $connections) { + foreach ($connections as $node) { + if ($node->connection === $connection) { + return $node; + } + } + } + + return null; } public function isOnline(string $nodeId): bool { - return isset($this->nodes[$nodeId]); + return ($this->nodes[$nodeId] ?? []) !== []; } /** @@ -43,6 +64,13 @@ final class NodeRegistry */ public function all(): array { - return array_values($this->nodes); + $nodes = []; + foreach ($this->nodes as $connections) { + foreach ($connections as $node) { + $nodes[] = $node; + } + } + + return $nodes; } } diff --git a/src/Server/AgentListener.php b/src/Server/AgentListener.php index 18e446c..56481ae 100644 --- a/src/Server/AgentListener.php +++ b/src/Server/AgentListener.php @@ -10,22 +10,19 @@ use LayLink\Auth\NodeAuthenticator; use LayLink\Node\NodeConnection; use LayLink\Node\NodeRegistry; use LayLink\Protocol\Frame; -use LayLink\Protocol\FrameCodec; -use LayLink\Protocol\FrameParser; use LayLink\Protocol\FrameType; use LayLink\Route\RouteResolver; use LayLink\Session\SessionManager; use LayLink\Session\TunnelSession; +use LayLink\Transport\FrameServerConnection; +use LayLink\Transport\FrameServerListenerFactory; use Workerman\Connection\AsyncTcpConnection; use Workerman\Connection\AsyncUdpConnection; -use Workerman\Connection\TcpConnection; use Workerman\Timer; use Workerman\Worker; final class AgentListener { - /** @var array */ - private array $parsers = []; /** @var array */ private array $connectionNodeIds = []; /** @var array> */ @@ -42,33 +39,38 @@ final class AgentListener private readonly int $maxSendBuffer = 64 * 1024 * 1024, private readonly int $backpressureHighWatermark = 32 * 1024 * 1024, private readonly int $dataChunkSize = 1024 * 1024, + private readonly string $agentTransportProtocol = 'tcp', ) { - $worker->onConnect = fn (TcpConnection $connection) => $this->onConnect($connection); - $worker->onMessage = fn (TcpConnection $connection, string $data) => $this->onMessage($connection, $data); - $worker->onClose = fn (TcpConnection $connection) => $this->onClose($connection); - $worker->onWorkerStart = fn () => Timer::add(10, fn () => $this->sweepHeartbeats()); - } - - private function onConnect(TcpConnection $connection): void - { - $connection->maxSendBufferSize = $this->maxSendBuffer; - $connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumeTargetsForAgent($connection); - $this->parsers[$connection->id] = new FrameParser(); - } - - private function onMessage(TcpConnection $connection, string $data): void - { - try { - foreach ($this->parsers[$connection->id]->push($data) as $frame) { - $this->handleFrame($connection, $frame); + (new FrameServerListenerFactory())->attach( + $this->agentTransportProtocol, + $worker, + $this->maxSendBuffer, + fn (FrameServerConnection $connection) => $this->onConnect($connection), + fn (FrameServerConnection $connection, Frame $frame) => $this->handleFrame($connection, $frame), + fn (FrameServerConnection $connection) => $this->onClose($connection), + fn (FrameServerConnection $connection) => $this->resumeTargetsForAgent($connection), + fn (FrameServerConnection $connection, \Throwable $e) => $this->onInvalidFrame($connection), + ); + $transportWorkerStart = $worker->onWorkerStart; + $worker->onWorkerStart = function () use ($transportWorkerStart): void { + if ($transportWorkerStart !== null) { + $transportWorkerStart(); } - } catch (\Throwable $e) { - $this->send($connection, new Frame(FrameType::ERROR, null, ['reason' => 'invalid_frame'])); - $connection->close(); - } + Timer::add(10, fn () => $this->sweepHeartbeats()); + }; } - private function handleFrame(TcpConnection $connection, Frame $frame): void + private function onConnect(FrameServerConnection $connection): void + { + } + + private function onInvalidFrame(FrameServerConnection $connection): void + { + $this->send($connection, new Frame(FrameType::ERROR, null, ['reason' => 'invalid_frame'])); + $connection->close(); + } + + private function handleFrame(FrameServerConnection $connection, Frame $frame): void { if ($frame->type === FrameType::AUTH) { $result = $this->authenticator->authenticate($frame->payload); @@ -86,7 +88,7 @@ final class AgentListener $connection, ); $this->nodes->register($node); - $this->connectionNodeIds[$connection->id] = $nodeId; + $this->connectionNodeIds[$connection->id()] = $nodeId; $this->send($connection, new Frame(FrameType::AUTH_OK, null, [ 'node_id' => $nodeId, 'heartbeat_interval' => 10, @@ -95,8 +97,8 @@ final class AgentListener return; } - $nodeId = $this->connectionNodeIds[$connection->id] ?? null; - $node = is_string($nodeId) ? $this->nodes->get($nodeId) : null; + $nodeId = $this->connectionNodeIds[$connection->id()] ?? null; + $node = is_string($nodeId) ? $this->nodes->getByConnection($connection) : null; if (!is_string($nodeId) || $node === null) { $this->send($connection, new Frame(FrameType::ERROR, $frame->sessionId, ['reason' => 'invalid_auth'])); $connection->close(); @@ -133,7 +135,7 @@ final class AgentListener }; } - private function forwardUdpDatagram(TcpConnection $agentConnection, string $nodeId, Frame $frame): void + private function forwardUdpDatagram(FrameServerConnection $agentConnection, string $nodeId, Frame $frame): void { if ($frame->sessionId === null) { $this->send($agentConnection, new Frame(FrameType::ERROR, null, ['reason' => 'invalid_frame'])); @@ -177,7 +179,7 @@ final class AgentListener $target->connect(); } - private function openTargetForAgent(TcpConnection $agentConnection, string $nodeId, Frame $frame): void + private function openTargetForAgent(FrameServerConnection $agentConnection, string $nodeId, Frame $frame): void { if ($frame->sessionId === null || $this->sessions->get($frame->sessionId) !== null) { $this->send($agentConnection, new Frame(FrameType::OPEN_FAIL, $frame->sessionId, ['reason' => 'invalid_frame'])); @@ -248,7 +250,7 @@ final class AgentListener } if ($agentConnection->getSendBufferQueueSize() >= $this->backpressureHighWatermark) { $target->pauseRecv(); - $this->pausedTargetsByAgent[$agentConnection->id][$session->sessionId] = $target; + $this->pausedTargetsByAgent[$agentConnection->id()][$session->sessionId] = $target; } }; $target->onClose = fn () => $this->closeSession($session, 'closed', null); @@ -282,7 +284,7 @@ final class AgentListener } } - private function failOpenSession(TcpConnection $agentConnection, TunnelSession $session, string $reason): void + private function failOpenSession(FrameServerConnection $agentConnection, TunnelSession $session, string $reason): void { if ($session->state === TunnelSession::OPENING) { $this->send($agentConnection, new Frame(FrameType::OPEN_FAIL, $session->sessionId, ['reason' => $reason])); @@ -299,7 +301,7 @@ final class AgentListener return base64_decode((string)($frame->payload['data'] ?? ''), true); } - private function rejectOpen(TcpConnection $agentConnection, Frame $frame, string $reason, string $userId, string $nodeId, ?string $policyId = null): void + private function rejectOpen(FrameServerConnection $agentConnection, Frame $frame, string $reason, string $userId, string $nodeId, ?string $policyId = null): void { $this->send($agentConnection, new Frame(FrameType::OPEN_FAIL, $frame->sessionId, ['reason' => $reason])); $this->audit->write([ @@ -330,7 +332,7 @@ final class AgentListener $session->state = TunnelSession::CLOSED; $this->sessions->remove($session->sessionId); if ($session->agent !== null) { - unset($this->pausedTargetsByAgent[$session->agent->id][$session->sessionId]); + unset($this->pausedTargetsByAgent[$session->agent->id()][$session->sessionId]); } if ($session->agent !== null) { @@ -359,11 +361,10 @@ final class AgentListener ]); } - private function onClose(TcpConnection $connection): void + private function onClose(FrameServerConnection $connection): void { - unset($this->parsers[$connection->id]); - unset($this->connectionNodeIds[$connection->id]); - unset($this->pausedTargetsByAgent[$connection->id]); + unset($this->connectionNodeIds[$connection->id()]); + unset($this->pausedTargetsByAgent[$connection->id()]); $node = $this->nodes->unregisterByConnection($connection); foreach ($this->sessions->all() as $session) { if ($session->agent === $connection) { @@ -386,12 +387,12 @@ final class AgentListener } } - private function send(TcpConnection $connection, Frame $frame): bool|null + private function send(FrameServerConnection $connection, Frame $frame): bool|null { - return $connection->send(FrameCodec::encode($frame)); + return $connection->send($frame); } - private function sendData(TcpConnection $connection, string $sessionId, string $data): bool + private function sendData(FrameServerConnection $connection, string $sessionId, string $data): bool { $length = strlen($data); for ($offset = 0; $offset < $length; $offset += $this->dataChunkSize) { @@ -406,14 +407,14 @@ final class AgentListener return true; } - private function resumeTargetsForAgent(TcpConnection $connection): void + private function resumeTargetsForAgent(FrameServerConnection $connection): void { - foreach ($this->pausedTargetsByAgent[$connection->id] ?? [] as $sessionId => $target) { + foreach ($this->pausedTargetsByAgent[$connection->id()] ?? [] as $sessionId => $target) { $target->resumeRecv(); - unset($this->pausedTargetsByAgent[$connection->id][$sessionId]); + unset($this->pausedTargetsByAgent[$connection->id()][$sessionId]); } - if (($this->pausedTargetsByAgent[$connection->id] ?? []) === []) { - unset($this->pausedTargetsByAgent[$connection->id]); + if (($this->pausedTargetsByAgent[$connection->id()] ?? []) === []) { + unset($this->pausedTargetsByAgent[$connection->id()]); } } diff --git a/src/Server/PopServer.php b/src/Server/PopServer.php index 565728b..1250ea4 100644 --- a/src/Server/PopServer.php +++ b/src/Server/PopServer.php @@ -36,20 +36,29 @@ final class PopServer public function boot(): void { - $agentWorker = new Worker('tcp://' . $this->agentListen); - $agentWorker->name = 'laylink-pop-agent-listener'; - $agentWorker->count = 1; - new AgentListener( - $agentWorker, - new NodeAuthenticator($this->nodeConfig, $this->allowedAgentTransports), - new ClientAuthenticator(), - new RouteResolver(new PolicyChecker($this->policies), $this->nodes), - $this->nodes, - $this->sessions, - $this->audit, - $this->maxSendBuffer, - $this->backpressureHighWatermark, - $this->dataChunkSize, - ); + $implementedTransports = array_values(array_intersect($this->allowedAgentTransports, ['tcp', 'kcp'])); + if ($implementedTransports === []) { + throw new \RuntimeException('no_implemented_pop_transport_enabled'); + } + + foreach ($implementedTransports as $transport) { + $scheme = $transport === 'kcp' ? 'udp' : 'tcp'; + $agentWorker = new Worker($scheme . '://' . $this->agentListen); + $agentWorker->name = 'laylink-pop-agent-listener-' . $transport; + $agentWorker->count = 1; + new AgentListener( + $agentWorker, + new NodeAuthenticator($this->nodeConfig, $this->allowedAgentTransports), + new ClientAuthenticator(), + new RouteResolver(new PolicyChecker($this->policies), $this->nodes), + $this->nodes, + $this->sessions, + $this->audit, + $this->maxSendBuffer, + $this->backpressureHighWatermark, + $this->dataChunkSize, + $transport, + ); + } } } diff --git a/src/Session/TunnelSession.php b/src/Session/TunnelSession.php index 4b044c0..f892834 100644 --- a/src/Session/TunnelSession.php +++ b/src/Session/TunnelSession.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace LayLink\Session; +use LayLink\Transport\FrameServerConnection; use Workerman\Connection\AsyncTcpConnection; use Workerman\Connection\TcpConnection; @@ -18,7 +19,7 @@ final class TunnelSession public string $state = self::NEW; public ?TcpConnection $client = null; - public ?TcpConnection $agent = null; + public ?FrameServerConnection $agent = null; public ?AsyncTcpConnection $target = null; public ?string $nodeId = null; public string $startTime; diff --git a/src/Transport/FrameClientTransport.php b/src/Transport/FrameClientTransport.php new file mode 100644 index 0000000..ea7a922 --- /dev/null +++ b/src/Transport/FrameClientTransport.php @@ -0,0 +1,22 @@ + new TcpFrameClientTransport( + $address, + $maxSendBuffer, + $onConnect, + $onFrame, + $onClose, + $onBufferDrain, + $onInvalidFrame, + ), + 'kcp' => new KcpFrameClientTransport( + $address, + $maxSendBuffer, + $onConnect, + $onFrame, + $onClose, + $onBufferDrain, + $onInvalidFrame, + ), + 'udp' => throw new InvalidArgumentException('agent_transport_not_implemented'), + default => throw new InvalidArgumentException('unsupported_agent_transport'), + }; + } +} diff --git a/src/Transport/FrameServerConnection.php b/src/Transport/FrameServerConnection.php new file mode 100644 index 0000000..b72be71 --- /dev/null +++ b/src/Transport/FrameServerConnection.php @@ -0,0 +1,24 @@ + new TcpFrameServerListener( + $worker, + $maxSendBuffer, + $onConnect, + $onFrame, + $onClose, + $onBufferDrain, + $onInvalidFrame, + ), + 'kcp' => new KcpFrameServerListener( + $worker, + $maxSendBuffer, + $onConnect, + $onFrame, + $onClose, + $onBufferDrain, + $onInvalidFrame, + ), + 'udp' => throw new InvalidArgumentException('pop_transport_not_implemented'), + default => throw new InvalidArgumentException('unsupported_pop_transport'), + }; + } +} diff --git a/src/Transport/KcpFrameClientTransport.php b/src/Transport/KcpFrameClientTransport.php new file mode 100644 index 0000000..e89498a --- /dev/null +++ b/src/Transport/KcpFrameClientTransport.php @@ -0,0 +1,176 @@ +conv = random_int(1, 0x7fffffff); + } + + public function connect(): void + { + $this->connected = false; + $this->session = null; + $connection = new AsyncUdpConnection($this->normalizeAddress($this->address)); + $this->connection = $connection; + + $connection->onConnect = function () use ($connection): void { + $connection->send(KcpPacketCodec::encode([ + 'type' => KcpPacketCodec::SYN, + 'conv' => $this->conv, + ])); + }; + $connection->onMessage = function (AsyncUdpConnection $connection, string $data): void { + $this->handlePacket($data); + }; + $connection->onClose = function () use ($connection): void { + if ($this->connection === $connection) { + $this->connection = null; + } + $this->stopTimer(); + ($this->onClose)($this); + }; + $connection->connect(); + + $this->timerId = Timer::add(0.02, fn () => $this->tick()); + } + + public function send(Frame $frame): bool|null + { + return $this->session?->sendFrame($frame, $this->maxSendBuffer) ?? false; + } + + public function close(): void + { + $this->session?->close(); + $this->connection?->close(); + } + + public function pauseRecv(): void + { + $this->session?->pause(); + } + + public function resumeRecv(): void + { + $this->session?->resume(); + } + + public function getSendBufferQueueSize(): int + { + return $this->session?->getSendBufferQueueSize() ?? 0; + } + + private function handlePacket(string $data): void + { + $packet = KcpPacketCodec::decode($data); + if ($packet === null) { + $this->session?->receive($data); + return; + } + + if ($packet['conv'] !== $this->conv) { + return; + } + + if ($packet['type'] === KcpPacketCodec::SYN_ACK && !$this->connected) { + $this->connected = true; + $this->session = $this->createSession(); + ($this->onConnect)($this); + return; + } + + if ($packet['type'] === KcpPacketCodec::CLOSE) { + $this->connection?->close(); + return; + } + + $this->session?->receive($packet); + if ($this->session?->isClosed()) { + $this->connection?->close(); + } + } + + private function tick(): void + { + if ($this->connection === null) { + $this->stopTimer(); + return; + } + + if (!$this->connected) { + $this->connection->send(KcpPacketCodec::encode([ + 'type' => KcpPacketCodec::SYN, + 'conv' => $this->conv, + ])); + return; + } + + $before = $this->getSendBufferQueueSize(); + $this->session?->tick(); + if ($before >= $this->maxSendBuffer && $this->getSendBufferQueueSize() < $this->maxSendBuffer) { + ($this->onBufferDrain)($this); + } + } + + private function createSession(): KcpReliableSession|NativeKcpSession + { + $backend = strtolower(trim((string)(getenv('LAYLINK_KCP_BACKEND') ?: 'ffi'))); + $libraryPath = (string)(getenv('LAYLINK_KCP_FFI_LIB') ?: dirname(__DIR__, 2) . '/native/kcp/liblaylink_kcp.so'); + $args = [ + $this->conv, + fn (string $packet): bool|null => $this->connection?->send($packet), + fn (Frame $frame) => ($this->onFrame)($this, $frame), + fn (\Throwable $e) => ($this->onInvalidFrame)($e), + ]; + + if ($backend === 'php') { + return new KcpReliableSession(...$args); + } + + return new NativeKcpSession(...[...$args, $libraryPath]); + } + + private function stopTimer(): void + { + if ($this->timerId === null) { + return; + } + + Timer::del($this->timerId); + $this->timerId = null; + } + + private function normalizeAddress(string $address): string + { + if (str_starts_with($address, 'udp://')) { + return $address; + } + if (str_starts_with($address, 'tcp://')) { + return 'udp://' . substr($address, 6); + } + + return 'udp://' . $address; + } +} diff --git a/src/Transport/KcpFrameServerConnection.php b/src/Transport/KcpFrameServerConnection.php new file mode 100644 index 0000000..6026b55 --- /dev/null +++ b/src/Transport/KcpFrameServerConnection.php @@ -0,0 +1,75 @@ +id; + } + + public function send(Frame $frame): bool|null + { + if ($this->closed) { + return false; + } + + return $this->session->sendFrame($frame, $this->maxSendBuffer); + } + + public function close(): void + { + if ($this->closed) { + return; + } + + $this->closed = true; + $this->session->close(); + } + + public function pauseRecv(): void + { + $this->session->pause(); + } + + public function resumeRecv(): void + { + $this->session->resume(); + } + + public function getSendBufferQueueSize(): int + { + return $this->session->getSendBufferQueueSize(); + } + + public function getRemoteIp(): string + { + return $this->connection->getRemoteIp(); + } + + public function updateConnection(UdpConnection $connection): void + { + $this->connection = $connection; + } + + public function isClosed(): bool + { + return $this->closed || $this->session->isClosed(); + } +} diff --git a/src/Transport/KcpFrameServerListener.php b/src/Transport/KcpFrameServerListener.php new file mode 100644 index 0000000..dc70e73 --- /dev/null +++ b/src/Transport/KcpFrameServerListener.php @@ -0,0 +1,177 @@ + */ + private array $connections = []; + /** @var array */ + private array $sessions = []; + private ?int $timerId = null; + private int $nextConnectionId = 1_000_000; + + public function __construct( + Worker $worker, + private readonly int $maxSendBuffer, + private readonly \Closure $onConnect, + private readonly \Closure $onFrame, + private readonly \Closure $onClose, + private readonly \Closure $onBufferDrain, + private readonly \Closure $onInvalidFrame, + ) { + $worker->onMessage = fn (UdpConnection $connection, string $data) => $this->handleMessage($connection, $data); + $worker->onWorkerStart = function (): void { + $this->timerId = Timer::add(0.02, fn () => $this->tick()); + }; + $worker->onWorkerStop = function (): void { + if ($this->timerId !== null) { + Timer::del($this->timerId); + $this->timerId = null; + } + }; + } + + private function handleMessage(UdpConnection $connection, string $data): void + { + $packet = KcpPacketCodec::decode($data); + if ($packet === null) { + $conv = KcpPacketCodec::rawKcpConv($data); + if ($conv === null) { + return; + } + $key = $this->key($connection, $conv); + $wrapped = $this->connections[$key] ?? null; + $session = $this->sessions[$key] ?? null; + if ($wrapped === null || $session === null) { + return; + } + + $wrapped->updateConnection($connection); + $wasFull = $session->getSendBufferQueueSize() >= $this->maxSendBuffer; + $session->receive($data); + if ($session->isClosed()) { + $this->closeConnection($key); + return; + } + if ($wasFull && $session->getSendBufferQueueSize() < $this->maxSendBuffer) { + ($this->onBufferDrain)($wrapped); + } + return; + } + + $key = $this->key($connection, $packet['conv']); + if ($packet['type'] === KcpPacketCodec::SYN) { + $this->handleSyn($connection, $key, $packet['conv']); + return; + } + + $wrapped = $this->connections[$key] ?? null; + $session = $this->sessions[$key] ?? null; + if ($wrapped === null || $session === null) { + return; + } + + if ($packet['type'] === KcpPacketCodec::CLOSE) { + $this->closeConnection($key); + return; + } + + $wrapped->updateConnection($connection); + $wasFull = $session->getSendBufferQueueSize() >= $this->maxSendBuffer; + $session->receive($packet); + if ($session->isClosed()) { + $this->closeConnection($key); + return; + } + if ($wasFull && $session->getSendBufferQueueSize() < $this->maxSendBuffer) { + ($this->onBufferDrain)($wrapped); + } + } + + private function handleSyn(UdpConnection $connection, string $key, int $conv): void + { + if (isset($this->connections[$key])) { + $connection->send(KcpPacketCodec::encode([ + 'type' => KcpPacketCodec::SYN_ACK, + 'conv' => $conv, + ])); + return; + } + + $session = $this->createSession( + $conv, + fn (string $packet): bool|null => $connection->send($packet), + function (Frame $frame) use ($key): void { + $wrapped = $this->connections[$key] ?? null; + if ($wrapped !== null) { + ($this->onFrame)($wrapped, $frame); + } + }, + function (\Throwable $e) use ($key): void { + $wrapped = $this->connections[$key] ?? null; + if ($wrapped !== null) { + ($this->onInvalidFrame)($wrapped, $e); + } + }, + ); + $wrapped = new KcpFrameServerConnection($this->nextConnectionId++, $connection, $session, $this->maxSendBuffer); + $this->sessions[$key] = $session; + $this->connections[$key] = $wrapped; + + $connection->send(KcpPacketCodec::encode([ + 'type' => KcpPacketCodec::SYN_ACK, + 'conv' => $conv, + ])); + ($this->onConnect)($wrapped); + } + + private function tick(): void + { + foreach ($this->sessions as $key => $session) { + $wrapped = $this->connections[$key] ?? null; + if ($wrapped === null || $wrapped->isClosed()) { + $this->closeConnection($key); + continue; + } + + $before = $session->getSendBufferQueueSize(); + $session->tick(); + if ($before >= $this->maxSendBuffer && $session->getSendBufferQueueSize() < $this->maxSendBuffer) { + ($this->onBufferDrain)($wrapped); + } + } + } + + private function closeConnection(string $key): void + { + $wrapped = $this->connections[$key] ?? null; + unset($this->connections[$key], $this->sessions[$key]); + if ($wrapped !== null) { + ($this->onClose)($wrapped); + } + } + + private function createSession(int $conv, \Closure $sendPacket, \Closure $onFrame, \Closure $onInvalidFrame): KcpReliableSession|NativeKcpSession + { + $backend = strtolower(trim((string)(getenv('LAYLINK_KCP_BACKEND') ?: 'ffi'))); + if ($backend === 'php') { + return new KcpReliableSession($conv, $sendPacket, $onFrame, $onInvalidFrame); + } + + $libraryPath = (string)(getenv('LAYLINK_KCP_FFI_LIB') ?: dirname(__DIR__, 2) . '/native/kcp/liblaylink_kcp.so'); + return new NativeKcpSession($conv, $sendPacket, $onFrame, $onInvalidFrame, $libraryPath); + } + + private function key(UdpConnection $connection, int $conv): string + { + return $connection->getRemoteAddress() . '#' . $conv; + } +} diff --git a/src/Transport/KcpPacketCodec.php b/src/Transport/KcpPacketCodec.php new file mode 100644 index 0000000..08415ed --- /dev/null +++ b/src/Transport/KcpPacketCodec.php @@ -0,0 +1,71 @@ + (int)$header['type'], + 'conv' => (int)$header['conv'], + 'seq' => (int)$header['seq'], + 'ack' => (int)$header['ack'], + 'message_id' => (int)$header['message_id'], + 'fragment_index' => (int)$header['fragment_index'], + 'fragment_count' => (int)$header['fragment_count'], + 'payload' => substr($bytes, self::HEADER_LENGTH), + ]; + } + + public static function rawKcpConv(string $bytes): ?int + { + if (strlen($bytes) < 4 || substr($bytes, 0, 4) === self::MAGIC) { + return null; + } + + $decoded = unpack('Vconv', substr($bytes, 0, 4)); + return $decoded === false ? null : (int)$decoded['conv']; + } +} diff --git a/src/Transport/KcpReliableSession.php b/src/Transport/KcpReliableSession.php new file mode 100644 index 0000000..4433810 --- /dev/null +++ b/src/Transport/KcpReliableSession.php @@ -0,0 +1,235 @@ + */ + private array $unacked = []; + /** @var array */ + private array $pendingSegments = []; + /** @var array> */ + private array $messageFragments = []; + /** @var array */ + private array $messageFragmentCounts = []; + /** @var Frame[] */ + private array $pausedFrames = []; + private bool $paused = false; + private bool $closed = false; + + public function __construct( + private readonly int $conv, + private readonly \Closure $sendPacket, + private readonly \Closure $onFrame, + private readonly \Closure $onInvalidFrame, + ) { + $this->parser = new FrameParser(); + } + + public function conv(): int + { + return $this->conv; + } + + public function sendFrame(Frame $frame, int $maxSendBuffer): bool|null + { + if ($this->closed || $this->getSendBufferQueueSize() >= $maxSendBuffer) { + return false; + } + + $bytes = FrameCodec::encode($frame); + $payloadSize = self::MTU - 25; + $fragments = str_split($bytes, $payloadSize); + $messageId = $this->nextMessageId++; + $fragmentCount = count($fragments); + + foreach ($fragments as $fragmentIndex => $payload) { + $seq = $this->nextSeq++; + $packet = KcpPacketCodec::encode([ + 'type' => KcpPacketCodec::DATA, + 'conv' => $this->conv, + 'seq' => $seq, + 'message_id' => $messageId, + 'fragment_index' => $fragmentIndex, + 'fragment_count' => $fragmentCount, + 'payload' => $payload, + ]); + $this->unacked[$seq] = [ + 'packet' => $packet, + 'sent_at' => $this->nowMs(), + 'bytes' => strlen($packet), + ]; + ($this->sendPacket)($packet); + } + + return true; + } + + /** + * @param array{type:int, conv:int, seq:int, ack:int, message_id:int, fragment_index:int, fragment_count:int, payload:string}|string $packet + */ + public function receive(array|string $packet): void + { + if ($this->closed || is_string($packet)) { + return; + } + + if ($packet['type'] === KcpPacketCodec::ACK) { + unset($this->unacked[$packet['ack']]); + return; + } + + if ($packet['type'] === KcpPacketCodec::CLOSE) { + $this->closed = true; + return; + } + + if ($packet['type'] !== KcpPacketCodec::DATA) { + return; + } + + $this->ack($packet['seq']); + if ($packet['seq'] < $this->expectedSeq) { + return; + } + + $this->pendingSegments[$packet['seq']] = [ + 'message_id' => $packet['message_id'], + 'fragment_index' => $packet['fragment_index'], + 'fragment_count' => $packet['fragment_count'], + 'payload' => $packet['payload'], + ]; + $this->drainOrderedSegments(); + } + + public function tick(): void + { + if ($this->closed) { + return; + } + + $now = $this->nowMs(); + foreach ($this->unacked as $seq => $pending) { + if ($now - $pending['sent_at'] < self::RESEND_AFTER_MS) { + continue; + } + + $this->unacked[$seq]['sent_at'] = $now; + ($this->sendPacket)($pending['packet']); + } + } + + public function pause(): void + { + $this->paused = true; + } + + public function resume(): void + { + $this->paused = false; + while (!$this->paused && $this->pausedFrames !== []) { + $frame = array_shift($this->pausedFrames); + ($this->onFrame)($frame); + } + } + + public function close(): void + { + if ($this->closed) { + return; + } + + $this->closed = true; + ($this->sendPacket)(KcpPacketCodec::encode([ + 'type' => KcpPacketCodec::CLOSE, + 'conv' => $this->conv, + ])); + } + + public function isClosed(): bool + { + return $this->closed; + } + + public function getSendBufferQueueSize(): int + { + $bytes = 0; + foreach ($this->unacked as $pending) { + $bytes += $pending['bytes']; + } + + return $bytes; + } + + /** + * @param array{message_id:int, fragment_index:int, fragment_count:int, payload:string} $segment + */ + private function receiveOrderedSegment(array $segment): void + { + $messageId = $segment['message_id']; + $fragmentIndex = $segment['fragment_index']; + $fragmentCount = $segment['fragment_count']; + if ($fragmentCount < 1 || $fragmentIndex < 0 || $fragmentIndex >= $fragmentCount) { + return; + } + + $this->messageFragmentCounts[$messageId] = $fragmentCount; + $this->messageFragments[$messageId][$fragmentIndex] = $segment['payload']; + if (count($this->messageFragments[$messageId]) !== $fragmentCount) { + return; + } + + ksort($this->messageFragments[$messageId]); + $bytes = implode('', $this->messageFragments[$messageId]); + unset($this->messageFragments[$messageId], $this->messageFragmentCounts[$messageId]); + + try { + foreach ($this->parser->push($bytes) as $frame) { + if ($this->paused) { + $this->pausedFrames[] = $frame; + continue; + } + ($this->onFrame)($frame); + } + } catch (\Throwable $e) { + ($this->onInvalidFrame)($e); + } + } + + private function drainOrderedSegments(): void + { + while (isset($this->pendingSegments[$this->expectedSeq])) { + $segment = $this->pendingSegments[$this->expectedSeq]; + unset($this->pendingSegments[$this->expectedSeq]); + $this->expectedSeq++; + $this->receiveOrderedSegment($segment); + } + } + + private function ack(int $seq): void + { + ($this->sendPacket)(KcpPacketCodec::encode([ + 'type' => KcpPacketCodec::ACK, + 'conv' => $this->conv, + 'ack' => $seq, + ])); + } + + private function nowMs(): float + { + return microtime(true) * 1000; + } +} diff --git a/src/Transport/NativeKcpLibrary.php b/src/Transport/NativeKcpLibrary.php new file mode 100644 index 0000000..9b9d048 --- /dev/null +++ b/src/Transport/NativeKcpLibrary.php @@ -0,0 +1,52 @@ +ffi = NativeKcpLibrary::load($this->libraryPath); + $this->session = $this->ffi->laylink_kcp_create($conv); + if ($this->session === null) { + throw new \RuntimeException('kcp_create_failed'); + } + + $this->parser = new FrameParser(); + } + + public function __destruct() + { + if (!$this->closed) { + $this->closed = true; + $this->ffi->laylink_kcp_release($this->session); + } + } + + public function conv(): int + { + return $this->conv; + } + + public function sendFrame(Frame $frame, int $maxSendBuffer): bool|null + { + if ($this->closed || $this->queuedBytes >= $maxSendBuffer) { + return false; + } + + $bytes = FrameCodec::encode($frame); + $result = $this->ffi->laylink_kcp_send($this->session, $bytes, strlen($bytes)); + if ($result < 0) { + return false; + } + + $this->queuedBytes += strlen($bytes); + $this->ffi->laylink_kcp_flush($this->session); + $this->drainOutput(); + return true; + } + + public function receive(string|array $packet): void + { + if ($this->closed || !is_string($packet)) { + return; + } + + if ($this->ffi->laylink_kcp_input($this->session, $packet, strlen($packet)) < 0) { + return; + } + + $this->drainFrames(); + $this->drainOutput(); + } + + public function tick(): void + { + if ($this->closed) { + return; + } + + $this->ffi->laylink_kcp_update($this->session, $this->nowMs()); + $this->drainFrames(); + $this->drainOutput(); + } + + public function pause(): void + { + $this->paused = true; + } + + public function resume(): void + { + $this->paused = false; + while (!$this->paused && $this->pausedFrames !== []) { + $frame = array_shift($this->pausedFrames); + ($this->onFrame)($frame); + } + } + + public function close(): void + { + if ($this->closed) { + return; + } + + $this->closed = true; + ($this->sendPacket)(KcpPacketCodec::encode([ + 'type' => KcpPacketCodec::CLOSE, + 'conv' => $this->conv, + ])); + $this->ffi->laylink_kcp_release($this->session); + } + + public function isClosed(): bool + { + return $this->closed; + } + + public function getSendBufferQueueSize(): int + { + return max($this->queuedBytes, $this->pendingOutputBytes()); + } + + private function drainFrames(): void + { + while (($size = $this->ffi->laylink_kcp_peeksize($this->session)) > 0) { + if ($size > self::RECEIVE_BUFFER_BYTES) { + ($this->onInvalidFrame)(new \RuntimeException('kcp_message_too_large')); + $this->close(); + return; + } + + $buffer = $this->ffi->new("char[$size]"); + $read = $this->ffi->laylink_kcp_recv($this->session, $buffer, $size); + if ($read <= 0) { + return; + } + + $this->queuedBytes = max(0, $this->queuedBytes - $read); + $bytes = FFI::string($buffer, $read); + try { + foreach ($this->parser->push($bytes) as $frame) { + if ($this->paused) { + $this->pausedFrames[] = $frame; + continue; + } + ($this->onFrame)($frame); + } + } catch (\Throwable $e) { + ($this->onInvalidFrame)($e); + } + } + } + + private function drainOutput(): void + { + while (($size = $this->ffi->laylink_kcp_pending_output_size($this->session)) > 0) { + $buffer = $this->ffi->new("char[$size]"); + $read = $this->ffi->laylink_kcp_pop_output($this->session, $buffer, $size); + if ($read <= 0) { + return; + } + ($this->sendPacket)(FFI::string($buffer, $read)); + } + } + + private function pendingOutputBytes(): int + { + $size = $this->ffi->laylink_kcp_pending_output_size($this->session); + return $size > 0 ? $size : 0; + } + + private function nowMs(): int + { + return (int)floor(microtime(true) * 1000); + } +} diff --git a/src/Transport/TcpFrameClientTransport.php b/src/Transport/TcpFrameClientTransport.php new file mode 100644 index 0000000..566984c --- /dev/null +++ b/src/Transport/TcpFrameClientTransport.php @@ -0,0 +1,85 @@ +parser = new FrameParser(); + } + + public function connect(): void + { + $this->parser = new FrameParser(); + $connection = new AsyncTcpConnection($this->address); + $connection->maxSendBufferSize = $this->maxSendBuffer; + $this->connection = $connection; + + $connection->onConnect = function () use ($connection): void { + ($this->onConnect)($this); + }; + $connection->onMessage = function (AsyncTcpConnection $connection, string $data): void { + try { + foreach ($this->parser->push($data) as $frame) { + ($this->onFrame)($this, $frame); + } + } catch (\Throwable $e) { + ($this->onInvalidFrame)($e); + $connection->close(); + } + }; + $connection->onBufferDrain = function () use ($connection): void { + ($this->onBufferDrain)($this); + }; + $connection->onClose = function () use ($connection): void { + if ($this->connection === $connection) { + $this->connection = null; + } + ($this->onClose)($this); + }; + $connection->connect(); + } + + public function send(Frame $frame): bool|null + { + return $this->connection?->send(FrameCodec::encode($frame)); + } + + public function close(): void + { + $this->connection?->close(); + } + + public function pauseRecv(): void + { + $this->connection?->pauseRecv(); + } + + public function resumeRecv(): void + { + $this->connection?->resumeRecv(); + } + + public function getSendBufferQueueSize(): int + { + return $this->connection?->getSendBufferQueueSize() ?? 0; + } +} diff --git a/src/Transport/TcpFrameServerConnection.php b/src/Transport/TcpFrameServerConnection.php new file mode 100644 index 0000000..f64851c --- /dev/null +++ b/src/Transport/TcpFrameServerConnection.php @@ -0,0 +1,63 @@ +parser = new FrameParser(); + } + + public function id(): int + { + return $this->connection->id; + } + + /** + * @return Frame[] + */ + public function push(string $data): array + { + return $this->parser->push($data); + } + + public function send(Frame $frame): bool|null + { + return $this->connection->send(FrameCodec::encode($frame)); + } + + public function close(): void + { + $this->connection->close(); + } + + public function pauseRecv(): void + { + $this->connection->pauseRecv(); + } + + public function resumeRecv(): void + { + $this->connection->resumeRecv(); + } + + public function getSendBufferQueueSize(): int + { + return $this->connection->getSendBufferQueueSize(); + } + + public function getRemoteIp(): string + { + return $this->connection->getRemoteIp(); + } +} diff --git a/src/Transport/TcpFrameServerListener.php b/src/Transport/TcpFrameServerListener.php new file mode 100644 index 0000000..eec8cf8 --- /dev/null +++ b/src/Transport/TcpFrameServerListener.php @@ -0,0 +1,66 @@ + */ + private array $connections = []; + + public function __construct( + Worker $worker, + private readonly int $maxSendBuffer, + private readonly \Closure $onConnect, + private readonly \Closure $onFrame, + private readonly \Closure $onClose, + private readonly \Closure $onBufferDrain, + private readonly \Closure $onInvalidFrame, + ) { + $worker->onConnect = fn (TcpConnection $connection) => $this->handleConnect($connection); + $worker->onMessage = fn (TcpConnection $connection, string $data) => $this->handleMessage($connection, $data); + $worker->onClose = fn (TcpConnection $connection) => $this->handleClose($connection); + } + + private function handleConnect(TcpConnection $connection): void + { + $connection->maxSendBufferSize = $this->maxSendBuffer; + $wrapped = new TcpFrameServerConnection($connection); + $this->connections[$connection->id] = $wrapped; + $connection->onBufferDrain = function () use ($wrapped): void { + ($this->onBufferDrain)($wrapped); + }; + ($this->onConnect)($wrapped); + } + + private function handleMessage(TcpConnection $connection, string $data): void + { + $wrapped = $this->connections[$connection->id] ?? null; + if ($wrapped === null) { + return; + } + + try { + foreach ($wrapped->push($data) as $frame) { + ($this->onFrame)($wrapped, $frame); + } + } catch (\Throwable $e) { + ($this->onInvalidFrame)($wrapped, $e); + } + } + + private function handleClose(TcpConnection $connection): void + { + $wrapped = $this->connections[$connection->id] ?? null; + unset($this->connections[$connection->id]); + if ($wrapped === null) { + return; + } + + ($this->onClose)($wrapped); + } +} diff --git a/src/Util/Env.php b/src/Util/Env.php index fc73bb2..c27fc65 100644 --- a/src/Util/Env.php +++ b/src/Util/Env.php @@ -74,6 +74,17 @@ final class Env return $default; } + $value = trim($value); + if (str_starts_with($value, '[')) { + $decoded = json_decode($value, true); + if (is_array($decoded)) { + return array_values(array_filter(array_map( + static fn (mixed $item): string => strtolower(trim((string)$item)), + $decoded, + ))); + } + } + return array_values(array_filter(array_map( static fn (string $item): string => strtolower(trim($item)), explode(',', $value),