phpsocks5/Workerman/Connection/TcpConnection.php
2015-04-04 21:46:31 +08:00

586 lines
16 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace Workerman\Connection;
use Workerman\Events\Libevent;
use Workerman\Events\Select;
use Workerman\Events\EventInterface;
use Workerman\Worker;
use \Exception;
/**
* Tcp连接类
* @author walkor<walkor@workerman.net>
*/
class TcpConnection extends ConnectionInterface
{
/**
* 当数据可读时从socket缓冲区读取多少字节数据
* @var int
*/
const READ_BUFFER_SIZE = 8192;
/**
* 连接状态 连接中
* @var int
*/
const STATUS_CONNECTING = 1;
/**
* 连接状态 已经建立连接
* @var int
*/
const STATUS_ESTABLISH = 2;
/**
* 连接状态 连接关闭中标识调用了close方法但是发送缓冲区中任然有数据
* 等待发送缓冲区的数据发送完毕写入到socket写缓冲区后执行关闭
* @var int
*/
const STATUS_CLOSING = 4;
/**
* 连接状态 已经关闭
* @var int
*/
const STATUS_CLOSED = 8;
/**
* 当对端发来数据时,如果设置了$onMessage回调则执行
* @var callback
*/
public $onMessage = null;
/**
* 当连接关闭时,如果设置了$onClose回调则执行
* @var callback
*/
public $onClose = null;
/**
* 当出现错误是,如果设置了$onError回调则执行
* @var callback
*/
public $onError = null;
/**
* 当发送缓冲区满时,如果设置了$onBufferFull回调则执行
* @var callback
*/
public $onBufferFull = null;
/**
* 当发送缓冲区被清空时,如果设置了$onBufferDrain回调则执行
* @var callback
*/
public $onBufferDrain = null;
/**
* 使用的应用层协议,是协议类的名称
* 值类似于 Workerman\\Protocols\\Http
* @var string
*/
public $protocol = '';
/**
* 属于哪个worker
* @var Worker
*/
public $worker = null;
/**
* 发送缓冲区大小当发送缓冲区满时会尝试触发onBufferFull回调如果有设置的话
* 如果没设置onBufferFull回调由于发送缓冲区满则后续发送的数据将被丢弃
* 直到发送缓冲区有空的位置
* 注意 此值可以动态设置
* 例如 Workerman\Connection\TcpConnection::$maxSendBufferSize=1024000;
* @var int
*/
public static $maxSendBufferSize = 1048576;
/**
* 能接受的最大数据包,为了防止恶意攻击,当数据包的大小大于此值时执行断开
* 注意 此值可以动态设置
* 例如 Workerman\Connection\TcpConnection::$maxPackageSize=1024000;
* @var int
*/
public static $maxPackageSize = 10485760;
/**
* 实际的socket资源
* @var resource
*/
protected $_socket = null;
/**
* 发送缓冲区
* @var string
*/
protected $_sendBuffer = '';
/**
* 接收缓冲区
* @var string
*/
protected $_recvBuffer = '';
/**
* 当前正在处理的数据包的包长此值是协议的intput方法的返回值
* @var int
*/
protected $_currentPackageLength = 0;
/**
* 当前的连接状态
* @var int
*/
protected $_status = self::STATUS_ESTABLISH;
/**
* 对端ip
* @var string
*/
protected $_remoteIp = '';
/**
* 对端端口
* @var int
*/
protected $_remotePort = 0;
/**
* 对端的地址 ip+port
* 值类似于 192.168.1.100:3698
* @var string
*/
protected $_remoteAddress = '';
/**
* 是否是停止接收数据
* @var bool
*/
protected $_isPaused = false;
/**
* 构造函数
* @param resource $socket
* @param EventInterface $event
*/
public function __construct($socket)
{
$this->_socket = $socket;
stream_set_blocking($this->_socket, 0);
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
}
/**
* 发送数据给对端
* @param string $send_buffer
* @param bool $raw
* @return void|boolean
*/
public function send($send_buffer, $raw = false)
{
// 如果当前状态是连接中,则把数据放入发送缓冲区
if($this->_status === self::STATUS_CONNECTING)
{
$this->_sendBuffer .= $send_buffer;
return null;
}
// 如果当前连接是关闭则返回false
elseif($this->_status == self::STATUS_CLOSED)
{
return false;
}
// 如果没有设置以原始数据发送,并且有设置协议则按照协议编码
if(false === $raw && $this->protocol)
{
$parser = $this->protocol;
$send_buffer = $parser::encode($send_buffer, $this);
}
// 如果发送缓冲区为空,尝试直接发送
if($this->_sendBuffer === '')
{
// 直接发送
$len = @fwrite($this->_socket, $send_buffer);
// 所有数据都发送完毕
if($len === strlen($send_buffer))
{
return true;
}
// 只有部分数据发送成功
if($len > 0)
{
// 未发送成功部分放入发送缓冲区
$this->_sendBuffer = substr($send_buffer, $len);
}
else
{
// 如果连接断开
if(feof($this->_socket))
{
// status统计发送失败次数
self::$statistics['send_fail']++;
// 如果有设置失败回调,则执行
if($this->onError)
{
try
{
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client closed');
}
catch(Exception $e)
{
echo $e;
}
}
// 销毁连接
$this->destroy();
return false;
}
// 连接未断开,发送失败,则把所有数据放入发送缓冲区
$this->_sendBuffer = $send_buffer;
}
// 监听对端可写事件
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
// 检查发送缓冲区是否已满如果满了尝试触发onBufferFull回调
$this->checkBufferIsFull();
return null;
}
else
{
// 缓冲区已经标记为满,仍然然有数据发送,则丢弃数据包
if(self::$maxSendBufferSize <= strlen($this->_sendBuffer))
{
// 为status命令统计发送失败次数
self::$statistics['send_fail']++;
// 如果有设置失败回调,则执行
if($this->onError)
{
try
{
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
}
catch(Exception $e)
{
echo $e;
}
}
return false;
}
// 将数据放入放缓冲区
$this->_sendBuffer .= $send_buffer;
// 检查发送缓冲区是否已满如果满了尝试触发onBufferFull回调
$this->checkBufferIsFull();
}
}
/**
* 获得对端ip
* @return string
*/
public function getRemoteIp()
{
if(!$this->_remoteIp)
{
$this->_remoteAddress = stream_socket_get_name($this->_socket, true);
if($this->_remoteAddress)
{
list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2);
$this->_remotePort = (int)$this->_remotePort;
}
}
return $this->_remoteIp;
}
/**
* 获得对端端口
* @return int
*/
public function getRemotePort()
{
if(!$this->_remotePort)
{
$this->_remoteAddress = stream_socket_get_name($this->_socket, true);
if($this->_remoteAddress)
{
list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2);
$this->_remotePort = (int)$this->_remotePort;
}
}
return $this->_remotePort;
}
/**
* 暂停接收数据,一般用于控制上传流量
* @return void
*/
public function pauseRecv()
{
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
$this->_isPaused = true;
}
/**
* 恢复接收数据,一般用户控制上传流量
* @return void
*/
public function resumeRecv()
{
if($this->_isPaused == true)
{
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
$this->_isPaused = false;
$this->baseRead($this->_socket);
}
}
/**
* 当socket可读时的回调
* @param resource $socket
* @return void
*/
public function baseRead($socket)
{
while($buffer = fread($socket, self::READ_BUFFER_SIZE))
{
$this->_recvBuffer .= $buffer;
}
if($this->_recvBuffer)
{
if(!$this->onMessage)
{
return ;
}
// 如果设置了协议
if($this->protocol)
{
$parser = $this->protocol;
while($this->_recvBuffer && !$this->_isPaused)
{
// 当前包的长度已知
if($this->_currentPackageLength)
{
// 数据不够一个包
if($this->_currentPackageLength > strlen($this->_recvBuffer))
{
break;
}
}
else
{
// 获得当前包长
$this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
// 数据不够,无法获得包长
if($this->_currentPackageLength === 0)
{
break;
}
elseif($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize)
{
// 数据不够一个包
if($this->_currentPackageLength > strlen($this->_recvBuffer))
{
break;
}
}
// 包错误
else
{
$this->close('error package. package_length='.var_export($this->_currentPackageLength, true));
}
}
// 数据足够一个包长
self::$statistics['total_request']++;
// 从缓冲区中获取一个完整的包
$one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
// 将当前包从接受缓冲区中去掉
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
// 重置当前包长为0
$this->_currentPackageLength = 0;
// 处理数据包
try
{
call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
}
catch(Exception $e)
{
self::$statistics['throw_exception']++;
echo $e;
}
}
if($this->_status !== self::STATUS_CLOSED && feof($socket))
{
$this->destroy();
}
return;
}
// 没有设置协议,则直接把接收的数据当做一个包处理
self::$statistics['total_request']++;
try
{
call_user_func($this->onMessage, $this, $this->_recvBuffer);
}
catch(Exception $e)
{
self::$statistics['throw_exception']++;
echo $e;
}
// 清空缓冲区
$this->_recvBuffer = '';
// 判断连接是否已经断开
if($this->_status !== self::STATUS_CLOSED && feof($socket))
{
$this->destroy();
return;
}
}
// 没收到数据,判断连接是否已经断开
else if(feof($socket))
{
$this->destroy();
return;
}
}
/**
* socket可写时的回调
* @return void
*/
public function baseWrite()
{
$len = @fwrite($this->_socket, $this->_sendBuffer);
if($len === strlen($this->_sendBuffer))
{
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
$this->_sendBuffer = '';
// 发送缓冲区的数据被发送完毕尝试触发onBufferDrain回调
if($this->onBufferDrain)
{
try
{
call_user_func($this->onBufferDrain, $this);
}
catch(Exception $e)
{
echo $e;
}
}
// 如果连接状态为关闭,则销毁连接
if($this->_status == self::STATUS_CLOSING)
{
$this->destroy();
}
return true;
}
if($len > 0)
{
$this->_sendBuffer = substr($this->_sendBuffer, $len);
}
else
{
if(feof($this->_socket))
{
self::$statistics['send_fail']++;
$this->destroy();
}
}
}
/**
* 从缓冲区中消费掉$length长度的数据
* @param int $length
* @return void
*/
public function consumeRecvBuffer($length)
{
$this->_recvBuffer = substr($this->_recvBuffer, $length);
}
/**
* 关闭连接
* @param mixed $data
* @void
*/
public function close($data = null)
{
if($this->_status == self::STATUS_CLOSING)
{
return false;
}
else
{
$this->_status = self::STATUS_CLOSING;
}
if($data !== null)
{
$this->send($data);
}
if($this->_sendBuffer === '')
{
$this->destroy();
}
}
/**
* 获得socket连接
* @return resource
*/
public function getSocket()
{
return $this->_socket;
}
/**
* 检查发送缓冲区是否已满如果满了尝试触发onBufferFull回调
* @return void
*/
protected function checkBufferIsFull()
{
if(self::$maxSendBufferSize <= strlen($this->_sendBuffer))
{
if($this->onBufferFull)
{
try
{
call_user_func($this->onBufferFull, $this);
}
catch(Exception $e)
{
echo $e;
}
}
}
}
/**
* 销毁连接
* @void
*/
protected function destroy()
{
self::$statistics['connection_count']--;
if($this->onClose)
{
try
{
call_user_func($this->onClose, $this);
}
catch (Exception $e)
{
self::$statistics['throw_exception']++;
echo $e;
}
}
if($this->worker)
{
unset($this->worker->connections[(int)$this->_socket]);
}
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
@fclose($this->_socket);
$this->_status = self::STATUS_CLOSED;
}
}