commit 37f4dcf4dc9e50b2c9125db3ec576cbcc0fbca50 Author: walkor Date: Sat Apr 4 21:46:31 2015 +0800 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..48b7711 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.buildpath +.project +.settings/org.eclipse.php.core.prefs \ No newline at end of file diff --git a/Applications/PassWall/start.php b/Applications/PassWall/start.php new file mode 100644 index 0000000..af20367 --- /dev/null +++ b/Applications/PassWall/start.php @@ -0,0 +1,15 @@ + + */ +class Autoloader +{ + // 应用的初始化目录,作为加载类文件的参考目录 + protected static $_appInitPath = ''; + + /** + * 设置应用初始化目录 + * @param string $root_path + * @return void + */ + public static function setRootPath($root_path) + { + self::$_appInitPath = $root_path; + } + + /** + * 根据命名空间加载文件 + * @param string $name + * @return boolean + */ + public static function loadByNamespace($name) + { + // 相对路径 + $class_path = str_replace('\\', DIRECTORY_SEPARATOR ,$name); + // 先尝试在应用目录寻找文件 + $class_file = self::$_appInitPath . '/' . $class_path.'.php'; + // 文件不存在,则在workerman根目录中寻找 + if(!is_file($class_file)) + { + $class_file = WORKERMAN_ROOT_DIR . DIRECTORY_SEPARATOR . "$class_path.php"; + } + // 找到文件 + if(is_file($class_file)) + { + // 加载 + require_once($class_file); + if(class_exists($name, false)) + { + return true; + } + } + return false; + } +} +// 设置类自动加载回调函数 +spl_autoload_register('\Workerman\Autoloader::loadByNamespace'); \ No newline at end of file diff --git a/Workerman/Connection/AsyncTcpConnection.php b/Workerman/Connection/AsyncTcpConnection.php new file mode 100644 index 0000000..8466844 --- /dev/null +++ b/Workerman/Connection/AsyncTcpConnection.php @@ -0,0 +1,128 @@ + + */ +class AsyncTcpConnection extends TcpConnection +{ + /** + * 连接状态 连接中 + * @var int + */ + protected $_status = self::STATUS_CONNECTING; + + /** + * 当连接成功时,如果设置了连接成功回调,则执行 + * @var callback + */ + public $onConnect = null; + + /** + * 构造函数,创建连接 + * @param resource $socket + * @param EventInterface $event + */ + public function __construct($remote_address) + { + // 获得协议及远程地址 + list($scheme, $address) = explode(':', $remote_address, 2); + if($scheme != 'tcp') + { + // 判断协议类是否存在 + $scheme = ucfirst($scheme); + $this->protocol = '\\Protocols\\'.$scheme; + if(!class_exists($this->protocol)) + { + $this->protocol = '\\Workerman\\Protocols\\' . $scheme; + if(!class_exists($this->protocol)) + { + throw new Exception("class \\Protocols\\$scheme not exist"); + } + } + } + // 创建异步连接 + $this->_socket = stream_socket_client("tcp:$address", $errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT); + // 如果失败尝试触发失败回调(如果有回调的话) + if(!$this->_socket) + { + $this->emitError(WORKERMAN_CONNECT_FAIL, $errstr); + return; + } + // 监听连接可写事件(可写意味着连接已经建立或者已经出错) + Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'checkConnection')); + } + + /** + * 尝试触发失败回调 + * @param int $code + * @param string $msg + * @return void + */ + protected function emitError($code, $msg) + { + if($this->onError) + { + try{ + call_user_func($this->onError, $this, $code, $msg); + } + catch(Exception $e) + { + echo $e; + } + } + } + + /** + * 检查连接状态,连接成功还是失败 + * @param resource $socket + * @return void + */ + public function checkConnection($socket) + { + // 删除连接可写监听 + Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE); + // 需要判断两次连接是否已经断开 + if(!feof($this->_socket) && !feof($this->_socket)) + { + // 设置非阻塞 + stream_set_blocking($this->_socket, 0); + // 监听可读事件 + Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead')); + // 如果发送缓冲区有数据则执行发送 + if($this->_sendBuffer) + { + Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite')); + } + // 标记状态为连接已经建立 + $this->_status = self::STATUS_ESTABLISH; + // 为status 命令统计数据 + ConnectionInterface::$statistics['connection_count']++; + // 如果有设置onConnect回调,则执行 + if($this->onConnect) + { + try + { + call_user_func($this->onConnect, $this); + } + catch(Exception $e) + { + self::$statistics['throw_exception']++; + echo $e; + } + } + } + else + { + // 连接未建立成功 + $this->emitError(WORKERMAN_CONNECT_FAIL, 'connect fail'); + } + } +} diff --git a/Workerman/Connection/ConnectionInterface.php b/Workerman/Connection/ConnectionInterface.php new file mode 100644 index 0000000..fcce854 --- /dev/null +++ b/Workerman/Connection/ConnectionInterface.php @@ -0,0 +1,68 @@ + + */ +abstract class ConnectionInterface +{ + /** + * status命令的统计数据 + * @var array + */ + public static $statistics = array( + 'connection_count'=>0, + 'total_request' => 0, + 'throw_exception' => 0, + 'send_fail' => 0, + ); + + /** + * 当收到数据时,如果有设置$onMessage回调,则执行 + * @var callback + */ + public $onMessage = null; + + /** + * 当连接关闭时,如果设置了$onClose回调,则执行 + * @var callback + */ + public $onClose = null; + + /** + * 当出现错误时,如果设置了$onError回调,则执行 + * @var callback + */ + public $onError = null; + + /** + * 发送数据给对端 + * @param string $send_buffer + * @return void|boolean + */ + abstract public function send($send_buffer); + + /** + * 获得远端ip + * @return string + */ + abstract public function getRemoteIp(); + + /** + * 获得远端端口 + * @return int + */ + abstract public function getRemotePort(); + + /** + * 关闭连接,为了保持接口一致,udp保留了此方法,当是udp时调用此方法无任何作用 + * @void + */ + abstract public function close($data = null); +} diff --git a/Workerman/Connection/TcpConnection.php b/Workerman/Connection/TcpConnection.php new file mode 100644 index 0000000..6c5852b --- /dev/null +++ b/Workerman/Connection/TcpConnection.php @@ -0,0 +1,585 @@ + + */ +class TcpConnection extends ConnectionInterface +{ + /** + * 当数据可读时,从socket缓冲区读取多少字节数据 + * @var int + */ + const READ_BUFFER_SIZE = 8192; + + /** + * 连接状态 连接中 + * @var int + */ + const STATUS_CONNECTING = 1; + + /** + * 连接状态 已经建立连接 + * @var int + */ + const STATUS_ESTABLISH = 2; + + /** + * 连接状态 连接关闭中,标识调用了close方法,但是发送缓冲区中任然有数据 + * 等待发送缓冲区的数据发送完毕(写入到socket写缓冲区)后执行关闭 + * @var int + */ + const STATUS_CLOSING = 4; + + /** + * 连接状态 已经关闭 + * @var int + */ + const STATUS_CLOSED = 8; + + /** + * 当对端发来数据时,如果设置了$onMessage回调,则执行 + * @var callback + */ + public $onMessage = null; + + /** + * 当连接关闭时,如果设置了$onClose回调,则执行 + * @var callback + */ + public $onClose = null; + + /** + * 当出现错误是,如果设置了$onError回调,则执行 + * @var callback + */ + public $onError = null; + + /** + * 当发送缓冲区满时,如果设置了$onBufferFull回调,则执行 + * @var callback + */ + public $onBufferFull = null; + + /** + * 当发送缓冲区被清空时,如果设置了$onBufferDrain回调,则执行 + * @var callback + */ + public $onBufferDrain = null; + + /** + * 使用的应用层协议,是协议类的名称 + * 值类似于 Workerman\\Protocols\\Http + * @var string + */ + public $protocol = ''; + + /** + * 属于哪个worker + * @var Worker + */ + public $worker = null; + + /** + * 发送缓冲区大小,当发送缓冲区满时,会尝试触发onBufferFull回调(如果有设置的话) + * 如果没设置onBufferFull回调,由于发送缓冲区满,则后续发送的数据将被丢弃, + * 直到发送缓冲区有空的位置 + * 注意 此值可以动态设置 + * 例如 Workerman\Connection\TcpConnection::$maxSendBufferSize=1024000; + * @var int + */ + public static $maxSendBufferSize = 1048576; + + /** + * 能接受的最大数据包,为了防止恶意攻击,当数据包的大小大于此值时执行断开 + * 注意 此值可以动态设置 + * 例如 Workerman\Connection\TcpConnection::$maxPackageSize=1024000; + * @var int + */ + public static $maxPackageSize = 10485760; + + /** + * 实际的socket资源 + * @var resource + */ + protected $_socket = null; + + /** + * 发送缓冲区 + * @var string + */ + protected $_sendBuffer = ''; + + /** + * 接收缓冲区 + * @var string + */ + protected $_recvBuffer = ''; + + /** + * 当前正在处理的数据包的包长(此值是协议的intput方法的返回值) + * @var int + */ + protected $_currentPackageLength = 0; + + /** + * 当前的连接状态 + * @var int + */ + protected $_status = self::STATUS_ESTABLISH; + + /** + * 对端ip + * @var string + */ + protected $_remoteIp = ''; + + /** + * 对端端口 + * @var int + */ + protected $_remotePort = 0; + + /** + * 对端的地址 ip+port + * 值类似于 192.168.1.100:3698 + * @var string + */ + protected $_remoteAddress = ''; + + /** + * 是否是停止接收数据 + * @var bool + */ + protected $_isPaused = false; + + /** + * 构造函数 + * @param resource $socket + * @param EventInterface $event + */ + public function __construct($socket) + { + $this->_socket = $socket; + stream_set_blocking($this->_socket, 0); + Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead')); + } + + /** + * 发送数据给对端 + * @param string $send_buffer + * @param bool $raw + * @return void|boolean + */ + public function send($send_buffer, $raw = false) + { + // 如果当前状态是连接中,则把数据放入发送缓冲区 + if($this->_status === self::STATUS_CONNECTING) + { + $this->_sendBuffer .= $send_buffer; + return null; + } + // 如果当前连接是关闭,则返回false + elseif($this->_status == self::STATUS_CLOSED) + { + return false; + } + + // 如果没有设置以原始数据发送,并且有设置协议则按照协议编码 + if(false === $raw && $this->protocol) + { + $parser = $this->protocol; + $send_buffer = $parser::encode($send_buffer, $this); + } + // 如果发送缓冲区为空,尝试直接发送 + if($this->_sendBuffer === '') + { + // 直接发送 + $len = @fwrite($this->_socket, $send_buffer); + // 所有数据都发送完毕 + if($len === strlen($send_buffer)) + { + return true; + } + // 只有部分数据发送成功 + if($len > 0) + { + // 未发送成功部分放入发送缓冲区 + $this->_sendBuffer = substr($send_buffer, $len); + } + else + { + // 如果连接断开 + if(feof($this->_socket)) + { + // status统计发送失败次数 + self::$statistics['send_fail']++; + // 如果有设置失败回调,则执行 + if($this->onError) + { + try + { + call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client closed'); + } + catch(Exception $e) + { + echo $e; + } + } + // 销毁连接 + $this->destroy(); + return false; + } + // 连接未断开,发送失败,则把所有数据放入发送缓冲区 + $this->_sendBuffer = $send_buffer; + } + // 监听对端可写事件 + Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite')); + // 检查发送缓冲区是否已满,如果满了尝试触发onBufferFull回调 + $this->checkBufferIsFull(); + return null; + } + else + { + // 缓冲区已经标记为满,仍然然有数据发送,则丢弃数据包 + if(self::$maxSendBufferSize <= strlen($this->_sendBuffer)) + { + // 为status命令统计发送失败次数 + self::$statistics['send_fail']++; + // 如果有设置失败回调,则执行 + if($this->onError) + { + try + { + call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package'); + } + catch(Exception $e) + { + echo $e; + } + } + return false; + } + // 将数据放入放缓冲区 + $this->_sendBuffer .= $send_buffer; + // 检查发送缓冲区是否已满,如果满了尝试触发onBufferFull回调 + $this->checkBufferIsFull(); + } + } + + /** + * 获得对端ip + * @return string + */ + public function getRemoteIp() + { + if(!$this->_remoteIp) + { + $this->_remoteAddress = stream_socket_get_name($this->_socket, true); + if($this->_remoteAddress) + { + list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2); + $this->_remotePort = (int)$this->_remotePort; + } + } + return $this->_remoteIp; + } + + /** + * 获得对端端口 + * @return int + */ + public function getRemotePort() + { + if(!$this->_remotePort) + { + $this->_remoteAddress = stream_socket_get_name($this->_socket, true); + if($this->_remoteAddress) + { + list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2); + $this->_remotePort = (int)$this->_remotePort; + } + } + return $this->_remotePort; + } + + /** + * 暂停接收数据,一般用于控制上传流量 + * @return void + */ + public function pauseRecv() + { + Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ); + $this->_isPaused = true; + } + + /** + * 恢复接收数据,一般用户控制上传流量 + * @return void + */ + public function resumeRecv() + { + if($this->_isPaused == true) + { + Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead')); + $this->_isPaused = false; + $this->baseRead($this->_socket); + } + } + + /** + * 当socket可读时的回调 + * @param resource $socket + * @return void + */ + public function baseRead($socket) + { + while($buffer = fread($socket, self::READ_BUFFER_SIZE)) + { + $this->_recvBuffer .= $buffer; + } + + if($this->_recvBuffer) + { + if(!$this->onMessage) + { + return ; + } + + // 如果设置了协议 + if($this->protocol) + { + $parser = $this->protocol; + while($this->_recvBuffer && !$this->_isPaused) + { + // 当前包的长度已知 + if($this->_currentPackageLength) + { + // 数据不够一个包 + if($this->_currentPackageLength > strlen($this->_recvBuffer)) + { + break; + } + } + else + { + // 获得当前包长 + $this->_currentPackageLength = $parser::input($this->_recvBuffer, $this); + // 数据不够,无法获得包长 + if($this->_currentPackageLength === 0) + { + break; + } + elseif($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize) + { + // 数据不够一个包 + if($this->_currentPackageLength > strlen($this->_recvBuffer)) + { + break; + } + } + // 包错误 + else + { + $this->close('error package. package_length='.var_export($this->_currentPackageLength, true)); + } + } + + // 数据足够一个包长 + self::$statistics['total_request']++; + // 从缓冲区中获取一个完整的包 + $one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength); + // 将当前包从接受缓冲区中去掉 + $this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength); + // 重置当前包长为0 + $this->_currentPackageLength = 0; + // 处理数据包 + try + { + call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this)); + } + catch(Exception $e) + { + self::$statistics['throw_exception']++; + echo $e; + } + } + if($this->_status !== self::STATUS_CLOSED && feof($socket)) + { + $this->destroy(); + } + return; + } + // 没有设置协议,则直接把接收的数据当做一个包处理 + self::$statistics['total_request']++; + try + { + call_user_func($this->onMessage, $this, $this->_recvBuffer); + } + catch(Exception $e) + { + self::$statistics['throw_exception']++; + echo $e; + } + // 清空缓冲区 + $this->_recvBuffer = ''; + // 判断连接是否已经断开 + if($this->_status !== self::STATUS_CLOSED && feof($socket)) + { + $this->destroy(); + return; + } + } + // 没收到数据,判断连接是否已经断开 + else if(feof($socket)) + { + $this->destroy(); + return; + } + } + + /** + * socket可写时的回调 + * @return void + */ + public function baseWrite() + { + $len = @fwrite($this->_socket, $this->_sendBuffer); + if($len === strlen($this->_sendBuffer)) + { + Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE); + $this->_sendBuffer = ''; + // 发送缓冲区的数据被发送完毕,尝试触发onBufferDrain回调 + if($this->onBufferDrain) + { + try + { + call_user_func($this->onBufferDrain, $this); + } + catch(Exception $e) + { + echo $e; + } + } + // 如果连接状态为关闭,则销毁连接 + if($this->_status == self::STATUS_CLOSING) + { + $this->destroy(); + } + return true; + } + if($len > 0) + { + $this->_sendBuffer = substr($this->_sendBuffer, $len); + } + else + { + if(feof($this->_socket)) + { + self::$statistics['send_fail']++; + $this->destroy(); + } + } + } + + /** + * 从缓冲区中消费掉$length长度的数据 + * @param int $length + * @return void + */ + public function consumeRecvBuffer($length) + { + $this->_recvBuffer = substr($this->_recvBuffer, $length); + } + + /** + * 关闭连接 + * @param mixed $data + * @void + */ + public function close($data = null) + { + if($this->_status == self::STATUS_CLOSING) + { + return false; + } + else + { + $this->_status = self::STATUS_CLOSING; + } + if($data !== null) + { + $this->send($data); + } + if($this->_sendBuffer === '') + { + $this->destroy(); + } + } + + /** + * 获得socket连接 + * @return resource + */ + public function getSocket() + { + return $this->_socket; + } + + /** + * 检查发送缓冲区是否已满,如果满了尝试触发onBufferFull回调 + * @return void + */ + protected function checkBufferIsFull() + { + if(self::$maxSendBufferSize <= strlen($this->_sendBuffer)) + { + if($this->onBufferFull) + { + try + { + call_user_func($this->onBufferFull, $this); + } + catch(Exception $e) + { + echo $e; + } + } + } + } + /** + * 销毁连接 + * @void + */ + protected function destroy() + { + self::$statistics['connection_count']--; + if($this->onClose) + { + try + { + call_user_func($this->onClose, $this); + } + catch (Exception $e) + { + self::$statistics['throw_exception']++; + echo $e; + } + } + if($this->worker) + { + unset($this->worker->connections[(int)$this->_socket]); + } + Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ); + Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE); + @fclose($this->_socket); + $this->_status = self::STATUS_CLOSED; + } +} diff --git a/Workerman/Connection/UdpConnection.php b/Workerman/Connection/UdpConnection.php new file mode 100644 index 0000000..3fda857 --- /dev/null +++ b/Workerman/Connection/UdpConnection.php @@ -0,0 +1,106 @@ + + */ +class UdpConnection extends ConnectionInterface +{ + /** + * 应用层协议 + * 值类似于 Workerman\\Protocols\\Http + * @var string + */ + public $protocol = ''; + + /** + * udp socket 资源 + * @var resource + */ + protected $_socket = null; + + /** + * 对端 ip + * @var string + */ + protected $_remoteIp = ''; + + /** + * 对端 端口 + * @var int + */ + protected $_remotePort = 0; + + /** + * 对端 地址 + * 值类似于 192.168.10.100:3698 + * @var string + */ + protected $_remoteAddress = ''; + + /** + * 构造函数 + * @param resource $socket + * @param string $remote_address + */ + public function __construct($socket, $remote_address) + { + $this->_socket = $socket; + $this->_remoteAddress = $remote_address; + } + + /** + * 发送数据给对端 + * @param string $send_buffer + * @return void|boolean + */ + public function send($send_buffer) + { + return strlen($send_buffer) === stream_socket_sendto($this->_socket, $send_buffer, 0, $this->_remoteAddress); + } + + /** + * 获得对端 ip + * @return string + */ + public function getRemoteIp() + { + if(!$this->_remoteIp) + { + list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2); + } + return $this->_remoteIp; + } + + /** + * 获得对端端口 + */ + public function getRemotePort() + { + if(!$this->_remotePort) + { + list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2); + } + return $this->_remotePort; + } + + /** + * 关闭连接(此处为了保持与TCP接口一致,提供了close方法) + * @void + */ + public function close($data = null) + { + if($data !== null) + { + $this->send($data); + } + return true; + } +} diff --git a/Workerman/Events/EventInterface.php b/Workerman/Events/EventInterface.php new file mode 100644 index 0000000..3643b9d --- /dev/null +++ b/Workerman/Events/EventInterface.php @@ -0,0 +1,64 @@ + + */ +class Libevent implements EventInterface +{ + /** + * eventBase + * @var object + */ + protected $_eventBase = null; + + /** + * 所有的事件 + * @var array + */ + protected $_allEvents = array(); + + /** + * 所有的信号事件 + * @var array + */ + protected $_eventSignal = array(); + + /** + * 所有的定时事件 + * [func, args, event, flag, time_interval] + * @var array + */ + protected $_eventTimer = array(); + + /** + * 构造函数 + * @return void + */ + public function __construct() + { + $this->_eventBase = event_base_new(); + } + + /** + * 添加事件 + * @see EventInterface::add() + */ + public function add($fd, $flag, $func, $args=null) + { + switch($flag) + { + case self::EV_SIGNAL: + $fd_key = (int)$fd; + $real_flag = EV_SIGNAL | EV_PERSIST; + $this->_eventSignal[$fd_key] = event_new(); + if(!event_set($this->_eventSignal[$fd_key], $fd, $real_flag, $func, null)) + { + return false; + } + if(!event_base_set($this->_eventSignal[$fd_key], $this->_eventBase)) + { + return false; + } + if(!event_add($this->_eventSignal[$fd_key])) + { + return false; + } + return true; + case self::EV_TIMER: + case self::EV_TIMER_ONCE: + $event = event_new(); + $timer_id = (int)$event; + if(!event_set($event, 0, EV_TIMEOUT, array($this, 'timerCallback'), $timer_id)) + { + return false; + } + + if(!event_base_set($event, $this->_eventBase)) + { + return false; + } + + $time_interval = $fd*1000000; + if(!event_add($event, $time_interval)) + { + return false; + } + $this->_eventTimer[$timer_id] = array($func, (array)$args, $event, $flag, $time_interval); + return $timer_id; + + default : + $fd_key = (int)$fd; + $real_flag = $flag == self::EV_READ ? EV_READ | EV_PERSIST : EV_WRITE | EV_PERSIST; + + $event = event_new(); + + if(!event_set($event, $fd, $real_flag, $func, null)) + { + return false; + } + + if(!event_base_set($event, $this->_eventBase)) + { + return false; + } + + if(!event_add($event)) + { + return false; + } + + $this->_allEvents[$fd_key][$flag] = $event; + + return true; + } + + } + + /** + * 删除事件 + * @see Events\EventInterface::del() + */ + public function del($fd ,$flag) + { + switch($flag) + { + case self::EV_READ: + case self::EV_WRITE: + $fd_key = (int)$fd; + if(isset($this->_allEvents[$fd_key][$flag])) + { + event_del($this->_allEvents[$fd_key][$flag]); + unset($this->_allEvents[$fd_key][$flag]); + } + if(empty($this->_allEvents[$fd_key])) + { + unset($this->_allEvents[$fd_key]); + } + break; + case self::EV_SIGNAL: + $fd_key = (int)$fd; + if(isset($this->_eventSignal[$fd_key])) + { + event_del($this->_eventSignal[$fd_key]); + unset($this->_eventSignal[$fd_key]); + } + break; + case self::EV_TIMER: + case self::EV_TIMER_ONCE: + // 这里 fd 为timerid + if(isset($this->_eventTimer[$fd])) + { + event_del($this->_eventTimer[$fd][2]); + unset($this->_eventTimer[$fd]); + } + break; + } + return true; + } + + /** + * 定时器回调 + * @param null $_null + * @param null $_null + * @param int $timer_id + */ + protected function timerCallback($_null, $_null, $timer_id) + { + // 如果是连续的定时任务,再把任务加进去 + if($this->_eventTimer[$timer_id][3] == self::EV_TIMER) + { + event_add($this->_eventTimer[$timer_id][2], $this->_eventTimer[$timer_id][4]); + } + try + { + // 执行任务 + call_user_func_array($this->_eventTimer[$timer_id][0], $this->_eventTimer[$timer_id][1]); + } + catch(\Exception $e) + { + echo $e; + } + } + + /** + * 删除所有定时器 + * @return void + */ + public function clearAllTimer() + { + foreach($this->_eventTimer as $task_data) + { + event_del($task_data[2]); + } + $this->_eventTimer = array(); + } + + + /** + * 事件循环 + * @see EventInterface::loop() + */ + public function loop() + { + event_base_loop($this->_eventBase); + } +} + diff --git a/Workerman/Events/Select.php b/Workerman/Events/Select.php new file mode 100644 index 0000000..3389c97 --- /dev/null +++ b/Workerman/Events/Select.php @@ -0,0 +1,267 @@ +channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); + if($this->channel) + { + stream_set_blocking($this->channel[0], 0); + $this->_readFds[0] = $this->channel[0]; + } + // 初始化优先队列(最大堆) + $this->_scheduler = new \SplPriorityQueue(); + $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH); + } + + /** + * 添加事件及处理函数 + * @see Events\EventInterface::add() + */ + public function add($fd, $flag, $func, $args = null) + { + switch ($flag) + { + case self::EV_READ: + $fd_key = (int)$fd; + $this->_allEvents[$fd_key][$flag] = array($func, $fd); + $this->_readFds[$fd_key] = $fd; + break; + case self::EV_WRITE: + $fd_key = (int)$fd; + $this->_allEvents[$fd_key][$flag] = array($func, $fd); + $this->_writeFds[$fd_key] = $fd; + break; + case self::EV_SIGNAL: + $fd_key = (int)$fd; + $this->_signalEvents[$fd_key][$flag] = array($func, $fd); + pcntl_signal($fd, array($this, 'signalHandler')); + break; + case self::EV_TIMER: + case self::EV_TIMER_ONCE: + // $fd 为 定时的时间间隔,单位为秒,支持小数,能精确到0.001秒 + $run_time = microtime(true)+$fd; + $this->_scheduler->insert($this->_timerId, -$run_time); + $this->_task[$this->_timerId] = array($func, $args, $flag, $fd); + $this->tick(); + return $this->_timerId++; + } + + return true; + } + + /** + * 信号处理函数 + * @param int $signal + */ + public function signalHandler($signal) + { + call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal)); + } + + /** + * 删除某个描述符的某类事件的监听 + * @see Events\EventInterface::del() + */ + public function del($fd ,$flag) + { + $fd_key = (int)$fd; + switch ($flag) + { + case self::EV_READ: + unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]); + if(empty($this->_allEvents[$fd_key])) + { + unset($this->_allEvents[$fd_key]); + } + return true; + case self::EV_WRITE: + unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]); + if(empty($this->_allEvents[$fd_key])) + { + unset($this->_allEvents[$fd_key]); + } + return true; + case self::EV_SIGNAL: + unset($this->_signalEvents[$fd_key]); + pcntl_signal($fd, SIG_IGN); + break; + case self::EV_TIMER: + case self::EV_TIMER_ONCE; + // $fd_key为要删除的定时器id,即timerId + unset($this->_task[$fd_key]); + return true; + } + return false;; + } + + /** + * 检查是否有可执行的定时任务,有的话执行 + * @return void + */ + protected function tick() + { + while(!$this->_scheduler->isEmpty()) + { + $scheduler_data = $this->_scheduler->top(); + $timer_id = $scheduler_data['data']; + $next_run_time = -$scheduler_data['priority']; + $time_now = microtime(true); + if($time_now >= $next_run_time) + { + $this->_scheduler->extract(); + + // 如果任务不存在,则是对应的定时器已经删除 + if(!isset($this->_task[$timer_id])) + { + continue; + } + + // 任务数据[func, args, flag, timer_interval] + $task_data = $this->_task[$timer_id]; + // 如果是持续的定时任务,再把任务加到定时队列 + if($task_data[2] == self::EV_TIMER) + { + $next_run_time = $time_now+$task_data[3]; + $this->_scheduler->insert($timer_id, -$next_run_time); + } + // 尝试执行任务 + try + { + call_user_func_array($task_data[0], $task_data[1]); + } + catch(\Exception $e) + { + echo $e; + } + continue; + } + else + { + // 设定超时时间 + $this->_selectTimeout = ($next_run_time - $time_now)*1000000; + return; + } + } + $this->_selectTimeout = 100000000; + } + + /** + * 删除所有定时器 + * @return void + */ + public function clearAllTimer() + { + $this->_scheduler = new \SplPriorityQueue(); + $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH); + $this->_task = array(); + } + + /** + * 主循环 + * @see Events\EventInterface::loop() + */ + public function loop() + { + $e = null; + while (1) + { + // 如果有信号,尝试执行信号处理函数 + pcntl_signal_dispatch(); + + $read = $this->_readFds; + $write = $this->_writeFds; + // 等待可读或者可写事件 + @stream_select($read, $write, $e, 0, $this->_selectTimeout); + + // 这些描述符可读,执行对应描述符的读回调函数 + if($read) + { + foreach($read as $fd) + { + $fd_key = (int) $fd; + if(isset($this->_allEvents[$fd_key][self::EV_READ])) + { + call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1])); + } + } + } + + // 这些描述符可写,执行对应描述符的写回调函数 + if($write) + { + foreach($write as $fd) + { + $fd_key = (int) $fd; + if(isset($this->_allEvents[$fd_key][self::EV_WRITE])) + { + call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1])); + } + } + } + + // 尝试执行定时任务 + if(!$this->_scheduler->isEmpty()) + { + $this->tick(); + } + } + } +} diff --git a/Workerman/Lib/Constants.php b/Workerman/Lib/Constants.php new file mode 100644 index 0000000..46e0b20 --- /dev/null +++ b/Workerman/Lib/Constants.php @@ -0,0 +1,13 @@ +example: + *
+ * 
+ * Workerman\Lib\Timer::init();
+ * Workerman\Lib\Timer::add($time_interval, callback, array($arg1, $arg2..));
+ * 
+ * 
+* @author walkor + */ +class Timer +{ + /** + * 基于ALARM信号的任务 + * [ + * run_time => [[$func, $args, $persistent, time_interval],[$func, $args, $persistent, time_interval],..]], + * run_time => [[$func, $args, $persistent, time_interval],[$func, $args, $persistent, time_interval],..]], + * .. + * ] + * @var array + */ + protected static $_tasks = array(); + + /** + * event + * @var event + */ + protected static $_event = null; + + + /** + * 初始化 + * @return void + */ + public static function init($event = null) + { + if($event) + { + self::$_event = $event; + } + else + { + pcntl_signal(SIGALRM, array('\Workerman\Lib\Timer', 'signalHandle'), false); + } + } + + /** + * 信号处理函数,只处理ALARM事件 + * @return void + */ + public static function signalHandle() + { + if(!self::$_event) + { + pcntl_alarm(1); + self::tick(); + } + } + + + /** + * 添加一个定时器 + * @param int $time_interval + * @param callback $func + * @param mix $args + * @return void + */ + public static function add($time_interval, $func, $args = array(), $persistent = true) + { + if($time_interval <= 0) + { + echo new Exception("bad time_interval"); + return false; + } + + if(self::$_event) + { + return self::$_event->add($time_interval, $persistent ? EventInterface::EV_TIMER : EventInterface::EV_TIMER_ONCE , $func, $args); + } + + if(!is_callable($func)) + { + echo new Exception("not callable"); + return false; + } + + if(empty(self::$_tasks)) + { + pcntl_alarm(1); + } + + $time_now = time(); + $run_time = $time_now + $time_interval; + if(!isset(self::$_tasks[$run_time])) + { + self::$_tasks[$run_time] = array(); + } + self::$_tasks[$run_time][] = array($func, $args, $persistent, $time_interval); + return true; + } + + + /** + * 尝试触发定时回调 + * @return void + */ + public static function tick() + { + if(empty(self::$_tasks)) + { + pcntl_alarm(0); + return; + } + + $time_now = time(); + foreach (self::$_tasks as $run_time=>$task_data) + { + if($time_now >= $run_time) + { + foreach($task_data as $index=>$one_task) + { + $task_func = $one_task[0]; + $task_args = $one_task[1]; + $persistent = $one_task[2]; + $time_interval = $one_task[3]; + try + { + call_user_func_array($task_func, $task_args); + } + catch(\Exception $e) + { + echo $e; + } + if($persistent) + { + self::add($time_interval, $task_func, $task_args); + } + } + unset(self::$_tasks[$run_time]); + } + } + } + + /** + * 删除定时器 + * @param $timer_id + */ + public static function del($timer_id) + { + if(self::$_event) + { + return self::$_event->del($timer_id, EventInterface::EV_TIMER); + } + } + + /** + * 删除所有定时 + */ + public static function delAll() + { + self::$_tasks = array(); + pcntl_alarm(0); + if(self::$_event) + { + self::$_event->clearAllTimer(); + } + } +} diff --git a/Workerman/Protocols/GatewayProtocol.php b/Workerman/Protocols/GatewayProtocol.php new file mode 100644 index 0000000..107eefd --- /dev/null +++ b/Workerman/Protocols/GatewayProtocol.php @@ -0,0 +1,151 @@ + + */ + +class GatewayProtocol +{ + // 发给worker,gateway有一个新的连接 + const CMD_ON_CONNECTION = 1; + + // 发给worker的,客户端有消息 + const CMD_ON_MESSAGE = 3; + + // 发给worker上的关闭链接事件 + const CMD_ON_CLOSE = 4; + + // 发给gateway的向单个用户发送数据 + const CMD_SEND_TO_ONE = 5; + + // 发给gateway的向所有用户发送数据 + const CMD_SEND_TO_ALL = 6; + + // 发给gateway的踢出用户 + const CMD_KICK = 7; + + // 发给gateway,通知用户session更改 + const CMD_UPDATE_SESSION = 9; + + // 获取在线状态 + const CMD_GET_ONLINE_STATUS = 10; + + // 判断是否在线 + const CMD_IS_ONLINE = 11; + + // 包体是标量 + const FLAG_BODY_IS_SCALAR = 0x01; + + /** + * 包头长度 + * @var integer + */ + const HEAD_LEN = 26; + + public static $empty = array( + 'cmd' => 0, + 'local_ip' => '0.0.0.0', + 'local_port' => 0, + 'client_ip' => '0.0.0.0', + 'client_port' => 0, + 'client_id' => 0, + 'flag' => 0, + 'ext_data' => '', + 'body' => '', + ); + + /** + * 返回包长度 + * @param string $buffer + * @return int return current package length + */ + public static function input($buffer) + { + if(strlen($buffer) < self::HEAD_LEN) + { + return 0; + } + + $data = unpack("Npack_len", $buffer); + return $data['pack_len']; + } + + /** + * 获取整个包的buffer + * @param array $data + * @return string + */ + public static function encode($data) + { + $flag = (int)is_scalar($data['body']); + if(!$flag) + { + $data['body'] = serialize($data['body']); + } + $ext_len = strlen($data['ext_data']); + $package_len = self::HEAD_LEN + $ext_len + strlen($data['body']); + return pack("NCNnNnNNC", $package_len, + $data['cmd'], ip2long($data['local_ip']), + $data['local_port'], ip2long($data['client_ip']), + $data['client_port'], $data['client_id'], + $ext_len, $flag) . $data['ext_data'] . $data['body']; + } + + /** + * 从二进制数据转换为数组 + * @param string $buffer + * @return array + */ + public static function decode($buffer) + { + $data = unpack("Npack_len/Ccmd/Nlocal_ip/nlocal_port/Nclient_ip/nclient_port/Nclient_id/Next_len/Cflag", $buffer); + $data['local_ip'] = long2ip($data['local_ip']); + $data['client_ip'] = long2ip($data['client_ip']); + if($data['ext_len'] > 0) + { + $data['ext_data'] = substr($buffer, self::HEAD_LEN, $data['ext_len']); + if($data['flag'] & self::FLAG_BODY_IS_SCALAR) + { + $data['body'] = substr($buffer, self::HEAD_LEN + $data['ext_len']); + } + else + { + $data['body'] = unserialize(substr($buffer, self::HEAD_LEN + $data['ext_len'])); + } + } + else + { + $data['ext_data'] = ''; + if($data['flag'] & self::FLAG_BODY_IS_SCALAR) + { + $data['body'] = substr($buffer, self::HEAD_LEN); + } + else + { + $data['body'] = unserialize(substr($buffer, self::HEAD_LEN)); + } + } + return $data; + } +} + + + diff --git a/Workerman/Protocols/Http.php b/Workerman/Protocols/Http.php new file mode 100644 index 0000000..98e7215 --- /dev/null +++ b/Workerman/Protocols/Http.php @@ -0,0 +1,552 @@ + + */ +class Http +{ + /** + * 判断包长 + * @param string $recv_buffer + * @param TcpConnection $connection + * @return int + */ + public static function input($recv_buffer, TcpConnection $connection) + { + if(!strpos($recv_buffer, "\r\n\r\n")) + { + // 无法获得包长,避免客户端传递超大头部的数据包 + if(strlen($recv_buffer)>=TcpConnection::$maxPackageSize) + { + $connection->close(); + return 0; + } + return 0; + } + + list($header, $body) = explode("\r\n\r\n", $recv_buffer, 2); + if(0 === strpos($recv_buffer, "POST")) + { + // find Content-Length + $match = array(); + if(preg_match("/\r\nContent-Length: ?(\d+)/", $header, $match)) + { + $content_lenght = $match[1]; + return $content_lenght + strlen($header) + 4; + } + else + { + return 0; + } + } + else + { + return strlen($header)+4; + } + } + + /** + * 从http数据包中解析$_POST、$_GET、$_COOKIE等 + * @param string $recv_buffer + * @param TcpConnection $connection + * @return void + */ + public static function decode($recv_buffer, TcpConnection $connection) + { + // 初始化 + $_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES = array(); + $GLOBALS['HTTP_RAW_POST_DATA'] = ''; + // 清空上次的数据 + HttpCache::$header = array(); + HttpCache::$instance = new HttpCache(); + // 需要设置的变量名 + $_SERVER = array ( + 'QUERY_STRING' => '', + 'REQUEST_METHOD' => '', + 'REQUEST_URI' => '', + 'SERVER_PROTOCOL' => '', + 'SERVER_SOFTWARE' => 'workerman/3.0', + 'SERVER_NAME' => '', + 'HTTP_HOST' => '', + 'HTTP_USER_AGENT' => '', + 'HTTP_ACCEPT' => '', + 'HTTP_ACCEPT_LANGUAGE' => '', + 'HTTP_ACCEPT_ENCODING' => '', + 'HTTP_COOKIE' => '', + 'HTTP_CONNECTION' => '', + 'REMOTE_ADDR' => '', + 'REMOTE_PORT' => '0', + ); + + // 将header分割成数组 + list($http_header, $http_body) = explode("\r\n\r\n", $recv_buffer, 2); + $header_data = explode("\r\n", $http_header); + + list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]); + + unset($header_data[0]); + foreach($header_data as $content) + { + // \r\n\r\n + if(empty($content)) + { + continue; + } + list($key, $value) = explode(':', $content, 2); + $key = strtolower($key); + $value = trim($value); + switch($key) + { + // HTTP_HOST + case 'host': + $_SERVER['HTTP_HOST'] = $value; + $tmp = explode(':', $value); + $_SERVER['SERVER_NAME'] = $tmp[0]; + if(isset($tmp[1])) + { + $_SERVER['SERVER_PORT'] = $tmp[1]; + } + break; + // cookie + case 'cookie': + $_SERVER['HTTP_COOKIE'] = $value; + parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE); + break; + // user-agent + case 'user-agent': + $_SERVER['HTTP_USER_AGENT'] = $value; + break; + // accept + case 'accept': + $_SERVER['HTTP_ACCEPT'] = $value; + break; + // accept-language + case 'accept-language': + $_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value; + break; + // accept-encoding + case 'accept-encoding': + $_SERVER['HTTP_ACCEPT_ENCODING'] = $value; + break; + // connection + case 'connection': + $_SERVER['HTTP_CONNECTION'] = $value; + break; + case 'referer': + $_SERVER['HTTP_REFERER'] = $value; + break; + case 'if-modified-since': + $_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value; + break; + case 'if-none-match': + $_SERVER['HTTP_IF_NONE_MATCH'] = $value; + break; + case 'content-type': + if(!preg_match('/boundary="?(\S+)"?/', $value, $match)) + { + $_SERVER['CONTENT_TYPE'] = $value; + } + else + { + $_SERVER['CONTENT_TYPE'] = 'multipart/form-data'; + $http_post_boundary = '--'.$match[1]; + } + break; + } + } + + // 需要解析$_POST + if($_SERVER['REQUEST_METHOD'] == 'POST') + { + if(isset($_SERVER['CONTENT_TYPE']) && $_SERVER['CONTENT_TYPE'] == 'multipart/form-data') + { + self::parseUploadFiles($http_body, $http_post_boundary); + } + else + { + parse_str($http_body, $_POST); + // $GLOBALS['HTTP_RAW_POST_DATA'] + $GLOBALS['HTTP_RAW_POST_DATA'] = $http_body; + } + } + + // QUERY_STRING + $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY); + if($_SERVER['QUERY_STRING']) + { + // $GET + parse_str($_SERVER['QUERY_STRING'], $_GET); + } + else + { + $_SERVER['QUERY_STRING'] = ''; + } + + // REQUEST + $_REQUEST = array_merge($_GET, $_POST); + + // REMOTE_ADDR REMOTE_PORT + $_SERVER['REMOTE_ADDR'] = $connection->getRemoteIp(); + $_SERVER['REMOTE_PORT'] = $connection->getRemotePort(); + + return $recv_buffer; + } + + /** + * 编码,增加HTTP头 + * @param string $content + * @param TcpConnection $connection + * @return string + */ + public static function encode($content, TcpConnection $connection) + { + // 没有http-code默认给个 + if(!isset(HttpCache::$header['Http-Code'])) + { + $header = "HTTP/1.1 200 OK\r\n"; + } + else + { + $header = HttpCache::$header['Http-Code']."\r\n"; + unset(HttpCache::$header['Http-Code']); + } + + // Content-Type + if(!isset(HttpCache::$header['Content-Type'])) + { + $header .= "Content-Type: text/html;charset=utf-8\r\n"; + } + + // other headers + foreach(HttpCache::$header as $key=>$item) + { + if('Set-Cookie' == $key && is_array($item)) + { + foreach($item as $it) + { + $header .= $it."\r\n"; + } + } + else + { + $header .= $item."\r\n"; + } + } + + // header + $header .= "Server: WorkerMan/3.0\r\nContent-Length: ".strlen($content)."\r\n\r\n"; + + // save session + self::sessionWriteClose(); + + // the whole http package + return $header.$content; + } + + /** + * 设置http头 + * @return bool + */ + public static function header($content, $replace = true, $http_response_code = 0) + { + if(PHP_SAPI != 'cli') + { + return $http_response_code ? header($content, $replace, $http_response_code) : header($content, $replace); + } + if(strpos($content, 'HTTP') === 0) + { + $key = 'Http-Code'; + } + else + { + $key = strstr($content, ":", true); + if(empty($key)) + { + return false; + } + } + + if('location' == strtolower($key) && !$http_response_code) + { + return self::header($content, true, 302); + } + + if(isset(HttpCache::$codes[$http_response_code])) + { + HttpCache::$header['Http-Code'] = "HTTP/1.1 $http_response_code " . HttpCache::$codes[$http_response_code]; + if($key == 'Http-Code') + { + return true; + } + } + + if($key == 'Set-Cookie') + { + HttpCache::$header[$key][] = $content; + } + else + { + HttpCache::$header[$key] = $content; + } + + return true; + } + + /** + * 删除一个header + * @param string $name + * @return void + */ + public static function headerRemove($name) + { + if(PHP_SAPI != 'cli') + { + return header_remove($name); + } + unset( HttpCache::$header[$name]); + } + + /** + * 设置cookie + * @param string $name + * @param string $value + * @param integer $maxage + * @param string $path + * @param string $domain + * @param bool $secure + * @param bool $HTTPOnly + */ + public static function setcookie($name, $value = '', $maxage = 0, $path = '', $domain = '', $secure = false, $HTTPOnly = false) { + if(PHP_SAPI != 'cli') + { + return setcookie($name, $value, $maxage, $path, $domain, $secure, $HTTPOnly); + } + return self::header( + 'Set-Cookie: ' . $name . '=' . rawurlencode($value) + . (empty($domain) ? '' : '; Domain=' . $domain) + . (empty($maxage) ? '' : '; Max-Age=' . $maxage) + . (empty($path) ? '' : '; Path=' . $path) + . (!$secure ? '' : '; Secure') + . (!$HTTPOnly ? '' : '; HttpOnly'), false); + } + + /** + * sessionStart + * @return bool + */ + public static function sessionStart() + { + if(PHP_SAPI != 'cli') + { + return session_start(); + } + if(HttpCache::$instance->sessionStarted) + { + echo "already sessionStarted\nn"; + return true; + } + HttpCache::$instance->sessionStarted = true; + // 没有sid,则创建一个session文件,生成一个sid + if(!isset($_COOKIE[HttpCache::$sessionName]) || !is_file(HttpCache::$sessionPath . '/sess_' . $_COOKIE[HttpCache::$sessionName])) + { + $file_name = tempnam(HttpCache::$sessionPath, 'sess_'); + if(!$file_name) + { + return false; + } + HttpCache::$instance->sessionFile = $file_name; + $session_id = substr(basename($file_name), strlen('sess_')); + return self::setcookie( + HttpCache::$sessionName + , $session_id + , ini_get('session.cookie_lifetime') + , ini_get('session.cookie_path') + , ini_get('session.cookie_domain') + , ini_get('session.cookie_secure') + , ini_get('session.cookie_httponly') + ); + } + if(!HttpCache::$instance->sessionFile) + { + HttpCache::$instance->sessionFile = HttpCache::$sessionPath . '/sess_' . $_COOKIE[HttpCache::$sessionName]; + } + // 有sid则打开文件,读取session值 + if(HttpCache::$instance->sessionFile) + { + $raw = file_get_contents(HttpCache::$instance->sessionFile); + if($raw) + { + session_decode($raw); + } + } + } + + /** + * 保存session + * @return bool + */ + public static function sessionWriteClose() + { + if(PHP_SAPI != 'cli') + { + return session_write_close(); + } + if(!empty(HttpCache::$instance->sessionStarted) && !empty($_SESSION)) + { + $session_str = session_encode(); + if($session_str && HttpCache::$instance->sessionFile) + { + return file_put_contents(HttpCache::$instance->sessionFile, $session_str); + } + } + return empty($_SESSION); + } + + /** + * 退出 + * @param string $msg + * @throws \Exception + */ + public static function end($msg = '') + { + if(PHP_SAPI != 'cli') + { + exit($msg); + } + if($msg) + { + echo $msg; + } + throw new \Exception('jump_exit'); + } + + /** + * get mime types + */ + public static function getMimeTypesFile() + { + return __DIR__.'/Http/mime.types'; + } + + /** + * 解析$_FILES + */ + protected function parseUploadFiles($http_body, $http_post_boundary) + { + $http_body = substr($http_body, 0, strlen($http_body) - (strlen($http_post_boundary) + 4)); + $boundary_data_array = explode($http_post_boundary."\r\n", $http_body); + if($boundary_data_array[0] === '') + { + unset($boundary_data_array[0]); + } + foreach($boundary_data_array as $boundary_data_buffer) + { + list($boundary_header_buffer, $boundary_value) = explode("\r\n\r\n", $boundary_data_buffer, 2); + // 去掉末尾\r\n + $boundary_value = substr($boundary_value, 0, -2); + foreach (explode("\r\n", $boundary_header_buffer) as $item) + { + list($header_key, $header_value) = explode(": ", $item); + $header_key = strtolower($header_key); + switch ($header_key) + { + case "content-disposition": + // 是文件 + if(preg_match('/name=".*?"; filename="(.*?)"$/', $header_value, $match)) + { + $_FILES[] = array( + 'file_name' => $match[1], + 'file_data' => $boundary_value, + 'file_size' => strlen($boundary_value), + ); + continue; + } + // 是post field + else + { + // 收集post + if(preg_match('/name="(.*?)"$/', $header_value, $match)) + { + $_POST[$match[1]] = $boundary_value; + } + } + break; + } + } + } + } +} + +/** + * 解析http协议数据包 缓存先关 + * @author walkor + */ +class HttpCache +{ + public static $codes = array( + 100 => 'Continue', + 101 => 'Switching Protocols', + 200 => 'OK', + 201 => 'Created', + 202 => 'Accepted', + 203 => 'Non-Authoritative Information', + 204 => 'No Content', + 205 => 'Reset Content', + 206 => 'Partial Content', + 300 => 'Multiple Choices', + 301 => 'Moved Permanently', + 302 => 'Found', + 303 => 'See Other', + 304 => 'Not Modified', + 305 => 'Use Proxy', + 306 => '(Unused)', + 307 => 'Temporary Redirect', + 400 => 'Bad Request', + 401 => 'Unauthorized', + 402 => 'Payment Required', + 403 => 'Forbidden', + 404 => 'Not Found', + 405 => 'Method Not Allowed', + 406 => 'Not Acceptable', + 407 => 'Proxy Authentication Required', + 408 => 'Request Timeout', + 409 => 'Conflict', + 410 => 'Gone', + 411 => 'Length Required', + 412 => 'Precondition Failed', + 413 => 'Request Entity Too Large', + 414 => 'Request-URI Too Long', + 415 => 'Unsupported Media Type', + 416 => 'Requested Range Not Satisfiable', + 417 => 'Expectation Failed', + 422 => 'Unprocessable Entity', + 423 => 'Locked', + 500 => 'Internal Server Error', + 501 => 'Not Implemented', + 502 => 'Bad Gateway', + 503 => 'Service Unavailable', + 504 => 'Gateway Timeout', + 505 => 'HTTP Version Not Supported', + ); + public static $instance = null; + public static $header = array(); + public static $sessionPath = ''; + public static $sessionName = ''; + public $sessionStarted = false; + public $sessionFile = ''; + + public static function init() + { + self::$sessionName = ini_get('session.name'); + self::$sessionPath = session_save_path(); + if(!self::$sessionPath) + { + self::$sessionPath = sys_get_temp_dir(); + } + @\session_start(); + } +} diff --git a/Workerman/Protocols/Http/mime.types b/Workerman/Protocols/Http/mime.types new file mode 100644 index 0000000..8a218b2 --- /dev/null +++ b/Workerman/Protocols/Http/mime.types @@ -0,0 +1,80 @@ + +types { + text/html html htm shtml; + text/css css; + text/xml xml; + image/gif gif; + image/jpeg jpeg jpg; + application/x-javascript js; + application/atom+xml atom; + application/rss+xml rss; + + text/mathml mml; + text/plain txt; + text/vnd.sun.j2me.app-descriptor jad; + text/vnd.wap.wml wml; + text/x-component htc; + + image/png png; + image/tiff tif tiff; + image/vnd.wap.wbmp wbmp; + image/x-icon ico; + image/x-jng jng; + image/x-ms-bmp bmp; + image/svg+xml svg svgz; + image/webp webp; + + application/java-archive jar war ear; + application/mac-binhex40 hqx; + application/msword doc; + application/pdf pdf; + application/postscript ps eps ai; + application/rtf rtf; + application/vnd.ms-excel xls; + application/vnd.ms-powerpoint ppt; + application/vnd.wap.wmlc wmlc; + application/vnd.google-earth.kml+xml kml; + application/vnd.google-earth.kmz kmz; + application/x-7z-compressed 7z; + application/x-cocoa cco; + application/x-java-archive-diff jardiff; + application/x-java-jnlp-file jnlp; + application/x-makeself run; + application/x-perl pl pm; + application/x-pilot prc pdb; + application/x-rar-compressed rar; + application/x-redhat-package-manager rpm; + application/x-sea sea; + application/x-shockwave-flash swf; + application/x-stuffit sit; + application/x-tcl tcl tk; + application/x-x509-ca-cert der pem crt; + application/x-xpinstall xpi; + application/xhtml+xml xhtml; + application/zip zip; + + application/octet-stream bin exe dll; + application/octet-stream deb; + application/octet-stream dmg; + application/octet-stream eot; + application/octet-stream iso img; + application/octet-stream msi msp msm; + + audio/midi mid midi kar; + audio/mpeg mp3; + audio/ogg ogg; + audio/x-m4a m4a; + audio/x-realaudio ra; + + video/3gpp 3gpp 3gp; + video/mp4 mp4; + video/mpeg mpeg mpg; + video/quicktime mov; + video/webm webm; + video/x-flv flv; + video/x-m4v m4v; + video/x-mng mng; + video/x-ms-asf asx asf; + video/x-ms-wmv wmv; + video/x-msvideo avi; +} diff --git a/Workerman/Protocols/ProtocolInterface.php b/Workerman/Protocols/ProtocolInterface.php new file mode 100644 index 0000000..b953633 --- /dev/null +++ b/Workerman/Protocols/ProtocolInterface.php @@ -0,0 +1,44 @@ + + */ +interface ProtocolInterface +{ + /** + * 用于分包,即在接收的buffer中返回当前请求的长度(字节) + * 如果可以在$recv_buffer中得到请求包的长度则返回长度 + * 否则返回0,表示需要更多的数据才能得到当前请求包的长度 + * 如果返回false或者负数,则代表请求不符合协议,则连接会断开 + * @param ConnectionInterface $connection + * @param string $recv_buffer + * @return int|false + */ + public static function input($recv_buffer, ConnectionInterface $connection); + + /** + * 用于请求解包 + * input返回值大于0,并且WorkerMan收到了足够的数据,则自动调用decode + * 然后触发onMessage回调,并将decode解码后的数据传递给onMessage回调的第二个参数 + * 也就是说当收到完整的客户端请求时,会自动调用decode解码,无需业务代码中手动调用 + * @param ConnectionInterface $connection + * @param string $recv_buffer + * @return mixed + */ + public static function decode($recv_buffer, ConnectionInterface $connection); + + /** + * 用于请求打包 + * 当需要向客户端发送数据即调用$connection->send($data);时 + * 会自动把$data用encode打包一次,变成符合协议的数据格式,然后再发送给客户端 + * 也就是说发送给客户端的数据会自动encode打包,无需业务代码中手动调用 + * @param ConnectionInterface $connection + * @param mixed $data + * @return string + */ + public static function encode($data, ConnectionInterface $connection); +} diff --git a/Workerman/Protocols/Text.php b/Workerman/Protocols/Text.php new file mode 100644 index 0000000..68c6a72 --- /dev/null +++ b/Workerman/Protocols/Text.php @@ -0,0 +1,60 @@ + + */ + +class Text +{ + /** + * 检查包的完整性 + * 如果能够得到包长,则返回包的长度,否则返回0继续等待数据 + * @param string $buffer + */ + public static function input($buffer ,TcpConnection $connection) + { + // 由于没有包头,无法预先知道包长,不能无限制的接收数据, + // 所以需要判断当前接收的数据是否超过限定值 + if(strlen($buffer)>=TcpConnection::$maxPackageSize) + { + $connection->close(); + return 0; + } + // 获得换行字符"\n"位置 + $pos = strpos($buffer, "\n"); + // 没有换行符,无法得知包长,返回0继续等待数据 + if($pos === false) + { + return 0; + } + // 有换行符,返回当前包长,包含换行符 + return $pos+1; + } + + /** + * 打包,当向客户端发送数据的时候会自动调用 + * @param string $buffer + * @return string + */ + public static function encode($buffer) + { + // 加上换行 + return $buffer."\n"; + } + + /** + * 解包,当接收到的数据字节数等于input返回的值(大于0的值)自动调用 + * 并传递给onMessage回调函数的$data参数 + * @param string $buffer + * @return string + */ + public static function decode($buffer) + { + // 去掉换行 + return trim($buffer); + } +} diff --git a/Workerman/Protocols/Websocket.php b/Workerman/Protocols/Websocket.php new file mode 100644 index 0000000..746b077 --- /dev/null +++ b/Workerman/Protocols/Websocket.php @@ -0,0 +1,390 @@ + + */ + +use Workerman\Connection\ConnectionInterface; + +class Websocket implements \Workerman\Protocols\ProtocolInterface +{ + /** + * websocket头部最小长度 + * @var int + */ + const MIN_HEAD_LEN = 6; + + /** + * websocket blob类型 + * @var char + */ + const BINARY_TYPE_BLOB = "\x81"; + + /** + * websocket arraybuffer类型 + * @var char + */ + const BINARY_TYPE_ARRAYBUFFER = "\x82"; + + /** + * 检查包的完整性 + * @param string $buffer + */ + public static function input($buffer, ConnectionInterface $connection) + { + // 数据长度 + $recv_len = strlen($buffer); + // 长度不够 + if($recv_len < self::MIN_HEAD_LEN) + { + return 0; + } + + // 还没有握手 + if(empty($connection->websocketHandshake)) + { + return self::dealHandshake($buffer, $connection); + } + + // $connection->websocketCurrentFrameLength有值说明当前fin为0,则缓冲websocket帧数据 + if($connection->websocketCurrentFrameLength) + { + // 如果当前帧数据未收全,则继续收 + if($connection->websocketCurrentFrameLength > $recv_len) + { + // 返回0,因为不清楚完整的数据包长度,需要等待fin=1的帧 + return 0; + } + } + else + { + $data_len = ord($buffer[1]) & 127; + $firstbyte = ord($buffer[0]); + $is_fin_frame = $firstbyte>>7; + $opcode = $firstbyte & 0xf; + switch($opcode) + { + // 附加数据帧 @todo 实现附加数据帧 + case 0x0: + break; + // 文本数据帧 + case 0x1: + break; + // 二进制数据帧 + case 0x2: + break; + // 关闭的包 + case 0x8: + // 如果有设置onWebSocketClose回调,尝试执行 + if(isset($connection->onWebSocketClose)) + { + call_user_func($connection->onWebSocketClose, $connection); + } + // 默认行为是关闭连接 + else + { + $connection->close(); + } + return 0; + // ping的包 + case 0x9: + // 如果有设置onWebSocketPing回调,尝试执行 + if(isset($connection->onWebSocketPing)) + { + call_user_func($connection->onWebSocketPing, $connection); + } + // 默认发送pong + else + { + $connection->send(pack('H*', '8a00'), true); + } + // 从接受缓冲区中消费掉该数据包 + if(!$data_len) + { + $connection->consumeRecvBuffer(self::MIN_HEAD_LEN); + return 0; + } + break; + // pong的包 + case 0xa: + // 如果有设置onWebSocketPong回调,尝试执行 + if(isset($connection->onWebSocketPong)) + { + call_user_func($connection->onWebSocketPong, $connection); + } + // 从接受缓冲区中消费掉该数据包 + if(!$data_len) + { + $connection->consumeRecvBuffer(self::MIN_HEAD_LEN); + return 0; + } + break; + // 错误的opcode + default : + echo "error opcode $opcode and close websocket connection\n"; + $connection->close(); + return 0; + } + + // websocket二进制数据 + $head_len = self::MIN_HEAD_LEN; + if ($data_len === 126) { + $head_len = 8; + if($head_len > $recv_len) + { + return 0; + } + $pack = unpack('ntotal_len', substr($buffer, 2, 2)); + $data_len = $pack['total_len']; + } else if ($data_len === 127) { + $head_len = 14; + if($head_len > $recv_len) + { + return 0; + } + $arr = unpack('N2', substr($buffer, 2, 8)); + $data_len = $arr[1]*4294967296 + $arr[2]; + } + $current_frame_length = $head_len + $data_len; + if($is_fin_frame) + { + return $current_frame_length; + } + else + { + $connection->websocketCurrentFrameLength = $current_frame_length; + } + } + + // 收到的数据刚好是一个frame + if($connection->websocketCurrentFrameLength == $recv_len) + { + self::decode($buffer, $connection); + $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength); + $connection->websocketCurrentFrameLength = 0; + return 0; + } + // 收到的数据大于一个frame + elseif($connection->websocketCurrentFrameLength < $recv_len) + { + self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection); + $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength); + $current_frame_length = $connection->websocketCurrentFrameLength; + $connection->websocketCurrentFrameLength = 0; + // 继续读取下一个frame + return self::input(substr($buffer, $current_frame_length), $connection); + } + // 收到的数据不足一个frame + else + { + return 0; + } + } + + /** + * 打包 + * @param string $buffer + * @return string + */ + public static function encode($buffer, ConnectionInterface $connection) + { + $len = strlen($buffer); + // 还没握手不能发数据 + if(empty($connection->websocketHandshake)) + { + $connection->send("HTTP/1.1 400 Bad Request\r\n\r\n400 Bad Request
Send data before handshake. ", true); + $connection->close(); + return false; + } + $first_byte = $connection->websocketType; + + if($len<=125) + { + return $first_byte.chr($len).$buffer; + } + else if($len<=65535) + { + return $first_byte.chr(126).pack("n", $len).$buffer; + } + else + { + return $first_byte.chr(127).pack("xxxxN", $len).$buffer; + } + } + + /** + * 解包 + * @param string $buffer + * @return string + */ + public static function decode($buffer, ConnectionInterface $connection) + { + $len = $masks = $data = $decoded = null; + $len = ord($buffer[1]) & 127; + if ($len === 126) { + $masks = substr($buffer, 4, 4); + $data = substr($buffer, 8); + } else if ($len === 127) { + $masks = substr($buffer, 10, 4); + $data = substr($buffer, 14); + } else { + $masks = substr($buffer, 2, 4); + $data = substr($buffer, 6); + } + for ($index = 0; $index < strlen($data); $index++) { + $decoded .= $data[$index] ^ $masks[$index % 4]; + } + if($connection->websocketCurrentFrameLength) + { + $connection->websocketDataBuffer .= $decoded; + return $connection->websocketDataBuffer; + } + else + { + $decoded = $connection->websocketDataBuffer . $decoded; + $connection->websocketDataBuffer = ''; + return $decoded; + } + } + + /** + * 处理websocket握手 + * @param string $buffer + * @param TcpConnection $connection + * @return int + */ + protected static function dealHandshake($buffer, $connection) + { + // 握手阶段客户端发送HTTP协议 + if(0 === strpos($buffer, 'GET')) + { + // 判断\r\n\r\n边界 + $heder_end_pos = strpos($buffer, "\r\n\r\n"); + if(!$heder_end_pos) + { + return 0; + } + + // 解析Sec-WebSocket-Key + $Sec_WebSocket_Key = ''; + if(preg_match("/Sec-WebSocket-Key: *(.*?)\r\n/", $buffer, $match)) + { + $Sec_WebSocket_Key = $match[1]; + } + else + { + $connection->send("HTTP/1.1 400 Bad Request\r\n\r\n400 Bad Request
Sec-WebSocket-Key not found", true); + $connection->close(); + return 0; + } + $new_key = base64_encode(sha1($Sec_WebSocket_Key."258EAFA5-E914-47DA-95CA-C5AB0DC85B11",true)); + // 握手返回的数据 + $new_message = "HTTP/1.1 101 Switching Protocols\r\n"; + $new_message .= "Upgrade: websocket\r\n"; + $new_message .= "Sec-WebSocket-Version: 13\r\n"; + $new_message .= "Connection: Upgrade\r\n"; + $new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n"; + $connection->websocketHandshake = true; + $connection->websocketDataBuffer = ''; + $connection->websocketCurrentFrameLength = 0; + $connection->websocketCurrentFrameBuffer = ''; + $connection->consumeRecvBuffer(strlen($buffer)); + $connection->send($new_message, true); + // blob or arraybuffer + $connection->websocketType = self::BINARY_TYPE_BLOB; + // 如果有设置onWebSocketConnect回调,尝试执行 + if(isset($connection->onWebSocketConnect)) + { + self::parseHttpHeader($buffer); + try + { + call_user_func($connection->onWebSocketConnect, $connection, $buffer); + } + catch(\Exception $e) + { + echo $e; + } + $_GET = $_COOKIE = $_SERVER = array(); + } + return 0; + } + // 如果是flash的policy-file-request + elseif(0 === strpos($buffer,'send($policy_xml, true); + $connection->consumeRecvBuffer(strlen($buffer)); + return 0; + } + // 出错 + $connection->send("HTTP/1.1 400 Bad Request\r\n\r\n400 Bad Request
Invalid handshake data for websocket. ", true); + $connection->close(); + return 0; + } + + /** + * 从header中获取 + * @param string $buffer + * @return void + */ + protected static function parseHttpHeader($buffer) + { + $header_data = explode("\r\n", $buffer); + $_SERVER = array(); + list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]); + unset($header_data[0]); + foreach($header_data as $content) + { + // \r\n\r\n + if(empty($content)) + { + continue; + } + list($key, $value) = explode(':', $content, 2); + $key = strtolower($key); + $value = trim($value); + switch($key) + { + // HTTP_HOST + case 'host': + $_SERVER['HTTP_HOST'] = $value; + $tmp = explode(':', $value); + $_SERVER['SERVER_NAME'] = $tmp[0]; + if(isset($tmp[1])) + { + $_SERVER['SERVER_PORT'] = $tmp[1]; + } + break; + // HTTP_COOKIE + case 'cookie': + $_SERVER['HTTP_COOKIE'] = $value; + parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE); + break; + // HTTP_USER_AGENT + case 'user-agent': + $_SERVER['HTTP_USER_AGENT'] = $value; + break; + // HTTP_REFERER + case 'referer': + $_SERVER['HTTP_REFERER'] = $value; + break; + case 'origin': + $_SERVER['HTTP_ORIGIN'] = $value; + break; + } + } + + // QUERY_STRING + $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY); + if($_SERVER['QUERY_STRING']) + { + // $GET + parse_str($_SERVER['QUERY_STRING'], $_GET); + } + else + { + $_SERVER['QUERY_STRING'] = ''; + } + } +} diff --git a/Workerman/WebServer.php b/Workerman/WebServer.php new file mode 100644 index 0000000..f715cc0 --- /dev/null +++ b/Workerman/WebServer.php @@ -0,0 +1,258 @@ + + */ +class WebServer extends Worker +{ + /** + * 默认mime类型 + * @var string + */ + protected static $defaultMimeType = 'text/html; charset=utf-8'; + + /** + * 服务器名到文件路径的转换 + * @var array ['workerman.net'=>'/home', 'www.workerman.net'=>'home/www'] + */ + protected $serverRoot = array(); + + /** + * mime类型映射关系 + * @var array + */ + protected static $mimeTypeMap = array(); + + + /** + * 用来保存用户设置的onWorkerStart回调 + * @var callback + */ + protected $_onWorkerStart = null; + + /** + * 添加站点域名与站点目录的对应关系,类似nginx的 + * @param string $domain + * @param string $root_path + * @return void + */ + public function addRoot($domain, $root_path) + { + $this->serverRoot[$domain] = $root_path; + } + + /** + * 构造函数 + * @param string $socket_name + * @param array $context_option + */ + public function __construct($socket_name, $context_option = array()) + { + list($scheme, $address) = explode(':', $socket_name, 2); + parent::__construct('http:'.$address, $context_option); + $this->name = 'WebServer'; + } + + /** + * 运行 + * @see Workerman.Worker::run() + */ + public function run() + { + $this->_onWorkerStart = $this->onWorkerStart; + $this->onWorkerStart = array($this, 'onWorkerStart'); + $this->onMessage = array($this, 'onMessage'); + parent::run(); + } + + /** + * 进程启动的时候一些初始化工作 + * @throws \Exception + */ + public function onWorkerStart() + { + if(empty($this->serverRoot)) + { + throw new \Exception('server root not set, please use WebServer::addRoot($domain, $root_path) to set server root path'); + } + // 初始化HttpCache + HttpCache::init(); + // 初始化mimeMap + $this->initMimeTypeMap(); + + // 尝试执行开发者设定的onWorkerStart回调 + if($this->_onWorkerStart) + { + call_user_func($this->_onWorkerStart, $this); + } + } + + /** + * 初始化mimeType + * @return void + */ + public function initMimeTypeMap() + { + $mime_file = Http::getMimeTypesFile(); + if(!is_file($mime_file)) + { + $this->notice("$mime_file mime.type file not fond"); + return; + } + $items = file($mime_file, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES); + if(!is_array($items)) + { + $this->log("get $mime_file mime.type content fail"); + return; + } + foreach($items as $content) + { + if(preg_match("/\s*(\S+)\s+(\S.+)/", $content, $match)) + { + $mime_type = $match[1]; + $extension_var = $match[2]; + $extension_array = explode(' ', substr($extension_var, 0, -1)); + foreach($extension_array as $extension) + { + self::$mimeTypeMap[$extension] = $mime_type; + } + } + } + } + + /** + * 当接收到完整的http请求后的处理逻辑 + * 1、如果请求的是以php为后缀的文件,则尝试加载 + * 2、如果请求的url没有后缀,则尝试加载对应目录的index.php + * 3、如果请求的是非php为后缀的文件,尝试读取原始数据并发送 + * 4、如果请求的文件不存在,则返回404 + * @param TcpConnection $connection + * @param mixed $data + * @return void + */ + public function onMessage($connection, $data) + { + // 请求的文件 + $url_info = parse_url($_SERVER['REQUEST_URI']); + if(!$url_info) + { + Http::header('HTTP/1.1 400 Bad Request'); + return $connection->close('

400 Bad Request

'); + } + + $path = $url_info['path']; + + $path_info = pathinfo($path); + $extension = isset($path_info['extension']) ? $path_info['extension'] : '' ; + if($extension == '') + { + $path = ($len = strlen($path)) && $path[$len -1] == '/' ? $path.'index.php' : $path . '/index.php'; + $extension = 'php'; + } + + $root_dir = isset($this->serverRoot[$_SERVER['HTTP_HOST']]) ? $this->serverRoot[$_SERVER['HTTP_HOST']] : current($this->serverRoot); + + $file = "$root_dir/$path"; + + // 对应的php文件不存在则直接使用根目录的index.php + if($extension == 'php' && !is_file($file)) + { + $file = "$root_dir/index.php"; + } + + // 请求的文件存在 + if(is_file($file)) + { + // 判断是否是站点目录里的文件 + if((!($request_realpath = realpath($file)) || !($root_dir_realpath = realpath($root_dir))) || 0 !== strpos($request_realpath, $root_dir_realpath)) + { + Http::header('HTTP/1.1 400 Bad Request'); + return $connection->close('

400 Bad Request

'); + } + + $file = realpath($file); + + // 如果请求的是php文件 + if($extension == 'php') + { + $cwd = getcwd(); + chdir($root_dir); + ini_set('display_errors', 'off'); + // 缓冲输出 + ob_start(); + // 载入php文件 + try + { + // $_SERVER变量 + $_SERVER['REMOTE_ADDR'] = $connection->getRemoteIp(); + $_SERVER['REMOTE_PORT'] = $connection->getRemotePort(); + include $file; + } + catch(\Exception $e) + { + // 如果不是exit + if($e->getMessage() != 'jump_exit') + { + echo $e; + } + } + $content = ob_get_clean(); + ini_set('display_errors', 'on'); + $connection->close($content); + chdir($cwd); + return ; + } + + // 请求的是静态资源文件 + if(isset(self::$mimeTypeMap[$extension])) + { + Http::header('Content-Type: '. self::$mimeTypeMap[$extension]); + } + else + { + Http::header('Content-Type: '. self::$defaultMimeType); + } + + // 获取文件信息 + $info = stat($file); + + $modified_time = $info ? date('D, d M Y H:i:s', $info['mtime']) . ' GMT' : ''; + + // 如果有$_SERVER['HTTP_IF_MODIFIED_SINCE'] + if(!empty($_SERVER['HTTP_IF_MODIFIED_SINCE']) && $info) + { + // 文件没有更改则直接304 + if($modified_time === $_SERVER['HTTP_IF_MODIFIED_SINCE']) + { + // 304 + Http::header('HTTP/1.1 304 Not Modified'); + // 发送给客户端 + return $connection->close(''); + } + } + + if($modified_time) + { + Http::header("Last-Modified: $modified_time"); + } + // 发送给客户端 + return $connection->close(file_get_contents($file)); + } + else + { + // 404 + Http::header("HTTP/1.1 404 Not Found"); + return $connection->close('404 页面不存在

404 Not Found

'); + } + } +} diff --git a/Workerman/Worker.php b/Workerman/Worker.php new file mode 100644 index 0000000..fb05b05 --- /dev/null +++ b/Workerman/Worker.php @@ -0,0 +1,1337 @@ + + */ +class Worker +{ + /** + * 版本号 + * @var string + */ + const VERSION = '3.0.9'; + + /** + * 状态 启动中 + * @var int + */ + const STATUS_STARTING = 1; + + /** + * 状态 运行中 + * @var int + */ + const STATUS_RUNNING = 2; + + /** + * 状态 停止 + * @var int + */ + const STATUS_SHUTDOWN = 4; + + /** + * 状态 平滑重启中 + * @var int + */ + const STATUS_RELOADING = 8; + + /** + * 给子进程发送重启命令 KILL_WORKER_TIMER_TIME 秒后 + * 如果对应进程仍然未重启则强行杀死 + * @var int + */ + const KILL_WORKER_TIMER_TIME = 1; + + /** + * 默认的backlog,即内核中用于存放未被进程认领(accept)的连接队列长度 + * @var int + */ + const DEFAUL_BACKLOG = 1024; + + /** + * udp最大包长 + * @var int + */ + const MAX_UDP_PACKEG_SIZE = 65535; + + /** + * worker的名称,用于在运行status命令时标记进程 + * @var string + */ + public $name = 'none'; + + /** + * 设置当前worker实例的进程数 + * @var int + */ + public $count = 1; + + /** + * 设置当前worker进程的运行用户,启动时需要root超级权限 + * @var string + */ + public $user = ''; + + /** + * 当前worker进程是否可以平滑重启 + * @var bool + */ + public $reloadable = true; + + /** + * 当worker进程启动时,如果设置了$onWorkerStart回调函数,则运行 + * 此钩子函数一般用于进程启动后初始化工作 + * @var callback + */ + public $onWorkerStart = null; + + /** + * 当有客户端连接时,如果设置了$onConnect回调函数,则运行 + * @var callback + */ + public $onConnect = null; + + /** + * 当客户端连接上发来数据时,如果设置了$onMessage回调,则运行 + * @var callback + */ + public $onMessage = null; + + /** + * 当客户端的连接关闭时,如果设置了$onClose回调,则运行 + * @var callback + */ + public $onClose = null; + + /** + * 当客户端的连接发生错误时,如果设置了$onError回调,则运行 + * 错误一般为客户端断开连接导致数据发送失败 + * 具体错误码及错误详情会以参数的形式传递给回调,参见手册 + * @var callback + */ + public $onError = null; + + /** + * 当连接的发送缓冲区满时,如果设置了$onBufferFull回调,则执行 + * @var callback + */ + public $onBufferFull = null; + + /** + * 当链接的发送缓冲区被清空时,如果设置了$onBufferDrain回调,则执行 + * @var callback + */ + public $onBufferDrain = null; + + /** + * 当前进程退出时(由于平滑重启或者服务停止导致),如果设置了此回调,则运行 + * @var callback + */ + public $onWorkerStop = null; + + /** + * 传输层协议 + * @var string + */ + public $transport = 'tcp'; + + /** + * 所有的客户端连接 + * @var array + */ + public $connections = array(); + + /** + * 应用层协议,由初始化worker时指定 + * 例如 new worker('http://0.0.0.0:8080');指定使用http协议 + * @var string + */ + protected $_protocol = ''; + + /** + * 当前worker实例初始化目录位置,用于设置应用自动加载的根目录 + * @var string + */ + protected $_appInitPath = ''; + + /** + * 是否以守护进程的方式运行。运行start时加上-d参数会自动以守护进程方式运行 + * 例如 php start.php start -d + * @var bool + */ + public static $daemonize = false; + + /** + * 重定向标准输出,即将所有echo、var_dump等终端输出写到对应文件中 + * 注意 此参数只有在以守护进程方式运行时有效 + * @var string + */ + public static $stdoutFile = '/dev/null'; + + /** + * pid文件的路径及名称 + * 例如 Worker::$pidFile = '/tmp/workerman.pid'; + * 注意 此属性一般不必手动设置,默认会放到php临时目录中 + * @var string + */ + public static $pidFile = ''; + + /** + * 日志目录,默认在workerman根目录下,与Applications同级 + * 可以手动设置 + * 例如 Worker::$logFile = '/tmp/workerman.log'; + * @var unknown_type + */ + public static $logFile = ''; + + /** + * 全局事件轮询库,用于监听所有资源的可读可写事件 + * @var Select/Libevent + */ + public static $globalEvent = null; + + /** + * 主进程pid + * @var int + */ + protected static $_masterPid = 0; + + /** + * 监听的socket + * @var stream + */ + protected $_mainSocket = null; + + /** + * socket名称,包括应用层协议+ip+端口号,在初始化worker时设置 + * 值类似 http://0.0.0.0:80 + * @var string + */ + protected $_socketName = ''; + + /** + * socket的上下文,具体选项设置可以在初始化worker时传递 + * @var context + */ + protected $_context = null; + + /** + * 所有的worker实例 + * @var array + */ + protected static $_workers = array(); + + /** + * 所有worker进程的pid + * 格式为 [worker_id=>[pid=>pid, pid=>pid, ..], ..] + * @var array + */ + protected static $_pidMap = array(); + + /** + * 所有需要重启的进程pid + * 格式为 [pid=>pid, pid=>pid] + * @var array + */ + protected static $_pidsToRestart = array(); + + /** + * 当前worker状态 + * @var int + */ + protected static $_status = self::STATUS_STARTING; + + /** + * 所有worke名称(name属性)中的最大长度,用于在运行 status 命令时格式化输出 + * @var int + */ + protected static $_maxWorkerNameLength = 12; + + /** + * 所有socket名称(_socketName属性)中的最大长度,用于在运行 status 命令时格式化输出 + * @var int + */ + protected static $_maxSocketNameLength = 12; + + /** + * 所有user名称(user属性)中的最大长度,用于在运行 status 命令时格式化输出 + * @var int + */ + protected static $_maxUserNameLength = 12; + + /** + * 运行 status 命令时用于保存结果的文件名 + * @var string + */ + protected static $_statisticsFile = ''; + + /** + * 启动的全局入口文件 + * 例如 php start.php start ,则入口文件为start.php + * @var string + */ + protected static $_startFile = ''; + + /** + * 全局统计数据,用于在运行 status 命令时展示 + * 统计的内容包括 workerman启动的时间戳及每组worker进程的退出次数及退出状态码 + * @var array + */ + protected static $_globalStatistics = array( + 'start_timestamp' => 0, + 'worker_exit_info' => array() + ); + + /** + * 运行所有worker实例 + * @return void + */ + public static function runAll() + { + // 初始化环境变量 + self::init(); + // 解析命令 + self::parseCommand(); + // 尝试以守护进程模式运行 + self::daemonize(); + // 初始化所有worker实例,主要是监听端口 + self::initWorkers(); + // 初始化所有信号处理函数 + self::installSignal(); + // 展示启动界面 + self::displayUI(); + // 尝试重定向标准输入输出 + self::resetStd(); + // 保存主进程pid + self::saveMasterPid(); + // 创建子进程(worker进程)并运行 + self::forkWorkers(); + // 监控所有子进程(worker进程) + self::monitorWorkers(); + } + + /** + * 初始化一些环境变量 + * @return void + */ + public static function init() + { + // 如果没设置$pidFile,则生成默认值 + if(empty(self::$pidFile)) + { + $backtrace = debug_backtrace(); + self::$_startFile = $backtrace[count($backtrace)-1]['file']; + self::$pidFile = sys_get_temp_dir()."/workerman.".str_replace('/', '_', self::$_startFile).".pid"; + } + // 没有设置日志文件,则生成一个默认值 + if(empty(self::$logFile)) + { + self::$logFile = __DIR__ . '/../workerman.log'; + } + // 标记状态为启动中 + self::$_status = self::STATUS_STARTING; + // 启动时间戳 + self::$_globalStatistics['start_timestamp'] = time(); + // 设置status文件位置 + self::$_statisticsFile = sys_get_temp_dir().'/workerman.status'; + // 尝试设置进程名称(需要php>=5.5或者安装了proctitle扩展) + self::setProcessTitle('WorkerMan: master process start_file=' . self::$_startFile); + + // 初始化定时器 + Timer::init(); + } + + /** + * 初始化所有的worker实例,主要工作为获得格式化所需数据及监听端口 + * @return void + */ + protected static function initWorkers() + { + foreach(self::$_workers as $worker) + { + // 没有设置worker名称,则使用none代替 + if(empty($worker->name)) + { + $worker->name = 'none'; + } + // 获得所有worker名称中最大长度 + $worker_name_length = strlen($worker->name); + if(self::$_maxWorkerNameLength < $worker_name_length) + { + self::$_maxWorkerNameLength = $worker_name_length; + } + // 获得所有_socketName中最大长度 + $socket_name_length = strlen($worker->getSocketName()); + if(self::$_maxSocketNameLength < $socket_name_length) + { + self::$_maxSocketNameLength = $socket_name_length; + } + // 获得运行用户名的最大长度 + if(empty($worker->user) || posix_getuid() !== 0) + { + $worker->user = self::getCurrentUser(); + } + $user_name_length = strlen($worker->user); + if(self::$_maxUserNameLength < $user_name_length) + { + self::$_maxUserNameLength = $user_name_length; + } + // 监听端口 + $worker->listen(); + } + } + + /** + * 获得运行当前进程的用户名 + * @return string + */ + protected static function getCurrentUser() + { + $user_info = posix_getpwuid(posix_getuid()); + return $user_info['name']; + } + + /** + * 展示启动界面 + * @return void + */ + protected static function displayUI() + { + echo "\033[1A\n\033[K-----------------------\033[47;30m WORKERMAN \033[0m-----------------------------\n\033[0m"; + echo 'Workerman version:' . Worker::VERSION . " PHP version:".PHP_VERSION."\n"; + echo "------------------------\033[47;30m WORKERS \033[0m-------------------------------\n"; + echo "\033[47;30muser\033[0m",str_pad('', self::$_maxUserNameLength+2-strlen('user')), "\033[47;30mworker\033[0m",str_pad('', self::$_maxWorkerNameLength+2-strlen('worker')), "\033[47;30mlisten\033[0m",str_pad('', self::$_maxSocketNameLength+2-strlen('listen')), "\033[47;30mprocesses\033[0m \033[47;30m","status\033[0m\n"; + foreach(self::$_workers as $worker) + { + echo str_pad($worker->user, self::$_maxUserNameLength+2),str_pad($worker->name, self::$_maxWorkerNameLength+2),str_pad($worker->getSocketName(), self::$_maxSocketNameLength+2), str_pad(' '.$worker->count, 9), " \033[32;40m [OK] \033[0m\n";; + } + echo "----------------------------------------------------------------\n"; + } + + /** + * 解析运行命令 + * php yourfile.php start | stop | restart | reload | status + * @return void + */ + public static function parseCommand() + { + // 检查运行命令的参数 + global $argv; + $start_file = $argv[0]; + if(!isset($argv[1])) + { + exit("Usage: php yourfile.php {start|stop|restart|reload|status}\n"); + } + + // 命令 + $command = trim($argv[1]); + + // 子命令,目前只支持-d + $command2 = isset($argv[2]) ? $argv[2] : ''; + + // 记录日志 + self::log("Workerman[$start_file] $command"); + + // 检查主进程是否在运行 + $master_pid = @file_get_contents(self::$pidFile); + $master_is_alive = $master_pid && @posix_kill($master_pid, 0); + if($master_is_alive) + { + if($command === 'start') + { + self::log("Workerman[$start_file] is running"); + } + } + elseif($command !== 'start' && $command !== 'restart') + { + self::log("Workerman[$start_file] not run"); + } + + // 根据命令做相应处理 + switch($command) + { + // 启动 workerman + case 'start': + if($command2 == '-d') + { + Worker::$daemonize = true; + } + break; + // 显示 workerman 运行状态 + case 'status': + // 尝试删除统计文件,避免脏数据 + if(is_file(self::$_statisticsFile)) + { + @unlink(self::$_statisticsFile); + } + // 向主进程发送 SIGUSR2 信号 ,然后主进程会向所有子进程发送 SIGUSR2 信号 + // 所有进程收到 SIGUSR2 信号后会向 $_statisticsFile 写入自己的状态 + posix_kill($master_pid, SIGUSR2); + // 睡眠100毫秒,等待子进程将自己的状态写入$_statisticsFile指定的文件 + usleep(100000); + // 展示状态 + readfile(self::$_statisticsFile); + exit(0); + // 重启 workerman + case 'restart': + // 停止 workeran + case 'stop': + self::log("Workerman[$start_file] is stoping ..."); + // 想主进程发送SIGINT信号,主进程会向所有子进程发送SIGINT信号 + $master_pid && posix_kill($master_pid, SIGINT); + // 如果 $timeout 秒后主进程没有退出则展示失败界面 + $timeout = 5; + $start_time = time(); + while(1) + { + // 检查主进程是否存活 + $master_is_alive = $master_pid && posix_kill($master_pid, 0); + if($master_is_alive) + { + // 检查是否超过$timeout时间 + if(time() - $start_time >= $timeout) + { + self::log("Workerman[$start_file] stop fail"); + exit; + } + usleep(10000); + continue; + } + self::log("Workerman[$start_file] stop success"); + // 是restart命令 + if($command === 'stop') + { + exit(0); + } + // -d 说明是以守护进程的方式启动 + if($command2 == '-d') + { + Worker::$daemonize = true; + } + break; + } + break; + // 平滑重启 workerman + case 'reload': + posix_kill($master_pid, SIGUSR1); + self::log("Workerman[$start_file] reload"); + exit; + // 未知命令 + default : + exit("Usage: php yourfile.php {start|stop|restart|reload|status}\n"); + } + } + + /** + * 安装信号处理函数 + * @return void + */ + protected static function installSignal() + { + // stop + pcntl_signal(SIGINT, array('\Workerman\Worker', 'signalHandler'), false); + // reload + pcntl_signal(SIGUSR1, array('\Workerman\Worker', 'signalHandler'), false); + // status + pcntl_signal(SIGUSR2, array('\Workerman\Worker', 'signalHandler'), false); + // ignore + pcntl_signal(SIGPIPE, SIG_IGN, false); + } + + /** + * 为子进程重新安装信号处理函数,使用全局事件轮询监听信号 + * @return void + */ + protected static function reinstallSignal() + { + // uninstall stop signal handler + pcntl_signal(SIGINT, SIG_IGN, false); + // uninstall reload signal handler + pcntl_signal(SIGUSR1, SIG_IGN, false); + // uninstall status signal handler + pcntl_signal(SIGUSR2, SIG_IGN, false); + // reinstall stop signal handler + self::$globalEvent->add(SIGINT, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler')); + // uninstall reload signal handler + self::$globalEvent->add(SIGUSR1, EventInterface::EV_SIGNAL,array('\Workerman\Worker', 'signalHandler')); + // uninstall status signal handler + self::$globalEvent->add(SIGUSR2, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler')); + } + + /** + * 信号处理函数 + * @param int $signal + */ + public static function signalHandler($signal) + { + switch($signal) + { + // stop + case SIGINT: + self::stopAll(); + break; + // reload + case SIGUSR1: + self::$_pidsToRestart = self::getAllWorkerPids();; + self::reload(); + break; + // show status + case SIGUSR2: + self::writeStatisticsToStatusFile(); + break; + } + } + + /** + * 尝试以守护进程的方式运行 + * @throws Exception + */ + protected static function daemonize() + { + if(!self::$daemonize) + { + return; + } + umask(0); + $pid = pcntl_fork(); + if(-1 == $pid) + { + throw new Exception('fork fail'); + } + elseif($pid > 0) + { + exit(0); + } + if(-1 == posix_setsid()) + { + throw new Exception("setsid fail"); + } + // fork again avoid SVR4 system regain the control of terminal + $pid = pcntl_fork(); + if(-1 == $pid) + { + throw new Exception("fork fail"); + } + elseif(0 !== $pid) + { + exit(0); + } + } + + /** + * 重定向标准输入输出 + * @throws Exception + */ + protected static function resetStd() + { + if(!self::$daemonize) + { + return; + } + global $STDOUT, $STDERR; + $handle = fopen(self::$stdoutFile,"a"); + if($handle) + { + unset($handle); + @fclose(STDOUT); + @fclose(STDERR); + $STDOUT = fopen(self::$stdoutFile,"a"); + $STDERR = fopen(self::$stdoutFile,"a"); + } + else + { + throw new Exception('can not open stdoutFile ' . self::$stdoutFile); + } + } + + /** + * 保存pid到文件中,方便运行命令时查找主进程pid + * @throws Exception + */ + protected static function saveMasterPid() + { + self::$_masterPid = posix_getpid(); + if(false === @file_put_contents(self::$pidFile, self::$_masterPid)) + { + throw new Exception('can not save pid to ' . self::$pidFile); + } + } + + /** + * 获得所有子进程的pid + * @return array + */ + protected static function getAllWorkerPids() + { + $pid_array = array(); + foreach(self::$_pidMap as $worker_pid_array) + { + foreach($worker_pid_array as $worker_pid) + { + $pid_array[$worker_pid] = $worker_pid; + } + } + return $pid_array; + } + + /** + * 创建子进程 + * @return void + */ + protected static function forkWorkers() + { + foreach(self::$_workers as $worker) + { + // 启动过程中需要得到运行用户名的最大长度,在status时格式化展示 + if(self::$_status === self::STATUS_STARTING) + { + if(empty($worker->name)) + { + $worker->name = $worker->getSocketName(); + } + $worker_name_length = strlen($worker->name); + if(self::$_maxWorkerNameLength < $worker_name_length) + { + self::$_maxWorkerNameLength = $worker_name_length; + } + } + + // 创建子进程 + while(count(self::$_pidMap[$worker->workerId]) < $worker->count) + { + self::forkOneWorker($worker); + } + } + } + + /** + * 创建一个子进程 + * @param Worker $worker + * @throws Exception + */ + protected static function forkOneWorker($worker) + { + $pid = pcntl_fork(); + // 主进程记录子进程pid + if($pid > 0) + { + self::$_pidMap[$worker->workerId][$pid] = $pid; + } + // 子进程运行 + elseif(0 === $pid) + { + self::$_pidMap = array(); + self::$_workers = array($worker->workerId => $worker); + Timer::delAll(); + self::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName()); + self::setProcessUser($worker->user); + $worker->run(); + exit(250); + } + else + { + throw new Exception("forkOneWorker fail"); + } + } + + /** + * 尝试设置运行当前进程的用户 + * @return void + */ + protected static function setProcessUser($user_name) + { + if(empty($user_name) || posix_getuid() !== 0) + { + return; + } + $user_info = posix_getpwnam($user_name); + if($user_info['uid'] != posix_getuid() || $user_info['gid'] != posix_getgid()) + { + if(!posix_setgid($user_info['gid']) || !posix_setuid($user_info['uid'])) + { + self::log( 'Notice : Can not run woker as '.$user_name." , You shuld be root\n", true); + } + } + } + + + /** + * 设置当前进程的名称,在ps aux命令中有用 + * 注意 需要php>=5.5或者安装了protitle扩展 + * @param string $title + * @return void + */ + protected static function setProcessTitle($title) + { + // >=php 5.5 + if (function_exists('cli_set_process_title')) + { + @cli_set_process_title($title); + } + // 需要扩展 + elseif(extension_loaded('proctitle') && function_exists('setproctitle')) + { + @setproctitle($title); + } + } + + /** + * 监控所有子进程的退出事件及退出码 + * @return void + */ + protected static function monitorWorkers() + { + self::$_status = self::STATUS_RUNNING; + while(1) + { + // 如果有信号到来,尝试触发信号处理函数 + pcntl_signal_dispatch(); + // 挂起进程,直到有子进程退出或者被信号打断 + $status = 0; + $pid = pcntl_wait($status, WUNTRACED); + // 有子进程退出 + if($pid > 0) + { + // 查找是哪个进程组的,然后再启动新的进程补上 + foreach(self::$_pidMap as $worker_id => $worker_pid_array) + { + if(isset($worker_pid_array[$pid])) + { + $worker = self::$_workers[$worker_id]; + // 检查退出状态 + if($status !== 0) + { + self::log("worker[".$worker->name.":$pid] exit with status $status"); + } + + // 统计,运行status命令时使用 + if(!isset(self::$_globalStatistics['worker_exit_info'][$worker_id][$status])) + { + self::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0; + } + self::$_globalStatistics['worker_exit_info'][$worker_id][$status]++; + + // 清除子进程信息 + unset(self::$_pidMap[$worker_id][$pid]); + + break; + } + } + // 如果不是关闭状态,则补充新的进程 + if(self::$_status !== self::STATUS_SHUTDOWN) + { + self::forkWorkers(); + // 如果该进程是因为运行reload命令退出,则继续执行reload流程 + if(isset(self::$_pidsToRestart[$pid])) + { + unset(self::$_pidsToRestart[$pid]); + self::reload(); + } + } + else + { + // 如果是关闭状态,并且所有进程退出完毕,则主进程退出 + if(!self::getAllWorkerPids()) + { + self::exitAndClearAll(); + } + } + } + else + { + // 如果是关闭状态,并且所有进程退出完毕,则主进程退出 + if(self::$_status === self::STATUS_SHUTDOWN && !self::getAllWorkerPids()) + { + self::exitAndClearAll(); + } + } + } + } + + /** + * 退出当前进程 + * @return void + */ + protected static function exitAndClearAll() + { + @unlink(self::$pidFile); + self::log("Workerman[".basename(self::$_startFile)."] has been stopped"); + exit(0); + } + + /** + * 执行平滑重启流程 + * @return void + */ + protected static function reload() + { + // 主进程部分 + if(self::$_masterPid === posix_getpid()) + { + // 设置为平滑重启状态 + if(self::$_status !== self::STATUS_RELOADING && self::$_status !== self::STATUS_SHUTDOWN) + { + self::log("Workerman[".basename(self::$_startFile)."] reloading"); + self::$_status = self::STATUS_RELOADING; + } + + // 如果有worker设置了reloadable=false,则过滤掉 + $reloadable_pid_array = array(); + foreach(self::$_pidMap as $worker_id =>$worker_pid_array) + { + $worker = self::$_workers[$worker_id]; + if($worker->reloadable) + { + foreach($worker_pid_array as $pid) + { + $reloadable_pid_array[$pid] = $pid; + } + } + } + + // 得到所有可以重启的进程 + self::$_pidsToRestart = array_intersect(self::$_pidsToRestart , $reloadable_pid_array); + + // 平滑重启完毕 + if(empty(self::$_pidsToRestart)) + { + if(self::$_status !== self::STATUS_SHUTDOWN) + { + self::$_status = self::STATUS_RUNNING; + } + return; + } + // 继续执行平滑重启流程 + $one_worker_pid = current(self::$_pidsToRestart ); + // 给子进程发送平滑重启信号 + posix_kill($one_worker_pid, SIGUSR1); + // 定时器,如果子进程在KILL_WORKER_TIMER_TIME秒后没有退出,则强行杀死 + Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($one_worker_pid, SIGKILL), false); + } + // 子进程部分 + else + { + // 如果当前worker的reloadable属性为真,则执行退出 + $worker = current(self::$_workers); + if($worker->reloadable) + { + self::stopAll(); + } + } + } + + /** + * 执行关闭流程 + * @return void + */ + public static function stopAll() + { + self::$_status = self::STATUS_SHUTDOWN; + // 主进程部分 + if(self::$_masterPid === posix_getpid()) + { + self::log("Workerman[".basename(self::$_startFile)."] Stopping ..."); + $worker_pid_array = self::getAllWorkerPids(); + // 向所有子进程发送SIGINT信号,表明关闭服务 + foreach($worker_pid_array as $worker_pid) + { + posix_kill($worker_pid, SIGINT); + Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($worker_pid, SIGKILL),false); + } + } + // 子进程部分 + else + { + // 执行stop逻辑 + foreach(self::$_workers as $worker) + { + $worker->stop(); + } + exit(0); + } + } + + /** + * 将当前进程的统计信息写入到统计文件 + * @return void + */ + protected static function writeStatisticsToStatusFile() + { + // 主进程部分 + if(self::$_masterPid === posix_getpid()) + { + $loadavg = sys_getloadavg(); + file_put_contents(self::$_statisticsFile, "---------------------------------------GLOBAL STATUS--------------------------------------------\n"); + file_put_contents(self::$_statisticsFile, 'Workerman version:' . Worker::VERSION . " PHP version:".PHP_VERSION."\n", FILE_APPEND); + file_put_contents(self::$_statisticsFile, 'start time:'. date('Y-m-d H:i:s', self::$_globalStatistics['start_timestamp']).' run ' . floor((time()-self::$_globalStatistics['start_timestamp'])/(24*60*60)). ' days ' . floor(((time()-self::$_globalStatistics['start_timestamp'])%(24*60*60))/(60*60)) . " hours \n", FILE_APPEND); + file_put_contents(self::$_statisticsFile, 'load average: ' . implode(", ", $loadavg) . "\n", FILE_APPEND); + file_put_contents(self::$_statisticsFile, count(self::$_pidMap) . ' workers ' . count(self::getAllWorkerPids())." processes\n", FILE_APPEND); + file_put_contents(self::$_statisticsFile, str_pad('worker_name', self::$_maxWorkerNameLength) . " exit_status exit_count\n", FILE_APPEND); + foreach(self::$_pidMap as $worker_id =>$worker_pid_array) + { + $worker = self::$_workers[$worker_id]; + if(isset(self::$_globalStatistics['worker_exit_info'][$worker_id])) + { + foreach(self::$_globalStatistics['worker_exit_info'][$worker_id] as $worker_exit_status=>$worker_exit_count) + { + file_put_contents(self::$_statisticsFile, str_pad($worker->name, self::$_maxWorkerNameLength) . " " . str_pad($worker_exit_status, 16). " $worker_exit_count\n", FILE_APPEND); + } + } + else + { + file_put_contents(self::$_statisticsFile, str_pad($worker->name, self::$_maxWorkerNameLength) . " " . str_pad(0, 16). " 0\n", FILE_APPEND); + } + } + file_put_contents(self::$_statisticsFile, "---------------------------------------PROCESS STATUS-------------------------------------------\n", FILE_APPEND); + file_put_contents(self::$_statisticsFile, "pid\tmemory ".str_pad('listening', self::$_maxSocketNameLength)." ".str_pad('worker_name', self::$_maxWorkerNameLength)." connections ".str_pad('total_request', 13)." ".str_pad('send_fail', 9)." ".str_pad('throw_exception', 15)."\n", FILE_APPEND); + + chmod(self::$_statisticsFile, 0722); + + foreach(self::getAllWorkerPids() as $worker_pid) + { + posix_kill($worker_pid, SIGUSR2); + } + return; + } + + // 子进程部分 + $worker = current(self::$_workers); + $wrker_status_str = posix_getpid()."\t".str_pad(round(memory_get_usage()/(1024*1024),2)."M", 7)." " .str_pad($worker->getSocketName(), self::$_maxSocketNameLength) ." ".str_pad(($worker->name == $worker->getSocketName() ? 'none' : $worker->name), self::$_maxWorkerNameLength)." "; + $wrker_status_str .= str_pad(ConnectionInterface::$statistics['connection_count'], 11)." ".str_pad(ConnectionInterface::$statistics['total_request'], 14)." ".str_pad(ConnectionInterface::$statistics['send_fail'],9)." ".str_pad(ConnectionInterface::$statistics['throw_exception'],15)."\n"; + file_put_contents(self::$_statisticsFile, $wrker_status_str, FILE_APPEND); + } + + /** + * 检查错误 + * @return void + */ + public static function checkErrors() + { + if(self::STATUS_SHUTDOWN != self::$_status) + { + $error_msg = "WORKER EXIT UNEXPECTED "; + $errors = error_get_last(); + if($errors && ($errors['type'] == E_ERROR || + $errors['type'] == E_PARSE || + $errors['type'] == E_CORE_ERROR || + $errors['type'] == E_COMPILE_ERROR || + $errors['type'] == E_RECOVERABLE_ERROR )) + { + $error_msg .= self::getErrorType($errors['type']) . " {$errors['message']} in {$errors['file']} on line {$errors['line']}"; + } + self::log($error_msg); + } + } + + /** + * 获取错误类型对应的意义 + * @param integer $type + * @return string + */ + protected static function getErrorType($type) + { + switch($type) + { + case E_ERROR: // 1 // + return 'E_ERROR'; + case E_WARNING: // 2 // + return 'E_WARNING'; + case E_PARSE: // 4 // + return 'E_PARSE'; + case E_NOTICE: // 8 // + return 'E_NOTICE'; + case E_CORE_ERROR: // 16 // + return 'E_CORE_ERROR'; + case E_CORE_WARNING: // 32 // + return 'E_CORE_WARNING'; + case E_COMPILE_ERROR: // 64 // + return 'E_COMPILE_ERROR'; + case E_COMPILE_WARNING: // 128 // + return 'E_COMPILE_WARNING'; + case E_USER_ERROR: // 256 // + return 'E_USER_ERROR'; + case E_USER_WARNING: // 512 // + return 'E_USER_WARNING'; + case E_USER_NOTICE: // 1024 // + return 'E_USER_NOTICE'; + case E_STRICT: // 2048 // + return 'E_STRICT'; + case E_RECOVERABLE_ERROR: // 4096 // + return 'E_RECOVERABLE_ERROR'; + case E_DEPRECATED: // 8192 // + return 'E_DEPRECATED'; + case E_USER_DEPRECATED: // 16384 // + return 'E_USER_DEPRECATED'; + } + return ""; + } + + /** + * 记录日志 + * @param string $msg + * @return void + */ + protected static function log($msg) + { + $msg = $msg."\n"; + if(self::$_status === self::STATUS_STARTING || !self::$daemonize) + { + echo $msg; + } + file_put_contents(self::$logFile, date('Y-m-d H:i:s') . " " . $msg, FILE_APPEND | LOCK_EX); + } + + /** + * worker构造函数 + * @param string $socket_name + * @return void + */ + public function __construct($socket_name = '', $context_option = array()) + { + // 保存worker实例 + $this->workerId = spl_object_hash($this); + self::$_workers[$this->workerId] = $this; + self::$_pidMap[$this->workerId] = array(); + + // 获得实例化文件路径,用于自动加载设置根目录 + $backrace = debug_backtrace(); + $this->_appInitPath = dirname($backrace[0]['file']); + + // 设置socket上下文 + if($socket_name) + { + $this->_socketName = $socket_name; + if(!isset($context_option['socket']['backlog'])) + { + $context_option['socket']['backlog'] = self::DEFAUL_BACKLOG; + } + $this->_context = stream_context_create($context_option); + } + } + + /** + * 监听端口 + * @throws Exception + */ + public function listen() + { + // 设置自动加载根目录 + Autoloader::setRootPath($this->_appInitPath); + + if(!$this->_socketName) + { + return; + } + // 获得应用层通讯协议以及监听的地址 + list($scheme, $address) = explode(':', $this->_socketName, 2); + // 如果有指定应用层协议,则检查对应的协议类是否存在 + if($scheme != 'tcp' && $scheme != 'udp') + { + $scheme = ucfirst($scheme); + $this->_protocol = '\\Protocols\\'.$scheme; + if(!class_exists($this->_protocol)) + { + $this->_protocol = "\\Workerman\\Protocols\\$scheme"; + if(!class_exists($this->_protocol)) + { + throw new Exception("class \\Protocols\\$scheme not exist"); + } + } + } + elseif($scheme === 'udp') + { + $this->transport = 'udp'; + } + + // flag + $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN; + $this->_mainSocket = stream_socket_server($this->transport.":".$address, $errno, $errmsg, $flags, $this->_context); + if(!$this->_mainSocket) + { + throw new Exception($errmsg); + } + + // 尝试打开tcp的keepalive,关闭TCP Nagle算法 + if(function_exists('socket_import_stream')) + { + $socket = socket_import_stream($this->_mainSocket ); + @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); + @socket_set_option($socket, SOL_SOCKET, TCP_NODELAY, 1); + } + + // 设置非阻塞 + stream_set_blocking($this->_mainSocket, 0); + + // 放到全局事件轮询中监听_mainSocket可读事件(客户端连接事件) + if(self::$globalEvent) + { + if($this->transport !== 'udp') + { + self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection')); + } + else + { + self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection')); + } + } + } + + /** + * 获得 socket name + * @return string + */ + public function getSocketName() + { + return $this->_socketName ? $this->_socketName : 'none'; + } + + /** + * 运行worker实例 + */ + public function run() + { + // 注册进程退出回调,用来检查是否有错误 + register_shutdown_function(array("\\Workerman\\Worker", 'checkErrors')); + + // 设置自动加载根目录 + Autoloader::setRootPath($this->_appInitPath); + + // 如果没有全局事件轮询,则创建一个 + if(!self::$globalEvent) + { + if(extension_loaded('libevent')) + { + self::$globalEvent = new Libevent(); + } + else + { + self::$globalEvent = new Select(); + } + // 监听_mainSocket上的可读事件(客户端连接事件) + if($this->_socketName) + { + if($this->transport !== 'udp') + { + self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection')); + } + else + { + self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection')); + } + } + } + + // 重新安装事件处理函数,使用全局事件轮询监听信号事件 + self::reinstallSignal(); + + // 用全局事件轮询初始化定时器 + Timer::init(self::$globalEvent); + + // 如果有设置进程启动回调,则执行 + if($this->onWorkerStart) + { + call_user_func($this->onWorkerStart, $this); + } + + // 子进程主循环 + self::$globalEvent->loop(); + } + + /** + * 停止当前worker实例 + * @return void + */ + public function stop() + { + // 如果有设置进程终止回调,则执行 + if($this->onWorkerStop) + { + call_user_func($this->onWorkerStop, $this); + } + // 删除相关监听事件,关闭_mainSocket + self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ); + @fclose($this->_mainSocket); + } + + /** + * 接收一个客户端连接 + * @param resources $socket + * @return void + */ + public function acceptConnection($socket) + { + // 获得客户端连接 + $new_socket = @stream_socket_accept($socket, 0); + // 惊群现象,忽略 + if(false === $new_socket) + { + return; + } + // 统计数据 + ConnectionInterface::$statistics['connection_count']++; + // 初始化连接对象 + $connection = new TcpConnection($new_socket); + $connection->worker = $this; + $connection->protocol = $this->_protocol; + $connection->onMessage = $this->onMessage; + $connection->onClose = $this->onClose; + $connection->onError = $this->onError; + $connection->onBufferDrain = $this->onBufferDrain; + $connection->onBufferFull = $this->onBufferFull; + $this->connections[(int)$new_socket] = $connection; + + // 如果有设置连接回调,则执行 + if($this->onConnect) + { + try + { + call_user_func($this->onConnect, $connection); + } + catch(Exception $e) + { + ConnectionInterface::$statistics['throw_exception']++; + self::log($e); + } + } + } + + /** + * 处理udp连接(udp其实是无连接的,这里为保证和tcp连接接口一致) + * @param resource $socket + */ + public function acceptUdpConnection($socket) + { + $recv_buffer = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $remote_address); + if(false === $recv_buffer || empty($remote_address)) + { + return false; + } + // 模拟一个连接对象 + $connection = new UdpConnection($socket, $remote_address); + if($this->onMessage) + { + $parser = $this->_protocol; + ConnectionInterface::$statistics['total_request']++; + try + { + call_user_func($this->onMessage, $connection, $parser::decode($recv_buffer, $connection)); + } + catch(Exception $e) + { + ConnectionInterface::$statistics['throw_exception']++; + } + } + } +} diff --git a/start.php b/start.php new file mode 100644 index 0000000..af92f44 --- /dev/null +++ b/start.php @@ -0,0 +1,32 @@ +