phpsocks5/Workerman/Connection/TcpConnection.php

586 lines
16 KiB
PHP
Raw Normal View History

2015-04-04 21:46:31 +08:00
<?php
namespace Workerman\Connection;
use Workerman\Events\Libevent;
use Workerman\Events\Select;
use Workerman\Events\EventInterface;
use Workerman\Worker;
use \Exception;
/**
* Tcp连接类
* @author walkor<walkor@workerman.net>
*/
class TcpConnection extends ConnectionInterface
{
/**
* 当数据可读时从socket缓冲区读取多少字节数据
* @var int
*/
const READ_BUFFER_SIZE = 8192;
/**
* 连接状态 连接中
* @var int
*/
const STATUS_CONNECTING = 1;
/**
* 连接状态 已经建立连接
* @var int
*/
const STATUS_ESTABLISH = 2;
/**
* 连接状态 连接关闭中标识调用了close方法但是发送缓冲区中任然有数据
* 等待发送缓冲区的数据发送完毕写入到socket写缓冲区后执行关闭
* @var int
*/
const STATUS_CLOSING = 4;
/**
* 连接状态 已经关闭
* @var int
*/
const STATUS_CLOSED = 8;
/**
* 当对端发来数据时,如果设置了$onMessage回调则执行
* @var callback
*/
public $onMessage = null;
/**
* 当连接关闭时,如果设置了$onClose回调则执行
* @var callback
*/
public $onClose = null;
/**
* 当出现错误是,如果设置了$onError回调则执行
* @var callback
*/
public $onError = null;
/**
* 当发送缓冲区满时,如果设置了$onBufferFull回调则执行
* @var callback
*/
public $onBufferFull = null;
/**
* 当发送缓冲区被清空时,如果设置了$onBufferDrain回调则执行
* @var callback
*/
public $onBufferDrain = null;
/**
* 使用的应用层协议,是协议类的名称
* 值类似于 Workerman\\Protocols\\Http
* @var string
*/
public $protocol = '';
/**
* 属于哪个worker
* @var Worker
*/
public $worker = null;
/**
* 发送缓冲区大小当发送缓冲区满时会尝试触发onBufferFull回调如果有设置的话
* 如果没设置onBufferFull回调由于发送缓冲区满则后续发送的数据将被丢弃
* 直到发送缓冲区有空的位置
* 注意 此值可以动态设置
* 例如 Workerman\Connection\TcpConnection::$maxSendBufferSize=1024000;
* @var int
*/
public static $maxSendBufferSize = 1048576;
/**
* 能接受的最大数据包,为了防止恶意攻击,当数据包的大小大于此值时执行断开
* 注意 此值可以动态设置
* 例如 Workerman\Connection\TcpConnection::$maxPackageSize=1024000;
* @var int
*/
public static $maxPackageSize = 10485760;
/**
* 实际的socket资源
* @var resource
*/
protected $_socket = null;
/**
* 发送缓冲区
* @var string
*/
protected $_sendBuffer = '';
/**
* 接收缓冲区
* @var string
*/
protected $_recvBuffer = '';
/**
* 当前正在处理的数据包的包长此值是协议的intput方法的返回值
* @var int
*/
protected $_currentPackageLength = 0;
/**
* 当前的连接状态
* @var int
*/
protected $_status = self::STATUS_ESTABLISH;
/**
* 对端ip
* @var string
*/
protected $_remoteIp = '';
/**
* 对端端口
* @var int
*/
protected $_remotePort = 0;
/**
* 对端的地址 ip+port
* 值类似于 192.168.1.100:3698
* @var string
*/
protected $_remoteAddress = '';
/**
* 是否是停止接收数据
* @var bool
*/
protected $_isPaused = false;
/**
* 构造函数
* @param resource $socket
* @param EventInterface $event
*/
public function __construct($socket)
{
$this->_socket = $socket;
stream_set_blocking($this->_socket, 0);
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
}
/**
* 发送数据给对端
* @param string $send_buffer
* @param bool $raw
* @return void|boolean
*/
public function send($send_buffer, $raw = false)
{
// 如果当前状态是连接中,则把数据放入发送缓冲区
if($this->_status === self::STATUS_CONNECTING)
{
$this->_sendBuffer .= $send_buffer;
return null;
}
// 如果当前连接是关闭则返回false
elseif($this->_status == self::STATUS_CLOSED)
{
return false;
}
// 如果没有设置以原始数据发送,并且有设置协议则按照协议编码
if(false === $raw && $this->protocol)
{
$parser = $this->protocol;
$send_buffer = $parser::encode($send_buffer, $this);
}
// 如果发送缓冲区为空,尝试直接发送
if($this->_sendBuffer === '')
{
// 直接发送
$len = @fwrite($this->_socket, $send_buffer);
// 所有数据都发送完毕
if($len === strlen($send_buffer))
{
return true;
}
// 只有部分数据发送成功
if($len > 0)
{
// 未发送成功部分放入发送缓冲区
$this->_sendBuffer = substr($send_buffer, $len);
}
else
{
// 如果连接断开
if(feof($this->_socket))
{
// status统计发送失败次数
self::$statistics['send_fail']++;
// 如果有设置失败回调,则执行
if($this->onError)
{
try
{
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client closed');
}
catch(Exception $e)
{
echo $e;
}
}
// 销毁连接
$this->destroy();
return false;
}
// 连接未断开,发送失败,则把所有数据放入发送缓冲区
$this->_sendBuffer = $send_buffer;
}
// 监听对端可写事件
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
// 检查发送缓冲区是否已满如果满了尝试触发onBufferFull回调
$this->checkBufferIsFull();
return null;
}
else
{
// 缓冲区已经标记为满,仍然然有数据发送,则丢弃数据包
if(self::$maxSendBufferSize <= strlen($this->_sendBuffer))
{
// 为status命令统计发送失败次数
self::$statistics['send_fail']++;
// 如果有设置失败回调,则执行
if($this->onError)
{
try
{
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
}
catch(Exception $e)
{
echo $e;
}
}
return false;
}
// 将数据放入放缓冲区
$this->_sendBuffer .= $send_buffer;
// 检查发送缓冲区是否已满如果满了尝试触发onBufferFull回调
$this->checkBufferIsFull();
}
}
/**
* 获得对端ip
* @return string
*/
public function getRemoteIp()
{
if(!$this->_remoteIp)
{
$this->_remoteAddress = stream_socket_get_name($this->_socket, true);
if($this->_remoteAddress)
{
list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2);
$this->_remotePort = (int)$this->_remotePort;
}
}
return $this->_remoteIp;
}
/**
* 获得对端端口
* @return int
*/
public function getRemotePort()
{
if(!$this->_remotePort)
{
$this->_remoteAddress = stream_socket_get_name($this->_socket, true);
if($this->_remoteAddress)
{
list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2);
$this->_remotePort = (int)$this->_remotePort;
}
}
return $this->_remotePort;
}
/**
* 暂停接收数据,一般用于控制上传流量
* @return void
*/
public function pauseRecv()
{
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
$this->_isPaused = true;
}
/**
* 恢复接收数据,一般用户控制上传流量
* @return void
*/
public function resumeRecv()
{
if($this->_isPaused == true)
{
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
$this->_isPaused = false;
$this->baseRead($this->_socket);
}
}
/**
* 当socket可读时的回调
* @param resource $socket
* @return void
*/
public function baseRead($socket)
{
while($buffer = fread($socket, self::READ_BUFFER_SIZE))
{
$this->_recvBuffer .= $buffer;
}
if($this->_recvBuffer)
{
if(!$this->onMessage)
{
return ;
}
// 如果设置了协议
if($this->protocol)
{
$parser = $this->protocol;
while($this->_recvBuffer && !$this->_isPaused)
{
// 当前包的长度已知
if($this->_currentPackageLength)
{
// 数据不够一个包
if($this->_currentPackageLength > strlen($this->_recvBuffer))
{
break;
}
}
else
{
// 获得当前包长
$this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
// 数据不够,无法获得包长
if($this->_currentPackageLength === 0)
{
break;
}
elseif($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize)
{
// 数据不够一个包
if($this->_currentPackageLength > strlen($this->_recvBuffer))
{
break;
}
}
// 包错误
else
{
$this->close('error package. package_length='.var_export($this->_currentPackageLength, true));
}
}
// 数据足够一个包长
self::$statistics['total_request']++;
// 从缓冲区中获取一个完整的包
$one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
// 将当前包从接受缓冲区中去掉
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
// 重置当前包长为0
$this->_currentPackageLength = 0;
// 处理数据包
try
{
call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
}
catch(Exception $e)
{
self::$statistics['throw_exception']++;
echo $e;
}
}
if($this->_status !== self::STATUS_CLOSED && feof($socket))
{
$this->destroy();
}
return;
}
// 没有设置协议,则直接把接收的数据当做一个包处理
self::$statistics['total_request']++;
try
{
call_user_func($this->onMessage, $this, $this->_recvBuffer);
}
catch(Exception $e)
{
self::$statistics['throw_exception']++;
echo $e;
}
// 清空缓冲区
$this->_recvBuffer = '';
// 判断连接是否已经断开
if($this->_status !== self::STATUS_CLOSED && feof($socket))
{
$this->destroy();
return;
}
}
// 没收到数据,判断连接是否已经断开
else if(feof($socket))
{
$this->destroy();
return;
}
}
/**
* socket可写时的回调
* @return void
*/
public function baseWrite()
{
$len = @fwrite($this->_socket, $this->_sendBuffer);
if($len === strlen($this->_sendBuffer))
{
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
$this->_sendBuffer = '';
// 发送缓冲区的数据被发送完毕尝试触发onBufferDrain回调
if($this->onBufferDrain)
{
try
{
call_user_func($this->onBufferDrain, $this);
}
catch(Exception $e)
{
echo $e;
}
}
// 如果连接状态为关闭,则销毁连接
if($this->_status == self::STATUS_CLOSING)
{
$this->destroy();
}
return true;
}
if($len > 0)
{
$this->_sendBuffer = substr($this->_sendBuffer, $len);
}
else
{
if(feof($this->_socket))
{
self::$statistics['send_fail']++;
$this->destroy();
}
}
}
/**
* 从缓冲区中消费掉$length长度的数据
* @param int $length
* @return void
*/
public function consumeRecvBuffer($length)
{
$this->_recvBuffer = substr($this->_recvBuffer, $length);
}
/**
* 关闭连接
* @param mixed $data
* @void
*/
public function close($data = null)
{
if($this->_status == self::STATUS_CLOSING)
{
return false;
}
else
{
$this->_status = self::STATUS_CLOSING;
}
if($data !== null)
{
$this->send($data);
}
if($this->_sendBuffer === '')
{
$this->destroy();
}
}
/**
* 获得socket连接
* @return resource
*/
public function getSocket()
{
return $this->_socket;
}
/**
* 检查发送缓冲区是否已满如果满了尝试触发onBufferFull回调
* @return void
*/
protected function checkBufferIsFull()
{
if(self::$maxSendBufferSize <= strlen($this->_sendBuffer))
{
if($this->onBufferFull)
{
try
{
call_user_func($this->onBufferFull, $this);
}
catch(Exception $e)
{
echo $e;
}
}
}
}
/**
* 销毁连接
* @void
*/
protected function destroy()
{
self::$statistics['connection_count']--;
if($this->onClose)
{
try
{
call_user_func($this->onClose, $this);
}
catch (Exception $e)
{
self::$statistics['throw_exception']++;
echo $e;
}
}
if($this->worker)
{
unset($this->worker->connections[(int)$this->_socket]);
}
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
@fclose($this->_socket);
$this->_status = self::STATUS_CLOSED;
}
}