压力优化
This commit is contained in:
parent
9c07b9fadc
commit
068d3f4be9
25
contract.md
25
contract.md
@ -160,6 +160,15 @@ Completed in this checkpoint:
|
|||||||
* TCP `DATA` payloads now use binary frame encoding when both ends run the updated code.
|
* TCP `DATA` payloads now use binary frame encoding when both ends run the updated code.
|
||||||
* This removes base64 expansion and JSON string encoding from the hot TCP data path.
|
* This removes base64 expansion and JSON string encoding from the hot TCP data path.
|
||||||
* Verified binary TCP `DATA` frame encode/decode under both `none` and `chacha20`.
|
* Verified binary TCP `DATA` frame encode/decode under both `none` and `chacha20`.
|
||||||
|
* Added POP-side target DNS pre-resolution:
|
||||||
|
* Domain resolution failures return `OPEN_FAIL` with `dns_resolution_failed`.
|
||||||
|
* POP no longer lets common target DNS failures bubble up as raw `stream_socket_client()` warnings.
|
||||||
|
* Added TCP backpressure for large transfers:
|
||||||
|
* POP pauses target reads when the Agent connection send buffer crosses the high watermark.
|
||||||
|
* POP resumes target reads when the Agent connection drains.
|
||||||
|
* Client Agent pauses local client reads when the POP connection send buffer crosses the high watermark.
|
||||||
|
* Client Agent pauses POP reads while a local client output buffer is full.
|
||||||
|
* Send buffer limits are raised to 32 MiB with a 16 MiB backpressure high watermark.
|
||||||
|
|
||||||
Known MVP limitations:
|
Known MVP limitations:
|
||||||
|
|
||||||
@ -173,7 +182,7 @@ Known MVP limitations:
|
|||||||
* No TLS yet.
|
* No TLS yet.
|
||||||
* No production-grade client identity yet; `dev-token` is hardcoded for MVP development.
|
* No production-grade client identity yet; `dev-token` is hardcoded for MVP development.
|
||||||
* No automated integration test harness yet.
|
* No automated integration test harness yet.
|
||||||
* TCP stream forwarding is still single Agent-to-POP connection based; binary `DATA` frames reduce per-byte overhead, but KCP/multipath/parallel transport and flow-control tuning are still future performance work.
|
* TCP stream forwarding is still single Agent-to-POP connection based; binary `DATA` frames and backpressure reduce per-byte overhead and buffer blowups, but KCP/multipath/parallel transport and per-session flow-control tuning are still future performance work.
|
||||||
* No explicit idle timeout or connect timeout enforcement yet.
|
* No explicit idle timeout or connect timeout enforcement yet.
|
||||||
* UDP relay is datagram-oriented and currently creates short-lived POP-side UDP sockets per outbound datagram; pooling and stronger timeout accounting are still future work.
|
* UDP relay is datagram-oriented and currently creates short-lived POP-side UDP sockets per outbound datagram; pooling and stronger timeout accounting are still future work.
|
||||||
* HTTP proxy supports `CONNECT` and ordinary absolute URL HTTP requests; advanced proxy auth and full HTTP/2 proxying are not implemented.
|
* HTTP proxy supports `CONNECT` and ordinary absolute URL HTTP requests; advanced proxy auth and full HTTP/2 proxying are not implemented.
|
||||||
@ -183,14 +192,16 @@ Next recommended tasks:
|
|||||||
1. Add a local integration harness that starts POP, Client Agent, and a mock TCP echo target, then verifies authorized tunnel, policy denial, and agent local denial.
|
1. Add a local integration harness that starts POP, Client Agent, and a mock TCP echo target, then verifies authorized tunnel, policy denial, and agent local denial.
|
||||||
2. Add configurable client auth token or JWT-ready auth interface.
|
2. Add configurable client auth token or JWT-ready auth interface.
|
||||||
3. Add target connect timeout and session idle timeout.
|
3. Add target connect timeout and session idle timeout.
|
||||||
4. Add buffer full/drain handling with audit result `buffer_overflow`.
|
4. Add more detailed buffer overflow audit reasons and metrics.
|
||||||
5. Add README quickstart with exact local commands.
|
5. Add README quickstart with exact local commands.
|
||||||
6. Add a reproducible throughput benchmark script for direct-vs-LayLink comparisons.
|
6. Add a reproducible throughput benchmark script for direct-vs-LayLink comparisons.
|
||||||
7. Add KCP or another UDP-based reliable transport behind the transport abstraction.
|
7. Add multi-connection Agent-to-POP support so multiple Client Agent workers can spread concurrent sessions safely.
|
||||||
8. Optimize UDP relay with POP-side UDP socket pooling.
|
8. Add KCP or another UDP-based reliable transport behind the transport abstraction.
|
||||||
9. Add UDP association idle timeouts and cleanup.
|
9. Add per-session flow-control windows to reduce head-of-line blocking on one Agent connection.
|
||||||
10. Aggregate UDP audit records per association instead of per datagram.
|
10. Optimize UDP relay with POP-side UDP socket pooling.
|
||||||
11. Add UDP and per-user rate limiting.
|
11. Add UDP association idle timeouts and cleanup.
|
||||||
|
12. Aggregate UDP audit records per association instead of per datagram.
|
||||||
|
13. Add UDP and per-user rate limiting.
|
||||||
|
|
||||||
## 0. Project Name
|
## 0. Project Name
|
||||||
|
|
||||||
|
|||||||
@ -380,6 +380,10 @@ php bin/client-agent.php start
|
|||||||
|
|
||||||
TCP 大流量 `DATA` 帧使用二进制帧编码;`AUTH`、`OPEN`、`CLOSE`、`ERROR` 等控制帧仍使用 JSON 编码。启用 `chacha20` 时,二进制和 JSON Frame body 都会被加密。
|
TCP 大流量 `DATA` 帧使用二进制帧编码;`AUTH`、`OPEN`、`CLOSE`、`ERROR` 等控制帧仍使用 JSON 编码。启用 `chacha20` 时,二进制和 JSON Frame body 都会被加密。
|
||||||
|
|
||||||
|
大文件下载时,LayLink 会使用 Workerman 的 `pauseRecv()` / `resumeRecv()` 做背压:当下游发送缓冲区过高时暂停上游读取,缓冲排空后继续读取。这可以避免单个慢连接无限堆积内存或因为发送缓冲区满而断联。
|
||||||
|
|
||||||
|
当前每个 Client Agent worker 仍然通过单条 Agent-to-POP TCP 长连接承载多个会话。背压可以保护进程不堵死,但单条 TCP 长连接仍可能产生队头阻塞;多 worker、多 POP 长连接、KCP 或 per-session window 是后续性能优化方向。
|
||||||
|
|
||||||
验证 SOCKS5 HTTPS 联通性和出口 IP:
|
验证 SOCKS5 HTTPS 联通性和出口 IP:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|||||||
@ -17,6 +17,9 @@ use Workerman\Worker;
|
|||||||
|
|
||||||
final class AgentClient
|
final class AgentClient
|
||||||
{
|
{
|
||||||
|
private const MAX_SEND_BUFFER = 32 * 1024 * 1024;
|
||||||
|
private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024;
|
||||||
|
|
||||||
private ?AsyncTcpConnection $pop = null;
|
private ?AsyncTcpConnection $pop = null;
|
||||||
private ?FrameParser $parser = null;
|
private ?FrameParser $parser = null;
|
||||||
private bool $authenticated = false;
|
private bool $authenticated = false;
|
||||||
@ -38,6 +41,10 @@ final class AgentClient
|
|||||||
private array $udpClients = [];
|
private array $udpClients = [];
|
||||||
/** @var array<string, array{client_address: string, target_host: string, target_port: int}> */
|
/** @var array<string, array{client_address: string, target_host: string, target_port: int}> */
|
||||||
private array $udpSessions = [];
|
private array $udpSessions = [];
|
||||||
|
/** @var array<int, TcpConnection> */
|
||||||
|
private array $pausedClientsForPop = [];
|
||||||
|
/** @var array<int, true> */
|
||||||
|
private array $clientsPausingPop = [];
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
private readonly string $clientListen,
|
private readonly string $clientListen,
|
||||||
@ -87,7 +94,7 @@ final class AgentClient
|
|||||||
$this->parser = new FrameParser();
|
$this->parser = new FrameParser();
|
||||||
$this->authenticated = false;
|
$this->authenticated = false;
|
||||||
$connection = new AsyncTcpConnection($this->popAddress);
|
$connection = new AsyncTcpConnection($this->popAddress);
|
||||||
$connection->maxSendBufferSize = 8 * 1024 * 1024;
|
$connection->maxSendBufferSize = self::MAX_SEND_BUFFER;
|
||||||
$this->pop = $connection;
|
$this->pop = $connection;
|
||||||
|
|
||||||
$connection->onConnect = function (AsyncTcpConnection $connection): void {
|
$connection->onConnect = function (AsyncTcpConnection $connection): void {
|
||||||
@ -110,6 +117,7 @@ final class AgentClient
|
|||||||
$connection->close();
|
$connection->close();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
$connection->onBufferDrain = fn (AsyncTcpConnection $connection) => $this->resumeClientsForPop();
|
||||||
$connection->onClose = function (): void {
|
$connection->onClose = function (): void {
|
||||||
$this->authenticated = false;
|
$this->authenticated = false;
|
||||||
foreach ($this->clients as $client) {
|
foreach ($this->clients as $client) {
|
||||||
@ -122,6 +130,8 @@ final class AgentClient
|
|||||||
$this->pendingData = [];
|
$this->pendingData = [];
|
||||||
$this->connectionStages = [];
|
$this->connectionStages = [];
|
||||||
$this->sessionIngressProtocols = [];
|
$this->sessionIngressProtocols = [];
|
||||||
|
$this->pausedClientsForPop = [];
|
||||||
|
$this->clientsPausingPop = [];
|
||||||
Timer::add(3, fn () => $this->connect(), [], false);
|
Timer::add(3, fn () => $this->connect(), [], false);
|
||||||
};
|
};
|
||||||
$connection->connect();
|
$connection->connect();
|
||||||
@ -144,7 +154,9 @@ final class AgentClient
|
|||||||
|
|
||||||
private function onClientConnect(TcpConnection $connection): void
|
private function onClientConnect(TcpConnection $connection): void
|
||||||
{
|
{
|
||||||
$connection->maxSendBufferSize = 8 * 1024 * 1024;
|
$connection->maxSendBufferSize = self::MAX_SEND_BUFFER;
|
||||||
|
$connection->onBufferFull = fn (TcpConnection $connection) => $this->pausePopForClient($connection);
|
||||||
|
$connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumePopForClient($connection);
|
||||||
$this->initialBuffers[$connection->id] = '';
|
$this->initialBuffers[$connection->id] = '';
|
||||||
$this->connectionStages[$connection->id] = 'init';
|
$this->connectionStages[$connection->id] = 'init';
|
||||||
}
|
}
|
||||||
@ -162,7 +174,15 @@ final class AgentClient
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->send(new Frame(FrameType::DATA, $sessionId, ['data_raw' => $data]));
|
$sent = $this->send(new Frame(FrameType::DATA, $sessionId, ['data_raw' => $data]));
|
||||||
|
if ($sent === false) {
|
||||||
|
$this->closeClient($sessionId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ($this->pop !== null && $this->pop->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) {
|
||||||
|
$connection->pauseRecv();
|
||||||
|
$this->pausedClientsForPop[$connection->id] = $connection;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function handleInitialRequest(TcpConnection $connection, string $data): void
|
private function handleInitialRequest(TcpConnection $connection, string $data): void
|
||||||
@ -579,7 +599,15 @@ final class AgentClient
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->clients[$frame->sessionId]->send($data);
|
$client = $this->clients[$frame->sessionId];
|
||||||
|
$sent = $client->send($data);
|
||||||
|
if ($sent === false) {
|
||||||
|
$this->closeClient($frame->sessionId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ($client->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) {
|
||||||
|
$this->pausePopForClient($client);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function frameData(Frame $frame): string|false
|
private function frameData(Frame $frame): string|false
|
||||||
@ -595,6 +623,7 @@ final class AgentClient
|
|||||||
{
|
{
|
||||||
unset($this->initialBuffers[$connection->id]);
|
unset($this->initialBuffers[$connection->id]);
|
||||||
unset($this->connectionStages[$connection->id]);
|
unset($this->connectionStages[$connection->id]);
|
||||||
|
unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id]);
|
||||||
if (!isset($this->connectionSessionIds[$connection->id])) {
|
if (!isset($this->connectionSessionIds[$connection->id])) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -614,6 +643,7 @@ final class AgentClient
|
|||||||
$connection = $this->clients[$sessionId];
|
$connection = $this->clients[$sessionId];
|
||||||
unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]);
|
unset($this->clients[$sessionId], $this->sessionStates[$sessionId], $this->pendingData[$sessionId], $this->sessionIngressProtocols[$sessionId]);
|
||||||
unset($this->connectionSessionIds[$connection->id]);
|
unset($this->connectionSessionIds[$connection->id]);
|
||||||
|
unset($this->pausedClientsForPop[$connection->id], $this->clientsPausingPop[$connection->id]);
|
||||||
$connection->close();
|
$connection->close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -766,8 +796,30 @@ final class AgentClient
|
|||||||
]));
|
]));
|
||||||
}
|
}
|
||||||
|
|
||||||
private function send(Frame $frame): void
|
private function send(Frame $frame): bool|null
|
||||||
{
|
{
|
||||||
$this->pop?->send(FrameCodec::encode($frame));
|
return $this->pop?->send(FrameCodec::encode($frame));
|
||||||
|
}
|
||||||
|
|
||||||
|
private function resumeClientsForPop(): void
|
||||||
|
{
|
||||||
|
foreach ($this->pausedClientsForPop as $connectionId => $client) {
|
||||||
|
$client->resumeRecv();
|
||||||
|
unset($this->pausedClientsForPop[$connectionId]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private function pausePopForClient(TcpConnection $connection): void
|
||||||
|
{
|
||||||
|
$this->clientsPausingPop[$connection->id] = true;
|
||||||
|
$this->pop?->pauseRecv();
|
||||||
|
}
|
||||||
|
|
||||||
|
private function resumePopForClient(TcpConnection $connection): void
|
||||||
|
{
|
||||||
|
unset($this->clientsPausingPop[$connection->id]);
|
||||||
|
if ($this->clientsPausingPop === []) {
|
||||||
|
$this->pop?->resumeRecv();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -24,10 +24,15 @@ use Workerman\Worker;
|
|||||||
|
|
||||||
final class AgentListener
|
final class AgentListener
|
||||||
{
|
{
|
||||||
|
private const MAX_SEND_BUFFER = 32 * 1024 * 1024;
|
||||||
|
private const BACKPRESSURE_HIGH_WATERMARK = 16 * 1024 * 1024;
|
||||||
|
|
||||||
/** @var array<int, FrameParser> */
|
/** @var array<int, FrameParser> */
|
||||||
private array $parsers = [];
|
private array $parsers = [];
|
||||||
/** @var array<int, string> */
|
/** @var array<int, string> */
|
||||||
private array $connectionNodeIds = [];
|
private array $connectionNodeIds = [];
|
||||||
|
/** @var array<int, array<string, AsyncTcpConnection>> */
|
||||||
|
private array $pausedTargetsByAgent = [];
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
Worker $worker,
|
Worker $worker,
|
||||||
@ -46,7 +51,8 @@ final class AgentListener
|
|||||||
|
|
||||||
private function onConnect(TcpConnection $connection): void
|
private function onConnect(TcpConnection $connection): void
|
||||||
{
|
{
|
||||||
$connection->maxSendBufferSize = 8 * 1024 * 1024;
|
$connection->maxSendBufferSize = self::MAX_SEND_BUFFER;
|
||||||
|
$connection->onBufferDrain = fn (TcpConnection $connection) => $this->resumeTargetsForAgent($connection);
|
||||||
$this->parsers[$connection->id] = new FrameParser();
|
$this->parsers[$connection->id] = new FrameParser();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,6 +210,12 @@ final class AgentListener
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$targetAddress = $this->resolveTargetAddress($host, $port);
|
||||||
|
if ($targetAddress === null) {
|
||||||
|
$this->rejectOpen($agentConnection, $frame, 'dns_resolution_failed', (string)$auth['user_id'], $nodeId, $decision->policyId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
$session = new TunnelSession(
|
$session = new TunnelSession(
|
||||||
$frame->sessionId,
|
$frame->sessionId,
|
||||||
(string)$auth['user_id'],
|
(string)$auth['user_id'],
|
||||||
@ -219,8 +231,8 @@ final class AgentListener
|
|||||||
$session->state = TunnelSession::OPENING;
|
$session->state = TunnelSession::OPENING;
|
||||||
$this->sessions->add($session);
|
$this->sessions->add($session);
|
||||||
|
|
||||||
$target = new AsyncTcpConnection("tcp://{$host}:{$port}");
|
$target = new AsyncTcpConnection($targetAddress);
|
||||||
$target->maxSendBufferSize = 8 * 1024 * 1024;
|
$target->maxSendBufferSize = self::MAX_SEND_BUFFER;
|
||||||
$session->target = $target;
|
$session->target = $target;
|
||||||
|
|
||||||
$target->onConnect = function () use ($session, $agentConnection): void {
|
$target->onConnect = function () use ($session, $agentConnection): void {
|
||||||
@ -232,9 +244,17 @@ final class AgentListener
|
|||||||
};
|
};
|
||||||
$target->onMessage = function (AsyncTcpConnection $target, string $data) use ($session, $agentConnection): void {
|
$target->onMessage = function (AsyncTcpConnection $target, string $data) use ($session, $agentConnection): void {
|
||||||
$session->bytesTargetToClient += strlen($data);
|
$session->bytesTargetToClient += strlen($data);
|
||||||
$this->send($agentConnection, new Frame(FrameType::DATA, $session->sessionId, [
|
$sent = $this->send($agentConnection, new Frame(FrameType::DATA, $session->sessionId, [
|
||||||
'data_raw' => $data,
|
'data_raw' => $data,
|
||||||
]));
|
]));
|
||||||
|
if ($sent === false) {
|
||||||
|
$this->closeSession($session, 'failed', 'agent_buffer_overflow');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ($agentConnection->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK) {
|
||||||
|
$target->pauseRecv();
|
||||||
|
$this->pausedTargetsByAgent[$agentConnection->id][$session->sessionId] = $target;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
$target->onClose = fn () => $this->closeSession($session, 'closed', null);
|
$target->onClose = fn () => $this->closeSession($session, 'closed', null);
|
||||||
$target->onError = fn () => $this->failOpenSession($agentConnection, $session, 'target_connection_refused');
|
$target->onError = fn () => $this->failOpenSession($agentConnection, $session, 'target_connection_refused');
|
||||||
@ -250,12 +270,28 @@ final class AgentListener
|
|||||||
}
|
}
|
||||||
|
|
||||||
$session->bytesClientToTarget += strlen($data);
|
$session->bytesClientToTarget += strlen($data);
|
||||||
$session->target?->send($data);
|
if ($session->target === null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$sent = $session->target->send($data);
|
||||||
|
if ($sent === false) {
|
||||||
|
$this->closeSession($session, 'failed', 'target_buffer_overflow');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ($session->target->getSendBufferQueueSize() >= self::BACKPRESSURE_HIGH_WATERMARK && $session->agent !== null) {
|
||||||
|
$session->agent->pauseRecv();
|
||||||
|
$session->target->onBufferDrain = function (AsyncTcpConnection $target) use ($session): void {
|
||||||
|
$session->agent?->resumeRecv();
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function failOpenSession(TcpConnection $agentConnection, TunnelSession $session, string $reason): void
|
private function failOpenSession(TcpConnection $agentConnection, TunnelSession $session, string $reason): void
|
||||||
{
|
{
|
||||||
$this->send($agentConnection, new Frame(FrameType::OPEN_FAIL, $session->sessionId, ['reason' => $reason]));
|
if ($session->state === TunnelSession::OPENING) {
|
||||||
|
$this->send($agentConnection, new Frame(FrameType::OPEN_FAIL, $session->sessionId, ['reason' => $reason]));
|
||||||
|
}
|
||||||
$this->closeSession($session, 'failed', $reason);
|
$this->closeSession($session, 'failed', $reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,6 +334,9 @@ final class AgentListener
|
|||||||
|
|
||||||
$session->state = TunnelSession::CLOSED;
|
$session->state = TunnelSession::CLOSED;
|
||||||
$this->sessions->remove($session->sessionId);
|
$this->sessions->remove($session->sessionId);
|
||||||
|
if ($session->agent !== null) {
|
||||||
|
unset($this->pausedTargetsByAgent[$session->agent->id][$session->sessionId]);
|
||||||
|
}
|
||||||
|
|
||||||
if ($session->agent !== null) {
|
if ($session->agent !== null) {
|
||||||
$this->send($session->agent, new Frame(FrameType::CLOSE, $session->sessionId, ['reason' => $reason ?? $result]));
|
$this->send($session->agent, new Frame(FrameType::CLOSE, $session->sessionId, ['reason' => $reason ?? $result]));
|
||||||
@ -329,6 +368,7 @@ final class AgentListener
|
|||||||
{
|
{
|
||||||
unset($this->parsers[$connection->id]);
|
unset($this->parsers[$connection->id]);
|
||||||
unset($this->connectionNodeIds[$connection->id]);
|
unset($this->connectionNodeIds[$connection->id]);
|
||||||
|
unset($this->pausedTargetsByAgent[$connection->id]);
|
||||||
$node = $this->nodes->unregisterByConnection($connection);
|
$node = $this->nodes->unregisterByConnection($connection);
|
||||||
foreach ($this->sessions->all() as $session) {
|
foreach ($this->sessions->all() as $session) {
|
||||||
if ($session->agent === $connection) {
|
if ($session->agent === $connection) {
|
||||||
@ -351,8 +391,47 @@ final class AgentListener
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function send(TcpConnection $connection, Frame $frame): void
|
private function send(TcpConnection $connection, Frame $frame): bool|null
|
||||||
{
|
{
|
||||||
$connection->send(FrameCodec::encode($frame));
|
return $connection->send(FrameCodec::encode($frame));
|
||||||
|
}
|
||||||
|
|
||||||
|
private function resumeTargetsForAgent(TcpConnection $connection): void
|
||||||
|
{
|
||||||
|
foreach ($this->pausedTargetsByAgent[$connection->id] ?? [] as $sessionId => $target) {
|
||||||
|
$target->resumeRecv();
|
||||||
|
unset($this->pausedTargetsByAgent[$connection->id][$sessionId]);
|
||||||
|
}
|
||||||
|
if (($this->pausedTargetsByAgent[$connection->id] ?? []) === []) {
|
||||||
|
unset($this->pausedTargetsByAgent[$connection->id]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private function resolveTargetAddress(string $host, int $port): ?string
|
||||||
|
{
|
||||||
|
if (filter_var($host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4) !== false) {
|
||||||
|
return "tcp://{$host}:{$port}";
|
||||||
|
}
|
||||||
|
if (filter_var($host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6) !== false) {
|
||||||
|
return "tcp://[{$host}]:{$port}";
|
||||||
|
}
|
||||||
|
|
||||||
|
$records = @dns_get_record($host, DNS_A | DNS_AAAA);
|
||||||
|
if ($records === false || $records === []) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach ($records as $record) {
|
||||||
|
if (isset($record['ip']) && is_string($record['ip'])) {
|
||||||
|
return "tcp://{$record['ip']}:{$port}";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
foreach ($records as $record) {
|
||||||
|
if (isset($record['ipv6']) && is_string($record['ipv6'])) {
|
||||||
|
return "tcp://[{$record['ipv6']}]:{$port}";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user