Add KCP support

This commit is contained in:
EchoNoch 2026-05-29 01:06:18 +08:00
parent 6d9299eeb0
commit 661ed401da
31 changed files with 3829 additions and 174 deletions

View File

@ -15,6 +15,10 @@ LAYLINK_MAX_SEND_BUFFER_BYTES=67108864
# 单连接发送缓冲区上限;单位字节。大文件下载建议 33554432 或 67108864内存紧张时调小。
LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432
# 背压触发水位;单位字节。应小于 LAYLINK_MAX_SEND_BUFFER_BYTES达到后会暂停上游读取直到缓冲排空。
LAYLINK_KCP_BACKEND=ffi
# KCP 实现后端;可选 ffi、php。ffi 使用 native ikcp.c 动态库生产建议使用php 是调试回退实现。
LAYLINK_KCP_FFI_LIB=native/kcp/liblaylink_kcp.so
# native KCP 动态库路径LAYLINK_KCP_BACKEND=ffi 时使用。相对路径按项目根目录解析,先运行 scripts/build-kcp-ffi.sh 构建。
[client-agent]
NODE_ID=client-01
@ -26,9 +30,11 @@ NODE_TOKEN=CHANGE_ME
NODE_ZONE=default
# 当前 Agent 所在逻辑区域;可按部署场景填写,例如 local、corp、restricted-a。
POP_SERVER_ADDRESS=tcp://127.0.0.1:9001
# Agent 出站连接 POP Server 的地址;格式为 tcp://host:port例如 tcp://10.1.0.2:9001。
# Agent 出站连接 POP Server 的地址;格式为 tcp://host:port例如 tcp://10.1.0.2:9001AGENT_TRANSPORT_PROTOCOL=kcp 时会使用同一 host:port 的 UDP
AGENT_TRANSPORT_PROTOCOL=tcp
# 当前 Agent 到 POP Server 使用的传输协议;可选值 tcp、udp、kcp必须被 POP_ALLOWED_AGENT_TRANSPORTS 允许,当前可运行值为 tcp。
# 当前 Agent 到 POP Server 使用的传输协议;可选值 tcp、udp、kcp必须被 POP_ALLOWED_AGENT_TRANSPORTS 允许,当前可运行值为 tcp、kcp。
CLIENT_AGENT_POP_CONNECTIONS=1
# Client Agent 到 POP Server 的并行长连接数量;默认 1。提高到 2、4 可分摊多会话,当前 tcp/kcp 生效。
CLIENT_AGENT_AUTH_TOKEN=dev-token
# Client Agent 为 SOCKS5/HTTP 代理入口生成 OPEN 帧时使用的客户端认证 token当前 MVP 默认 dev-token。
CLIENT_AGENT_USER_ID=admin
@ -67,5 +73,5 @@ CLIENT_AGENT_RAW_JSON_LISTEN_PORT=9000
[pop-server]
POP_AGENT_LISTEN=0.0.0.0:9001
# POP Server 监听 Agent 长连接的地址;格式为 host:port例如 0.0.0.0:9001 或 127.0.0.1:9001。
POP_ALLOWED_AGENT_TRANSPORTS=tcp
# POP Server 允许 Client Agent 使用的传输协议;逗号分隔,可选值 tcp、udp、kcp当前已实现 tcpudp/kcp 为预留实现。
POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp
# POP Server 允许 Client Agent 使用的传输协议;可写逗号数组 tcp,kcp也可写 JSON 数组 ["tcp","kcp"];当前已实现 tcp、kcpudp 为预留实现。

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
.env
runtime/*
runtime/*
native/kcp/*.so

View File

@ -42,6 +42,7 @@ $bootAgent = function (string $protocol, string $listen, string $name) use ($nod
Env::int('LAYLINK_MAX_SEND_BUFFER_BYTES', 64 * 1024 * 1024, 1024 * 1024),
Env::int('LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES', 32 * 1024 * 1024, 512 * 1024),
Env::int('LAYLINK_DATA_CHUNK_BYTES', 1024 * 1024, 16 * 1024, 8 * 1024 * 1024),
Env::int('CLIENT_AGENT_POP_CONNECTIONS', 1, 1, 16),
);
$agent->boot($name);
};

View File

@ -2,7 +2,7 @@
## Implementation Status
Last updated: 2026-05-28 Asia/Shanghai.
Last updated: 2026-05-29 Asia/Shanghai.
Current phase: MVP bootstrap in progress.
@ -24,7 +24,7 @@ Direction update under evaluation:
* POP Server uses `POP_ALLOWED_AGENT_TRANSPORTS` to allow one or more Agent-to-POP transports.
* Agent uses `AGENT_TRANSPORT_PROTOCOL` to choose one concrete transport.
* Allowed names are `tcp`, `udp`, and `kcp`.
* Current runnable implementation is `tcp`; `udp` and `kcp` are reserved and must be implemented behind a transport abstraction.
* Current runnable implementations are `tcp` and experimental `kcp`; `udp` is reserved.
* Feasibility:
* Workerman supports long-running async TCP servers and custom protocols; it is suitable for the framed fallback/control channel.
* KCP itself is a UDP-based reliable ARQ protocol, so adding KCP means adding a UDP transport layer and session demultiplexing below the existing LayLink frame protocol.
@ -33,7 +33,7 @@ Direction update under evaluation:
1. Complete Client Agent naming migration in code, docs, config, and entrypoints.
2. Implement TCP-framed Client Agent -> POP -> public target path.
3. Define `TransportInterface` so frame protocol can run over TCP now and KCP later.
4. Add KCP-over-UDP transport via extension/FFI/proxy after the TCP framed path is stable.
4. Add KCP-over-UDP transport behind the transport abstraction after the TCP framed path is stable.
* Main risk:
* KCP is not a socket by itself. It needs UDP I/O, timers, packet flush/update scheduling, MTU handling, retransmission tuning, and connection/session management.
* PHP-only KCP may work as a prototype but is likely CPU-heavy under concurrency.
@ -177,6 +177,47 @@ Completed in this checkpoint:
* Client Agent now treats POP `CLOSE` as a graceful remote EOF and waits for the local client send buffer to drain before closing the local socket.
* TCP `DATA` is split into configurable chunks, defaulting to 1 MiB, to reduce frame overhead while avoiding oversized frames.
* POP refreshes Agent activity on any valid frame, not only `PING`, reducing heartbeat false positives during heavy traffic.
* Started Agent-to-POP transport abstraction:
* Added `FrameClientTransport`.
* Added `TcpFrameClientTransport` as the current TCP implementation.
* `AgentClient` now sends and receives LayLink frames through the transport interface instead of directly owning `AsyncTcpConnection` and `FrameParser`.
* This preserves current TCP behavior while preparing a `KcpFrameClientTransport` implementation.
* Added POP-side frame transport abstraction:
* Added `FrameServerConnection`.
* Added `TcpFrameServerConnection`.
* Added `TcpFrameServerListener`.
* `AgentListener`, `NodeConnection`, `NodeRegistry`, and `TunnelSession` now hold Agent connections through `FrameServerConnection`.
* TCP listener decode/encode details are isolated from POP session, policy, heartbeat, and relay logic.
* Added transport factory/config selection:
* `FrameClientTransportFactory` maps `AGENT_TRANSPORT_PROTOCOL=tcp` to `TcpFrameClientTransport`.
* `FrameServerListenerFactory` maps the implemented POP transport `tcp` to `TcpFrameServerListener`.
* `FrameClientTransportFactory` maps `AGENT_TRANSPORT_PROTOCOL=kcp` to `KcpFrameClientTransport`.
* `FrameServerListenerFactory` maps POP transport `kcp` to `KcpFrameServerListener`.
* `udp` still fails at factory boundaries with explicit not-implemented errors instead of leaking into business logic.
* Added experimental multi-connection Client Agent -> POP support:
* `CLIENT_AGENT_POP_CONNECTIONS` controls how many parallel Agent-to-POP long connections a Client Agent opens.
* New local TCP sessions are distributed round-robin across authenticated POP transports.
* Each session stays bound to its selected POP transport for the whole session lifetime.
* POP `NodeRegistry` now supports multiple live connections under the same `NODE_ID`.
* Heartbeat activity and offline cleanup are tracked per Agent connection.
* KCP/UDP implementation decision:
* Start with `kcp` before raw `udp` for Agent-to-POP frame transport.
* Existing TCP tunnel sessions require ordered, reliable byte-stream semantics; raw UDP would need retransmission, ordering, MTU fragmentation, congestion/window handling, and session cleanup.
* Implementing raw UDP as a general Frame transport would effectively recreate a weaker KCP.
* Keep the existing SOCKS5 `UDP ASSOCIATE`/`UDP_DATA` feature separate: it is application datagram relay over the current reliable Agent-to-POP channel, not the Agent-to-POP transport itself.
* Recommended KCP path is a transport implementation behind `FrameClientTransport` / `FrameServerConnection`, backed by a native extension, FFI binding, or sidecar process rather than pure PHP for production throughput.
* Added KCP Agent-to-POP transport:
* `KcpPacketCodec` defines UDP packet types for `SYN`, `SYN_ACK`, `DATA`, `ACK`, and `CLOSE`.
* `KcpFrameClientTransport` runs Client Agent frames over UDP while preserving the existing `FrameClientTransport` interface.
* `KcpFrameServerListener` and `KcpFrameServerConnection` expose KCP/UDP sessions to POP as `FrameServerConnection`.
* POP can now listen on both TCP and KCP when `POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp`.
* `NativeKcpSession` uses PHP FFI to call native upstream `ikcp.c` through `native/kcp/liblaylink_kcp.so`.
* `scripts/build-kcp-ffi.sh` builds the native shared library from vendored `native/kcp/ikcp.c`.
* `LAYLINK_KCP_BACKEND=ffi` selects the native KCP backend; `LAYLINK_KCP_BACKEND=php` remains as a debugging fallback through `KcpReliableSession`.
* `LAYLINK_KCP_FFI_LIB` can point to a custom native KCP library path.
* Added array-style env parsing:
* `Env::csv()` accepts traditional comma-separated values such as `tcp,kcp`.
* `Env::csv()` also accepts JSON arrays such as `["tcp","kcp"]`.
Known MVP limitations:
@ -190,7 +231,7 @@ Known MVP limitations:
* No TLS yet.
* No production-grade client identity yet; `dev-token` is hardcoded for MVP development.
* No automated integration test harness yet.
* TCP stream forwarding is still single Agent-to-POP connection based; binary `DATA` frames, chunking, graceful EOF, and backpressure reduce per-byte overhead and buffer blowups, but KCP/multipath/parallel transport and per-session flow-control tuning are still future performance work.
* TCP stream forwarding can now use multiple Agent-to-POP connections per Client Agent, but a single TCP session is still pinned to one POP transport. Binary `DATA` frames, chunking, graceful EOF, and backpressure reduce per-byte overhead and buffer blowups; KCP is experimental and still needs throughput/loss tuning, while multipath and per-session flow-control tuning are future performance work.
* No explicit idle timeout or connect timeout enforcement yet.
* UDP relay is datagram-oriented and currently creates short-lived POP-side UDP sockets per outbound datagram; pooling and stronger timeout accounting are still future work.
* HTTP proxy supports `CONNECT` and ordinary absolute URL HTTP requests; advanced proxy auth and full HTTP/2 proxying are not implemented.
@ -203,13 +244,18 @@ Next recommended tasks:
4. Add more detailed buffer overflow audit reasons and metrics.
5. Add README quickstart with exact local commands.
6. Add a reproducible throughput benchmark script for direct-vs-LayLink comparisons.
7. Add multi-connection Agent-to-POP support so multiple Client Agent workers can spread concurrent sessions safely.
8. Add KCP or another UDP-based reliable transport behind the transport abstraction.
9. Add per-session flow-control windows to reduce head-of-line blocking on one Agent connection.
10. Optimize UDP relay with POP-side UDP socket pooling.
11. Add UDP association idle timeouts and cleanup.
12. Aggregate UDP audit records per association instead of per datagram.
13. Add UDP and per-user rate limiting.
7. Keep TCP tuning as an ongoing task:
* benchmark `LAYLINK_DATA_CHUNK_BYTES` at `524288`, `1048576`, `2097152`, and `4194304`
* benchmark buffer pairs such as `64MiB/32MiB` and `128MiB/64MiB`
* record direct-vs-LayLink throughput, CPU, memory, and disconnect behavior
8. Benchmark and tune `CLIENT_AGENT_POP_CONNECTIONS` for 1, 2, 4, and 8 long connections under mixed single-download and multi-session workloads.
9. Benchmark native FFI `kcp` against `tcp` under latency, loss, and high-throughput workloads; tune KCP nodelay, window, MTU, resend, and interval settings.
10. Add raw UDP Agent-to-POP transport only for explicitly datagram-oriented frame classes, or after a reliability/window design exists.
11. Add per-session flow-control windows to reduce head-of-line blocking on one Agent connection.
12. Optimize UDP relay with POP-side UDP socket pooling.
13. Add UDP association idle timeouts and cleanup.
14. Aggregate UDP audit records per association instead of per datagram.
15. Add UDP and per-user rate limiting.
## 0. Project Name

1423
native/kcp/ikcp.c Normal file

File diff suppressed because it is too large Load Diff

452
native/kcp/ikcp.h Normal file
View File

@ -0,0 +1,452 @@
//=====================================================================
//
// KCP - A Better ARQ Protocol Implementation
// skywind3000 (at) gmail.com, 2010-2011
//
// Features:
// + Average RTT reduce 30% - 40% vs traditional ARQ like tcp.
// + Maximum RTT reduce three times vs tcp.
// + Lightweight, distributed as a single source file.
//
//=====================================================================
#ifndef _IKCP_H_
#define _IKCP_H_
#include <stddef.h>
#include <stdlib.h>
#include <assert.h>
//=====================================================================
// 32BIT INTEGER DEFINITION
//=====================================================================
#ifndef __INTEGER_32_BITS__
#define __INTEGER_32_BITS__
#if defined(_WIN64) || defined(WIN64) || defined(__amd64__) || \
defined(__x86_64) || defined(__x86_64__) || defined(_M_IA64) || \
defined(_M_AMD64)
typedef unsigned int ISTDUINT32;
typedef int ISTDINT32;
#elif defined(_WIN32) || defined(WIN32) || defined(__i386__) || \
defined(__i386) || defined(_M_X86)
typedef unsigned long ISTDUINT32;
typedef long ISTDINT32;
#elif defined(__MACOS__)
typedef UInt32 ISTDUINT32;
typedef SInt32 ISTDINT32;
#elif defined(__APPLE__) && defined(__MACH__)
#include <sys/types.h>
typedef u_int32_t ISTDUINT32;
typedef int32_t ISTDINT32;
#elif defined(__BEOS__)
#include <sys/inttypes.h>
typedef u_int32_t ISTDUINT32;
typedef int32_t ISTDINT32;
#elif (defined(_MSC_VER) || defined(__BORLANDC__)) && (!defined(__MSDOS__))
typedef unsigned __int32 ISTDUINT32;
typedef __int32 ISTDINT32;
#elif defined(__GNUC__)
#include <stdint.h>
typedef uint32_t ISTDUINT32;
typedef int32_t ISTDINT32;
#else
typedef unsigned long ISTDUINT32;
typedef long ISTDINT32;
#endif
#endif
//=====================================================================
// Integer Definition
//=====================================================================
#ifndef __IINT8_DEFINED
#define __IINT8_DEFINED
typedef char IINT8;
#endif
#ifndef __IUINT8_DEFINED
#define __IUINT8_DEFINED
typedef unsigned char IUINT8;
#endif
#ifndef __IUINT16_DEFINED
#define __IUINT16_DEFINED
typedef unsigned short IUINT16;
#endif
#ifndef __IINT16_DEFINED
#define __IINT16_DEFINED
typedef short IINT16;
#endif
#ifndef __IINT32_DEFINED
#define __IINT32_DEFINED
typedef ISTDINT32 IINT32;
#endif
#ifndef __IUINT32_DEFINED
#define __IUINT32_DEFINED
typedef ISTDUINT32 IUINT32;
#endif
#ifndef __IINT64_DEFINED
#define __IINT64_DEFINED
#if defined(_MSC_VER) || defined(__BORLANDC__)
typedef __int64 IINT64;
#else
typedef long long IINT64;
#endif
#endif
#ifndef __IUINT64_DEFINED
#define __IUINT64_DEFINED
#if defined(_MSC_VER) || defined(__BORLANDC__)
typedef unsigned __int64 IUINT64;
#else
typedef unsigned long long IUINT64;
#endif
#endif
#ifndef INLINE
#if defined(__GNUC__)
#if (__GNUC__ > 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ >= 1))
#define INLINE __inline__ __attribute__((always_inline))
#else
#define INLINE __inline__
#endif
#elif (defined(_MSC_VER) || defined(__BORLANDC__) || defined(__WATCOMC__))
#define INLINE __inline
#else
#define INLINE
#endif
#endif
#if (!defined(__cplusplus)) && (!defined(inline))
#define inline INLINE
#endif
//=====================================================================
// QUEUE DEFINITION
//=====================================================================
#ifndef __IQUEUE_DEF__
#define __IQUEUE_DEF__
struct IQUEUEHEAD {
struct IQUEUEHEAD *next, *prev;
};
typedef struct IQUEUEHEAD iqueue_head;
//---------------------------------------------------------------------
// queue init
//---------------------------------------------------------------------
#define IQUEUE_HEAD_INIT(name) { &(name), &(name) }
#define IQUEUE_HEAD(name) \
struct IQUEUEHEAD name = IQUEUE_HEAD_INIT(name)
#define IQUEUE_INIT(ptr) ( \
(ptr)->next = (ptr), (ptr)->prev = (ptr))
#define IOFFSETOF(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
#define ICONTAINEROF(ptr, type, member) ( \
(type*)( ((char*)((type*)ptr)) - IOFFSETOF(type, member)) )
#define IQUEUE_ENTRY(ptr, type, member) ICONTAINEROF(ptr, type, member)
//---------------------------------------------------------------------
// queue operation
//---------------------------------------------------------------------
#define IQUEUE_ADD(node, head) ( \
(node)->prev = (head), (node)->next = (head)->next, \
(head)->next->prev = (node), (head)->next = (node))
#define IQUEUE_ADD_TAIL(node, head) ( \
(node)->prev = (head)->prev, (node)->next = (head), \
(head)->prev->next = (node), (head)->prev = (node))
#define IQUEUE_DEL_BETWEEN(p, n) ((n)->prev = (p), (p)->next = (n))
#define IQUEUE_DEL(entry) (\
(entry)->next->prev = (entry)->prev, \
(entry)->prev->next = (entry)->next, \
(entry)->next = 0, (entry)->prev = 0)
#define IQUEUE_DEL_INIT(entry) do { \
IQUEUE_DEL(entry); IQUEUE_INIT(entry); } while (0)
#define IQUEUE_IS_EMPTY(entry) ((entry) == (entry)->next)
#define iqueue_init IQUEUE_INIT
#define iqueue_entry IQUEUE_ENTRY
#define iqueue_add IQUEUE_ADD
#define iqueue_add_tail IQUEUE_ADD_TAIL
#define iqueue_del IQUEUE_DEL
#define iqueue_del_init IQUEUE_DEL_INIT
#define iqueue_is_empty IQUEUE_IS_EMPTY
#define IQUEUE_FOREACH(iterator, head, TYPE, MEMBER) \
for ((iterator) = iqueue_entry((head)->next, TYPE, MEMBER); \
&((iterator)->MEMBER) != (head); \
(iterator) = iqueue_entry((iterator)->MEMBER.next, TYPE, MEMBER))
#define iqueue_foreach(iterator, head, TYPE, MEMBER) \
IQUEUE_FOREACH(iterator, head, TYPE, MEMBER)
#define iqueue_foreach_entry(pos, head) \
for( (pos) = (head)->next; (pos) != (head) ; (pos) = (pos)->next )
#define __iqueue_splice(list, head) do { \
iqueue_head *first = (list)->next, *last = (list)->prev; \
iqueue_head *at = (head)->next; \
(first)->prev = (head), (head)->next = (first); \
(last)->next = (at), (at)->prev = (last); } while (0)
#define iqueue_splice(list, head) do { \
if (!iqueue_is_empty(list)) __iqueue_splice(list, head); } while (0)
#define iqueue_splice_init(list, head) do { \
iqueue_splice(list, head); iqueue_init(list); } while (0)
#ifdef _MSC_VER
#pragma warning(disable:4311)
#pragma warning(disable:4312)
#pragma warning(disable:4996)
#endif
#endif
//---------------------------------------------------------------------
// BYTE ORDER & ALIGNMENT
//---------------------------------------------------------------------
#ifndef IWORDS_BIG_ENDIAN
#ifdef _BIG_ENDIAN_
#if _BIG_ENDIAN_
#define IWORDS_BIG_ENDIAN 1
#endif
#endif
#ifndef IWORDS_BIG_ENDIAN
#if defined(__hppa__) || \
defined(__m68k__) || defined(mc68000) || defined(_M_M68K) || \
(defined(__MIPS__) && defined(__MIPSEB__)) || \
defined(__ppc__) || defined(__POWERPC__) || defined(_M_PPC) || \
defined(__sparc__) || defined(__powerpc__) || \
defined(__mc68000__) || defined(__s390x__) || defined(__s390__)
#define IWORDS_BIG_ENDIAN 1
#endif
#endif
#ifndef IWORDS_BIG_ENDIAN
#define IWORDS_BIG_ENDIAN 0
#endif
#endif
#ifndef IWORDS_MUST_ALIGN
#if defined(__i386__) || defined(__i386) || defined(_i386_)
#define IWORDS_MUST_ALIGN 0
#elif defined(_M_IX86) || defined(_X86_) || defined(__x86_64__)
#define IWORDS_MUST_ALIGN 0
#elif defined(__amd64) || defined(__amd64__)
#define IWORDS_MUST_ALIGN 0
#else
#define IWORDS_MUST_ALIGN 1
#endif
#endif
//=====================================================================
// Predefine struct
//=====================================================================
struct IKCPCB;
typedef struct IKCPCB ikcpcb;
//=====================================================================
// SEGMENT
//=====================================================================
struct IKCPSEG
{
struct IQUEUEHEAD node;
IUINT32 conv;
IUINT32 cmd;
IUINT32 frg;
IUINT32 wnd;
IUINT32 ts;
IUINT32 sn;
IUINT32 una;
IUINT32 len;
IUINT32 resendts;
IUINT32 rto;
IUINT32 fastack;
IUINT32 xmit;
char data[1];
};
//---------------------------------------------------------------------
// IKCPOPS - pluggable congestion control operations
//---------------------------------------------------------------------
struct IKCPOPS
{
const char *name;
int (*init)(ikcpcb *kcp);
void (*release)(ikcpcb *kcp);
void (*on_ack)(ikcpcb *kcp, IUINT32 acked_segs, IUINT32 acked_bytes,
IUINT32 prior_in_flight);
void (*on_fast_retransmit)(ikcpcb *kcp, IUINT32 fast_retrans,
IUINT32 inflight, IUINT32 prior_cwnd);
void (*on_timeout)(ikcpcb *kcp, IUINT32 prior_cwnd);
void (*on_tick)(ikcpcb *kcp);
void (*on_app_limited)(ikcpcb *kcp, IUINT32 inflight);
void (*on_rtt)(ikcpcb *kcp, IINT32 rtt);
void (*on_pkt_sent)(ikcpcb *kcp, IUINT32 sn, IUINT32 ts,
IUINT32 len, IUINT32 inflight, IUINT32 xmit);
void (*on_pkt_acked)(ikcpcb *kcp, IUINT32 sn, IUINT32 ts,
IUINT32 len, IINT32 rtt, IUINT32 xmit);
IUINT32 (*get_info)(ikcpcb *kcp, void *buf, IUINT32 bufsize);
IUINT32 (*pacing_rate)(ikcpcb *kcp);
};
//---------------------------------------------------------------------
// IKCPCB
//---------------------------------------------------------------------
struct IKCPCB
{
IUINT32 conv, mtu, mss, state;
IUINT32 snd_una, snd_nxt, rcv_nxt;
IUINT32 ts_recent, ts_lastack, ssthresh;
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
IUINT32 current, interval, ts_flush, xmit;
IUINT32 nrcv_buf, nsnd_buf;
IUINT32 nrcv_que, nsnd_que;
IUINT32 nodelay, updated;
IUINT32 ts_probe, probe_wait;
IUINT32 dead_link, incr;
struct IQUEUEHEAD snd_queue;
struct IQUEUEHEAD rcv_queue;
struct IQUEUEHEAD snd_buf;
struct IQUEUEHEAD rcv_buf;
IUINT32 *acklist;
IUINT32 ackcount;
IUINT32 ackblock;
IUINT32 ackedlen;
void *user;
char *buffer;
int fastresend;
int fastlimit;
int nocwnd, stream;
const struct IKCPOPS *ccops;
void *congest;
int logmask;
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};
#define IKCP_LOG_OUTPUT 1
#define IKCP_LOG_INPUT 2
#define IKCP_LOG_SEND 4
#define IKCP_LOG_RECV 8
#define IKCP_LOG_IN_DATA 16
#define IKCP_LOG_IN_ACK 32
#define IKCP_LOG_IN_PROBE 64
#define IKCP_LOG_IN_WINS 128
#define IKCP_LOG_OUT_DATA 256
#define IKCP_LOG_OUT_ACK 512
#define IKCP_LOG_OUT_PROBE 1024
#define IKCP_LOG_OUT_WINS 2048
#ifdef __cplusplus
extern "C" {
#endif
//---------------------------------------------------------------------
// interface
//---------------------------------------------------------------------
// create a new kcp control object, 'conv' must be equal in both endpoints
// of the same connection. 'user' will be passed to the output callback.
// output callback can be set up like this: 'kcp->output = my_udp_output'
ikcpcb* ikcp_create(IUINT32 conv, void *user);
// release kcp control object
void ikcp_release(ikcpcb *kcp);
// set output callback, which will be invoked by kcp
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user));
// user/upper level recv: returns size, returns below zero for EAGAIN
int ikcp_recv(ikcpcb *kcp, char *buffer, int len);
// user/upper level send, returns below zero for error
int ikcp_send(ikcpcb *kcp, const char *buffer, int len);
// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
void ikcp_update(ikcpcb *kcp, IUINT32 current);
// Determines when you should invoke ikcp_update next:
// returns the timestamp (in milliseconds) at which you should call
// ikcp_update, assuming no ikcp_input/_send calls occur in between.
// You can call ikcp_update at that time instead of calling it repeatedly.
// Important for reducing unnecessary ikcp_update invocations. Use it to
// schedule ikcp_update (e.g., implementing an epoll-like mechanism,
// or optimizing ikcp_update when handling massive kcp connections).
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current);
// when you receive a low-level packet (e.g., UDP packet), call this
int ikcp_input(ikcpcb *kcp, const char *data, long size);
// flush pending data
void ikcp_flush(ikcpcb *kcp);
// check the size of next message in the recv queue
int ikcp_peeksize(const ikcpcb *kcp);
// change MTU size, default is 1400
int ikcp_setmtu(ikcpcb *kcp, int mtu);
// set maximum window size: sndwnd=32, rcvwnd=32 by default
int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd);
// get how many packets are waiting to be sent
int ikcp_waitsnd(const ikcpcb *kcp);
// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable (default), 1:enable
// interval: internal update timer interval in ms, default is 100ms
// resend: 0:disable fast resend (default), 1:enable fast resend
// nc: 0:normal congestion control (default), 1:disable congestion control
int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc);
// install congestion control algorithm, NULL restores builtin
int ikcp_setcc(ikcpcb *kcp, const struct IKCPOPS *ops);
// write log with kcp->writelog
void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...);
// setup allocator
void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*));
// read conv
IUINT32 ikcp_getconv(const void *ptr);
#ifdef __cplusplus
}
#endif
#endif

193
native/kcp/laylink_kcp.c Normal file
View File

@ -0,0 +1,193 @@
#include "laylink_kcp.h"
#include "ikcp.h"
#include <stdlib.h>
#include <string.h>
typedef struct laylink_packet {
int len;
char* data;
struct laylink_packet* next;
} laylink_packet;
struct laylink_kcp {
ikcpcb* kcp;
laylink_packet* output_head;
laylink_packet* output_tail;
};
static void laylink_packet_free(laylink_packet* packet) {
if (packet == NULL) {
return;
}
free(packet->data);
free(packet);
}
static int laylink_kcp_output(const char* buffer, int len, ikcpcb* kcp, void* user) {
(void)kcp;
laylink_kcp* session = (laylink_kcp*)user;
if (session == NULL || len <= 0) {
return -1;
}
laylink_packet* packet = (laylink_packet*)malloc(sizeof(laylink_packet));
if (packet == NULL) {
return -2;
}
packet->data = (char*)malloc((size_t)len);
if (packet->data == NULL) {
free(packet);
return -3;
}
memcpy(packet->data, buffer, (size_t)len);
packet->len = len;
packet->next = NULL;
if (session->output_tail != NULL) {
session->output_tail->next = packet;
} else {
session->output_head = packet;
}
session->output_tail = packet;
return 0;
}
laylink_kcp* laylink_kcp_create(unsigned int conv) {
laylink_kcp* session = (laylink_kcp*)calloc(1, sizeof(laylink_kcp));
if (session == NULL) {
return NULL;
}
session->kcp = ikcp_create(conv, session);
if (session->kcp == NULL) {
free(session);
return NULL;
}
session->kcp->output = laylink_kcp_output;
ikcp_nodelay(session->kcp, 1, 10, 2, 1);
ikcp_wndsize(session->kcp, 1024, 1024);
ikcp_setmtu(session->kcp, 1350);
return session;
}
void laylink_kcp_release(laylink_kcp* session) {
if (session == NULL) {
return;
}
laylink_packet* packet = session->output_head;
while (packet != NULL) {
laylink_packet* next = packet->next;
laylink_packet_free(packet);
packet = next;
}
if (session->kcp != NULL) {
ikcp_release(session->kcp);
}
free(session);
}
int laylink_kcp_nodelay(laylink_kcp* session, int nodelay, int interval, int resend, int nc) {
if (session == NULL || session->kcp == NULL) {
return -1;
}
return ikcp_nodelay(session->kcp, nodelay, interval, resend, nc);
}
int laylink_kcp_wndsize(laylink_kcp* session, int sndwnd, int rcvwnd) {
if (session == NULL || session->kcp == NULL) {
return -1;
}
return ikcp_wndsize(session->kcp, sndwnd, rcvwnd);
}
int laylink_kcp_setmtu(laylink_kcp* session, int mtu) {
if (session == NULL || session->kcp == NULL) {
return -1;
}
return ikcp_setmtu(session->kcp, mtu);
}
int laylink_kcp_send(laylink_kcp* session, const char* buffer, int len) {
if (session == NULL || session->kcp == NULL) {
return -1;
}
return ikcp_send(session->kcp, buffer, len);
}
int laylink_kcp_input(laylink_kcp* session, const char* buffer, long size) {
if (session == NULL || session->kcp == NULL) {
return -1;
}
return ikcp_input(session->kcp, buffer, size);
}
void laylink_kcp_update(laylink_kcp* session, unsigned int current) {
if (session == NULL || session->kcp == NULL) {
return;
}
ikcp_update(session->kcp, current);
}
unsigned int laylink_kcp_check(laylink_kcp* session, unsigned int current) {
if (session == NULL || session->kcp == NULL) {
return current + 10;
}
return ikcp_check(session->kcp, current);
}
int laylink_kcp_peeksize(laylink_kcp* session) {
if (session == NULL || session->kcp == NULL) {
return -1;
}
return ikcp_peeksize(session->kcp);
}
int laylink_kcp_recv(laylink_kcp* session, char* buffer, int len) {
if (session == NULL || session->kcp == NULL) {
return -1;
}
return ikcp_recv(session->kcp, buffer, len);
}
void laylink_kcp_flush(laylink_kcp* session) {
if (session == NULL || session->kcp == NULL) {
return;
}
ikcp_flush(session->kcp);
}
int laylink_kcp_pending_output_size(laylink_kcp* session) {
if (session == NULL || session->output_head == NULL) {
return 0;
}
return session->output_head->len;
}
int laylink_kcp_pop_output(laylink_kcp* session, char* buffer, int len) {
if (session == NULL || session->output_head == NULL) {
return 0;
}
laylink_packet* packet = session->output_head;
if (len < packet->len) {
return -packet->len;
}
memcpy(buffer, packet->data, (size_t)packet->len);
session->output_head = packet->next;
if (session->output_head == NULL) {
session->output_tail = NULL;
}
int packet_len = packet->len;
laylink_packet_free(packet);
return packet_len;
}

29
native/kcp/laylink_kcp.h Normal file
View File

@ -0,0 +1,29 @@
#ifndef LAYLINK_KCP_H
#define LAYLINK_KCP_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct laylink_kcp laylink_kcp;
laylink_kcp* laylink_kcp_create(unsigned int conv);
void laylink_kcp_release(laylink_kcp* session);
int laylink_kcp_nodelay(laylink_kcp* session, int nodelay, int interval, int resend, int nc);
int laylink_kcp_wndsize(laylink_kcp* session, int sndwnd, int rcvwnd);
int laylink_kcp_setmtu(laylink_kcp* session, int mtu);
int laylink_kcp_send(laylink_kcp* session, const char* buffer, int len);
int laylink_kcp_input(laylink_kcp* session, const char* buffer, long size);
void laylink_kcp_update(laylink_kcp* session, unsigned int current);
unsigned int laylink_kcp_check(laylink_kcp* session, unsigned int current);
int laylink_kcp_peeksize(laylink_kcp* session);
int laylink_kcp_recv(laylink_kcp* session, char* buffer, int len);
void laylink_kcp_flush(laylink_kcp* session);
int laylink_kcp_pending_output_size(laylink_kcp* session);
int laylink_kcp_pop_output(laylink_kcp* session, char* buffer, int len);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -74,7 +74,7 @@ POP Server 需要配置这些 `.env`
```env
APP_ENV=dev
POP_AGENT_LISTEN=0.0.0.0:9001
POP_ALLOWED_AGENT_TRANSPORTS=tcp
POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp
AUDIT_LOG=runtime/audit.log
LOG_LEVEL=debug
```
@ -87,7 +87,7 @@ LOG_LEVEL=debug
| `LAYLINK_FRAME_ENCRYPTION` | Agent 与 POP Server 之间 Frame 加密方式,两端必须一致。 | `none`、`chacha20` |
| `LAYLINK_FRAME_ENCRYPTION_KEY` | Frame 加密密钥,启用 `chacha20` 时必填。 | 普通口令、`hex:...`、`base64:...` |
| `POP_AGENT_LISTEN` | POP Server 给 Client Agent 连接的监听地址。Agent 的 `POP_SERVER_ADDRESS` 应指向这里。 | `0.0.0.0:9001`、`127.0.0.1:9001` |
| `POP_ALLOWED_AGENT_TRANSPORTS` | POP Server 允许 Agent 使用的底层传输协议,逗号分隔。Agent 认证时会上报自己的选择,不在列表内会被拒绝。 | `tcp`、`tcp,kcp`、`tcp,udp,kcp` |
| `POP_ALLOWED_AGENT_TRANSPORTS` | POP Server 允许 Agent 使用的底层传输协议。支持逗号数组,也支持 JSON 数组。Agent 认证时会上报自己的选择,不在列表内会被拒绝。 | `tcp`、`tcp,kcp`、`["tcp","kcp"]` |
| `AUDIT_LOG` | 审计日志路径。MVP 使用 JSON Lines 追加写入。 | `runtime/audit.log` |
| `LOG_LEVEL` | 日志级别预留配置。当前 MVP 主要为后续日志工厂使用。 | `debug`、`info`、`warning`、`error` |
@ -121,6 +121,7 @@ NODE_ID=client-01
NODE_TYPE=client
NODE_TOKEN=CHANGE_ME
AGENT_TRANSPORT_PROTOCOL=tcp
CLIENT_AGENT_POP_CONNECTIONS=1
CLIENT_AGENT_AUTH_TOKEN=dev-token
CLIENT_AGENT_USER_ID=admin
CLIENT_AGENT_SOCKS5_ENABLED=true
@ -151,6 +152,7 @@ LOG_LEVEL=debug
| `NODE_TYPE` | 当前节点类型。Client Agent 必须配置为 `client`。 | `client` |
| `NODE_TOKEN` | 当前节点认证密钥。必须和 `config/nodes.php` 中同一 `NODE_ID``token` 一致。 | 强随机字符串,开发时可临时用 `CHANGE_ME` |
| `AGENT_TRANSPORT_PROTOCOL` | 当前 Agent 到 POP Server 使用的底层传输协议。必须被 POP Server 的 `POP_ALLOWED_AGENT_TRANSPORTS` 允许。 | `tcp`、`udp`、`kcp` |
| `CLIENT_AGENT_POP_CONNECTIONS` | Client Agent 到 POP Server 的并行长连接数量。新 TCP 会话会在已认证连接间轮询分配,适合多并发请求或多线程测速。 | `1`、`2`、`4` |
| `CLIENT_AGENT_AUTH_TOKEN` | SOCKS5/HTTP 代理入口生成 `OPEN` 帧时使用的客户端认证 token。 | `dev-token`,生产应替换 |
| `CLIENT_AGENT_USER_ID` | SOCKS5/HTTP 代理入口生成 `OPEN` 帧时使用的默认用户 ID。 | `admin`、`normal-user` |
| `CLIENT_AGENT_SOCKS5_ENABLED` | 是否启用 SOCKS5 本地入口。 | `true`、`false` |
@ -309,7 +311,7 @@ CLIENT_AGENT_SOCKS5_PASSWORD=change-this-password
```env
APP_ENV=dev
POP_AGENT_LISTEN=127.0.0.1:9001
POP_ALLOWED_AGENT_TRANSPORTS=tcp
POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp
NODE_ID=client-01
NODE_TYPE=client
NODE_TOKEN=CHANGE_ME
@ -346,23 +348,42 @@ Agent 到 POP Server 的业务数据始终使用 LayLink 自定义 Frame 协议
| --- | --- | --- |
| `tcp` | Frame over TCP最容易部署和调试。 | 已实现 |
| `udp` | Frame over UDP需要额外处理可靠性、顺序和丢包。 | 已预留,未实现 |
| `kcp` | Frame over KCP/UDP用 KCP 做可靠、低延迟传输。 | 已预留,未实现 |
| `kcp` | Frame over KCP/UDP默认通过 FFI 调用 native `ikcp.c`。 | 已实现,需构建动态库 |
POP Server 用 `POP_ALLOWED_AGENT_TRANSPORTS` 控制允许哪些传输协议。例如:
```env
POP_ALLOWED_AGENT_TRANSPORTS=tcp,kcp
# 也可以写成:
POP_ALLOWED_AGENT_TRANSPORTS=["tcp","kcp"]
```
Client Agent 用 `AGENT_TRANSPORT_PROTOCOL` 选择自己实际使用哪种协议。例如:
```env
AGENT_TRANSPORT_PROTOCOL=tcp
AGENT_TRANSPORT_PROTOCOL=kcp
```
如果 Agent 选择的协议不在 POP 允许列表中POP 会在认证阶段返回 `AUTH_FAIL`,原因是 `transport_not_allowed`
当前代码只实现了 `tcp`。如果 Agent 配置为 `udp``kcp`,进程会启动失败并明确提示该传输尚未实现。
`kcp` 默认使用 FFI 调用 native `ikcp.c`。首次使用前需要构建动态库:
```bash
scripts/build-kcp-ffi.sh
```
然后两端配置:
```env
LAYLINK_KCP_BACKEND=ffi
LAYLINK_KCP_FFI_LIB=native/kcp/liblaylink_kcp.so
```
使用 `kcp`POP Server 会在 `POP_AGENT_LISTEN` 的同一 host:port 上监听 UDPClient Agent 的 `POP_SERVER_ADDRESS` 仍填写同一地址即可。服务器防火墙需要放行同端口 UDP。
如果运行环境暂时不能启用 FFI可以配置 `LAYLINK_KCP_BACKEND=php` 使用调试回退实现;该实现不适合作为生产高吞吐路径。
如果 Agent 配置为 `udp`,进程会启动失败并明确提示该传输尚未实现。
启动 POP Server
@ -394,7 +415,7 @@ LAYLINK_BACKPRESSURE_HIGH_WATERMARK_BYTES=33554432
当 POP 收到目标站关闭连接时Client Agent 会先等待本地客户端发送缓冲区排空,再关闭本地 socket避免大文件尾部数据还在缓冲区里时被提前截断。TCP `DATA` 默认按 1 MiB 分片发送,以减少帧开销;可通过 `LAYLINK_DATA_CHUNK_BYTES` 调整。
当前每个 Client Agent worker 仍然通过单条 Agent-to-POP TCP 长连接承载多个会话。背压可以保护进程不堵死,但单条 TCP 长连接仍可能产生队头阻塞;多 worker、多 POP 长连接、KCP 或 per-session window 是后续性能优化方向。
Client Agent 默认使用 1 条 Agent-to-POP TCP 长连接。可通过 `CLIENT_AGENT_POP_CONNECTIONS` 增加并行长连接数,新 TCP 会话会在已认证 POP 连接之间轮询分配,并在会话生命周期内固定使用同一条连接。它主要改善多并发请求、多线程下载或测速场景;单个 TCP 下载是否变快取决于客户端和目标站是否本身使用多连接。背压可以保护进程不堵死,但单条会话仍受单 TCP 流限制KCP、per-session window 和更细粒度 TCP 调优仍是后续性能优化方向。
验证 SOCKS5 HTTPS 联通性和出口 IP

14
scripts/build-kcp-ffi.sh Executable file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
SRC_DIR="${ROOT_DIR}/native/kcp"
OUT="${SRC_DIR}/liblaylink_kcp.so"
cc="${CC:-gcc}"
"${cc}" -O3 -fPIC -shared \
"${SRC_DIR}/ikcp.c" \
"${SRC_DIR}/laylink_kcp.c" \
-o "${OUT}"
echo "${OUT}"

View File

@ -5,11 +5,10 @@ declare(strict_types=1);
namespace LayLink\Agent;
use LayLink\Protocol\Frame;
use LayLink\Protocol\FrameCodec;
use LayLink\Protocol\FrameParser;
use LayLink\Protocol\FrameType;
use LayLink\Transport\FrameClientTransport;
use LayLink\Transport\FrameClientTransportFactory;
use LayLink\Util\Uuid;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\UdpConnection;
use Workerman\Timer;
@ -17,8 +16,13 @@ use Workerman\Worker;
final class AgentClient
{
private ?AsyncTcpConnection $pop = null;
private ?FrameParser $parser = null;
/** @var array<int, FrameClientTransport> */
private array $pops = [];
/** @var array<int, true> */
private array $authenticatedPops = [];
/** @var array<string, FrameClientTransport> */
private array $sessionTransports = [];
private int $nextTransportCursor = 0;
private bool $authenticated = false;
/** @var array<int, string> */
private array $initialBuffers = [];
@ -66,6 +70,7 @@ final class AgentClient
private readonly int $maxSendBuffer = 64 * 1024 * 1024,
private readonly int $backpressureHighWatermark = 32 * 1024 * 1024,
private readonly int $dataChunkSize = 1024 * 1024,
private readonly int $popConnectionCount = 1,
) {
}
@ -75,9 +80,6 @@ final class AgentClient
$worker->name = $workerName;
$worker->count = 1;
$worker->onWorkerStart = function (): void {
if ($this->transportProtocol !== 'tcp') {
throw new \RuntimeException("Agent transport '{$this->transportProtocol}' is configured but not implemented yet.");
}
$this->connect();
Timer::add(10, fn () => $this->heartbeat());
};
@ -95,59 +97,53 @@ final class AgentClient
private function connect(): void
{
$this->parser = new FrameParser();
$this->authenticated = false;
$connection = new AsyncTcpConnection($this->popAddress);
$connection->maxSendBufferSize = $this->maxSendBuffer;
$this->pop = $connection;
$connection->onConnect = function (AsyncTcpConnection $connection): void {
$this->send(new Frame(FrameType::AUTH, null, [
'node_id' => $this->nodeId,
'node_type' => $this->nodeType,
'node_zone' => $this->nodeZone,
'node_token' => $this->nodeToken,
'transport_protocol' => $this->transportProtocol,
'supported_protocols' => ['tcp'],
'supported_transports' => ['tcp', 'udp', 'kcp'],
]));
};
$connection->onMessage = function (AsyncTcpConnection $connection, string $data): void {
try {
foreach ($this->parser?->push($data) ?? [] as $frame) {
$this->handleFrame($frame);
}
} catch (\Throwable $e) {
$connection->close();
}
};
$connection->onBufferDrain = fn (AsyncTcpConnection $connection) => $this->resumeClientsForPop();
$connection->onClose = function (): void {
$this->authenticated = false;
foreach ($this->clients as $client) {
$client->close();
}
$this->initialBuffers = [];
$this->connectionSessionIds = [];
$this->clients = [];
$this->sessionStates = [];
$this->pendingData = [];
$this->connectionStages = [];
$this->sessionIngressProtocols = [];
$this->pausedClientsForPop = [];
$this->clientsPausingPop = [];
$this->pendingClientCloses = [];
$this->suppressClientCloseFrames = [];
Timer::add(3, fn () => $this->connect(), [], false);
};
$connection->connect();
$this->pops = [];
$this->authenticatedPops = [];
for ($i = 0; $i < $this->popConnectionCount; $i++) {
$this->connectOne();
}
}
private function handleFrame(Frame $frame): void
private function connectOne(): void
{
$transport = (new FrameClientTransportFactory())->create(
$this->transportProtocol,
$this->popAddress,
$this->maxSendBuffer,
function (FrameClientTransport $transport): void {
$transport->send(new Frame(FrameType::AUTH, null, [
'node_id' => $this->nodeId,
'node_type' => $this->nodeType,
'node_zone' => $this->nodeZone,
'node_token' => $this->nodeToken,
'transport_protocol' => $this->transportProtocol,
'supported_protocols' => ['tcp'],
'supported_transports' => ['tcp', 'udp', 'kcp'],
]));
},
function (FrameClientTransport $transport, Frame $frame): void {
$this->handleFrame($transport, $frame);
},
function (FrameClientTransport $transport): void {
$this->handlePopClose($transport);
Timer::add(3, fn () => $this->connectOne(), [], false);
},
function (FrameClientTransport $transport): void {
$this->resumeClientsForPop($transport);
},
function (\Throwable $e): void {
},
);
$this->pops[spl_object_id($transport)] = $transport;
$transport->connect();
}
private function handleFrame(FrameClientTransport $transport, Frame $frame): void
{
match ($frame->type) {
FrameType::AUTH_OK => $this->authenticated = true,
FrameType::AUTH_FAIL => $this->pop?->close(),
FrameType::AUTH_OK => $this->markPopAuthenticated($transport),
FrameType::AUTH_FAIL => $transport->close(),
FrameType::PONG => null,
FrameType::OPEN_OK => $this->openClientSession($frame),
FrameType::OPEN_FAIL => $this->failClientSession($frame),
@ -184,7 +180,8 @@ final class AgentClient
$this->closeClient($sessionId);
return;
}
if ($this->pop !== null && $this->pop->getSendBufferQueueSize() >= $this->backpressureHighWatermark) {
$transport = $this->sessionTransports[$sessionId] ?? null;
if ($transport !== null && $transport->getSendBufferQueueSize() >= $this->backpressureHighWatermark) {
$connection->pauseRecv();
$this->pausedClientsForPop[$connection->id] = $connection;
}
@ -447,7 +444,8 @@ final class AgentClient
private function startPopSession(TcpConnection $connection, array $request, string $payloadBytes, string $ingressProtocol): void
{
if (!$this->authenticated || $this->pop === null) {
$transport = $this->selectTransport();
if (!$this->authenticated || $transport === null) {
$this->failOpeningLocalClient($connection, $ingressProtocol, 'pop_not_connected');
return;
}
@ -455,13 +453,14 @@ final class AgentClient
$sessionId = Uuid::v4();
$this->connectionSessionIds[$connection->id] = $sessionId;
$this->clients[$sessionId] = $connection;
$this->sessionTransports[$sessionId] = $transport;
$this->sessionStates[$sessionId] = 'opening';
$this->sessionIngressProtocols[$sessionId] = $ingressProtocol;
if ($payloadBytes !== '') {
$this->pendingData[$sessionId] = $payloadBytes;
}
$this->send(new Frame(FrameType::OPEN, $sessionId, [
$transport->send(new Frame(FrameType::OPEN, $sessionId, [
'auth_token' => (string)($request['auth_token'] ?? ''),
'user_id' => (string)($request['user_id'] ?? ''),
'target_host' => (string)($request['target_host'] ?? ''),
@ -486,7 +485,7 @@ final class AgentClient
private function onSocks5UdpMessage(UdpConnection $connection, string $data): void
{
if (!$this->authenticated || $this->pop === null || strlen($data) < 10) {
if (!$this->authenticated || $this->selectTransport() === null || strlen($data) < 10) {
return;
}
@ -638,7 +637,7 @@ final class AgentClient
$sessionId = $this->connectionSessionIds[$connection->id];
unset($this->pendingClientCloses[$sessionId]);
unset($this->connectionSessionIds[$connection->id]);
unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]);
unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId], $this->sessionTransports[$sessionId]);
if ($suppressCloseFrame) {
return;
}
@ -668,7 +667,7 @@ final class AgentClient
}
$connection = $this->clients[$sessionId];
unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]);
unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId], $this->sessionTransports[$sessionId]);
unset($this->connectionSessionIds[$connection->id]);
unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id], $this->pendingClientCloses[$sessionId]);
$this->suppressClientCloseFrames[$connection->id] = true;
@ -812,21 +811,23 @@ final class AgentClient
private function heartbeat(): void
{
if (!$this->authenticated || $this->pop === null) {
if (!$this->authenticated) {
return;
}
$this->send(new Frame(FrameType::PING, null, [
'node_id' => $this->nodeId,
'active_sessions' => count($this->clients),
'load' => sys_getloadavg()[0] ?? 0.0,
'timestamp' => time(),
]));
foreach ($this->authenticatedTransports() as $transport) {
$transport->send(new Frame(FrameType::PING, null, [
'node_id' => $this->nodeId,
'active_sessions' => $this->activeSessionsForTransport($transport),
'load' => sys_getloadavg()[0] ?? 0.0,
'timestamp' => time(),
]));
}
}
private function send(Frame $frame): bool|null
{
return $this->pop?->send(FrameCodec::encode($frame));
return $this->transportForSession($frame->sessionId)?->send($frame);
}
private function sendData(string $sessionId, string $data): bool
@ -844,9 +845,12 @@ final class AgentClient
return true;
}
private function resumeClientsForPop(): void
private function resumeClientsForPop(FrameClientTransport $transport): void
{
foreach ($this->pausedClientsForPop as $connectionId => $client) {
if ($this->transportForConnection($client) !== $transport) {
continue;
}
$client->resumeRecv();
unset($this->pausedClientsForPop[$connectionId]);
}
@ -854,15 +858,20 @@ final class AgentClient
private function pausePopForClient(TcpConnection $connection): void
{
$transport = $this->transportForConnection($connection);
if ($transport === null) {
return;
}
$this->clientsPausingPop[$connection->id] = true;
$this->pop?->pauseRecv();
$transport->pauseRecv();
}
private function resumePopForClient(TcpConnection $connection): void
{
unset($this->clientsPausingPop[$connection->id]);
if ($this->clientsPausingPop === []) {
$this->pop?->resumeRecv();
$transport = $this->transportForConnection($connection);
if ($transport !== null && !$this->hasClientsPausingTransport($transport)) {
$transport->resumeRecv();
}
}
@ -874,4 +883,91 @@ final class AgentClient
$this->finalizeClientClose($sessionId);
}
}
private function markPopAuthenticated(FrameClientTransport $transport): void
{
$this->authenticatedPops[spl_object_id($transport)] = true;
$this->authenticated = true;
}
private function handlePopClose(FrameClientTransport $transport): void
{
$transportId = spl_object_id($transport);
unset($this->pops[$transportId], $this->authenticatedPops[$transportId]);
$this->authenticated = $this->authenticatedPops !== [];
foreach ($this->sessionTransports as $sessionId => $sessionTransport) {
if ($sessionTransport !== $transport || !isset($this->clients[$sessionId])) {
continue;
}
$client = $this->clients[$sessionId];
$this->suppressClientCloseFrames[$client->id] = true;
$client->close();
}
}
private function selectTransport(): ?FrameClientTransport
{
$transports = $this->authenticatedTransports();
if ($transports === []) {
return null;
}
$transport = $transports[$this->nextTransportCursor % count($transports)];
$this->nextTransportCursor++;
return $transport;
}
/**
* @return FrameClientTransport[]
*/
private function authenticatedTransports(): array
{
$transports = [];
foreach ($this->authenticatedPops as $transportId => $_) {
if (isset($this->pops[$transportId])) {
$transports[] = $this->pops[$transportId];
}
}
return $transports;
}
private function transportForSession(?string $sessionId): ?FrameClientTransport
{
if ($sessionId !== null && isset($this->sessionTransports[$sessionId])) {
return $this->sessionTransports[$sessionId];
}
return $this->selectTransport();
}
private function transportForConnection(TcpConnection $connection): ?FrameClientTransport
{
$sessionId = $this->connectionSessionIds[$connection->id] ?? null;
return is_string($sessionId) ? ($this->sessionTransports[$sessionId] ?? null) : null;
}
private function hasClientsPausingTransport(FrameClientTransport $transport): bool
{
foreach ($this->clientsPausingPop as $connectionId => $_) {
$sessionId = $this->connectionSessionIds[$connectionId] ?? null;
if (is_string($sessionId) && ($this->sessionTransports[$sessionId] ?? null) === $transport) {
return true;
}
}
return false;
}
private function activeSessionsForTransport(FrameClientTransport $transport): int
{
$count = 0;
foreach ($this->sessionTransports as $sessionId => $sessionTransport) {
if ($sessionTransport === $transport && isset($this->clients[$sessionId])) {
$count++;
}
}
return $count;
}
}

