*/ private array $initialBuffers = []; /** @var array */ private array $connectionSessionIds = []; /** @var array */ private array $clients = []; /** @var array */ private array $sessionStates = []; /** @var array */ private array $pendingData = []; /** @var array */ private array $connectionStages = []; /** @var array */ private array $sessionIngressProtocols = []; /** @var array */ private array $udpClients = []; /** @var array */ private array $udpSessions = []; public function __construct( private readonly string $clientListen, private readonly string $ingressProtocol, private readonly string $popAddress, private readonly string $nodeId, private readonly string $nodeType, private readonly string $nodeToken, private readonly string $nodeZone, private readonly string $transportProtocol, private readonly string $clientAuthToken, private readonly string $defaultUserId, private readonly string $socks5AuthMode = 'no-auth', private readonly string $socks5Username = '', private readonly string $socks5Password = '', private readonly ?string $socks5UdpListen = null, private readonly string $socks5UdpAdvertiseIp = '127.0.0.1', ) { } public function boot(string $workerName): void { $worker = new Worker('tcp://' . $this->clientListen); $worker->name = $workerName; $worker->count = 1; $worker->onWorkerStart = function (): void { if ($this->transportProtocol !== 'tcp') { throw new \RuntimeException("Agent transport '{$this->transportProtocol}' is configured but not implemented yet."); } $this->connect(); Timer::add(10, fn () => $this->heartbeat()); }; $worker->onConnect = fn (TcpConnection $connection) => $this->onClientConnect($connection); $worker->onMessage = fn (TcpConnection $connection, string $data) => $this->onClientMessage($connection, $data); $worker->onClose = fn (TcpConnection $connection) => $this->onClientClose($connection); if ($this->ingressProtocol === 'socks5' && $this->socks5UdpListen !== null) { $udpWorker = new Worker('udp://' . $this->socks5UdpListen); $udpWorker->name = $workerName . '-udp'; $udpWorker->count = 1; $udpWorker->onMessage = fn (UdpConnection $connection, string $data) => $this->onSocks5UdpMessage($connection, $data); } } private function connect(): void { $this->parser = new FrameParser(); $this->authenticated = false; $connection = new AsyncTcpConnection($this->popAddress); $connection->maxSendBufferSize = 8 * 1024 * 1024; $this->pop = $connection; $connection->onConnect = function (AsyncTcpConnection $connection): void { $this->send(new Frame(FrameType::AUTH, null, [ 'node_id' => $this->nodeId, 'node_type' => $this->nodeType, 'node_zone' => $this->nodeZone, 'node_token' => $this->nodeToken, 'transport_protocol' => $this->transportProtocol, 'supported_protocols' => ['tcp'], 'supported_transports' => ['tcp', 'udp', 'kcp'], ])); }; $connection->onMessage = function (AsyncTcpConnection $connection, string $data): void { try { foreach ($this->parser?->push($data) ?? [] as $frame) { $this->handleFrame($frame); } } catch (\Throwable $e) { $connection->close(); } }; $connection->onClose = function (): void { $this->authenticated = false; foreach ($this->clients as $client) { $client->close(); } $this->initialBuffers = []; $this->connectionSessionIds = []; $this->clients = []; $this->sessionStates = []; $this->pendingData = []; $this->connectionStages = []; $this->sessionIngressProtocols = []; Timer::add(3, fn () => $this->connect(), [], false); }; $connection->connect(); } private function handleFrame(Frame $frame): void { match ($frame->type) { FrameType::AUTH_OK => $this->authenticated = true, FrameType::AUTH_FAIL => $this->pop?->close(), FrameType::PONG => null, FrameType::OPEN_OK => $this->openClientSession($frame), FrameType::OPEN_FAIL => $this->failClientSession($frame), FrameType::DATA => $this->forwardToClient($frame), FrameType::UDP_DATA => $this->forwardUdpToClient($frame), FrameType::CLOSE => $this->closeClient($frame->sessionId), default => null, }; } private function onClientConnect(TcpConnection $connection): void { $connection->maxSendBufferSize = 8 * 1024 * 1024; $this->initialBuffers[$connection->id] = ''; $this->connectionStages[$connection->id] = 'init'; } private function onClientMessage(TcpConnection $connection, string $data): void { if (!isset($this->connectionSessionIds[$connection->id])) { $this->handleInitialRequest($connection, $data); return; } $sessionId = $this->connectionSessionIds[$connection->id]; if (($this->sessionStates[$sessionId] ?? null) !== 'open') { $this->pendingData[$sessionId] = ($this->pendingData[$sessionId] ?? '') . $data; return; } $this->send(new Frame(FrameType::DATA, $sessionId, ['data_raw' => $data])); } private function handleInitialRequest(TcpConnection $connection, string $data): void { $buffer = ($this->initialBuffers[$connection->id] ?? '') . $data; $this->initialBuffers[$connection->id] = $buffer; match ($this->ingressProtocol) { 'raw-json' => $this->handleRawJsonInitialRequest($connection), 'socks5' => $this->handleSocks5InitialRequest($connection), 'http-proxy' => $this->handleHttpProxyInitialRequest($connection), default => $this->failLocalClient($connection, 'unsupported_ingress_protocol'), }; } private function handleRawJsonInitialRequest(TcpConnection $connection): void { $buffer = $this->initialBuffers[$connection->id] ?? ''; $payloadBytes = ''; $requestText = null; if (($pos = strpos($buffer, "\n")) !== false) { $requestText = substr($buffer, 0, $pos); $payloadBytes = substr($buffer, $pos + 1); } elseif (($decoded = json_decode(trim($buffer), true)) && is_array($decoded)) { $requestText = trim($buffer); } if ($requestText === null) { if (strlen($buffer) > 8192) { $connection->send("ERR invalid_frame\n"); $connection->close(); } else { $this->initialBuffers[$connection->id] = $buffer; } return; } unset($this->initialBuffers[$connection->id]); $request = json_decode(trim($requestText), true); if (!is_array($request)) { $connection->send("ERR invalid_frame\n"); $connection->close(); return; } $this->startPopSession($connection, [ 'auth_token' => (string)($request['auth_token'] ?? ''), 'user_id' => (string)($request['user_id'] ?? ''), 'target_host' => (string)($request['target_host'] ?? ''), 'target_port' => (int)($request['target_port'] ?? 0), 'protocol' => (string)($request['protocol'] ?? 'tcp'), 'route_hint' => $request['route_hint'] ?? null, ], $payloadBytes, 'raw-json'); } private function handleSocks5InitialRequest(TcpConnection $connection): void { $buffer = $this->initialBuffers[$connection->id] ?? ''; $stage = $this->connectionStages[$connection->id] ?? 'init'; if ($stage === 'init') { if (strlen($buffer) < 2) { return; } $version = ord($buffer[0]); $methods = ord($buffer[1]); if ($version !== 5) { $this->failLocalClient($connection, 'invalid_socks_version'); return; } if (strlen($buffer) < 2 + $methods) { return; } $offeredMethods = array_map('ord', str_split(substr($buffer, 2, $methods))); $selectedMethod = $this->selectSocks5AuthMethod($offeredMethods); if ($selectedMethod === null) { $connection->send("\x05\xff"); $connection->close(); return; } $connection->send("\x05" . chr($selectedMethod)); $buffer = substr($buffer, 2 + $methods); $this->initialBuffers[$connection->id] = $buffer; $this->connectionStages[$connection->id] = $selectedMethod === 2 ? 'auth' : 'request'; if ($buffer === '') { return; } $stage = $this->connectionStages[$connection->id]; } if ($stage === 'auth') { if (strlen($buffer) < 2) { return; } if (ord($buffer[0]) !== 1) { $connection->send("\x01\x01"); $connection->close(); return; } $usernameLength = ord($buffer[1]); if (strlen($buffer) < 2 + $usernameLength + 1) { return; } $username = substr($buffer, 2, $usernameLength); $passwordLengthOffset = 2 + $usernameLength; $passwordLength = ord($buffer[$passwordLengthOffset]); if (strlen($buffer) < $passwordLengthOffset + 1 + $passwordLength) { return; } $password = substr($buffer, $passwordLengthOffset + 1, $passwordLength); if (!hash_equals($this->socks5Username, $username) || !hash_equals($this->socks5Password, $password)) { $connection->send("\x01\x01"); $connection->close(); return; } $connection->send("\x01\x00"); $buffer = substr($buffer, $passwordLengthOffset + 1 + $passwordLength); $this->initialBuffers[$connection->id] = $buffer; $this->connectionStages[$connection->id] = 'request'; if ($buffer === '') { return; } } if (strlen($buffer) < 4) { return; } $version = ord($buffer[0]); $command = ord($buffer[1]); $reserved = ord($buffer[2]); $addressType = ord($buffer[3]); if ($version !== 5 || $reserved !== 0) { $connection->send($this->socks5Reply(1)); $connection->close(); return; } if ($command !== 1 && $command !== 3) { $connection->send($this->socks5Reply(7)); $connection->close(); return; } $offset = 4; if ($addressType === 1) { if (strlen($buffer) < $offset + 4 + 2) { return; } $host = inet_ntop(substr($buffer, $offset, 4)); $offset += 4; } elseif ($addressType === 3) { if (strlen($buffer) < $offset + 1) { return; } $length = ord($buffer[$offset]); $offset++; if (strlen($buffer) < $offset + $length + 2) { return; } $host = substr($buffer, $offset, $length); $offset += $length; } elseif ($addressType === 4) { if (strlen($buffer) < $offset + 16 + 2) { return; } $host = inet_ntop(substr($buffer, $offset, 16)); $offset += 16; } else { $connection->send($this->socks5Reply(8)); $connection->close(); return; } $port = unpack('nport', substr($buffer, $offset, 2))['port']; $offset += 2; unset($this->initialBuffers[$connection->id], $this->connectionStages[$connection->id]); if ($command === 3) { $connection->send($this->socks5UdpAssociateReply()); return; } $this->startPopSession($connection, [ 'auth_token' => $this->clientAuthToken, 'user_id' => $this->defaultUserId, 'target_host' => (string)$host, 'target_port' => (int)$port, 'protocol' => 'tcp', ], substr($buffer, $offset), 'socks5'); } private function handleHttpProxyInitialRequest(TcpConnection $connection): void { $buffer = $this->initialBuffers[$connection->id] ?? ''; $headerEnd = strpos($buffer, "\r\n\r\n"); if ($headerEnd === false) { if (strlen($buffer) > 65536) { $this->failLocalClient($connection, 'invalid_http_proxy_request'); } return; } $headers = substr($buffer, 0, $headerEnd + 4); $body = substr($buffer, $headerEnd + 4); $lineEnd = strpos($headers, "\r\n"); if ($lineEnd === false) { $this->failLocalClient($connection, 'invalid_http_proxy_request'); return; } $requestLine = substr($headers, 0, $lineEnd); $parts = explode(' ', $requestLine, 3); if (count($parts) !== 3) { $this->failLocalClient($connection, 'invalid_http_proxy_request'); return; } [$method, $target, $version] = $parts; unset($this->initialBuffers[$connection->id], $this->connectionStages[$connection->id]); if (strtoupper($method) === 'CONNECT') { [$host, $port] = $this->splitHostPort($target, 443); $this->startPopSession($connection, [ 'auth_token' => $this->clientAuthToken, 'user_id' => $this->defaultUserId, 'target_host' => $host, 'target_port' => $port, 'protocol' => 'tcp', ], '', 'http-connect'); return; } $url = parse_url($target); if (!is_array($url) || !isset($url['host'])) { $this->failLocalClient($connection, 'unsupported_http_proxy_request'); return; } $scheme = strtolower((string)($url['scheme'] ?? 'http')); $port = (int)($url['port'] ?? ($scheme === 'https' ? 443 : 80)); $path = (string)($url['path'] ?? '/'); if (isset($url['query'])) { $path .= '?' . $url['query']; } $rewritten = $method . ' ' . $path . ' ' . $version . substr($headers, $lineEnd) . $body; $this->startPopSession($connection, [ 'auth_token' => $this->clientAuthToken, 'user_id' => $this->defaultUserId, 'target_host' => (string)$url['host'], 'target_port' => $port, 'protocol' => 'tcp', ], $rewritten, 'http-proxy'); } private function startPopSession(TcpConnection $connection, array $request, string $payloadBytes, string $ingressProtocol): void { if (!$this->authenticated || $this->pop === null) { $this->failOpeningLocalClient($connection, $ingressProtocol, 'pop_not_connected'); return; } $sessionId = Uuid::v4(); $this->connectionSessionIds[$connection->id] = $sessionId; $this->clients[$sessionId] = $connection; $this->sessionStates[$sessionId] = 'opening'; $this->sessionIngressProtocols[$sessionId] = $ingressProtocol; if ($payloadBytes !== '') { $this->pendingData[$sessionId] = $payloadBytes; } $this->send(new Frame(FrameType::OPEN, $sessionId, [ 'auth_token' => (string)($request['auth_token'] ?? ''), 'user_id' => (string)($request['user_id'] ?? ''), 'target_host' => (string)($request['target_host'] ?? ''), 'target_port' => (int)($request['target_port'] ?? 0), 'protocol' => (string)($request['protocol'] ?? 'tcp'), 'route_hint' => $request['route_hint'] ?? null, 'source_ip' => (string)($connection->getRemoteIp() ?? ''), ])); } private function failOpeningLocalClient(TcpConnection $connection, string $ingressProtocol, string $reason): void { if ($ingressProtocol === 'socks5') { $connection->send($this->socks5Reply(1)); } elseif (str_starts_with($ingressProtocol, 'http')) { $connection->send("HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\nX-LayLink-Error: {$reason}\r\n\r\n"); } else { $connection->send("ERR {$reason}\n"); } $connection->close(); } private function onSocks5UdpMessage(UdpConnection $connection, string $data): void { if (!$this->authenticated || $this->pop === null || strlen($data) < 10) { return; } $clientAddress = $connection->getRemoteAddress(); $this->udpClients[$clientAddress] = $connection; $offset = 0; $reserved = substr($data, $offset, 2); $offset += 2; $fragment = ord($data[$offset]); $offset++; $addressType = ord($data[$offset]); $offset++; if ($reserved !== "\x00\x00" || $fragment !== 0) { return; } $parsed = $this->parseSocksAddress($data, $offset, $addressType); if ($parsed === null) { return; } [$host, $port, $offset] = $parsed; $payload = substr($data, $offset); $sessionId = $this->udpSessionId($clientAddress, $host, $port); $this->udpSessions[$sessionId] = [ 'client_address' => $clientAddress, 'target_host' => $host, 'target_port' => $port, ]; $this->send(new Frame(FrameType::UDP_DATA, $sessionId, [ 'auth_token' => $this->clientAuthToken, 'user_id' => $this->defaultUserId, 'target_host' => $host, 'target_port' => $port, 'protocol' => 'udp', 'source_ip' => $connection->getRemoteIp(), 'data' => base64_encode($payload), ])); } private function forwardUdpToClient(Frame $frame): void { if ($frame->sessionId === null || !isset($this->udpSessions[$frame->sessionId])) { return; } $session = $this->udpSessions[$frame->sessionId]; $clientAddress = $session['client_address']; $client = $this->udpClients[$clientAddress] ?? null; $data = base64_decode((string)($frame->payload['data'] ?? ''), true); if ($client === null || $data === false) { return; } $client->send($this->packSocks5UdpPacket( (string)($frame->payload['target_host'] ?? $session['target_host']), (int)($frame->payload['target_port'] ?? $session['target_port']), $data, )); } private function openClientSession(Frame $frame): void { if ($frame->sessionId === null || !isset($this->clients[$frame->sessionId])) { return; } $this->sessionStates[$frame->sessionId] = 'open'; $ingressProtocol = $this->sessionIngressProtocols[$frame->sessionId] ?? 'raw-json'; if ($ingressProtocol === 'raw-json') { $this->clients[$frame->sessionId]->send("OK\n"); } elseif ($ingressProtocol === 'socks5') { $this->clients[$frame->sessionId]->send($this->socks5Reply(0)); } elseif ($ingressProtocol === 'http-connect') { $this->clients[$frame->sessionId]->send("HTTP/1.1 200 Connection Established\r\n\r\n"); } $pending = $this->pendingData[$frame->sessionId] ?? ''; unset($this->pendingData[$frame->sessionId]); if ($pending !== '') { $this->send(new Frame(FrameType::DATA, $frame->sessionId, ['data_raw' => $pending])); } } private function failClientSession(Frame $frame): void { if ($frame->sessionId === null || !isset($this->clients[$frame->sessionId])) { return; } $reason = (string)($frame->payload['reason'] ?? 'open_failed'); $ingressProtocol = $this->sessionIngressProtocols[$frame->sessionId] ?? 'raw-json'; if ($ingressProtocol === 'socks5') { $this->clients[$frame->sessionId]->send($this->socks5Reply($this->socks5ReplyCodeForReason($reason))); } elseif (str_starts_with($ingressProtocol, 'http')) { $this->clients[$frame->sessionId]->send("HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\nX-LayLink-Error: {$reason}\r\n\r\n"); } else { $this->clients[$frame->sessionId]->send("ERR {$reason}\n"); } $this->closeClient($frame->sessionId); } private function forwardToClient(Frame $frame): void { if ($frame->sessionId === null || !isset($this->clients[$frame->sessionId])) { $this->send(new Frame(FrameType::ERROR, $frame->sessionId, ['reason' => 'session_not_found'])); return; } $data = $this->frameData($frame); if ($data === false) { $this->send(new Frame(FrameType::ERROR, $frame->sessionId, ['reason' => 'invalid_frame'])); return; } $this->clients[$frame->sessionId]->send($data); } private function frameData(Frame $frame): string|false { if (isset($frame->payload['data_raw']) && is_string($frame->payload['data_raw'])) { return $frame->payload['data_raw']; } return base64_decode((string)($frame->payload['data'] ?? ''), true); } private function onClientClose(TcpConnection $connection): void { unset($this->initialBuffers[$connection->id]); unset($this->connectionStages[$connection->id]); if (!isset($this->connectionSessionIds[$connection->id])) { return; } $sessionId = $this->connectionSessionIds[$connection->id]; unset($this->connectionSessionIds[$connection->id]); unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]); $this->send(new Frame(FrameType::CLOSE, $sessionId, ['reason' => 'client_closed'])); } private function closeClient(?string $sessionId): void { if ($sessionId === null || !isset($this->clients[$sessionId])) { return; } $connection = $this->clients[$sessionId]; unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]); unset($this->connectionSessionIds[$connection->id]); $connection->close(); } private function failLocalClient(TcpConnection $connection, string $reason): void { if ($this->ingressProtocol === 'socks5') { $connection->send($this->socks5Reply(1)); } elseif ($this->ingressProtocol === 'http-proxy') { $connection->send("HTTP/1.1 400 Bad Request\r\nConnection: close\r\nX-LayLink-Error: {$reason}\r\n\r\n"); } else { $connection->send("ERR {$reason}\n"); } $connection->close(); } /** * @param int[] $offeredMethods */ private function selectSocks5AuthMethod(array $offeredMethods): ?int { $mode = strtolower($this->socks5AuthMode); if (in_array($mode, ['userpass', 'username-password', 'password'], true)) { return in_array(2, $offeredMethods, true) ? 2 : null; } return in_array(0, $offeredMethods, true) ? 0 : null; } private function socks5Reply(int $replyCode): string { return "\x05" . chr($replyCode) . "\x00\x01\x00\x00\x00\x00\x00\x00"; } private function socks5UdpAssociateReply(): string { if ($this->socks5UdpListen === null) { return $this->socks5Reply(7); } [$listenIp, $listenPort] = $this->splitHostPort($this->socks5UdpListen, 0); $ip = $this->socks5UdpAdvertiseIp !== '' ? $this->socks5UdpAdvertiseIp : $listenIp; return "\x05\x00\x00" . $this->packSocksAddress($ip, $listenPort); } /** * @return array{0: string, 1: int, 2: int}|null */ private function parseSocksAddress(string $buffer, int $offset, int $addressType): ?array { if ($addressType === 1) { if (strlen($buffer) < $offset + 4 + 2) { return null; } $host = inet_ntop(substr($buffer, $offset, 4)); $offset += 4; } elseif ($addressType === 3) { if (strlen($buffer) < $offset + 1) { return null; } $length = ord($buffer[$offset]); $offset++; if (strlen($buffer) < $offset + $length + 2) { return null; } $host = substr($buffer, $offset, $length); $offset += $length; } elseif ($addressType === 4) { if (strlen($buffer) < $offset + 16 + 2) { return null; } $host = inet_ntop(substr($buffer, $offset, 16)); $offset += 16; } else { return null; } $port = unpack('nport', substr($buffer, $offset, 2))['port']; $offset += 2; return [(string)$host, (int)$port, $offset]; } private function packSocks5UdpPacket(string $host, int $port, string $data): string { return "\x00\x00\x00" . $this->packSocksAddress($host, $port) . $data; } private function packSocksAddress(string $host, int $port): string { if (filter_var($host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4)) { return "\x01" . inet_pton($host) . pack('n', $port); } if (filter_var($host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) { return "\x04" . inet_pton($host) . pack('n', $port); } return "\x03" . chr(strlen($host)) . $host . pack('n', $port); } private function udpSessionId(string $clientAddress, string $host, int $port): string { return hash('sha256', $this->nodeId . '|' . $clientAddress . '|' . $host . '|' . $port); } private function socks5ReplyCodeForReason(string $reason): int { return match ($reason) { 'policy_denied', 'invalid_auth' => 2, 'node_offline', 'route_not_found' => 3, 'target_connection_refused' => 5, 'protocol_not_supported', 'route_not_supported' => 7, default => 1, }; } /** * @return array{0: string, 1: int} */ private function splitHostPort(string $target, int $defaultPort): array { if (str_starts_with($target, '[')) { $end = strpos($target, ']'); if ($end !== false) { $host = substr($target, 1, $end - 1); $port = substr($target, $end + 1); return [$host, str_starts_with($port, ':') ? (int)substr($port, 1) : $defaultPort]; } } $parts = explode(':', $target); if (count($parts) >= 2) { $port = array_pop($parts); return [implode(':', $parts), (int)$port]; } return [$target, $defaultPort]; } private function heartbeat(): void { if (!$this->authenticated || $this->pop === null) { return; } $this->send(new Frame(FrameType::PING, null, [ 'node_id' => $this->nodeId, 'active_sessions' => count($this->clients), 'load' => sys_getloadavg()[0] ?? 0.0, 'timestamp' => time(), ])); } private function send(Frame $frame): void { $this->pop?->send(FrameCodec::encode($frame)); } }