monitor/vendor/textalk/websocket/lib/Connection.php
2024-11-10 21:16:01 +08:00

519 lines
16 KiB
PHP

<?php
/**
* Copyright (C) 2014-2022 Textalk/Abicart and contributors.
*
* This file is part of Websocket PHP and is free software under the ISC License.
* License text: https://raw.githubusercontent.com/Textalk/websocket-php/master/COPYING
*/
namespace WebSocket;
use Psr\Log\{
LoggerAwareInterface,
LoggerAwareTrait,
LoggerInterface, NullLogger
};
use WebSocket\Message\{
Factory,
Message
};
class Connection implements LoggerAwareInterface
{
use LoggerAwareTrait;
use OpcodeTrait;
private $stream;
private $read_buffer;
private $msg_factory;
private $options = [];
protected $is_closing = false;
protected $close_status = null;
private $uid;
/* ---------- Construct & Destruct ----------------------------------------------- */
public function __construct($stream, array $options = [])
{
$this->stream = $stream;
$this->setOptions($options);
$this->setLogger(new NullLogger());
$this->msg_factory = new Factory();
}
public function __destruct()
{
if ($this->getType() === 'stream') {
fclose($this->stream);
}
}
public function setOptions(array $options = []): void
{
$this->options = array_merge($this->options, $options);
}
public function getCloseStatus(): ?int
{
return $this->close_status;
}
/**
* Tell the socket to close.
*
* @param integer $status http://tools.ietf.org/html/rfc6455#section-7.4
* @param string $message A closing message, max 125 bytes.
*/
public function close(int $status = 1000, string $message = 'ttfn'): void
{
if (!$this->isConnected()) {
return;
}
$status_binstr = sprintf('%016b', $status);
$status_str = '';
foreach (str_split($status_binstr, 8) as $binstr) {
$status_str .= chr(bindec($binstr));
}
$message = $this->msg_factory->create('close', $status_str . $message);
$this->pushMessage($message, true);
$this->logger->debug("Closing with status: {$status}.");
$this->is_closing = true;
while (true) {
$message = $this->pullMessage();
if ($message->getOpcode() == 'close') {
break;
}
}
}
/* ---------- Message methods ---------------------------------------------------- */
// Push a message to stream
public function pushMessage(Message $message, bool $masked = true): void
{
$frames = $message->getFrames($masked, $this->options['fragment_size']);
foreach ($frames as $frame) {
$this->pushFrame($frame);
}
$this->logger->info("[connection] Pushed {$message}", [
'opcode' => $message->getOpcode(),
'content-length' => $message->getLength(),
'frames' => count($frames),
]);
}
// Pull a message from stream
public function pullMessage(): Message
{
do {
$frame = $this->pullFrame();
$frame = $this->autoRespond($frame);
list ($final, $payload, $opcode, $masked) = $frame;
if ($opcode == 'close') {
$this->close();
}
// Continuation and factual opcode
$continuation = $opcode == 'continuation';
$payload_opcode = $continuation ? $this->read_buffer['opcode'] : $opcode;
// First continuation frame, create buffer
if (!$final && !$continuation) {
$this->read_buffer = ['opcode' => $opcode, 'payload' => $payload, 'frames' => 1];
continue; // Continue reading
}
// Subsequent continuation frames, add to buffer
if ($continuation) {
$this->read_buffer['payload'] .= $payload;
$this->read_buffer['frames']++;
}
} while (!$final);
// Final, return payload
$frames = 1;
if ($continuation) {
$payload = $this->read_buffer['payload'];
$frames = $this->read_buffer['frames'];
$this->read_buffer = null;
}
$message = $this->msg_factory->create($payload_opcode, $payload);
$this->logger->info("[connection] Pulled {$message}", [
'opcode' => $payload_opcode,
'content-length' => strlen($payload),
'frames' => $frames,
]);
return $message;
}
/* ---------- Frame I/O methods -------------------------------------------------- */
// Pull frame from stream
private function pullFrame(): array
{
// Read the fragment "header" first, two bytes.
$data = $this->read(2);
list ($byte_1, $byte_2) = array_values(unpack('C*', $data));
$final = (bool)($byte_1 & 0b10000000); // Final fragment marker.
$rsv = $byte_1 & 0b01110000; // Unused bits, ignore
// Parse opcode
$opcode_int = $byte_1 & 0b00001111;
$opcode_ints = array_flip(self::$opcodes);
if (!array_key_exists($opcode_int, $opcode_ints)) {
$warning = "Bad opcode in websocket frame: {$opcode_int}";
$this->logger->warning($warning);
throw new ConnectionException($warning, ConnectionException::BAD_OPCODE);
}
$opcode = $opcode_ints[$opcode_int];
// Masking bit
$masked = (bool)($byte_2 & 0b10000000);
$payload = '';
// Payload length
$payload_length = $byte_2 & 0b01111111;
if ($payload_length > 125) {
if ($payload_length === 126) {
$data = $this->read(2); // 126: Payload is a 16-bit unsigned int
$payload_length = current(unpack('n', $data));
} else {
$data = $this->read(8); // 127: Payload is a 64-bit unsigned int
$payload_length = current(unpack('J', $data));
}
}
// Get masking key.
if ($masked) {
$masking_key = $this->read(4);
}
// Get the actual payload, if any (might not be for e.g. close frames.
if ($payload_length > 0) {
$data = $this->read($payload_length);
if ($masked) {
// Unmask payload.
for ($i = 0; $i < $payload_length; $i++) {
$payload .= ($data[$i] ^ $masking_key[$i % 4]);
}
} else {
$payload = $data;
}
}
$this->logger->debug("[connection] Pulled '{opcode}' frame", [
'opcode' => $opcode,
'final' => $final,
'content-length' => strlen($payload),
]);
return [$final, $payload, $opcode, $masked];
}
// Push frame to stream
private function pushFrame(array $frame): void
{
list ($final, $payload, $opcode, $masked) = $frame;
$data = '';
$byte_1 = $final ? 0b10000000 : 0b00000000; // Final fragment marker.
$byte_1 |= self::$opcodes[$opcode]; // Set opcode.
$data .= pack('C', $byte_1);
$byte_2 = $masked ? 0b10000000 : 0b00000000; // Masking bit marker.
// 7 bits of payload length...
$payload_length = strlen($payload);
if ($payload_length > 65535) {
$data .= pack('C', $byte_2 | 0b01111111);
$data .= pack('J', $payload_length);
} elseif ($payload_length > 125) {
$data .= pack('C', $byte_2 | 0b01111110);
$data .= pack('n', $payload_length);
} else {
$data .= pack('C', $byte_2 | $payload_length);
}
// Handle masking
if ($masked) {
// generate a random mask:
$mask = '';
for ($i = 0; $i < 4; $i++) {
$mask .= chr(rand(0, 255));
}
$data .= $mask;
// Append payload to frame:
for ($i = 0; $i < $payload_length; $i++) {
$data .= $payload[$i] ^ $mask[$i % 4];
}
} else {
$data .= $payload;
}
$this->write($data);
$this->logger->debug("[connection] Pushed '{$opcode}' frame", [
'opcode' => $opcode,
'final' => $final,
'content-length' => strlen($payload),
]);
}
// Trigger auto response for frame
private function autoRespond(array $frame)
{
list ($final, $payload, $opcode, $masked) = $frame;
$payload_length = strlen($payload);
switch ($opcode) {
case 'ping':
// If we received a ping, respond with a pong
$this->logger->debug("[connection] Received 'ping', sending 'pong'.");
$message = $this->msg_factory->create('pong', $payload);
$this->pushMessage($message, $masked);
return [$final, $payload, $opcode, $masked];
case 'close':
// If we received close, possibly acknowledge and close connection
$status_bin = '';
$status = '';
if ($payload_length > 0) {
$status_bin = $payload[0] . $payload[1];
$status = current(unpack('n', $payload));
$this->close_status = $status;
}
// Get additional close message
if ($payload_length >= 2) {
$payload = substr($payload, 2);
}
$this->logger->debug("[connection] Received 'close', status: {$status}.");
if (!$this->is_closing) {
$ack = "{$status_bin}Close acknowledged: {$status}";
$message = $this->msg_factory->create('close', $ack);
$this->pushMessage($message, $masked);
} else {
$this->is_closing = false; // A close response, all done.
}
$this->disconnect();
return [$final, $payload, $opcode, $masked];
default:
return [$final, $payload, $opcode, $masked];
}
}
/* ---------- Stream I/O methods ------------------------------------------------- */
/**
* Close connection stream.
* @return bool
*/
public function disconnect(): bool
{
$this->logger->debug('Closing connection');
return fclose($this->stream);
}
/**
* If connected to stream.
* @return bool
*/
public function isConnected(): bool
{
return in_array($this->getType(), ['stream', 'persistent stream']);
}
/**
* Return type of connection.
* @return string|null Type of connection or null if invalid type.
*/
public function getType(): ?string
{
return get_resource_type($this->stream);
}
/**
* Get name of local socket, or null if not connected.
* @return string|null
*/
public function getName(): ?string
{
return stream_socket_get_name($this->stream, false);
}
/**
* Get name of remote socket, or null if not connected.
* @return string|null
*/
public function getRemoteName(): ?string
{
return stream_socket_get_name($this->stream, true);
}
/**
* Get meta data for connection.
* @return array
*/
public function getMeta(): array
{
return stream_get_meta_data($this->stream);
}
/**
* Returns current position of stream pointer.
* @return int
* @throws ConnectionException
*/
public function tell(): int
{
$tell = ftell($this->stream);
if ($tell === false) {
$this->throwException('Could not resolve stream pointer position');
}
return $tell;
}
/**
* If stream pointer is at end of file.
* @return bool
*/
public function eof(): int
{
return feof($this->stream);
}
/* ---------- Stream option methods ---------------------------------------------- */
/**
* Set time out on connection.
* @param int $seconds Timeout part in seconds
* @param int $microseconds Timeout part in microseconds
* @return bool
*/
public function setTimeout(int $seconds, int $microseconds = 0): bool
{
$this->logger->debug("Setting timeout {$seconds}:{$microseconds} seconds");
return stream_set_timeout($this->stream, $seconds, $microseconds);
}
/* ---------- Stream read/write methods ------------------------------------------ */
/**
* Read line from stream.
* @param int $length Maximum number of bytes to read
* @param string $ending Line delimiter
* @return string Read data
*/
public function getLine(int $length, string $ending): string
{
$line = stream_get_line($this->stream, $length, $ending);
if ($line === false) {
$this->throwException('Could not read from stream');
}
$read = strlen($line);
$this->logger->debug("Read {$read} bytes of line.");
return $line;
}
/**
* Read line from stream.
* @param int $length Maximum number of bytes to read
* @return string Read data
*/
public function gets(int $length): string
{
$line = fgets($this->stream, $length);
if ($line === false) {
$this->throwException('Could not read from stream');
}
$read = strlen($line);
$this->logger->debug("Read {$read} bytes of line.");
return $line;
}
/**
* Read characters from stream.
* @param int $length Maximum number of bytes to read
* @return string Read data
*/
public function read(string $length): string
{
$data = '';
while (strlen($data) < $length) {
$buffer = fread($this->stream, $length - strlen($data));
if (!$buffer) {
$meta = stream_get_meta_data($this->stream);
if (!empty($meta['timed_out'])) {
$message = 'Client read timeout';
$this->logger->error($message, $meta);
throw new TimeoutException($message, ConnectionException::TIMED_OUT, $meta);
}
}
if ($buffer === false) {
$read = strlen($data);
$this->throwException("Broken frame, read {$read} of stated {$length} bytes.");
}
if ($buffer === '') {
$this->throwException("Empty read; connection dead?");
}
$data .= $buffer;
$read = strlen($data);
$this->logger->debug("Read {$read} of {$length} bytes.");
}
return $data;
}
/**
* Write characters to stream.
* @param string $data Data to read
*/
public function write(string $data): void
{
$length = strlen($data);
$written = fwrite($this->stream, $data);
if ($written === false) {
$this->throwException("Failed to write {$length} bytes.");
}
if ($written < strlen($data)) {
$this->throwException("Could only write {$written} out of {$length} bytes.");
}
$this->logger->debug("Wrote {$written} of {$length} bytes.");
}
/* ---------- Internal helper methods -------------------------------------------- */
private function throwException(string $message, int $code = 0): void
{
$meta = ['closed' => true];
if ($this->isConnected()) {
$meta = $this->getMeta();
$this->disconnect();
if (!empty($meta['timed_out'])) {
$this->logger->error($message, $meta);
throw new TimeoutException($message, ConnectionException::TIMED_OUT, $meta);
}
if (!empty($meta['eof'])) {
$code = ConnectionException::EOF;
}
}
$this->logger->error($message, $meta);
throw new ConnectionException($message, $code, $meta);
}
}