View File

@ -4,7 +4,7 @@ declare(strict_types=1);
namespace LayLink\Node;
use Workerman\Connection\TcpConnection;
use LayLink\Transport\FrameServerConnection;
final class NodeConnection
{
@ -15,7 +15,7 @@ final class NodeConnection
public readonly string $nodeId,
public readonly string $nodeType,
public readonly string $nodeZone,
public readonly TcpConnection $connection,
public readonly FrameServerConnection $connection,
) {
$this->lastHeartbeat = time();
}

View File

@ -4,23 +4,30 @@ declare(strict_types=1);
namespace LayLink\Node;
use Workerman\Connection\TcpConnection;
use LayLink\Transport\FrameServerConnection;
final class NodeRegistry
{
/** @var array<string, NodeConnection> */
/** @var array<string, array<int, NodeConnection>> */
private array $nodes = [];
public function register(NodeConnection $node): void
{
$this->nodes[$node->nodeId] = $node;
$this->nodes[$node->nodeId][$node->connection->id()] = $node;
}
public function unregisterByConnection(TcpConnection $connection): ?NodeConnection
public function unregisterByConnection(FrameServerConnection $connection): ?NodeConnection
{
foreach ($this->nodes as $nodeId => $node) {
if ($node->connection === $connection) {
unset($this->nodes[$nodeId]);
foreach ($this->nodes as $nodeId => $connections) {
foreach ($connections as $connectionId => $node) {
if ($node->connection !== $connection) {
continue;
}
unset($this->nodes[$nodeId][$connectionId]);
if ($this->nodes[$nodeId] === []) {
unset($this->nodes[$nodeId]);
}
return $node;
}
}
@ -30,12 +37,26 @@ final class NodeRegistry
public function get(string $nodeId): ?NodeConnection
{
return $this->nodes[$nodeId] ?? null;
$connections = $this->nodes[$nodeId] ?? [];
return $connections === [] ? null : array_values($connections)[0];
}
public function getByConnection(FrameServerConnection $connection): ?NodeConnection
{
foreach ($this->nodes as $connections) {
foreach ($connections as $node) {
if ($node->connection === $connection) {
return $node;
}
}
}
return null;
}
public function isOnline(string $nodeId): bool
{
return isset($this->nodes[$nodeId]);
return ($this->nodes[$nodeId] ?? []) !== [];
}
/**
@ -43,6 +64,13 @@ final class NodeRegistry
*/
public function all(): array
{
return array_values($this->nodes);
$nodes = [];
foreach ($this->nodes as $connections) {
foreach ($connections as $node) {
$nodes[] = $node;
}
}
return $nodes;
}
}

View File

@ -10,22 +10,19 @@ use LayLink\Auth\NodeAuthenticator;
use LayLink\Node\NodeConnection;
use LayLink\Node\NodeRegistry;
use LayLink\Protocol\Frame;
use LayLink\Protocol\FrameCodec;
use LayLink\Protocol\FrameParser;
use LayLink\Protocol\FrameType;
use LayLink\Route\RouteResolver;
use LayLink\Session\SessionManager;
use LayLink\Session\TunnelSession;
use LayLink\Transport\FrameServerConnection;
use LayLink\Transport\FrameServerListenerFactory;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Connection\TcpConnection;
use Workerman\Timer;
use Workerman\Worker;
final class AgentListener
{
/** @var array<int, FrameParser> */
private array $parsers = [];
/** @var array<int, string> */
private array $connectionNodeIds = [];
/** @var array<int, array<string, AsyncTcpConnection>> */
@ -42,33 +39,38 @@ final class AgentListener
private readonly int $maxSendBuffer = 64 * 1024 * 1024,
private readonly int $backpressureHighWatermark = 32 * 1024 * 1024,
private readonly int $dataChunkSize = 1024 * 1024,
private readonly string $agentTransportProtocol = 'tcp',
) {
$worker->onConnect = fn (TcpConnection $connection) => $this->onConnect($connection);
$worker->onMessage = fn (TcpConnection $connection, string $data) => $this->onMessage($connection, $data);
$worker->onClose = fn (TcpConnection $connection) => $this->onClose($connection);
$worker->onWorkerStart = fn () => Timer::add(10, fn () => $this->sweepHeartbeats());
}
private function onConnect(TcpConnection $connection): void
{
$connection->maxSendBufferSize = $this->maxSendBuffer;
$connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumeTargetsForAgent($connection);
$this->parsers[$connection->id] = new FrameParser();
}
private function onMessage(TcpConnection $connection, string $data): void
{
try {
foreach ($this->parsers[$connection->id]->push($data) as $frame) {
$this->handleFrame($connection, $frame);
(new FrameServerListenerFactory())->attach(
$this->agentTransportProtocol,
$worker,
$this->maxSendBuffer,
fn (FrameServerConnection $connection) => $this->onConnect($connection),
fn (FrameServerConnection $connection, Frame $frame) => $this->handleFrame($connection, $frame),
fn (FrameServerConnection $connection) => $this->onClose($connection),
fn (FrameServerConnection $connection) => $this->resumeTargetsForAgent($connection),
fn (FrameServerConnection $connection, \Throwable $e) => $this->onInvalidFrame($connection),
);
$transportWorkerStart = $worker->onWorkerStart;
$worker->onWorkerStart = function () use ($transportWorkerStart): void {
if ($transportWorkerStart !== null) {
$transportWorkerStart();
}
} catch (\Throwable $e) {
$this->send($connection, new Frame(FrameType::ERROR, null, ['reason' => 'invalid_frame']));
$connection->close();
}
Timer::add(10, fn () => $this->sweepHeartbeats());
};
}
private function handleFrame(TcpConnection $connection, Frame $frame): void
private function onConnect(FrameServerConnection $connection): void
{
}
private function onInvalidFrame(FrameServerConnection $connection): void
{
$this->send($connection, new Frame(FrameType::ERROR, null, ['reason' => 'invalid_frame']));
$connection->close();
}
private function handleFrame(FrameServerConnection $connection, Frame $frame): void
{
if ($frame->type === FrameType::AUTH) {
$result = $this->authenticator->authenticate($frame->payload);
@ -86,7 +88,7 @@ final class AgentListener
$connection,
);
$this->nodes->register($node);
$this->connectionNodeIds[$connection->id] = $nodeId;
$this->connectionNodeIds[$connection->id()] = $nodeId;
$this->send($connection, new Frame(FrameType::AUTH_OK, null, [
'node_id' => $nodeId,
'heartbeat_interval' => 10,
@ -95,8 +97,8 @@ final class AgentListener
return;
}
$nodeId = $this->connectionNodeIds[$connection->id] ?? null;
$node = is_string($nodeId) ? $this->nodes->get($nodeId) : null;
$nodeId = $this->connectionNodeIds[$connection->id()] ?? null;
$node = is_string($nodeId) ? $this->nodes->getByConnection($connection) : null;
if (!is_string($nodeId) || $node === null) {
$this->send($connection, new Frame(FrameType::ERROR, $frame->sessionId, ['reason' => 'invalid_auth']));
$connection->close();
@ -133,7 +135,7 @@ final class AgentListener
};
}
private function forwardUdpDatagram(TcpConnection $agentConnection, string $nodeId, Frame $frame): void
private function forwardUdpDatagram(FrameServerConnection $agentConnection, string $nodeId, Frame $frame): void
{
if ($frame->sessionId === null) {
$this->send($agentConnection, new Frame(FrameType::ERROR, null, ['reason' => 'invalid_frame']));
@ -177,7 +179,7 @@ final class AgentListener
$target->connect();
}
private function openTargetForAgent(TcpConnection $agentConnection, string $nodeId, Frame $frame): void
private function openTargetForAgent(FrameServerConnection $agentConnection, string $nodeId, Frame $frame): void
{
if ($frame->sessionId === null || $this->sessions->get($frame->sessionId) !== null) {
$this->send($agentConnection, new Frame(FrameType::OPEN_FAIL, $frame->sessionId, ['reason' => 'invalid_frame']));
@ -248,7 +250,7 @@ final class AgentListener
}
if ($agentConnection->getSendBufferQueueSize() >= $this->backpressureHighWatermark) {
$target->pauseRecv();
$this->pausedTargetsByAgent[$agentConnection->id][$session->sessionId] = $target;
$this->pausedTargetsByAgent[$agentConnection->id()][$session->sessionId] = $target;
}
};
$target->onClose = fn () => $this->closeSession($session, 'closed', null);
@ -282,7 +284,7 @@ final class AgentListener
}
}
private function failOpenSession(TcpConnection $agentConnection, TunnelSession $session, string $reason): void
private function failOpenSession(FrameServerConnection $agentConnection, TunnelSession $session, string $reason): void
{
if ($session->state === TunnelSession::OPENING) {
$this->send($agentConnection, new Frame(FrameType::OPEN_FAIL, $session->sessionId, ['reason' => $reason]));
@ -299,7 +301,7 @@ final class AgentListener
return base64_decode((string)($frame->payload['data'] ?? ''), true);
}
private function rejectOpen(TcpConnection $agentConnection, Frame $frame, string $reason, string $userId, string $nodeId, ?string $policyId = null): void
private function rejectOpen(FrameServerConnection $agentConnection, Frame $frame, string $reason, string $userId, string $nodeId, ?string $policyId = null): void
{
$this->send($agentConnection, new Frame(FrameType::OPEN_FAIL, $frame->sessionId, ['reason' => $reason]));
$this->audit->write([
@ -330,7 +332,7 @@ final class AgentListener
$session->state = TunnelSession::CLOSED;
$this->sessions->remove($session->sessionId);
if ($session->agent !== null) {
unset($this->pausedTargetsByAgent[$session->agent->id][$session->sessionId]);
unset($this->pausedTargetsByAgent[$session->agent->id()][$session->sessionId]);
}
if ($session->agent !== null) {
@ -359,11 +361,10 @@ final class AgentListener
]);
}
private function onClose(TcpConnection $connection): void
private function onClose(FrameServerConnection $connection): void
{
unset($this->parsers[$connection->id]);
unset($this->connectionNodeIds[$connection->id]);
unset($this->pausedTargetsByAgent[$connection->id]);
unset($this->connectionNodeIds[$connection->id()]);
unset($this->pausedTargetsByAgent[$connection->id()]);
$node = $this->nodes->unregisterByConnection($connection);
foreach ($this->sessions->all() as $session) {
if ($session->agent === $connection) {
@ -386,12 +387,12 @@ final class AgentListener
}
}
private function send(TcpConnection $connection, Frame $frame): bool|null
private function send(FrameServerConnection $connection, Frame $frame): bool|null
{
return $connection->send(FrameCodec::encode($frame));
return $connection->send($frame);
}
private function sendData(TcpConnection $connection, string $sessionId, string $data): bool
private function sendData(FrameServerConnection $connection, string $sessionId, string $data): bool
{
$length = strlen($data);
for ($offset = 0; $offset < $length; $offset += $this->dataChunkSize) {
@ -406,14 +407,14 @@ final class AgentListener
return true;
}
private function resumeTargetsForAgent(TcpConnection $connection): void
private function resumeTargetsForAgent(FrameServerConnection $connection): void
{
foreach ($this->pausedTargetsByAgent[$connection->id] ?? [] as $sessionId => $target) {
foreach ($this->pausedTargetsByAgent[$connection->id()] ?? [] as $sessionId => $target) {
$target->resumeRecv();
unset($this->pausedTargetsByAgent[$connection->id][$sessionId]);
unset($this->pausedTargetsByAgent[$connection->id()][$sessionId]);
}
if (($this->pausedTargetsByAgent[$connection->id] ?? []) === []) {
unset($this->pausedTargetsByAgent[$connection->id]);
if (($this->pausedTargetsByAgent[$connection->id()] ?? []) === []) {
unset($this->pausedTargetsByAgent[$connection->id()]);
}
}

View File

@ -36,20 +36,29 @@ final class PopServer
public function boot(): void
{
$agentWorker = new Worker('tcp://' . $this->agentListen);
$agentWorker->name = 'laylink-pop-agent-listener';
$agentWorker->count = 1;
new AgentListener(
$agentWorker,
new NodeAuthenticator($this->nodeConfig, $this->allowedAgentTransports),
new ClientAuthenticator(),
new RouteResolver(new PolicyChecker($this->policies), $this->nodes),
$this->nodes,
$this->sessions,
$this->audit,
$this->maxSendBuffer,
$this->backpressureHighWatermark,
$this->dataChunkSize,
);
$implementedTransports = array_values(array_intersect($this->allowedAgentTransports, ['tcp', 'kcp']));
if ($implementedTransports === []) {
throw new \RuntimeException('no_implemented_pop_transport_enabled');
}
foreach ($implementedTransports as $transport) {
$scheme = $transport === 'kcp' ? 'udp' : 'tcp';
$agentWorker = new Worker($scheme . '://' . $this->agentListen);
$agentWorker->name = 'laylink-pop-agent-listener-' . $transport;
$agentWorker->count = 1;
new AgentListener(
$agentWorker,
new NodeAuthenticator($this->nodeConfig, $this->allowedAgentTransports),
new ClientAuthenticator(),
new RouteResolver(new PolicyChecker($this->policies), $this->nodes),
$this->nodes,
$this->sessions,
$this->audit,
$this->maxSendBuffer,
$this->backpressureHighWatermark,
$this->dataChunkSize,
$transport,
);
}
}
}

View File

@ -4,6 +4,7 @@ declare(strict_types=1);
namespace LayLink\Session;
use LayLink\Transport\FrameServerConnection;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
@ -18,7 +19,7 @@ final class TunnelSession
public string $state = self::NEW;
public ?TcpConnection $client = null;
public ?TcpConnection $agent = null;
public ?FrameServerConnection $agent = null;
public ?AsyncTcpConnection $target = null;
public ?string $nodeId = null;
public string $startTime;

View File

@ -0,0 +1,22 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use LayLink\Protocol\Frame;
interface FrameClientTransport
{
public function connect(): void;
public function send(Frame $frame): bool|null;
public function close(): void;
public function pauseRecv(): void;
public function resumeRecv(): void;
public function getSendBufferQueueSize(): int;
}

View File

@ -0,0 +1,44 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use InvalidArgumentException;
final class FrameClientTransportFactory
{
public function create(
string $protocol,
string $address,
int $maxSendBuffer,
\Closure $onConnect,
\Closure $onFrame,
\Closure $onClose,
\Closure $onBufferDrain,
\Closure $onInvalidFrame,
): FrameClientTransport {
return match (strtolower(trim($protocol))) {
'tcp' => new TcpFrameClientTransport(
$address,
$maxSendBuffer,
$onConnect,
$onFrame,
$onClose,
$onBufferDrain,
$onInvalidFrame,
),
'kcp' => new KcpFrameClientTransport(
$address,
$maxSendBuffer,
$onConnect,
$onFrame,
$onClose,
$onBufferDrain,
$onInvalidFrame,
),
'udp' => throw new InvalidArgumentException('agent_transport_not_implemented'),
default => throw new InvalidArgumentException('unsupported_agent_transport'),
};
}
}

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use LayLink\Protocol\Frame;
interface FrameServerConnection
{
public function id(): int;
public function send(Frame $frame): bool|null;
public function close(): void;
public function pauseRecv(): void;
public function resumeRecv(): void;
public function getSendBufferQueueSize(): int;
public function getRemoteIp(): string;
}

View File

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use InvalidArgumentException;
use Workerman\Worker;
final class FrameServerListenerFactory
{
public function attach(
string $protocol,
Worker $worker,
int $maxSendBuffer,
\Closure $onConnect,
\Closure $onFrame,
\Closure $onClose,
\Closure $onBufferDrain,
\Closure $onInvalidFrame,
): void {
match (strtolower(trim($protocol))) {
'tcp' => new TcpFrameServerListener(
$worker,
$maxSendBuffer,
$onConnect,
$onFrame,
$onClose,
$onBufferDrain,
$onInvalidFrame,
),
'kcp' => new KcpFrameServerListener(
$worker,
$maxSendBuffer,
$onConnect,
$onFrame,
$onClose,
$onBufferDrain,
$onInvalidFrame,
),
'udp' => throw new InvalidArgumentException('pop_transport_not_implemented'),
default => throw new InvalidArgumentException('unsupported_pop_transport'),
};
}
}

View File

@ -0,0 +1,176 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use LayLink\Protocol\Frame;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Timer;
final class KcpFrameClientTransport implements FrameClientTransport
{
private ?AsyncUdpConnection $connection = null;
private KcpReliableSession|NativeKcpSession|null $session = null;
private ?int $timerId = null;
private int $conv;
private bool $connected = false;
public function __construct(
private readonly string $address,
private readonly int $maxSendBuffer,
private readonly \Closure $onConnect,
private readonly \Closure $onFrame,
private readonly \Closure $onClose,
private readonly \Closure $onBufferDrain,
private readonly \Closure $onInvalidFrame,
) {
$this->conv = random_int(1, 0x7fffffff);
}
public function connect(): void
{
$this->connected = false;
$this->session = null;
$connection = new AsyncUdpConnection($this->normalizeAddress($this->address));
$this->connection = $connection;
$connection->onConnect = function () use ($connection): void {
$connection->send(KcpPacketCodec::encode([
'type' => KcpPacketCodec::SYN,
'conv' => $this->conv,
]));
};
$connection->onMessage = function (AsyncUdpConnection $connection, string $data): void {
$this->handlePacket($data);
};
$connection->onClose = function () use ($connection): void {
if ($this->connection === $connection) {
$this->connection = null;
}
$this->stopTimer();
($this->onClose)($this);
};
$connection->connect();
$this->timerId = Timer::add(0.02, fn () => $this->tick());
}
public function send(Frame $frame): bool|null
{
return $this->session?->sendFrame($frame, $this->maxSendBuffer) ?? false;
}
public function close(): void
{
$this->session?->close();
$this->connection?->close();
}
public function pauseRecv(): void
{
$this->session?->pause();
}
public function resumeRecv(): void
{
$this->session?->resume();
}
public function getSendBufferQueueSize(): int
{
return $this->session?->getSendBufferQueueSize() ?? 0;
}
private function handlePacket(string $data): void
{
$packet = KcpPacketCodec::decode($data);
if ($packet === null) {
$this->session?->receive($data);
return;
}
if ($packet['conv'] !== $this->conv) {
return;
}
if ($packet['type'] === KcpPacketCodec::SYN_ACK && !$this->connected) {
$this->connected = true;
$this->session = $this->createSession();
($this->onConnect)($this);
return;
}
if ($packet['type'] === KcpPacketCodec::CLOSE) {
$this->connection?->close();
return;
}
$this->session?->receive($packet);
if ($this->session?->isClosed()) {
$this->connection?->close();
}
}
private function tick(): void
{
if ($this->connection === null) {
$this->stopTimer();
return;
}
if (!$this->connected) {
$this->connection->send(KcpPacketCodec::encode([
'type' => KcpPacketCodec::SYN,
'conv' => $this->conv,
]));
return;
}
$before = $this->getSendBufferQueueSize();
$this->session?->tick();
if ($before >= $this->maxSendBuffer && $this->getSendBufferQueueSize() < $this->maxSendBuffer) {
($this->onBufferDrain)($this);
}
}
private function createSession(): KcpReliableSession|NativeKcpSession
{
$backend = strtolower(trim((string)(getenv('LAYLINK_KCP_BACKEND') ?: 'ffi')));
$libraryPath = (string)(getenv('LAYLINK_KCP_FFI_LIB') ?: dirname(__DIR__, 2) . '/native/kcp/liblaylink_kcp.so');
$args = [
$this->conv,
fn (string $packet): bool|null => $this->connection?->send($packet),
fn (Frame $frame) => ($this->onFrame)($this, $frame),
fn (\Throwable $e) => ($this->onInvalidFrame)($e),
];
if ($backend === 'php') {
return new KcpReliableSession(...$args);
}
return new NativeKcpSession(...[...$args, $libraryPath]);
}
private function stopTimer(): void
{
if ($this->timerId === null) {
return;
}
Timer::del($this->timerId);
$this->timerId = null;
}
private function normalizeAddress(string $address): string
{
if (str_starts_with($address, 'udp://')) {
return $address;
}
if (str_starts_with($address, 'tcp://')) {
return 'udp://' . substr($address, 6);
}
return 'udp://' . $address;
}
}

View File

@ -0,0 +1,75 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use LayLink\Protocol\Frame;
use Workerman\Connection\UdpConnection;
final class KcpFrameServerConnection implements FrameServerConnection
{
private bool $closed = false;
public function __construct(
private readonly int $id,
private UdpConnection $connection,
private readonly KcpReliableSession|NativeKcpSession $session,
private readonly int $maxSendBuffer,
) {
}
public function id(): int
{
return $this->id;
}
public function send(Frame $frame): bool|null
{
if ($this->closed) {
return false;
}
return $this->session->sendFrame($frame, $this->maxSendBuffer);
}
public function close(): void
{
if ($this->closed) {
return;
}
$this->closed = true;
$this->session->close();
}
public function pauseRecv(): void
{
$this->session->pause();
}
public function resumeRecv(): void
{
$this->session->resume();
}
public function getSendBufferQueueSize(): int
{
return $this->session->getSendBufferQueueSize();
}
public function getRemoteIp(): string
{
return $this->connection->getRemoteIp();
}
public function updateConnection(UdpConnection $connection): void
{
$this->connection = $connection;
}
public function isClosed(): bool
{
return $this->closed || $this->session->isClosed();
}
}

View File

@ -0,0 +1,177 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use LayLink\Protocol\Frame;
use Workerman\Connection\UdpConnection;
use Workerman\Timer;
use Workerman\Worker;
final class KcpFrameServerListener
{
/** @var array<string, KcpFrameServerConnection> */
private array $connections = [];
/** @var array<string, KcpReliableSession|NativeKcpSession> */
private array $sessions = [];
private ?int $timerId = null;
private int $nextConnectionId = 1_000_000;
public function __construct(
Worker $worker,
private readonly int $maxSendBuffer,
private readonly \Closure $onConnect,
private readonly \Closure $onFrame,
private readonly \Closure $onClose,
private readonly \Closure $onBufferDrain,
private readonly \Closure $onInvalidFrame,
) {
$worker->onMessage = fn (UdpConnection $connection, string $data) => $this->handleMessage($connection, $data);
$worker->onWorkerStart = function (): void {
$this->timerId = Timer::add(0.02, fn () => $this->tick());
};
$worker->onWorkerStop = function (): void {
if ($this->timerId !== null) {
Timer::del($this->timerId);
$this->timerId = null;
}
};
}
private function handleMessage(UdpConnection $connection, string $data): void
{
$packet = KcpPacketCodec::decode($data);
if ($packet === null) {
$conv = KcpPacketCodec::rawKcpConv($data);
if ($conv === null) {
return;
}
$key = $this->key($connection, $conv);
$wrapped = $this->connections[$key] ?? null;
$session = $this->sessions[$key] ?? null;
if ($wrapped === null || $session === null) {
return;
}
$wrapped->updateConnection($connection);
$wasFull = $session->getSendBufferQueueSize() >= $this->maxSendBuffer;
$session->receive($data);
if ($session->isClosed()) {
$this->closeConnection($key);
return;
}
if ($wasFull && $session->getSendBufferQueueSize() < $this->maxSendBuffer) {
($this->onBufferDrain)($wrapped);
}
return;
}
$key = $this->key($connection, $packet['conv']);
if ($packet['type'] === KcpPacketCodec::SYN) {
$this->handleSyn($connection, $key, $packet['conv']);
return;
}
$wrapped = $this->connections[$key] ?? null;
$session = $this->sessions[$key] ?? null;
if ($wrapped === null || $session === null) {
return;
}
if ($packet['type'] === KcpPacketCodec::CLOSE) {
$this->closeConnection($key);
return;
}
$wrapped->updateConnection($connection);
$wasFull = $session->getSendBufferQueueSize() >= $this->maxSendBuffer;
$session->receive($packet);
if ($session->isClosed()) {
$this->closeConnection($key);
return;
}
if ($wasFull && $session->getSendBufferQueueSize() < $this->maxSendBuffer) {
($this->onBufferDrain)($wrapped);
}
}
private function handleSyn(UdpConnection $connection, string $key, int $conv): void
{
if (isset($this->connections[$key])) {
$connection->send(KcpPacketCodec::encode([
'type' => KcpPacketCodec::SYN_ACK,
'conv' => $conv,
]));
return;
}
$session = $this->createSession(
$conv,
fn (string $packet): bool|null => $connection->send($packet),
function (Frame $frame) use ($key): void {
$wrapped = $this->connections[$key] ?? null;
if ($wrapped !== null) {
($this->onFrame)($wrapped, $frame);
}
},
function (\Throwable $e) use ($key): void {
$wrapped = $this->connections[$key] ?? null;
if ($wrapped !== null) {
($this->onInvalidFrame)($wrapped, $e);
}
},
);
$wrapped = new KcpFrameServerConnection($this->nextConnectionId++, $connection, $session, $this->maxSendBuffer);
$this->sessions[$key] = $session;
$this->connections[$key] = $wrapped;
$connection->send(KcpPacketCodec::encode([
'type' => KcpPacketCodec::SYN_ACK,
'conv' => $conv,
]));
($this->onConnect)($wrapped);
}
private function tick(): void
{
foreach ($this->sessions as $key => $session) {
$wrapped = $this->connections[$key] ?? null;
if ($wrapped === null || $wrapped->isClosed()) {
$this->closeConnection($key);
continue;
}
$before = $session->getSendBufferQueueSize();
$session->tick();
if ($before >= $this->maxSendBuffer && $session->getSendBufferQueueSize() < $this->maxSendBuffer) {
($this->onBufferDrain)($wrapped);
}
}
}
private function closeConnection(string $key): void
{
$wrapped = $this->connections[$key] ?? null;
unset($this->connections[$key], $this->sessions[$key]);
if ($wrapped !== null) {
($this->onClose)($wrapped);
}
}
private function createSession(int $conv, \Closure $sendPacket, \Closure $onFrame, \Closure $onInvalidFrame): KcpReliableSession|NativeKcpSession
{
$backend = strtolower(trim((string)(getenv('LAYLINK_KCP_BACKEND') ?: 'ffi')));
if ($backend === 'php') {
return new KcpReliableSession($conv, $sendPacket, $onFrame, $onInvalidFrame);
}
$libraryPath = (string)(getenv('LAYLINK_KCP_FFI_LIB') ?: dirname(__DIR__, 2) . '/native/kcp/liblaylink_kcp.so');
return new NativeKcpSession($conv, $sendPacket, $onFrame, $onInvalidFrame, $libraryPath);
}
private function key(UdpConnection $connection, int $conv): string
{
return $connection->getRemoteAddress() . '#' . $conv;
}
}

View File

@ -0,0 +1,71 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
final class KcpPacketCodec
{
public const MAGIC = "LLK1";
public const SYN = 1;
public const SYN_ACK = 2;
public const DATA = 3;
public const ACK = 4;
public const CLOSE = 5;
private const HEADER_LENGTH = 25;
/**
* @param array{type:int, conv:int, seq?:int, ack?:int, message_id?:int, fragment_index?:int, fragment_count?:int, payload?:string} $packet
*/
public static function encode(array $packet): string
{
return pack(
'a4CNNNNnn',
self::MAGIC,
$packet['type'],
$packet['conv'],
$packet['seq'] ?? 0,
$packet['ack'] ?? 0,
$packet['message_id'] ?? 0,
$packet['fragment_index'] ?? 0,
$packet['fragment_count'] ?? 0,
) . ($packet['payload'] ?? '');
}
/**
* @return array{type:int, conv:int, seq:int, ack:int, message_id:int, fragment_index:int, fragment_count:int, payload:string}|null
*/
public static function decode(string $bytes): ?array
{
if (strlen($bytes) < self::HEADER_LENGTH || substr($bytes, 0, 4) !== self::MAGIC) {
return null;
}
$header = unpack('a4magic/Ctype/Nconv/Nseq/Nack/Nmessage_id/nfragment_index/nfragment_count', substr($bytes, 0, self::HEADER_LENGTH));
if ($header === false) {
return null;
}
return [
'type' => (int)$header['type'],
'conv' => (int)$header['conv'],
'seq' => (int)$header['seq'],
'ack' => (int)$header['ack'],
'message_id' => (int)$header['message_id'],
'fragment_index' => (int)$header['fragment_index'],
'fragment_count' => (int)$header['fragment_count'],
'payload' => substr($bytes, self::HEADER_LENGTH),
];
}
public static function rawKcpConv(string $bytes): ?int
{
if (strlen($bytes) < 4 || substr($bytes, 0, 4) === self::MAGIC) {
return null;
}
$decoded = unpack('Vconv', substr($bytes, 0, 4));
return $decoded === false ? null : (int)$decoded['conv'];
}
}

View File

@ -0,0 +1,235 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use LayLink\Protocol\Frame;
use LayLink\Protocol\FrameCodec;
use LayLink\Protocol\FrameParser;
final class KcpReliableSession
{
private const MTU = 1200;
private const RESEND_AFTER_MS = 120;
private FrameParser $parser;
private int $nextSeq = 1;
private int $expectedSeq = 1;
private int $nextMessageId = 1;
/** @var array<int, array{packet:string, sent_at:float, bytes:int}> */
private array $unacked = [];
/** @var array<int, array{message_id:int, fragment_index:int, fragment_count:int, payload:string}> */
private array $pendingSegments = [];
/** @var array<int, array<int, string>> */
private array $messageFragments = [];
/** @var array<int, int> */
private array $messageFragmentCounts = [];
/** @var Frame[] */
private array $pausedFrames = [];
private bool $paused = false;
private bool $closed = false;
public function __construct(
private readonly int $conv,
private readonly \Closure $sendPacket,
private readonly \Closure $onFrame,
private readonly \Closure $onInvalidFrame,
) {
$this->parser = new FrameParser();
}
public function conv(): int
{
return $this->conv;
}
public function sendFrame(Frame $frame, int $maxSendBuffer): bool|null
{
if ($this->closed || $this->getSendBufferQueueSize() >= $maxSendBuffer) {
return false;
}
$bytes = FrameCodec::encode($frame);
$payloadSize = self::MTU - 25;
$fragments = str_split($bytes, $payloadSize);
$messageId = $this->nextMessageId++;
$fragmentCount = count($fragments);
foreach ($fragments as $fragmentIndex => $payload) {
$seq = $this->nextSeq++;
$packet = KcpPacketCodec::encode([
'type' => KcpPacketCodec::DATA,
'conv' => $this->conv,
'seq' => $seq,
'message_id' => $messageId,
'fragment_index' => $fragmentIndex,
'fragment_count' => $fragmentCount,
'payload' => $payload,
]);
$this->unacked[$seq] = [
'packet' => $packet,
'sent_at' => $this->nowMs(),
'bytes' => strlen($packet),
];
($this->sendPacket)($packet);
}
return true;
}
/**
* @param array{type:int, conv:int, seq:int, ack:int, message_id:int, fragment_index:int, fragment_count:int, payload:string}|string $packet
*/
public function receive(array|string $packet): void
{
if ($this->closed || is_string($packet)) {
return;
}
if ($packet['type'] === KcpPacketCodec::ACK) {
unset($this->unacked[$packet['ack']]);
return;
}
if ($packet['type'] === KcpPacketCodec::CLOSE) {
$this->closed = true;
return;
}
if ($packet['type'] !== KcpPacketCodec::DATA) {
return;
}
$this->ack($packet['seq']);
if ($packet['seq'] < $this->expectedSeq) {
return;
}
$this->pendingSegments[$packet['seq']] = [
'message_id' => $packet['message_id'],
'fragment_index' => $packet['fragment_index'],
'fragment_count' => $packet['fragment_count'],
'payload' => $packet['payload'],
];
$this->drainOrderedSegments();
}
public function tick(): void
{
if ($this->closed) {
return;
}
$now = $this->nowMs();
foreach ($this->unacked as $seq => $pending) {
if ($now - $pending['sent_at'] < self::RESEND_AFTER_MS) {
continue;
}
$this->unacked[$seq]['sent_at'] = $now;
($this->sendPacket)($pending['packet']);
}
}
public function pause(): void
{
$this->paused = true;
}
public function resume(): void
{
$this->paused = false;
while (!$this->paused && $this->pausedFrames !== []) {
$frame = array_shift($this->pausedFrames);
($this->onFrame)($frame);
}
}
public function close(): void
{
if ($this->closed) {
return;
}
$this->closed = true;
($this->sendPacket)(KcpPacketCodec::encode([
'type' => KcpPacketCodec::CLOSE,
'conv' => $this->conv,
]));
}
public function isClosed(): bool
{
return $this->closed;
}
public function getSendBufferQueueSize(): int
{
$bytes = 0;
foreach ($this->unacked as $pending) {
$bytes += $pending['bytes'];
}
return $bytes;
}
/**
* @param array{message_id:int, fragment_index:int, fragment_count:int, payload:string} $segment
*/
private function receiveOrderedSegment(array $segment): void
{
$messageId = $segment['message_id'];
$fragmentIndex = $segment['fragment_index'];
$fragmentCount = $segment['fragment_count'];
if ($fragmentCount < 1 || $fragmentIndex < 0 || $fragmentIndex >= $fragmentCount) {
return;
}
$this->messageFragmentCounts[$messageId] = $fragmentCount;
$this->messageFragments[$messageId][$fragmentIndex] = $segment['payload'];
if (count($this->messageFragments[$messageId]) !== $fragmentCount) {
return;
}
ksort($this->messageFragments[$messageId]);
$bytes = implode('', $this->messageFragments[$messageId]);
unset($this->messageFragments[$messageId], $this->messageFragmentCounts[$messageId]);
try {
foreach ($this->parser->push($bytes) as $frame) {
if ($this->paused) {
$this->pausedFrames[] = $frame;
continue;
}
($this->onFrame)($frame);
}
} catch (\Throwable $e) {
($this->onInvalidFrame)($e);
}
}
private function drainOrderedSegments(): void
{
while (isset($this->pendingSegments[$this->expectedSeq])) {
$segment = $this->pendingSegments[$this->expectedSeq];
unset($this->pendingSegments[$this->expectedSeq]);
$this->expectedSeq++;
$this->receiveOrderedSegment($segment);
}
}
private function ack(int $seq): void
{
($this->sendPacket)(KcpPacketCodec::encode([
'type' => KcpPacketCodec::ACK,
'conv' => $this->conv,
'ack' => $seq,
]));
}
private function nowMs(): float
{
return microtime(true) * 1000;
}
}

View File

@ -0,0 +1,52 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use FFI;
final class NativeKcpLibrary
{
private const CDEF = <<<'CDEF'
typedef struct laylink_kcp laylink_kcp;
laylink_kcp* laylink_kcp_create(unsigned int conv);
void laylink_kcp_release(laylink_kcp* session);
int laylink_kcp_nodelay(laylink_kcp* session, int nodelay, int interval, int resend, int nc);
int laylink_kcp_wndsize(laylink_kcp* session, int sndwnd, int rcvwnd);
int laylink_kcp_setmtu(laylink_kcp* session, int mtu);
int laylink_kcp_send(laylink_kcp* session, const char* buffer, int len);
int laylink_kcp_input(laylink_kcp* session, const char* buffer, long size);
void laylink_kcp_update(laylink_kcp* session, unsigned int current);
unsigned int laylink_kcp_check(laylink_kcp* session, unsigned int current);
int laylink_kcp_peeksize(laylink_kcp* session);
int laylink_kcp_recv(laylink_kcp* session, char* buffer, int len);
void laylink_kcp_flush(laylink_kcp* session);
int laylink_kcp_pending_output_size(laylink_kcp* session);
int laylink_kcp_pop_output(laylink_kcp* session, char* buffer, int len);
CDEF;
private static ?FFI $ffi = null;
public static function load(?string $libraryPath = null): FFI
{
if (self::$ffi !== null) {
return self::$ffi;
}
if (!class_exists(FFI::class)) {
throw new \RuntimeException('ffi_extension_not_loaded');
}
$libraryPath ??= dirname(__DIR__, 2) . '/native/kcp/liblaylink_kcp.so';
if (!str_starts_with($libraryPath, '/')) {
$libraryPath = dirname(__DIR__, 2) . '/' . $libraryPath;
}
if (!is_file($libraryPath)) {
throw new \RuntimeException('kcp_ffi_library_not_found');
}
self::$ffi = FFI::cdef(self::CDEF, $libraryPath);
return self::$ffi;
}
}

View File

@ -0,0 +1,188 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use FFI;
use LayLink\Protocol\Frame;
use LayLink\Protocol\FrameCodec;
use LayLink\Protocol\FrameParser;
final class NativeKcpSession
{
private const RECEIVE_BUFFER_BYTES = 8 * 1024 * 1024;
private \FFI $ffi;
private mixed $session;
private FrameParser $parser;
private bool $paused = false;
private bool $closed = false;
/** @var Frame[] */
private array $pausedFrames = [];
private int $queuedBytes = 0;
public function __construct(
private readonly int $conv,
private readonly \Closure $sendPacket,
private readonly \Closure $onFrame,
private readonly \Closure $onInvalidFrame,
private readonly ?string $libraryPath = null,
) {
$this->ffi = NativeKcpLibrary::load($this->libraryPath);
$this->session = $this->ffi->laylink_kcp_create($conv);
if ($this->session === null) {
throw new \RuntimeException('kcp_create_failed');
}
$this->parser = new FrameParser();
}
public function __destruct()
{
if (!$this->closed) {
$this->closed = true;
$this->ffi->laylink_kcp_release($this->session);
}
}
public function conv(): int
{
return $this->conv;
}
public function sendFrame(Frame $frame, int $maxSendBuffer): bool|null
{
if ($this->closed || $this->queuedBytes >= $maxSendBuffer) {
return false;
}
$bytes = FrameCodec::encode($frame);
$result = $this->ffi->laylink_kcp_send($this->session, $bytes, strlen($bytes));
if ($result < 0) {
return false;
}
$this->queuedBytes += strlen($bytes);
$this->ffi->laylink_kcp_flush($this->session);
$this->drainOutput();
return true;
}
public function receive(string|array $packet): void
{
if ($this->closed || !is_string($packet)) {
return;
}
if ($this->ffi->laylink_kcp_input($this->session, $packet, strlen($packet)) < 0) {
return;
}
$this->drainFrames();
$this->drainOutput();
}
public function tick(): void
{
if ($this->closed) {
return;
}
$this->ffi->laylink_kcp_update($this->session, $this->nowMs());
$this->drainFrames();
$this->drainOutput();
}
public function pause(): void
{
$this->paused = true;
}
public function resume(): void
{
$this->paused = false;
while (!$this->paused && $this->pausedFrames !== []) {
$frame = array_shift($this->pausedFrames);
($this->onFrame)($frame);
}
}
public function close(): void
{
if ($this->closed) {
return;
}
$this->closed = true;
($this->sendPacket)(KcpPacketCodec::encode([
'type' => KcpPacketCodec::CLOSE,
'conv' => $this->conv,
]));
$this->ffi->laylink_kcp_release($this->session);
}
public function isClosed(): bool
{
return $this->closed;
}
public function getSendBufferQueueSize(): int
{
return max($this->queuedBytes, $this->pendingOutputBytes());
}
private function drainFrames(): void
{
while (($size = $this->ffi->laylink_kcp_peeksize($this->session)) > 0) {
if ($size > self::RECEIVE_BUFFER_BYTES) {
($this->onInvalidFrame)(new \RuntimeException('kcp_message_too_large'));
$this->close();
return;
}
$buffer = $this->ffi->new("char[$size]");
$read = $this->ffi->laylink_kcp_recv($this->session, $buffer, $size);
if ($read <= 0) {
return;
}
$this->queuedBytes = max(0, $this->queuedBytes - $read);
$bytes = FFI::string($buffer, $read);
try {
foreach ($this->parser->push($bytes) as $frame) {
if ($this->paused) {
$this->pausedFrames[] = $frame;
continue;
}
($this->onFrame)($frame);
}
} catch (\Throwable $e) {
($this->onInvalidFrame)($e);
}
}
}
private function drainOutput(): void
{
while (($size = $this->ffi->laylink_kcp_pending_output_size($this->session)) > 0) {
$buffer = $this->ffi->new("char[$size]");
$read = $this->ffi->laylink_kcp_pop_output($this->session, $buffer, $size);
if ($read <= 0) {
return;
}
($this->sendPacket)(FFI::string($buffer, $read));
}
}
private function pendingOutputBytes(): int
{
$size = $this->ffi->laylink_kcp_pending_output_size($this->session);
return $size > 0 ? $size : 0;
}
private function nowMs(): int
{
return (int)floor(microtime(true) * 1000);
}
}

View File

@ -0,0 +1,85 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use LayLink\Protocol\Frame;
use LayLink\Protocol\FrameCodec;
use LayLink\Protocol\FrameParser;
use Workerman\Connection\AsyncTcpConnection;
final class TcpFrameClientTransport implements FrameClientTransport
{
private ?AsyncTcpConnection $connection = null;
private FrameParser $parser;
public function __construct(
private readonly string $address,
private readonly int $maxSendBuffer,
private readonly \Closure $onConnect,
private readonly \Closure $onFrame,
private readonly \Closure $onClose,
private readonly \Closure $onBufferDrain,
private readonly \Closure $onInvalidFrame,
) {
$this->parser = new FrameParser();
}
public function connect(): void
{
$this->parser = new FrameParser();
$connection = new AsyncTcpConnection($this->address);
$connection->maxSendBufferSize = $this->maxSendBuffer;
$this->connection = $connection;
$connection->onConnect = function () use ($connection): void {
($this->onConnect)($this);
};
$connection->onMessage = function (AsyncTcpConnection $connection, string $data): void {
try {
foreach ($this->parser->push($data) as $frame) {
($this->onFrame)($this, $frame);
}
} catch (\Throwable $e) {
($this->onInvalidFrame)($e);
$connection->close();
}
};
$connection->onBufferDrain = function () use ($connection): void {
($this->onBufferDrain)($this);
};
$connection->onClose = function () use ($connection): void {
if ($this->connection === $connection) {
$this->connection = null;
}
($this->onClose)($this);
};
$connection->connect();
}
public function send(Frame $frame): bool|null
{
return $this->connection?->send(FrameCodec::encode($frame));
}
public function close(): void
{
$this->connection?->close();
}
public function pauseRecv(): void
{
$this->connection?->pauseRecv();
}
public function resumeRecv(): void
{
$this->connection?->resumeRecv();
}
public function getSendBufferQueueSize(): int
{
return $this->connection?->getSendBufferQueueSize() ?? 0;
}
}

View File

@ -0,0 +1,63 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use LayLink\Protocol\Frame;
use LayLink\Protocol\FrameCodec;
use LayLink\Protocol\FrameParser;
use Workerman\Connection\TcpConnection;
final class TcpFrameServerConnection implements FrameServerConnection
{
private FrameParser $parser;
public function __construct(private readonly TcpConnection $connection)
{
$this->parser = new FrameParser();
}
public function id(): int
{
return $this->connection->id;
}
/**
* @return Frame[]
*/
public function push(string $data): array
{
return $this->parser->push($data);
}
public function send(Frame $frame): bool|null
{
return $this->connection->send(FrameCodec::encode($frame));
}
public function close(): void
{
$this->connection->close();
}
public function pauseRecv(): void
{
$this->connection->pauseRecv();
}
public function resumeRecv(): void
{
$this->connection->resumeRecv();
}
public function getSendBufferQueueSize(): int
{
return $this->connection->getSendBufferQueueSize();
}
public function getRemoteIp(): string
{
return $this->connection->getRemoteIp();
}
}

View File

@ -0,0 +1,66 @@
<?php
declare(strict_types=1);
namespace LayLink\Transport;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;
final class TcpFrameServerListener
{
/** @var array<int, TcpFrameServerConnection> */
private array $connections = [];
public function __construct(
Worker $worker,
private readonly int $maxSendBuffer,
private readonly \Closure $onConnect,
private readonly \Closure $onFrame,
private readonly \Closure $onClose,
private readonly \Closure $onBufferDrain,
private readonly \Closure $onInvalidFrame,
) {
$worker->onConnect = fn (TcpConnection $connection) => $this->handleConnect($connection);
$worker->onMessage = fn (TcpConnection $connection, string $data) => $this->handleMessage($connection, $data);
$worker->onClose = fn (TcpConnection $connection) => $this->handleClose($connection);
}
private function handleConnect(TcpConnection $connection): void
{
$connection->maxSendBufferSize = $this->maxSendBuffer;
$wrapped = new TcpFrameServerConnection($connection);
$this->connections[$connection->id] = $wrapped;
$connection->onBufferDrain = function () use ($wrapped): void {
($this->onBufferDrain)($wrapped);
};
($this->onConnect)($wrapped);
}
private function handleMessage(TcpConnection $connection, string $data): void
{
$wrapped = $this->connections[$connection->id] ?? null;
if ($wrapped === null) {
return;
}
try {
foreach ($wrapped->push($data) as $frame) {
($this->onFrame)($wrapped, $frame);
}
} catch (\Throwable $e) {
($this->onInvalidFrame)($wrapped, $e);
}
}
private function handleClose(TcpConnection $connection): void
{
$wrapped = $this->connections[$connection->id] ?? null;
unset($this->connections[$connection->id]);
if ($wrapped === null) {
return;
}
($this->onClose)($wrapped);
}
}

View File

@ -74,6 +74,17 @@ final class Env
return $default;
}
$value = trim($value);
if (str_starts_with($value, '[')) {
$decoded = json_decode($value, true);
if (is_array($decoded)) {
return array_values(array_filter(array_map(
static fn (mixed $item): string => strtolower(trim((string)$item)),
$decoded,
)));
}
}
return array_values(array_filter(array_map(
static fn (string $item): string => strtolower(trim($item)),
explode(',', $value),