586 lines
16 KiB
PHP
586 lines
16 KiB
PHP
<?php
|
||
namespace Workerman\Connection;
|
||
|
||
use Workerman\Events\Libevent;
|
||
use Workerman\Events\Select;
|
||
use Workerman\Events\EventInterface;
|
||
use Workerman\Worker;
|
||
use \Exception;
|
||
|
||
/**
|
||
* Tcp连接类
|
||
* @author walkor<walkor@workerman.net>
|
||
*/
|
||
class TcpConnection extends ConnectionInterface
|
||
{
|
||
/**
|
||
* 当数据可读时,从socket缓冲区读取多少字节数据
|
||
* @var int
|
||
*/
|
||
const READ_BUFFER_SIZE = 8192;
|
||
|
||
/**
|
||
* 连接状态 连接中
|
||
* @var int
|
||
*/
|
||
const STATUS_CONNECTING = 1;
|
||
|
||
/**
|
||
* 连接状态 已经建立连接
|
||
* @var int
|
||
*/
|
||
const STATUS_ESTABLISH = 2;
|
||
|
||
/**
|
||
* 连接状态 连接关闭中,标识调用了close方法,但是发送缓冲区中任然有数据
|
||
* 等待发送缓冲区的数据发送完毕(写入到socket写缓冲区)后执行关闭
|
||
* @var int
|
||
*/
|
||
const STATUS_CLOSING = 4;
|
||
|
||
/**
|
||
* 连接状态 已经关闭
|
||
* @var int
|
||
*/
|
||
const STATUS_CLOSED = 8;
|
||
|
||
/**
|
||
* 当对端发来数据时,如果设置了$onMessage回调,则执行
|
||
* @var callback
|
||
*/
|
||
public $onMessage = null;
|
||
|
||
/**
|
||
* 当连接关闭时,如果设置了$onClose回调,则执行
|
||
* @var callback
|
||
*/
|
||
public $onClose = null;
|
||
|
||
/**
|
||
* 当出现错误是,如果设置了$onError回调,则执行
|
||
* @var callback
|
||
*/
|
||
public $onError = null;
|
||
|
||
/**
|
||
* 当发送缓冲区满时,如果设置了$onBufferFull回调,则执行
|
||
* @var callback
|
||
*/
|
||
public $onBufferFull = null;
|
||
|
||
/**
|
||
* 当发送缓冲区被清空时,如果设置了$onBufferDrain回调,则执行
|
||
* @var callback
|
||
*/
|
||
public $onBufferDrain = null;
|
||
|
||
/**
|
||
* 使用的应用层协议,是协议类的名称
|
||
* 值类似于 Workerman\\Protocols\\Http
|
||
* @var string
|
||
*/
|
||
public $protocol = '';
|
||
|
||
/**
|
||
* 属于哪个worker
|
||
* @var Worker
|
||
*/
|
||
public $worker = null;
|
||
|
||
/**
|
||
* 发送缓冲区大小,当发送缓冲区满时,会尝试触发onBufferFull回调(如果有设置的话)
|
||
* 如果没设置onBufferFull回调,由于发送缓冲区满,则后续发送的数据将被丢弃,
|
||
* 直到发送缓冲区有空的位置
|
||
* 注意 此值可以动态设置
|
||
* 例如 Workerman\Connection\TcpConnection::$maxSendBufferSize=1024000;
|
||
* @var int
|
||
*/
|
||
public static $maxSendBufferSize = 1048576;
|
||
|
||
/**
|
||
* 能接受的最大数据包,为了防止恶意攻击,当数据包的大小大于此值时执行断开
|
||
* 注意 此值可以动态设置
|
||
* 例如 Workerman\Connection\TcpConnection::$maxPackageSize=1024000;
|
||
* @var int
|
||
*/
|
||
public static $maxPackageSize = 10485760;
|
||
|
||
/**
|
||
* 实际的socket资源
|
||
* @var resource
|
||
*/
|
||
protected $_socket = null;
|
||
|
||
/**
|
||
* 发送缓冲区
|
||
* @var string
|
||
*/
|
||
protected $_sendBuffer = '';
|
||
|
||
/**
|
||
* 接收缓冲区
|
||
* @var string
|
||
*/
|
||
protected $_recvBuffer = '';
|
||
|
||
/**
|
||
* 当前正在处理的数据包的包长(此值是协议的intput方法的返回值)
|
||
* @var int
|
||
*/
|
||
protected $_currentPackageLength = 0;
|
||
|
||
/**
|
||
* 当前的连接状态
|
||
* @var int
|
||
*/
|
||
protected $_status = self::STATUS_ESTABLISH;
|
||
|
||
/**
|
||
* 对端ip
|
||
* @var string
|
||
*/
|
||
protected $_remoteIp = '';
|
||
|
||
/**
|
||
* 对端端口
|
||
* @var int
|
||
*/
|
||
protected $_remotePort = 0;
|
||
|
||
/**
|
||
* 对端的地址 ip+port
|
||
* 值类似于 192.168.1.100:3698
|
||
* @var string
|
||
*/
|
||
protected $_remoteAddress = '';
|
||
|
||
/**
|
||
* 是否是停止接收数据
|
||
* @var bool
|
||
*/
|
||
protected $_isPaused = false;
|
||
|
||
/**
|
||
* 构造函数
|
||
* @param resource $socket
|
||
* @param EventInterface $event
|
||
*/
|
||
public function __construct($socket)
|
||
{
|
||
$this->_socket = $socket;
|
||
stream_set_blocking($this->_socket, 0);
|
||
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
|
||
}
|
||
|
||
/**
|
||
* 发送数据给对端
|
||
* @param string $send_buffer
|
||
* @param bool $raw
|
||
* @return void|boolean
|
||
*/
|
||
public function send($send_buffer, $raw = false)
|
||
{
|
||
// 如果当前状态是连接中,则把数据放入发送缓冲区
|
||
if($this->_status === self::STATUS_CONNECTING)
|
||
{
|
||
$this->_sendBuffer .= $send_buffer;
|
||
return null;
|
||
}
|
||
// 如果当前连接是关闭,则返回false
|
||
elseif($this->_status == self::STATUS_CLOSED)
|
||
{
|
||
return false;
|
||
}
|
||
|
||
// 如果没有设置以原始数据发送,并且有设置协议则按照协议编码
|
||
if(false === $raw && $this->protocol)
|
||
{
|
||
$parser = $this->protocol;
|
||
$send_buffer = $parser::encode($send_buffer, $this);
|
||
}
|
||
// 如果发送缓冲区为空,尝试直接发送
|
||
if($this->_sendBuffer === '')
|
||
{
|
||
// 直接发送
|
||
$len = @fwrite($this->_socket, $send_buffer);
|
||
// 所有数据都发送完毕
|
||
if($len === strlen($send_buffer))
|
||
{
|
||
return true;
|
||
}
|
||
// 只有部分数据发送成功
|
||
if($len > 0)
|
||
{
|
||
// 未发送成功部分放入发送缓冲区
|
||
$this->_sendBuffer = substr($send_buffer, $len);
|
||
}
|
||
else
|
||
{
|
||
// 如果连接断开
|
||
if(feof($this->_socket))
|
||
{
|
||
// status统计发送失败次数
|
||
self::$statistics['send_fail']++;
|
||
// 如果有设置失败回调,则执行
|
||
if($this->onError)
|
||
{
|
||
try
|
||
{
|
||
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client closed');
|
||
}
|
||
catch(Exception $e)
|
||
{
|
||
echo $e;
|
||
}
|
||
}
|
||
// 销毁连接
|
||
$this->destroy();
|
||
return false;
|
||
}
|
||
// 连接未断开,发送失败,则把所有数据放入发送缓冲区
|
||
$this->_sendBuffer = $send_buffer;
|
||
}
|
||
// 监听对端可写事件
|
||
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
||
// 检查发送缓冲区是否已满,如果满了尝试触发onBufferFull回调
|
||
$this->checkBufferIsFull();
|
||
return null;
|
||
}
|
||
else
|
||
{
|
||
// 缓冲区已经标记为满,仍然然有数据发送,则丢弃数据包
|
||
if(self::$maxSendBufferSize <= strlen($this->_sendBuffer))
|
||
{
|
||
// 为status命令统计发送失败次数
|
||
self::$statistics['send_fail']++;
|
||
// 如果有设置失败回调,则执行
|
||
if($this->onError)
|
||
{
|
||
try
|
||
{
|
||
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
|
||
}
|
||
catch(Exception $e)
|
||
{
|
||
echo $e;
|
||
}
|
||
}
|
||
return false;
|
||
}
|
||
// 将数据放入放缓冲区
|
||
$this->_sendBuffer .= $send_buffer;
|
||
// 检查发送缓冲区是否已满,如果满了尝试触发onBufferFull回调
|
||
$this->checkBufferIsFull();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获得对端ip
|
||
* @return string
|
||
*/
|
||
public function getRemoteIp()
|
||
{
|
||
if(!$this->_remoteIp)
|
||
{
|
||
$this->_remoteAddress = stream_socket_get_name($this->_socket, true);
|
||
if($this->_remoteAddress)
|
||
{
|
||
list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2);
|
||
$this->_remotePort = (int)$this->_remotePort;
|
||
}
|
||
}
|
||
return $this->_remoteIp;
|
||
}
|
||
|
||
/**
|
||
* 获得对端端口
|
||
* @return int
|
||
*/
|
||
public function getRemotePort()
|
||
{
|
||
if(!$this->_remotePort)
|
||
{
|
||
$this->_remoteAddress = stream_socket_get_name($this->_socket, true);
|
||
if($this->_remoteAddress)
|
||
{
|
||
list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2);
|
||
$this->_remotePort = (int)$this->_remotePort;
|
||
}
|
||
}
|
||
return $this->_remotePort;
|
||
}
|
||
|
||
/**
|
||
* 暂停接收数据,一般用于控制上传流量
|
||
* @return void
|
||
*/
|
||
public function pauseRecv()
|
||
{
|
||
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
|
||
$this->_isPaused = true;
|
||
}
|
||
|
||
/**
|
||
* 恢复接收数据,一般用户控制上传流量
|
||
* @return void
|
||
*/
|
||
public function resumeRecv()
|
||
{
|
||
if($this->_isPaused == true)
|
||
{
|
||
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
|
||
$this->_isPaused = false;
|
||
$this->baseRead($this->_socket);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 当socket可读时的回调
|
||
* @param resource $socket
|
||
* @return void
|
||
*/
|
||
public function baseRead($socket)
|
||
{
|
||
while($buffer = fread($socket, self::READ_BUFFER_SIZE))
|
||
{
|
||
$this->_recvBuffer .= $buffer;
|
||
}
|
||
|
||
if($this->_recvBuffer)
|
||
{
|
||
if(!$this->onMessage)
|
||
{
|
||
return ;
|
||
}
|
||
|
||
// 如果设置了协议
|
||
if($this->protocol)
|
||
{
|
||
$parser = $this->protocol;
|
||
while($this->_recvBuffer && !$this->_isPaused)
|
||
{
|
||
// 当前包的长度已知
|
||
if($this->_currentPackageLength)
|
||
{
|
||
// 数据不够一个包
|
||
if($this->_currentPackageLength > strlen($this->_recvBuffer))
|
||
{
|
||
break;
|
||
}
|
||
}
|
||
else
|
||
{
|
||
// 获得当前包长
|
||
$this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
|
||
// 数据不够,无法获得包长
|
||
if($this->_currentPackageLength === 0)
|
||
{
|
||
break;
|
||
}
|
||
elseif($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize)
|
||
{
|
||
// 数据不够一个包
|
||
if($this->_currentPackageLength > strlen($this->_recvBuffer))
|
||
{
|
||
break;
|
||
}
|
||
}
|
||
// 包错误
|
||
else
|
||
{
|
||
$this->close('error package. package_length='.var_export($this->_currentPackageLength, true));
|
||
}
|
||
}
|
||
|
||
// 数据足够一个包长
|
||
self::$statistics['total_request']++;
|
||
// 从缓冲区中获取一个完整的包
|
||
$one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
|
||
// 将当前包从接受缓冲区中去掉
|
||
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
|
||
// 重置当前包长为0
|
||
$this->_currentPackageLength = 0;
|
||
// 处理数据包
|
||
try
|
||
{
|
||
call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
|
||
}
|
||
catch(Exception $e)
|
||
{
|
||
self::$statistics['throw_exception']++;
|
||
echo $e;
|
||
}
|
||
}
|
||
if($this->_status !== self::STATUS_CLOSED && feof($socket))
|
||
{
|
||
$this->destroy();
|
||
}
|
||
return;
|
||
}
|
||
// 没有设置协议,则直接把接收的数据当做一个包处理
|
||
self::$statistics['total_request']++;
|
||
try
|
||
{
|
||
call_user_func($this->onMessage, $this, $this->_recvBuffer);
|
||
}
|
||
catch(Exception $e)
|
||
{
|
||
self::$statistics['throw_exception']++;
|
||
echo $e;
|
||
}
|
||
// 清空缓冲区
|
||
$this->_recvBuffer = '';
|
||
// 判断连接是否已经断开
|
||
if($this->_status !== self::STATUS_CLOSED && feof($socket))
|
||
{
|
||
$this->destroy();
|
||
return;
|
||
}
|
||
}
|
||
// 没收到数据,判断连接是否已经断开
|
||
else if(feof($socket))
|
||
{
|
||
$this->destroy();
|
||
return;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* socket可写时的回调
|
||
* @return void
|
||
*/
|
||
public function baseWrite()
|
||
{
|
||
$len = @fwrite($this->_socket, $this->_sendBuffer);
|
||
if($len === strlen($this->_sendBuffer))
|
||
{
|
||
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
||
$this->_sendBuffer = '';
|
||
// 发送缓冲区的数据被发送完毕,尝试触发onBufferDrain回调
|
||
if($this->onBufferDrain)
|
||
{
|
||
try
|
||
{
|
||
call_user_func($this->onBufferDrain, $this);
|
||
}
|
||
catch(Exception $e)
|
||
{
|
||
echo $e;
|
||
}
|
||
}
|
||
// 如果连接状态为关闭,则销毁连接
|
||
if($this->_status == self::STATUS_CLOSING)
|
||
{
|
||
$this->destroy();
|
||
}
|
||
return true;
|
||
}
|
||
if($len > 0)
|
||
{
|
||
$this->_sendBuffer = substr($this->_sendBuffer, $len);
|
||
}
|
||
else
|
||
{
|
||
if(feof($this->_socket))
|
||
{
|
||
self::$statistics['send_fail']++;
|
||
$this->destroy();
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 从缓冲区中消费掉$length长度的数据
|
||
* @param int $length
|
||
* @return void
|
||
*/
|
||
public function consumeRecvBuffer($length)
|
||
{
|
||
$this->_recvBuffer = substr($this->_recvBuffer, $length);
|
||
}
|
||
|
||
/**
|
||
* 关闭连接
|
||
* @param mixed $data
|
||
* @void
|
||
*/
|
||
public function close($data = null)
|
||
{
|
||
if($this->_status == self::STATUS_CLOSING)
|
||
{
|
||
return false;
|
||
}
|
||
else
|
||
{
|
||
$this->_status = self::STATUS_CLOSING;
|
||
}
|
||
if($data !== null)
|
||
{
|
||
$this->send($data);
|
||
}
|
||
if($this->_sendBuffer === '')
|
||
{
|
||
$this->destroy();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获得socket连接
|
||
* @return resource
|
||
*/
|
||
public function getSocket()
|
||
{
|
||
return $this->_socket;
|
||
}
|
||
|
||
/**
|
||
* 检查发送缓冲区是否已满,如果满了尝试触发onBufferFull回调
|
||
* @return void
|
||
*/
|
||
protected function checkBufferIsFull()
|
||
{
|
||
if(self::$maxSendBufferSize <= strlen($this->_sendBuffer))
|
||
{
|
||
if($this->onBufferFull)
|
||
{
|
||
try
|
||
{
|
||
call_user_func($this->onBufferFull, $this);
|
||
}
|
||
catch(Exception $e)
|
||
{
|
||
echo $e;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
/**
|
||
* 销毁连接
|
||
* @void
|
||
*/
|
||
protected function destroy()
|
||
{
|
||
self::$statistics['connection_count']--;
|
||
if($this->onClose)
|
||
{
|
||
try
|
||
{
|
||
call_user_func($this->onClose, $this);
|
||
}
|
||
catch (Exception $e)
|
||
{
|
||
self::$statistics['throw_exception']++;
|
||
echo $e;
|
||
}
|
||
}
|
||
if($this->worker)
|
||
{
|
||
unset($this->worker->connections[(int)$this->_socket]);
|
||
}
|
||
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
|
||
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
||
@fclose($this->_socket);
|
||
$this->_status = self::STATUS_CLOSED;
|
||
}
|
||
}
|