653 lines
18 KiB
PHP
653 lines
18 KiB
PHP
<?php
|
|
/**
|
|
* This file is part of workerman.
|
|
*
|
|
* Licensed under The MIT License
|
|
* For full copyright and license information, please see the MIT-LICENSE.txt
|
|
* Redistributions of files must retain the above copyright notice.
|
|
*
|
|
* @author walkor<walkor@workerman.net>
|
|
* @copyright walkor<walkor@workerman.net>
|
|
* @link http://www.workerman.net/
|
|
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
|
*/
|
|
namespace Workerman\Connection;
|
|
|
|
use Workerman\Events\EventInterface;
|
|
use Workerman\Worker;
|
|
use Exception;
|
|
|
|
/**
|
|
* TcpConnection.
|
|
*/
|
|
class TcpConnection extends ConnectionInterface
|
|
{
|
|
/**
|
|
* Read buffer size.
|
|
*
|
|
* @var int
|
|
*/
|
|
const READ_BUFFER_SIZE = 65535;
|
|
|
|
/**
|
|
* Status initial.
|
|
*
|
|
* @var int
|
|
*/
|
|
const STATUS_INITIAL = 0;
|
|
|
|
/**
|
|
* Status connecting.
|
|
*
|
|
* @var int
|
|
*/
|
|
const STATUS_CONNECTING = 1;
|
|
|
|
/**
|
|
* Status connection established.
|
|
*
|
|
* @var int
|
|
*/
|
|
const STATUS_ESTABLISH = 2;
|
|
|
|
/**
|
|
* Status closing.
|
|
*
|
|
* @var int
|
|
*/
|
|
const STATUS_CLOSING = 4;
|
|
|
|
/**
|
|
* Status closed.
|
|
*
|
|
* @var int
|
|
*/
|
|
const STATUS_CLOSED = 8;
|
|
|
|
/**
|
|
* Emitted when data is received.
|
|
*
|
|
* @var callback
|
|
*/
|
|
public $onMessage = null;
|
|
|
|
/**
|
|
* Emitted when the other end of the socket sends a FIN packet.
|
|
*
|
|
* @var callback
|
|
*/
|
|
public $onClose = null;
|
|
|
|
/**
|
|
* Emitted when an error occurs with connection.
|
|
*
|
|
* @var callback
|
|
*/
|
|
public $onError = null;
|
|
|
|
/**
|
|
* Emitted when the send buffer becomes full.
|
|
*
|
|
* @var callback
|
|
*/
|
|
public $onBufferFull = null;
|
|
|
|
/**
|
|
* Emitted when the send buffer becomes empty.
|
|
*
|
|
* @var callback
|
|
*/
|
|
public $onBufferDrain = null;
|
|
|
|
/**
|
|
* Application layer protocol.
|
|
* The format is like this Workerman\\Protocols\\Http.
|
|
*
|
|
* @var \Workerman\Protocols\ProtocolInterface
|
|
*/
|
|
public $protocol = null;
|
|
|
|
/**
|
|
* Which worker belong to.
|
|
*
|
|
* @var Worker
|
|
*/
|
|
public $worker = null;
|
|
|
|
/**
|
|
* Connection->id.
|
|
*
|
|
* @var int
|
|
*/
|
|
public $id = 0;
|
|
|
|
/**
|
|
* A copy of $worker->id which used to clean up the connection in worker->connections
|
|
*
|
|
* @var int
|
|
*/
|
|
protected $_id = 0;
|
|
|
|
/**
|
|
* Sets the maximum send buffer size for the current connection.
|
|
* OnBufferFull callback will be emited When the send buffer is full.
|
|
*
|
|
* @var int
|
|
*/
|
|
public $maxSendBufferSize = 1048576;
|
|
|
|
/**
|
|
* Default send buffer size.
|
|
*
|
|
* @var int
|
|
*/
|
|
public static $defaultMaxSendBufferSize = 1048576;
|
|
|
|
/**
|
|
* Maximum acceptable packet size.
|
|
*
|
|
* @var int
|
|
*/
|
|
public static $maxPackageSize = 10485760;
|
|
|
|
/**
|
|
* Id recorder.
|
|
*
|
|
* @var int
|
|
*/
|
|
protected static $_idRecorder = 1;
|
|
|
|
/**
|
|
* Socket
|
|
*
|
|
* @var resource
|
|
*/
|
|
protected $_socket = null;
|
|
|
|
/**
|
|
* Send buffer.
|
|
*
|
|
* @var string
|
|
*/
|
|
protected $_sendBuffer = '';
|
|
|
|
/**
|
|
* Receive buffer.
|
|
*
|
|
* @var string
|
|
*/
|
|
protected $_recvBuffer = '';
|
|
|
|
/**
|
|
* Current package length.
|
|
*
|
|
* @var int
|
|
*/
|
|
protected $_currentPackageLength = 0;
|
|
|
|
/**
|
|
* Connection status.
|
|
*
|
|
* @var int
|
|
*/
|
|
protected $_status = self::STATUS_ESTABLISH;
|
|
|
|
/**
|
|
* Remote address.
|
|
*
|
|
* @var string
|
|
*/
|
|
protected $_remoteAddress = '';
|
|
|
|
/**
|
|
* Is paused.
|
|
*
|
|
* @var bool
|
|
*/
|
|
protected $_isPaused = false;
|
|
|
|
/**
|
|
* Construct.
|
|
*
|
|
* @param resource $socket
|
|
* @param string $remote_address
|
|
*/
|
|
public function __construct($socket, $remote_address = '')
|
|
{
|
|
self::$statistics['connection_count']++;
|
|
$this->id = $this->_id = self::$_idRecorder++;
|
|
$this->_socket = $socket;
|
|
stream_set_blocking($this->_socket, 0);
|
|
// Compatible with hhvm
|
|
if (function_exists('stream_set_read_buffer')) {
|
|
stream_set_read_buffer($this->_socket, 0);
|
|
}
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
|
|
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
|
|
$this->_remoteAddress = $remote_address;
|
|
}
|
|
|
|
/**
|
|
* Sends data on the connection.
|
|
*
|
|
* @param string $send_buffer
|
|
* @param bool $raw
|
|
* @return void|bool|null
|
|
*/
|
|
public function send($send_buffer, $raw = false)
|
|
{
|
|
// Try to call protocol::encode($send_buffer) before sending.
|
|
if (false === $raw && $this->protocol) {
|
|
$parser = $this->protocol;
|
|
$send_buffer = $parser::encode($send_buffer, $this);
|
|
if ($send_buffer === '') {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
if ($this->_status === self::STATUS_INITIAL || $this->_status === self::STATUS_CONNECTING) {
|
|
$this->_sendBuffer .= $send_buffer;
|
|
return null;
|
|
} elseif ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
|
|
return false;
|
|
}
|
|
|
|
// Attempt to send data directly.
|
|
if ($this->_sendBuffer === '') {
|
|
$len = @fwrite($this->_socket, $send_buffer);
|
|
// send successful.
|
|
if ($len === strlen($send_buffer)) {
|
|
return true;
|
|
}
|
|
// Send only part of the data.
|
|
if ($len > 0) {
|
|
$this->_sendBuffer = substr($send_buffer, $len);
|
|
} else {
|
|
// Connection closed?
|
|
if (!is_resource($this->_socket) || feof($this->_socket)) {
|
|
self::$statistics['send_fail']++;
|
|
if ($this->onError) {
|
|
try {
|
|
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client closed');
|
|
} catch (\Exception $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
} catch (\Error $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
}
|
|
}
|
|
$this->destroy();
|
|
return false;
|
|
}
|
|
$this->_sendBuffer = $send_buffer;
|
|
}
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
|
// Check if the send buffer is full.
|
|
$this->checkBufferIsFull();
|
|
return null;
|
|
} else {
|
|
// Buffer has been marked as full but still has data to send the packet is discarded.
|
|
if ($this->maxSendBufferSize <= strlen($this->_sendBuffer)) {
|
|
self::$statistics['send_fail']++;
|
|
if ($this->onError) {
|
|
try {
|
|
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
|
|
} catch (\Exception $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
} catch (\Error $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
$this->_sendBuffer .= $send_buffer;
|
|
// Check if the send buffer is full.
|
|
$this->checkBufferIsFull();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get remote IP.
|
|
*
|
|
* @return string
|
|
*/
|
|
public function getRemoteIp()
|
|
{
|
|
$pos = strrpos($this->_remoteAddress, ':');
|
|
if ($pos) {
|
|
return trim(substr($this->_remoteAddress, 0, $pos), '[]');
|
|
}
|
|
return '';
|
|
}
|
|
|
|
/**
|
|
* Get remote port.
|
|
*
|
|
* @return int
|
|
*/
|
|
public function getRemotePort()
|
|
{
|
|
if ($this->_remoteAddress) {
|
|
return (int)substr(strrchr($this->_remoteAddress, ':'), 1);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
|
|
*
|
|
* @return void
|
|
*/
|
|
public function pauseRecv()
|
|
{
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
|
|
$this->_isPaused = true;
|
|
}
|
|
|
|
/**
|
|
* Resumes reading after a call to pauseRecv.
|
|
*
|
|
* @return void
|
|
*/
|
|
public function resumeRecv()
|
|
{
|
|
if ($this->_isPaused === true) {
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
|
|
$this->_isPaused = false;
|
|
$this->baseRead($this->_socket, false);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Base read handler.
|
|
*
|
|
* @param resource $socket
|
|
* @param bool $check_eof
|
|
* @return void
|
|
*/
|
|
public function baseRead($socket, $check_eof = true)
|
|
{
|
|
$buffer = fread($socket, self::READ_BUFFER_SIZE);
|
|
|
|
// Check connection closed.
|
|
if ($buffer === '' || $buffer === false) {
|
|
if ($check_eof && (feof($socket) || !is_resource($socket) || $buffer === false)) {
|
|
$this->destroy();
|
|
return;
|
|
}
|
|
} else {
|
|
$this->_recvBuffer .= $buffer;
|
|
}
|
|
|
|
// If the application layer protocol has been set up.
|
|
if ($this->protocol) {
|
|
$parser = $this->protocol;
|
|
while ($this->_recvBuffer !== '' && !$this->_isPaused) {
|
|
// The current packet length is known.
|
|
if ($this->_currentPackageLength) {
|
|
// Data is not enough for a package.
|
|
if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
|
|
break;
|
|
}
|
|
} else {
|
|
// Get current package length.
|
|
$this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
|
|
// The packet length is unknown.
|
|
if ($this->_currentPackageLength === 0) {
|
|
break;
|
|
} elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize) {
|
|
// Data is not enough for a package.
|
|
if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
|
|
break;
|
|
}
|
|
} // Wrong package.
|
|
else {
|
|
echo 'error package. package_length=' . var_export($this->_currentPackageLength, true);
|
|
$this->destroy();
|
|
return;
|
|
}
|
|
}
|
|
|
|
// The data is enough for a packet.
|
|
self::$statistics['total_request']++;
|
|
// The current packet length is equal to the length of the buffer.
|
|
if (strlen($this->_recvBuffer) === $this->_currentPackageLength) {
|
|
$one_request_buffer = $this->_recvBuffer;
|
|
$this->_recvBuffer = '';
|
|
} else {
|
|
// Get a full package from the buffer.
|
|
$one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
|
|
// Remove the current package from the receive buffer.
|
|
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
|
|
}
|
|
// Reset the current packet length to 0.
|
|
$this->_currentPackageLength = 0;
|
|
if (!$this->onMessage) {
|
|
continue;
|
|
}
|
|
try {
|
|
// Decode request buffer before Emitting onMessage callback.
|
|
call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
|
|
} catch (\Exception $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
} catch (\Error $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
|
|
if ($this->_recvBuffer === '' || $this->_isPaused) {
|
|
return;
|
|
}
|
|
|
|
// Applications protocol is not set.
|
|
self::$statistics['total_request']++;
|
|
if (!$this->onMessage) {
|
|
$this->_recvBuffer = '';
|
|
return;
|
|
}
|
|
try {
|
|
call_user_func($this->onMessage, $this, $this->_recvBuffer);
|
|
} catch (\Exception $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
} catch (\Error $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
}
|
|
// Clean receive buffer.
|
|
$this->_recvBuffer = '';
|
|
}
|
|
|
|
/**
|
|
* Base write handler.
|
|
*
|
|
* @return void|bool
|
|
*/
|
|
public function baseWrite()
|
|
{
|
|
$len = @fwrite($this->_socket, $this->_sendBuffer);
|
|
if ($len === strlen($this->_sendBuffer)) {
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
|
$this->_sendBuffer = '';
|
|
// Try to emit onBufferDrain callback when the send buffer becomes empty.
|
|
if ($this->onBufferDrain) {
|
|
try {
|
|
call_user_func($this->onBufferDrain, $this);
|
|
} catch (\Exception $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
} catch (\Error $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
}
|
|
}
|
|
if ($this->_status === self::STATUS_CLOSING) {
|
|
$this->destroy();
|
|
}
|
|
return true;
|
|
}
|
|
if ($len > 0) {
|
|
$this->_sendBuffer = substr($this->_sendBuffer, $len);
|
|
} else {
|
|
self::$statistics['send_fail']++;
|
|
$this->destroy();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* This method pulls all the data out of a readable stream, and writes it to the supplied destination.
|
|
*
|
|
* @param TcpConnection $dest
|
|
* @return void
|
|
*/
|
|
public function pipe($dest)
|
|
{
|
|
$source = $this;
|
|
$this->onMessage = function ($source, $data) use ($dest) {
|
|
$dest->send($data);
|
|
};
|
|
$this->onClose = function ($source) use ($dest) {
|
|
$dest->destroy();
|
|
};
|
|
$dest->onBufferFull = function ($dest) use ($source) {
|
|
$source->pauseRecv();
|
|
};
|
|
$dest->onBufferDrain = function ($dest) use ($source) {
|
|
$source->resumeRecv();
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Remove $length of data from receive buffer.
|
|
*
|
|
* @param int $length
|
|
* @return void
|
|
*/
|
|
public function consumeRecvBuffer($length)
|
|
{
|
|
$this->_recvBuffer = substr($this->_recvBuffer, $length);
|
|
}
|
|
|
|
/**
|
|
* Close connection.
|
|
*
|
|
* @param mixed $data
|
|
* @param bool $raw
|
|
* @return void
|
|
*/
|
|
public function close($data = null, $raw = false)
|
|
{
|
|
if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
|
|
return;
|
|
} else {
|
|
if ($data !== null) {
|
|
$this->send($data, $raw);
|
|
}
|
|
$this->_status = self::STATUS_CLOSING;
|
|
}
|
|
if ($this->_sendBuffer === '') {
|
|
$this->destroy();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get the real socket.
|
|
*
|
|
* @return resource
|
|
*/
|
|
public function getSocket()
|
|
{
|
|
return $this->_socket;
|
|
}
|
|
|
|
/**
|
|
* Check whether the send buffer is full.
|
|
*
|
|
* @return void
|
|
*/
|
|
protected function checkBufferIsFull()
|
|
{
|
|
if ($this->maxSendBufferSize <= strlen($this->_sendBuffer)) {
|
|
if ($this->onBufferFull) {
|
|
try {
|
|
call_user_func($this->onBufferFull, $this);
|
|
} catch (\Exception $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
} catch (\Error $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Destroy connection.
|
|
*
|
|
* @return void
|
|
*/
|
|
public function destroy()
|
|
{
|
|
// Avoid repeated calls.
|
|
if ($this->_status === self::STATUS_CLOSED) {
|
|
return;
|
|
}
|
|
// Remove event listener.
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
|
// Close socket.
|
|
@fclose($this->_socket);
|
|
// Remove from worker->connections.
|
|
if ($this->worker) {
|
|
unset($this->worker->connections[$this->_id]);
|
|
}
|
|
$this->_status = self::STATUS_CLOSED;
|
|
// Try to emit onClose callback.
|
|
if ($this->onClose) {
|
|
try {
|
|
call_user_func($this->onClose, $this);
|
|
} catch (\Exception $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
} catch (\Error $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
}
|
|
}
|
|
// Try to emit protocol::onClose
|
|
if (method_exists($this->protocol, 'onClose')) {
|
|
try {
|
|
call_user_func(array($this->protocol, 'onClose'), $this);
|
|
} catch (\Exception $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
} catch (\Error $e) {
|
|
Worker::log($e);
|
|
exit(250);
|
|
}
|
|
}
|
|
if ($this->_status === self::STATUS_CLOSED) {
|
|
// Cleaning up the callback to avoid memory leaks.
|
|
$this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Destruct.
|
|
*
|
|
* @return void
|
|
*/
|
|
public function __destruct()
|
|
{
|
|
self::$statistics['connection_count']--;
|
|
}
|
|
}
|