This commit is contained in:
walkor 2016-10-31 22:43:03 +08:00
parent 4e805e0b43
commit ddf8d1359e
27 changed files with 11 additions and 6240 deletions

3
.gitignore vendored
View File

@ -1,3 +0,0 @@
.buildpath
.project
.settings/org.eclipse.php.core.prefs

View File

@ -1,69 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman;
/**
* Autoload.
*/
class Autoloader
{
/**
* Autoload root path.
*
* @var string
*/
protected static $_autoloadRootPath = '';
/**
* Set autoload root path.
*
* @param string $root_path
* @return void
*/
public static function setRootPath($root_path)
{
self::$_autoloadRootPath = $root_path;
}
/**
* Load files by namespace.
*
* @param string $name
* @return boolean
*/
public static function loadByNamespace($name)
{
$class_path = str_replace('\\', DIRECTORY_SEPARATOR, $name);
if (strpos($name, 'Workerman\\') === 0) {
$class_file = __DIR__ . substr($class_path, strlen('Workerman')) . '.php';
} else {
if (self::$_autoloadRootPath) {
$class_file = self::$_autoloadRootPath . DIRECTORY_SEPARATOR . $class_path . '.php';
}
if (empty($class_file) || !is_file($class_file)) {
$class_file = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . "$class_path.php";
}
}
if (is_file($class_file)) {
require_once($class_file);
if (class_exists($name, false)) {
return true;
}
}
return false;
}
}
spl_autoload_register('\Workerman\Autoloader::loadByNamespace');

View File

@ -1,274 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Connection;
use Workerman\Events\EventInterface;
use Workerman\Worker;
use Exception;
/**
* AsyncTcpConnection.
*/
class AsyncTcpConnection extends TcpConnection
{
/**
* Emitted when socket connection is successfully established.
*
* @var callback
*/
public $onConnect = null;
/**
* Transport layer protocol.
*
* @var string
*/
public $transport = 'tcp';
/**
* Status.
*
* @var int
*/
protected $_status = self::STATUS_INITIAL;
/**
* Remote host.
*
* @var string
*/
protected $_remoteHost = '';
/**
* Connect start time.
*
* @var string
*/
protected $_connectStartTime = 0;
/**
* Remote URI.
*
* @var string
*/
protected $_remoteURI = '';
/**
* PHP built-in protocols.
*
* @var array
*/
protected static $_builtinTransports = array(
'tcp' => 'tcp',
'udp' => 'udp',
'unix' => 'unix',
'ssl' => 'ssl',
'sslv2' => 'sslv2',
'sslv3' => 'sslv3',
'tls' => 'tls'
);
/**
* Construct.
*
* @param string $remote_address
* @throws Exception
*/
public function __construct($remote_address)
{
$address_info = parse_url($remote_address);
if (!$address_info) {
echo new \Exception('bad remote_address');
$this->_remoteAddress = $remote_address;
} else {
if (!isset($address_info['port'])) {
$address_info['port'] = 80;
}
if (!isset($address_info['path'])) {
$address_info['path'] = '/';
}
if (!isset($address_info['query'])) {
$address_info['query'] = '';
} else {
$address_info['query'] = '?' . $address_info['query'];
}
$this->_remoteAddress = "{$address_info['host']}:{$address_info['port']}";
$this->_remoteHost = $address_info['host'];
$this->_remoteURI = "{$address_info['path']}{$address_info['query']}";
$scheme = isset($address_info['scheme']) ? $address_info['scheme'] : 'tcp';
}
$this->id = self::$_idRecorder++;
// Check application layer protocol class.
if (!isset(self::$_builtinTransports[$scheme])) {
$scheme = ucfirst($scheme);
$this->protocol = '\\Protocols\\' . $scheme;
if (!class_exists($this->protocol)) {
$this->protocol = "\\Workerman\\Protocols\\$scheme";
if (!class_exists($this->protocol)) {
throw new Exception("class \\Protocols\\$scheme not exist");
}
}
} else {
$this->transport = self::$_builtinTransports[$scheme];
}
// For statistics.
self::$statistics['connection_count']++;
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
}
/**
* Do connect.
*
* @return void
*/
public function connect()
{
if ($this->_status !== self::STATUS_INITIAL && $this->_status !== self::STATUS_CLOSING && $this->_status !== self::STATUS_CLOSED) {
return;
}
$this->_status = self::STATUS_CONNECTING;
$this->_connectStartTime = microtime(true);
// Open socket connection asynchronously.
$this->_socket = stream_socket_client("{$this->transport}://{$this->_remoteAddress}", $errno, $errstr, 0,
STREAM_CLIENT_ASYNC_CONNECT);
// If failed attempt to emit onError callback.
if (!$this->_socket) {
$this->emitError(WORKERMAN_CONNECT_FAIL, $errstr);
if ($this->_status === self::STATUS_CLOSING) {
$this->destroy();
}
if ($this->_status === self::STATUS_CLOSED) {
$this->onConnect = null;
}
return;
}
// Add socket to global event loop waiting connection is successfully established or faild.
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'checkConnection'));
}
/**
* Get remote address.
*
* @return string
*/
public function getRemoteHost()
{
return $this->_remoteHost;
}
/**
* Get remote URI.
*
* @return string
*/
public function getRemoteURI()
{
return $this->_remoteURI;
}
/**
* Try to emit onError callback.
*
* @param int $code
* @param string $msg
* @return void
*/
protected function emitError($code, $msg)
{
$this->_status = self::STATUS_CLOSING;
if ($this->onError) {
try {
call_user_func($this->onError, $this, $code, $msg);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
}
/**
* Check connection is successfully established or faild.
*
* @param resource $socket
* @return void
*/
public function checkConnection($socket)
{
// Check socket state.
if ($address = stream_socket_get_name($socket, true)) {
// Remove write listener.
Worker::$globalEvent->del($socket, EventInterface::EV_WRITE);
// Nonblocking.
stream_set_blocking($socket, 0);
// Compatible with hhvm
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($socket, 0);
}
// Try to open keepalive for tcp and disable Nagle algorithm.
if (function_exists('socket_import_stream') && $this->transport === 'tcp') {
$raw_socket = socket_import_stream($socket);
socket_set_option($raw_socket, SOL_SOCKET, SO_KEEPALIVE, 1);
socket_set_option($raw_socket, SOL_TCP, TCP_NODELAY, 1);
}
// Register a listener waiting read event.
Worker::$globalEvent->add($socket, EventInterface::EV_READ, array($this, 'baseRead'));
// There are some data waiting to send.
if ($this->_sendBuffer) {
Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
}
$this->_status = self::STATUS_ESTABLISH;
$this->_remoteAddress = $address;
// Try to emit onConnect callback.
if ($this->onConnect) {
try {
call_user_func($this->onConnect, $this);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
// Try to emit protocol::onConnect
if (method_exists($this->protocol, 'onConnect')) {
try {
call_user_func(array($this->protocol, 'onConnect'), $this);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
} else {
// Connection failed.
$this->emitError(WORKERMAN_CONNECT_FAIL, 'connect ' . $this->_remoteAddress . ' fail after ' . round(microtime(true) - $this->_connectStartTime, 4) . ' seconds');
if ($this->_status === self::STATUS_CLOSING) {
$this->destroy();
}
if ($this->_status === self::STATUS_CLOSED) {
$this->onConnect = null;
}
}
}
}

View File

@ -1,83 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Connection;
/**
* ConnectionInterface.
*/
abstract class ConnectionInterface
{
/**
* Statistics for status command.
*
* @var array
*/
public static $statistics = array(
'connection_count' => 0,
'total_request' => 0,
'throw_exception' => 0,
'send_fail' => 0,
);
/**
* Emitted when data is received.
*
* @var callback
*/
public $onMessage = null;
/**
* Emitted when the other end of the socket sends a FIN packet.
*
* @var callback
*/
public $onClose = null;
/**
* Emitted when an error occurs with connection.
*
* @var callback
*/
public $onError = null;
/**
* Sends data on the connection.
*
* @param string $send_buffer
* @return void|boolean
*/
abstract public function send($send_buffer);
/**
* Get remote IP.
*
* @return string
*/
abstract public function getRemoteIp();
/**
* Get remote port.
*
* @return int
*/
abstract public function getRemotePort();
/**
* Close connection.
*
* @param $data
* @return void
*/
abstract public function close($data = null);
}

View File

@ -1,652 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Connection;
use Workerman\Events\EventInterface;
use Workerman\Worker;
use Exception;
/**
* TcpConnection.
*/
class TcpConnection extends ConnectionInterface
{
/**
* Read buffer size.
*
* @var int
*/
const READ_BUFFER_SIZE = 65535;
/**
* Status initial.
*
* @var int
*/
const STATUS_INITIAL = 0;
/**
* Status connecting.
*
* @var int
*/
const STATUS_CONNECTING = 1;
/**
* Status connection established.
*
* @var int
*/
const STATUS_ESTABLISH = 2;
/**
* Status closing.
*
* @var int
*/
const STATUS_CLOSING = 4;
/**
* Status closed.
*
* @var int
*/
const STATUS_CLOSED = 8;
/**
* Emitted when data is received.
*
* @var callback
*/
public $onMessage = null;
/**
* Emitted when the other end of the socket sends a FIN packet.
*
* @var callback
*/
public $onClose = null;
/**
* Emitted when an error occurs with connection.
*
* @var callback
*/
public $onError = null;
/**
* Emitted when the send buffer becomes full.
*
* @var callback
*/
public $onBufferFull = null;
/**
* Emitted when the send buffer becomes empty.
*
* @var callback
*/
public $onBufferDrain = null;
/**
* Application layer protocol.
* The format is like this Workerman\\Protocols\\Http.
*
* @var \Workerman\Protocols\ProtocolInterface
*/
public $protocol = null;
/**
* Which worker belong to.
*
* @var Worker
*/
public $worker = null;
/**
* Connection->id.
*
* @var int
*/
public $id = 0;
/**
* A copy of $worker->id which used to clean up the connection in worker->connections
*
* @var int
*/
protected $_id = 0;
/**
* Sets the maximum send buffer size for the current connection.
* OnBufferFull callback will be emited When the send buffer is full.
*
* @var int
*/
public $maxSendBufferSize = 1048576;
/**
* Default send buffer size.
*
* @var int
*/
public static $defaultMaxSendBufferSize = 1048576;
/**
* Maximum acceptable packet size.
*
* @var int
*/
public static $maxPackageSize = 10485760;
/**
* Id recorder.
*
* @var int
*/
protected static $_idRecorder = 1;
/**
* Socket
*
* @var resource
*/
protected $_socket = null;
/**
* Send buffer.
*
* @var string
*/
protected $_sendBuffer = '';
/**
* Receive buffer.
*
* @var string
*/
protected $_recvBuffer = '';
/**
* Current package length.
*
* @var int
*/
protected $_currentPackageLength = 0;
/**
* Connection status.
*
* @var int
*/
protected $_status = self::STATUS_ESTABLISH;
/**
* Remote address.
*
* @var string
*/
protected $_remoteAddress = '';
/**
* Is paused.
*
* @var bool
*/
protected $_isPaused = false;
/**
* Construct.
*
* @param resource $socket
* @param string $remote_address
*/
public function __construct($socket, $remote_address = '')
{
self::$statistics['connection_count']++;
$this->id = $this->_id = self::$_idRecorder++;
$this->_socket = $socket;
stream_set_blocking($this->_socket, 0);
// Compatible with hhvm
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($this->_socket, 0);
}
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
$this->_remoteAddress = $remote_address;
}
/**
* Sends data on the connection.
*
* @param string $send_buffer
* @param bool $raw
* @return void|bool|null
*/
public function send($send_buffer, $raw = false)
{
// Try to call protocol::encode($send_buffer) before sending.
if (false === $raw && $this->protocol) {
$parser = $this->protocol;
$send_buffer = $parser::encode($send_buffer, $this);
if ($send_buffer === '') {
return null;
}
}
if ($this->_status === self::STATUS_INITIAL || $this->_status === self::STATUS_CONNECTING) {
$this->_sendBuffer .= $send_buffer;
return null;
} elseif ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
return false;
}
// Attempt to send data directly.
if ($this->_sendBuffer === '') {
$len = @fwrite($this->_socket, $send_buffer);
// send successful.
if ($len === strlen($send_buffer)) {
return true;
}
// Send only part of the data.
if ($len > 0) {
$this->_sendBuffer = substr($send_buffer, $len);
} else {
// Connection closed?
if (!is_resource($this->_socket) || feof($this->_socket)) {
self::$statistics['send_fail']++;
if ($this->onError) {
try {
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client closed');
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
$this->destroy();
return false;
}
$this->_sendBuffer = $send_buffer;
}
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
// Check if the send buffer is full.
$this->checkBufferIsFull();
return null;
} else {
// Buffer has been marked as full but still has data to send the packet is discarded.
if ($this->maxSendBufferSize <= strlen($this->_sendBuffer)) {
self::$statistics['send_fail']++;
if ($this->onError) {
try {
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
return false;
}
$this->_sendBuffer .= $send_buffer;
// Check if the send buffer is full.
$this->checkBufferIsFull();
}
}
/**
* Get remote IP.
*
* @return string
*/
public function getRemoteIp()
{
$pos = strrpos($this->_remoteAddress, ':');
if ($pos) {
return trim(substr($this->_remoteAddress, 0, $pos), '[]');
}
return '';
}
/**
* Get remote port.
*
* @return int
*/
public function getRemotePort()
{
if ($this->_remoteAddress) {
return (int)substr(strrchr($this->_remoteAddress, ':'), 1);
}
return 0;
}
/**
* Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
*
* @return void
*/
public function pauseRecv()
{
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
$this->_isPaused = true;
}
/**
* Resumes reading after a call to pauseRecv.
*
* @return void
*/
public function resumeRecv()
{
if ($this->_isPaused === true) {
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
$this->_isPaused = false;
$this->baseRead($this->_socket, false);
}
}
/**
* Base read handler.
*
* @param resource $socket
* @param bool $check_eof
* @return void
*/
public function baseRead($socket, $check_eof = true)
{
$buffer = fread($socket, self::READ_BUFFER_SIZE);
// Check connection closed.
if ($buffer === '' || $buffer === false) {
if ($check_eof && (feof($socket) || !is_resource($socket) || $buffer === false)) {
$this->destroy();
return;
}
} else {
$this->_recvBuffer .= $buffer;
}
// If the application layer protocol has been set up.
if ($this->protocol) {
$parser = $this->protocol;
while ($this->_recvBuffer !== '' && !$this->_isPaused) {
// The current packet length is known.
if ($this->_currentPackageLength) {
// Data is not enough for a package.
if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
break;
}
} else {
// Get current package length.
$this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
// The packet length is unknown.
if ($this->_currentPackageLength === 0) {
break;
} elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize) {
// Data is not enough for a package.
if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
break;
}
} // Wrong package.
else {
echo 'error package. package_length=' . var_export($this->_currentPackageLength, true);
$this->destroy();
return;
}
}
// The data is enough for a packet.
self::$statistics['total_request']++;
// The current packet length is equal to the length of the buffer.
if (strlen($this->_recvBuffer) === $this->_currentPackageLength) {
$one_request_buffer = $this->_recvBuffer;
$this->_recvBuffer = '';
} else {
// Get a full package from the buffer.
$one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
// Remove the current package from the receive buffer.
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
}
// Reset the current packet length to 0.
$this->_currentPackageLength = 0;
if (!$this->onMessage) {
continue;
}
try {
// Decode request buffer before Emitting onMessage callback.
call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
return;
}
if ($this->_recvBuffer === '' || $this->_isPaused) {
return;
}
// Applications protocol is not set.
self::$statistics['total_request']++;
if (!$this->onMessage) {
$this->_recvBuffer = '';
return;
}
try {
call_user_func($this->onMessage, $this, $this->_recvBuffer);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
// Clean receive buffer.
$this->_recvBuffer = '';
}
/**
* Base write handler.
*
* @return void|bool
*/
public function baseWrite()
{
$len = @fwrite($this->_socket, $this->_sendBuffer);
if ($len === strlen($this->_sendBuffer)) {
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
$this->_sendBuffer = '';
// Try to emit onBufferDrain callback when the send buffer becomes empty.
if ($this->onBufferDrain) {
try {
call_user_func($this->onBufferDrain, $this);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
if ($this->_status === self::STATUS_CLOSING) {
$this->destroy();
}
return true;
}
if ($len > 0) {
$this->_sendBuffer = substr($this->_sendBuffer, $len);
} else {
self::$statistics['send_fail']++;
$this->destroy();
}
}
/**
* This method pulls all the data out of a readable stream, and writes it to the supplied destination.
*
* @param TcpConnection $dest
* @return void
*/
public function pipe($dest)
{
$source = $this;
$this->onMessage = function ($source, $data) use ($dest) {
$dest->send($data);
};
$this->onClose = function ($source) use ($dest) {
$dest->destroy();
};
$dest->onBufferFull = function ($dest) use ($source) {
$source->pauseRecv();
};
$dest->onBufferDrain = function ($dest) use ($source) {
$source->resumeRecv();
};
}
/**
* Remove $length of data from receive buffer.
*
* @param int $length
* @return void
*/
public function consumeRecvBuffer($length)
{
$this->_recvBuffer = substr($this->_recvBuffer, $length);
}
/**
* Close connection.
*
* @param mixed $data
* @param bool $raw
* @return void
*/
public function close($data = null, $raw = false)
{
if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
return;
} else {
if ($data !== null) {
$this->send($data, $raw);
}
$this->_status = self::STATUS_CLOSING;
}
if ($this->_sendBuffer === '') {
$this->destroy();
}
}
/**
* Get the real socket.
*
* @return resource
*/
public function getSocket()
{
return $this->_socket;
}
/**
* Check whether the send buffer is full.
*
* @return void
*/
protected function checkBufferIsFull()
{
if ($this->maxSendBufferSize <= strlen($this->_sendBuffer)) {
if ($this->onBufferFull) {
try {
call_user_func($this->onBufferFull, $this);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
}
}
/**
* Destroy connection.
*
* @return void
*/
public function destroy()
{
// Avoid repeated calls.
if ($this->_status === self::STATUS_CLOSED) {
return;
}
// Remove event listener.
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
// Close socket.
@fclose($this->_socket);
// Remove from worker->connections.
if ($this->worker) {
unset($this->worker->connections[$this->_id]);
}
$this->_status = self::STATUS_CLOSED;
// Try to emit onClose callback.
if ($this->onClose) {
try {
call_user_func($this->onClose, $this);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
// Try to emit protocol::onClose
if (method_exists($this->protocol, 'onClose')) {
try {
call_user_func(array($this->protocol, 'onClose'), $this);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
if ($this->_status === self::STATUS_CLOSED) {
// Cleaning up the callback to avoid memory leaks.
$this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
}
}
/**
* Destruct.
*
* @return void
*/
public function __destruct()
{
self::$statistics['connection_count']--;
}
}

View File

@ -1,115 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Connection;
/**
* UdpConnection.
*/
class UdpConnection extends ConnectionInterface
{
/**
* Application layer protocol.
* The format is like this Workerman\\Protocols\\Http.
*
* @var \Workerman\Protocols\ProtocolInterface
*/
public $protocol = null;
/**
* Udp socket.
*
* @var resource
*/
protected $_socket = null;
/**
* Remote address.
*
* @var string
*/
protected $_remoteAddress = '';
/**
* Construct.
*
* @param resource $socket
* @param string $remote_address
*/
public function __construct($socket, $remote_address)
{
$this->_socket = $socket;
$this->_remoteAddress = $remote_address;
}
/**
* Sends data on the connection.
*
* @param string $send_buffer
* @param bool $raw
* @return void|boolean
*/
public function send($send_buffer, $raw = false)
{
if (false === $raw && $this->protocol) {
$parser = $this->protocol;
$send_buffer = $parser::encode($send_buffer, $this);
if ($send_buffer === '') {
return null;
}
}
return strlen($send_buffer) === stream_socket_sendto($this->_socket, $send_buffer, 0, $this->_remoteAddress);
}
/**
* Get remote IP.
*
* @return string
*/
public function getRemoteIp()
{
$pos = strrpos($this->_remoteAddress, ':');
if ($pos) {
return trim(substr($this->_remoteAddress, 0, $pos), '[]');
}
return '';
}
/**
* Get remote port.
*
* @return int
*/
public function getRemotePort()
{
if ($this->_remoteAddress) {
return (int)substr(strrchr($this->_remoteAddress, ':'), 1);
}
return 0;
}
/**
* Close connection.
*
* @param mixed $data
* @param bool $raw
* @return bool
*/
public function close($data = null, $raw = false)
{
if ($data !== null) {
$this->send($data, $raw);
}
return true;
}
}

View File

@ -1,173 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author 有个鬼<42765633@qq.com>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Events;
use Workerman\Worker;
/**
* ev eventloop
*/
class Ev implements EventInterface
{
/**
* All listeners for read/write event.
*
* @var array
*/
protected $_allEvents = array();
/**
* Event listeners of signal.
*
* @var array
*/
protected $_eventSignal = array();
/**
* All timer event listeners.
* [func, args, event, flag, time_interval]
*
* @var array
*/
protected $_eventTimer = array();
/**
* Timer id.
*
* @var int
*/
protected static $_timerId = 1;
/**
* Add a timer.
* {@inheritdoc}
*/
public function add($fd, $flag, $func, $args = null)
{
$callback = function ($event, $socket) use ($fd, $func) {
try {
call_user_func($func, $fd);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
};
switch ($flag) {
case self::EV_SIGNAL:
$event = new \EvSignal($fd, $callback);
$this->_eventSignal[$fd] = $event;
return true;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$repeat = $flag == self::EV_TIMER_ONCE ? 0 : $fd;
$param = array($func, (array)$args, $flag, $fd, self::$_timerId);
$event = new \EvTimer($fd, $repeat, array($this, 'timerCallback'), $param);
$this->_eventTimer[self::$_timerId] = $event;
return self::$_timerId++;
default :
$fd_key = (int)$fd;
$real_flag = $flag === self::EV_READ ? \Ev::READ : \Ev::WRITE;
$event = new \EvIo($fd, $real_flag, $callback);
$this->_allEvents[$fd_key][$flag] = $event;
return true;
}
}
/**
* Remove a timer.
* {@inheritdoc}
*/
public function del($fd, $flag)
{
switch ($flag) {
case self::EV_READ:
case self::EV_WRITE:
$fd_key = (int)$fd;
if (isset($this->_allEvents[$fd_key][$flag])) {
$this->_allEvents[$fd_key][$flag]->stop();
unset($this->_allEvents[$fd_key][$flag]);
}
if (empty($this->_allEvents[$fd_key])) {
unset($this->_allEvents[$fd_key]);
}
break;
case self::EV_SIGNAL:
$fd_key = (int)$fd;
if (isset($this->_eventSignal[$fd_key])) {
$this->_allEvents[$fd_key][$flag]->stop();
unset($this->_eventSignal[$fd_key]);
}
break;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
if (isset($this->_eventTimer[$fd])) {
$this->_eventTimer[$fd]->stop();
unset($this->_eventTimer[$fd]);
}
break;
}
return true;
}
/**
* Timer callback.
*
* @param \EvWatcher $event
*/
public function timerCallback($event)
{
$param = $event->data;
$timer_id = $param[4];
if ($param[2] === self::EV_TIMER_ONCE) {
$this->_eventTimer[$timer_id]->stop();
unset($this->_eventTimer[$timer_id]);
}
try {
call_user_func_array($param[0], $param[1]);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
/**
* Remove all timers.
*
* @return void
*/
public function clearAllTimer()
{
foreach ($this->_eventTimer as $event) {
$event->stop();
}
$this->_eventTimer = array();
}
/**
* Main loop.
*
* @see EventInterface::loop()
*/
public function loop()
{
\Ev::run();
}
}

View File

@ -1,188 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author 有个鬼<42765633@qq.com>
* @copyright 有个鬼<42765633@qq.com>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Events;
use Workerman\Worker;
/**
* libevent eventloop
*/
class Event implements EventInterface
{
/**
* Event base.
* @var object
*/
protected $_eventBase = null;
/**
* All listeners for read/write event.
* @var array
*/
protected $_allEvents = array();
/**
* Event listeners of signal.
* @var array
*/
protected $_eventSignal = array();
/**
* All timer event listeners.
* [func, args, event, flag, time_interval]
* @var array
*/
protected $_eventTimer = array();
/**
* Timer id.
* @var int
*/
protected static $_timerId = 1;
/**
* construct
* @return void
*/
public function __construct()
{
$this->_eventBase = new \EventBase();
}
/**
* @see EventInterface::add()
*/
public function add($fd, $flag, $func, $args=array())
{
switch ($flag) {
case self::EV_SIGNAL:
$fd_key = (int)$fd;
$event = \Event::signal($this->_eventBase, $fd, $func);
if (!$event||!$event->add()) {
return false;
}
$this->_eventSignal[$fd_key] = $event;
return true;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$param = array($func, (array)$args, $flag, $fd, self::$_timerId);
$event = new \Event($this->_eventBase, -1, \Event::TIMEOUT|\Event::PERSIST, array($this, "timerCallback"), $param);
if (!$event||!$event->addTimer($fd)) {
return false;
}
$this->_eventTimer[self::$_timerId] = $event;
return self::$_timerId++;
default :
$fd_key = (int)$fd;
$real_flag = $flag === self::EV_READ ? \Event::READ | \Event::PERSIST : \Event::WRITE | \Event::PERSIST;
$event = new \Event($this->_eventBase, $fd, $real_flag, $func, $fd);
if (!$event||!$event->add()) {
return false;
}
$this->_allEvents[$fd_key][$flag] = $event;
return true;
}
}
/**
* @see Events\EventInterface::del()
*/
public function del($fd, $flag)
{
switch ($flag) {
case self::EV_READ:
case self::EV_WRITE:
$fd_key = (int)$fd;
if (isset($this->_allEvents[$fd_key][$flag])) {
$this->_allEvents[$fd_key][$flag]->del();
unset($this->_allEvents[$fd_key][$flag]);
}
if (empty($this->_allEvents[$fd_key])) {
unset($this->_allEvents[$fd_key]);
}
break;
case self::EV_SIGNAL:
$fd_key = (int)$fd;
if (isset($this->_eventSignal[$fd_key])) {
$this->_allEvents[$fd_key][$flag]->del();
unset($this->_eventSignal[$fd_key]);
}
break;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
if (isset($this->_eventTimer[$fd])) {
$this->_eventTimer[$fd]->del();
unset($this->_eventTimer[$fd]);
}
break;
}
return true;
}
/**
* Timer callback.
* @param null $fd
* @param int $what
* @param int $timer_id
*/
public function timerCallback($fd, $what, $param)
{
$timer_id = $param[4];
if ($param[2] === self::EV_TIMER_ONCE) {
$this->_eventTimer[$timer_id]->del();
unset($this->_eventTimer[$timer_id]);
}
try {
call_user_func_array($param[0], $param[1]);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
/**
* @see Events\EventInterface::clearAllTimer()
* @return void
*/
public function clearAllTimer()
{
foreach ($this->_eventTimer as $event) {
$event->del();
}
$this->_eventTimer = array();
}
/**
* @see EventInterface::loop()
*/
public function loop()
{
$this->_eventBase->loop();
}
}

View File

@ -1,86 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Events;
interface EventInterface
{
/**
* Read event.
*
* @var int
*/
const EV_READ = 1;
/**
* Write event.
*
* @var int
*/
const EV_WRITE = 2;
/**
* Signal event.
*
* @var int
*/
const EV_SIGNAL = 4;
/**
* Timer event.
*
* @var int
*/
const EV_TIMER = 8;
/**
* Timer once event.
*
* @var int
*/
const EV_TIMER_ONCE = 16;
/**
* Add event listener to event loop.
*
* @param mixed $fd
* @param int $flag
* @param callable $func
* @param mixed $args
* @return bool
*/
public function add($fd, $flag, $func, $args = null);
/**
* Remove event listener from event loop.
*
* @param mixed $fd
* @param int $flag
* @return bool
*/
public function del($fd, $flag);
/**
* Remove all timers.
*
* @return void
*/
public function clearAllTimer();
/**
* Main loop.
*
* @return void
*/
public function loop();
}

View File

@ -1,205 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Events;
use Workerman\Worker;
/**
* libevent eventloop
*/
class Libevent implements EventInterface
{
/**
* Event base.
*
* @var resource
*/
protected $_eventBase = null;
/**
* All listeners for read/write event.
*
* @var array
*/
protected $_allEvents = array();
/**
* Event listeners of signal.
*
* @var array
*/
protected $_eventSignal = array();
/**
* All timer event listeners.
* [func, args, event, flag, time_interval]
*
* @var array
*/
protected $_eventTimer = array();
/**
* construct
*/
public function __construct()
{
$this->_eventBase = event_base_new();
}
/**
* {@inheritdoc}
*/
public function add($fd, $flag, $func, $args = array())
{
switch ($flag) {
case self::EV_SIGNAL:
$fd_key = (int)$fd;
$real_flag = EV_SIGNAL | EV_PERSIST;
$this->_eventSignal[$fd_key] = event_new();
if (!event_set($this->_eventSignal[$fd_key], $fd, $real_flag, $func, null)) {
return false;
}
if (!event_base_set($this->_eventSignal[$fd_key], $this->_eventBase)) {
return false;
}
if (!event_add($this->_eventSignal[$fd_key])) {
return false;
}
return true;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$event = event_new();
$timer_id = (int)$event;
if (!event_set($event, 0, EV_TIMEOUT, array($this, 'timerCallback'), $timer_id)) {
return false;
}
if (!event_base_set($event, $this->_eventBase)) {
return false;
}
$time_interval = $fd * 1000000;
if (!event_add($event, $time_interval)) {
return false;
}
$this->_eventTimer[$timer_id] = array($func, (array)$args, $event, $flag, $time_interval);
return $timer_id;
default :
$fd_key = (int)$fd;
$real_flag = $flag === self::EV_READ ? EV_READ | EV_PERSIST : EV_WRITE | EV_PERSIST;
$event = event_new();
if (!event_set($event, $fd, $real_flag, $func, null)) {
return false;
}
if (!event_base_set($event, $this->_eventBase)) {
return false;
}
if (!event_add($event)) {
return false;
}
$this->_allEvents[$fd_key][$flag] = $event;
return true;
}
}
/**
* {@inheritdoc}
*/
public function del($fd, $flag)
{
switch ($flag) {
case self::EV_READ:
case self::EV_WRITE:
$fd_key = (int)$fd;
if (isset($this->_allEvents[$fd_key][$flag])) {
event_del($this->_allEvents[$fd_key][$flag]);
unset($this->_allEvents[$fd_key][$flag]);
}
if (empty($this->_allEvents[$fd_key])) {
unset($this->_allEvents[$fd_key]);
}
break;
case self::EV_SIGNAL:
$fd_key = (int)$fd;
if (isset($this->_eventSignal[$fd_key])) {
event_del($this->_eventSignal[$fd_key]);
unset($this->_eventSignal[$fd_key]);
}
break;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
// 这里 fd 为timerid
if (isset($this->_eventTimer[$fd])) {
event_del($this->_eventTimer[$fd][2]);
unset($this->_eventTimer[$fd]);
}
break;
}
return true;
}
/**
* Timer callback.
*
* @param mixed $_null1
* @param int $_null2
* @param mixed $timer_id
*/
protected function timerCallback($_null1, $_null2, $timer_id)
{
if ($this->_eventTimer[$timer_id][3] === self::EV_TIMER) {
event_add($this->_eventTimer[$timer_id][2], $this->_eventTimer[$timer_id][4]);
}
try {
call_user_func_array($this->_eventTimer[$timer_id][0], $this->_eventTimer[$timer_id][1]);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
if (isset($this->_eventTimer[$timer_id]) && $this->_eventTimer[$timer_id][3] === self::EV_TIMER_ONCE) {
$this->del($timer_id, self::EV_TIMER_ONCE);
}
}
/**
* {@inheritdoc}
*/
public function clearAllTimer()
{
foreach ($this->_eventTimer as $task_data) {
event_del($task_data[2]);
}
$this->_eventTimer = array();
}
/**
* {@inheritdoc}
*/
public function loop()
{
event_base_loop($this->_eventBase);
}
}

View File

@ -1,263 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Events;
/**
* select eventloop
*/
class Select implements EventInterface
{
/**
* All listeners for read/write event.
*
* @var array
*/
public $_allEvents = array();
/**
* Event listeners of signal.
*
* @var array
*/
public $_signalEvents = array();
/**
* Fds waiting for read event.
*
* @var array
*/
protected $_readFds = array();
/**
* Fds waiting for write event.
*
* @var array
*/
protected $_writeFds = array();
/**
* Timer scheduler.
* {['data':timer_id, 'priority':run_timestamp], ..}
*
* @var \SplPriorityQueue
*/
protected $_scheduler = null;
/**
* All timer event listeners.
* [[func, args, flag, timer_interval], ..]
*
* @var array
*/
protected $_task = array();
/**
* Timer id.
*
* @var int
*/
protected $_timerId = 1;
/**
* Select timeout.
*
* @var int
*/
protected $_selectTimeout = 100000000;
/**
* Paired socket channels
*
* @var array
*/
protected $channel = array();
/**
* Construct.
*/
public function __construct()
{
// Create a pipeline and put into the collection of the read to read the descriptor to avoid empty polling.
$this->channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
if ($this->channel) {
stream_set_blocking($this->channel[0], 0);
$this->_readFds[0] = $this->channel[0];
}
// Init SplPriorityQueue.
$this->_scheduler = new \SplPriorityQueue();
$this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
}
/**
* {@inheritdoc}
*/
public function add($fd, $flag, $func, $args = array())
{
switch ($flag) {
case self::EV_READ:
$fd_key = (int)$fd;
$this->_allEvents[$fd_key][$flag] = array($func, $fd);
$this->_readFds[$fd_key] = $fd;
break;
case self::EV_WRITE:
$fd_key = (int)$fd;
$this->_allEvents[$fd_key][$flag] = array($func, $fd);
$this->_writeFds[$fd_key] = $fd;
break;
case self::EV_SIGNAL:
$fd_key = (int)$fd;
$this->_signalEvents[$fd_key][$flag] = array($func, $fd);
pcntl_signal($fd, array($this, 'signalHandler'));
break;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$run_time = microtime(true) + $fd;
$this->_scheduler->insert($this->_timerId, -$run_time);
$this->_task[$this->_timerId] = array($func, (array)$args, $flag, $fd);
$this->tick();
return $this->_timerId++;
}
return true;
}
/**
* Signal handler.
*
* @param int $signal
*/
public function signalHandler($signal)
{
call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal));
}
/**
* {@inheritdoc}
*/
public function del($fd, $flag)
{
$fd_key = (int)$fd;
switch ($flag) {
case self::EV_READ:
unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]);
if (empty($this->_allEvents[$fd_key])) {
unset($this->_allEvents[$fd_key]);
}
return true;
case self::EV_WRITE:
unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]);
if (empty($this->_allEvents[$fd_key])) {
unset($this->_allEvents[$fd_key]);
}
return true;
case self::EV_SIGNAL:
unset($this->_signalEvents[$fd_key]);
pcntl_signal($fd, SIG_IGN);
break;
case self::EV_TIMER:
case self::EV_TIMER_ONCE;
unset($this->_task[$fd_key]);
return true;
}
return false;
}
/**
* Tick for timer.
*
* @return void
*/
protected function tick()
{
while (!$this->_scheduler->isEmpty()) {
$scheduler_data = $this->_scheduler->top();
$timer_id = $scheduler_data['data'];
$next_run_time = -$scheduler_data['priority'];
$time_now = microtime(true);
$this->_selectTimeout = ($next_run_time - $time_now) * 1000000;
if ($this->_selectTimeout <= 0) {
$this->_scheduler->extract();
if (!isset($this->_task[$timer_id])) {
continue;
}
// [func, args, flag, timer_interval]
$task_data = $this->_task[$timer_id];
if ($task_data[2] === self::EV_TIMER) {
$next_run_time = $time_now + $task_data[3];
$this->_scheduler->insert($timer_id, -$next_run_time);
}
call_user_func_array($task_data[0], $task_data[1]);
if (isset($this->_task[$timer_id]) && $task_data[2] === self::EV_TIMER_ONCE) {
$this->del($timer_id, self::EV_TIMER_ONCE);
}
continue;
}
return;
}
$this->_selectTimeout = 100000000;
}
/**
* {@inheritdoc}
*/
public function clearAllTimer()
{
$this->_scheduler = new \SplPriorityQueue();
$this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
$this->_task = array();
}
/**
* {@inheritdoc}
*/
public function loop()
{
$e = null;
while (1) {
// Calls signal handlers for pending signals
pcntl_signal_dispatch();
$read = $this->_readFds;
$write = $this->_writeFds;
// Waiting read/write/signal/timeout events.
$ret = @stream_select($read, $write, $e, 0, $this->_selectTimeout);
if (!$this->_scheduler->isEmpty()) {
$this->tick();
}
if (!$ret) {
continue;
}
foreach ($read as $fd) {
$fd_key = (int)$fd;
if (isset($this->_allEvents[$fd_key][self::EV_READ])) {
call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0],
array($this->_allEvents[$fd_key][self::EV_READ][1]));
}
}
foreach ($write as $fd) {
$fd_key = (int)$fd;
if (isset($this->_allEvents[$fd_key][self::EV_WRITE])) {
call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0],
array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
}
}
}
}
}

View File

@ -1,35 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
// Date.timezone
if (!ini_get('date.timezone')) {
date_default_timezone_set('Asia/Shanghai');
}
// Display errors.
ini_set('display_errors', 'on');
// Reporting all.
error_reporting(E_ALL);
// For onError callback.
define('WORKERMAN_CONNECT_FAIL', 1);
// For onError callback.
define('WORKERMAN_SEND_FAIL', 2);
// Compatible with php7
if(!class_exists('Error'))
{
class Error extends Exception
{
}
}

View File

@ -1,176 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Lib;
use Workerman\Events\EventInterface;
use Exception;
/**
* Timer.
*
* example:
* Workerman\Lib\Timer::add($time_interval, callback, array($arg1, $arg2..));
*/
class Timer
{
/**
* Tasks that based on ALARM signal.
* [
* run_time => [[$func, $args, $persistent, time_interval],[$func, $args, $persistent, time_interval],..]],
* run_time => [[$func, $args, $persistent, time_interval],[$func, $args, $persistent, time_interval],..]],
* ..
* ]
*
* @var array
*/
protected static $_tasks = array();
/**
* event
*
* @var \Workerman\Events\EventInterface
*/
protected static $_event = null;
/**
* Init.
*
* @param \Workerman\Events\EventInterface $event
* @return void
*/
public static function init($event = null)
{
if ($event) {
self::$_event = $event;
} else {
pcntl_signal(SIGALRM, array('\Workerman\Lib\Timer', 'signalHandle'), false);
}
}
/**
* ALARM signal handler.
*
* @return void
*/
public static function signalHandle()
{
if (!self::$_event) {
pcntl_alarm(1);
self::tick();
}
}
/**
* Add a timer.
*
* @param int $time_interval
* @param callback $func
* @param mixed $args
* @param bool $persistent
* @return bool
*/
public static function add($time_interval, $func, $args = array(), $persistent = true)
{
if ($time_interval <= 0) {
echo new Exception("bad time_interval");
return false;
}
if (self::$_event) {
return self::$_event->add($time_interval,
$persistent ? EventInterface::EV_TIMER : EventInterface::EV_TIMER_ONCE, $func, $args);
}
if (!is_callable($func)) {
echo new Exception("not callable");
return false;
}
if (empty(self::$_tasks)) {
pcntl_alarm(1);
}
$time_now = time();
$run_time = $time_now + $time_interval;
if (!isset(self::$_tasks[$run_time])) {
self::$_tasks[$run_time] = array();
}
self::$_tasks[$run_time][] = array($func, (array)$args, $persistent, $time_interval);
return true;
}
/**
* Tick.
*
* @return void
*/
public static function tick()
{
if (empty(self::$_tasks)) {
pcntl_alarm(0);
return;
}
$time_now = time();
foreach (self::$_tasks as $run_time => $task_data) {
if ($time_now >= $run_time) {
foreach ($task_data as $index => $one_task) {
$task_func = $one_task[0];
$task_args = $one_task[1];
$persistent = $one_task[2];
$time_interval = $one_task[3];
try {
call_user_func_array($task_func, $task_args);
} catch (\Exception $e) {
echo $e;
}
if ($persistent) {
self::add($time_interval, $task_func, $task_args);
}
}
unset(self::$_tasks[$run_time]);
}
}
}
/**
* Remove a timer.
*
* @param mixed $timer_id
* @return bool
*/
public static function del($timer_id)
{
if (self::$_event) {
return self::$_event->del($timer_id, EventInterface::EV_TIMER);
}
return false;
}
/**
* Remove all timers.
*
* @return void
*/
public static function delAll()
{
self::$_tasks = array();
pcntl_alarm(0);
if (self::$_event) {
self::$_event->clearAllTimer();
}
}
}

View File

@ -1,21 +0,0 @@
The MIT License
Copyright (c) 2009-2015 walkor<walkor@workerman.net> and contributors (see https://github.com/walkor/workerman/contributors)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,61 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Protocols;
use Workerman\Connection\TcpConnection;
/**
* Frame Protocol.
*/
class Frame
{
/**
* Check the integrity of the package.
*
* @param string $buffer
* @param TcpConnection $connection
* @return int
*/
public static function input($buffer, TcpConnection $connection)
{
if (strlen($buffer) < 4) {
return 0;
}
$unpack_data = unpack('Ntotal_length', $buffer);
return $unpack_data['total_length'];
}
/**
* Decode.
*
* @param string $buffer
* @return string
*/
public static function decode($buffer)
{
return substr($buffer, 4);
}
/**
* Encode.
*
* @param string $buffer
* @return string
*/
public static function encode($buffer)
{
$total_length = 4 + strlen($buffer);
return pack('N', $total_length) . $buffer;
}
}

View File

@ -1,509 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Protocols;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;
/**
* http protocol
*/
class Http
{
/**
* Check the integrity of the package.
*
* @param string $recv_buffer
* @param TcpConnection $connection
* @return int
*/
public static function input($recv_buffer, TcpConnection $connection)
{
if (!strpos($recv_buffer, "\r\n\r\n")) {
// Judge whether the package length exceeds the limit.
if (strlen($recv_buffer) >= TcpConnection::$maxPackageSize) {
$connection->close();
return 0;
}
return 0;
}
list($header,) = explode("\r\n\r\n", $recv_buffer, 2);
if (0 === strpos($recv_buffer, "POST")) {
// find Content-Length
$match = array();
if (preg_match("/\r\nContent-Length: ?(\d+)/i", $header, $match)) {
$content_length = $match[1];
return $content_length + strlen($header) + 4;
} else {
return 0;
}
} elseif (0 === strpos($recv_buffer, "GET")) {
return strlen($header) + 4;
} else {
$connection->send("HTTP/1.1 400 Bad Request\r\n\r\n", true);
return 0;
}
}
/**
* Parse $_POST、$_GET、$_COOKIE.
*
* @param string $recv_buffer
* @param TcpConnection $connection
* @return array
*/
public static function decode($recv_buffer, TcpConnection $connection)
{
// Init.
$_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES = array();
$GLOBALS['HTTP_RAW_POST_DATA'] = '';
// Clear cache.
HttpCache::$header = array('Connection' => 'Connection: keep-alive');
HttpCache::$instance = new HttpCache();
// $_SERVER
$_SERVER = array(
'QUERY_STRING' => '',
'REQUEST_METHOD' => '',
'REQUEST_URI' => '',
'SERVER_PROTOCOL' => '',
'SERVER_SOFTWARE' => 'workerman/'.Worker::VERSION,
'SERVER_NAME' => '',
'HTTP_HOST' => '',
'HTTP_USER_AGENT' => '',
'HTTP_ACCEPT' => '',
'HTTP_ACCEPT_LANGUAGE' => '',
'HTTP_ACCEPT_ENCODING' => '',
'HTTP_COOKIE' => '',
'HTTP_CONNECTION' => '',
'REMOTE_ADDR' => '',
'REMOTE_PORT' => '0',
);
// Parse headers.
list($http_header, $http_body) = explode("\r\n\r\n", $recv_buffer, 2);
$header_data = explode("\r\n", $http_header);
list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ',
$header_data[0]);
$http_post_boundary = '';
unset($header_data[0]);
foreach ($header_data as $content) {
// \r\n\r\n
if (empty($content)) {
continue;
}
list($key, $value) = explode(':', $content, 2);
$key = str_replace('-', '_', strtoupper($key));
$value = trim($value);
$_SERVER['HTTP_' . $key] = $value;
switch ($key) {
// HTTP_HOST
case 'HOST':
$tmp = explode(':', $value);
$_SERVER['SERVER_NAME'] = $tmp[0];
if (isset($tmp[1])) {
$_SERVER['SERVER_PORT'] = $tmp[1];
}
break;
// cookie
case 'COOKIE':
parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);
break;
// content-type
case 'CONTENT_TYPE':
if (!preg_match('/boundary="?(\S+)"?/', $value, $match)) {
$_SERVER['CONTENT_TYPE'] = $value;
} else {
$_SERVER['CONTENT_TYPE'] = 'multipart/form-data';
$http_post_boundary = '--' . $match[1];
}
break;
case 'CONTENT_LENGTH':
$_SERVER['CONTENT_LENGTH'] = $value;
break;
}
}
// Parse $_POST.
if ($_SERVER['REQUEST_METHOD'] === 'POST') {
if (isset($_SERVER['CONTENT_TYPE']) && $_SERVER['CONTENT_TYPE'] === 'multipart/form-data') {
self::parseUploadFiles($http_body, $http_post_boundary);
} else {
parse_str($http_body, $_POST);
// $GLOBALS['HTTP_RAW_POST_DATA']
$GLOBALS['HTTP_RAW_POST_DATA'] = $http_body;
}
}
// QUERY_STRING
$_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
if ($_SERVER['QUERY_STRING']) {
// $GET
parse_str($_SERVER['QUERY_STRING'], $_GET);
} else {
$_SERVER['QUERY_STRING'] = '';
}
// REQUEST
$_REQUEST = array_merge($_GET, $_POST);
// REMOTE_ADDR REMOTE_PORT
$_SERVER['REMOTE_ADDR'] = $connection->getRemoteIp();
$_SERVER['REMOTE_PORT'] = $connection->getRemotePort();
return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES);
}
/**
* Http encode.
*
* @param string $content
* @param TcpConnection $connection
* @return string
*/
public static function encode($content, TcpConnection $connection)
{
// Default http-code.
if (!isset(HttpCache::$header['Http-Code'])) {
$header = "HTTP/1.1 200 OK\r\n";
} else {
$header = HttpCache::$header['Http-Code'] . "\r\n";
unset(HttpCache::$header['Http-Code']);
}
// Content-Type
if (!isset(HttpCache::$header['Content-Type'])) {
$header .= "Content-Type: text/html;charset=utf-8\r\n";
}
// other headers
foreach (HttpCache::$header as $key => $item) {
if ('Set-Cookie' === $key && is_array($item)) {
foreach ($item as $it) {
$header .= $it . "\r\n";
}
} else {
$header .= $item . "\r\n";
}
}
// header
$header .= "Server: workerman/" . Worker::VERSION . "\r\nContent-Length: " . strlen($content) . "\r\n\r\n";
// save session
self::sessionWriteClose();
// the whole http package
return $header . $content;
}
/**
* 设置http头
*
* @return bool|void
*/
public static function header($content, $replace = true, $http_response_code = 0)
{
if (PHP_SAPI != 'cli') {
return $http_response_code ? header($content, $replace, $http_response_code) : header($content, $replace);
}
if (strpos($content, 'HTTP') === 0) {
$key = 'Http-Code';
} else {
$key = strstr($content, ":", true);
if (empty($key)) {
return false;
}
}
if ('location' === strtolower($key) && !$http_response_code) {
return self::header($content, true, 302);
}
if (isset(HttpCache::$codes[$http_response_code])) {
HttpCache::$header['Http-Code'] = "HTTP/1.1 $http_response_code " . HttpCache::$codes[$http_response_code];
if ($key === 'Http-Code') {
return true;
}
}
if ($key === 'Set-Cookie') {
HttpCache::$header[$key][] = $content;
} else {
HttpCache::$header[$key] = $content;
}
return true;
}
/**
* Remove header.
*
* @param string $name
* @return void
*/
public static function headerRemove($name)
{
if (PHP_SAPI != 'cli') {
header_remove($name);
return;
}
unset(HttpCache::$header[$name]);
}
/**
* Set cookie.
*
* @param string $name
* @param string $value
* @param integer $maxage
* @param string $path
* @param string $domain
* @param bool $secure
* @param bool $HTTPOnly
* @return bool|void
*/
public static function setcookie(
$name,
$value = '',
$maxage = 0,
$path = '',
$domain = '',
$secure = false,
$HTTPOnly = false
) {
if (PHP_SAPI != 'cli') {
return setcookie($name, $value, $maxage, $path, $domain, $secure, $HTTPOnly);
}
return self::header(
'Set-Cookie: ' . $name . '=' . rawurlencode($value)
. (empty($domain) ? '' : '; Domain=' . $domain)
. (empty($maxage) ? '' : '; Max-Age=' . $maxage)
. (empty($path) ? '' : '; Path=' . $path)
. (!$secure ? '' : '; Secure')
. (!$HTTPOnly ? '' : '; HttpOnly'), false);
}
/**
* sessionStart
*
* @return bool
*/
public static function sessionStart()
{
if (PHP_SAPI != 'cli') {
return session_start();
}
if (HttpCache::$instance->sessionStarted) {
echo "already sessionStarted\nn";
return true;
}
HttpCache::$instance->sessionStarted = true;
// Generate a SID.
if (!isset($_COOKIE[HttpCache::$sessionName]) || !is_file(HttpCache::$sessionPath . '/ses' . $_COOKIE[HttpCache::$sessionName])) {
$file_name = tempnam(HttpCache::$sessionPath, 'ses');
if (!$file_name) {
return false;
}
HttpCache::$instance->sessionFile = $file_name;
$session_id = substr(basename($file_name), strlen('ses'));
return self::setcookie(
HttpCache::$sessionName
, $session_id
, ini_get('session.cookie_lifetime')
, ini_get('session.cookie_path')
, ini_get('session.cookie_domain')
, ini_get('session.cookie_secure')
, ini_get('session.cookie_httponly')
);
}
if (!HttpCache::$instance->sessionFile) {
HttpCache::$instance->sessionFile = HttpCache::$sessionPath . '/ses' . $_COOKIE[HttpCache::$sessionName];
}
// Read session from session file.
if (HttpCache::$instance->sessionFile) {
$raw = file_get_contents(HttpCache::$instance->sessionFile);
if ($raw) {
session_decode($raw);
}
}
return true;
}
/**
* Save session.
*
* @return bool
*/
public static function sessionWriteClose()
{
if (PHP_SAPI != 'cli') {
return session_write_close();
}
if (!empty(HttpCache::$instance->sessionStarted) && !empty($_SESSION)) {
$session_str = session_encode();
if ($session_str && HttpCache::$instance->sessionFile) {
return file_put_contents(HttpCache::$instance->sessionFile, $session_str);
}
}
return empty($_SESSION);
}
/**
* End, like call exit in php-fpm.
*
* @param string $msg
* @throws \Exception
*/
public static function end($msg = '')
{
if (PHP_SAPI != 'cli') {
exit($msg);
}
if ($msg) {
echo $msg;
}
throw new \Exception('jump_exit');
}
/**
* Get mime types.
*
* @return string
*/
public static function getMimeTypesFile()
{
return __DIR__ . '/Http/mime.types';
}
/**
* Parse $_FILES.
*
* @param string $http_body
* @param string $http_post_boundary
* @return void
*/
protected static function parseUploadFiles($http_body, $http_post_boundary)
{
$http_body = substr($http_body, 0, strlen($http_body) - (strlen($http_post_boundary) + 4));
$boundary_data_array = explode($http_post_boundary . "\r\n", $http_body);
if ($boundary_data_array[0] === '') {
unset($boundary_data_array[0]);
}
foreach ($boundary_data_array as $boundary_data_buffer) {
list($boundary_header_buffer, $boundary_value) = explode("\r\n\r\n", $boundary_data_buffer, 2);
// Remove \r\n from the end of buffer.
$boundary_value = substr($boundary_value, 0, -2);
foreach (explode("\r\n", $boundary_header_buffer) as $item) {
list($header_key, $header_value) = explode(": ", $item);
$header_key = strtolower($header_key);
switch ($header_key) {
case "content-disposition":
// Is file data.
if (preg_match('/name=".*?"; filename="(.*?)"$/', $header_value, $match)) {
// Parse $_FILES.
$_FILES[] = array(
'file_name' => $match[1],
'file_data' => $boundary_value,
'file_size' => strlen($boundary_value),
);
continue;
} // Is post field.
else {
// Parse $_POST.
if (preg_match('/name="(.*?)"$/', $header_value, $match)) {
$_POST[$match[1]] = $boundary_value;
}
}
break;
}
}
}
}
}
/**
* Http cache for the current http response.
*/
class HttpCache
{
public static $codes = array(
100 => 'Continue',
101 => 'Switching Protocols',
200 => 'OK',
201 => 'Created',
202 => 'Accepted',
203 => 'Non-Authoritative Information',
204 => 'No Content',
205 => 'Reset Content',
206 => 'Partial Content',
300 => 'Multiple Choices',
301 => 'Moved Permanently',
302 => 'Found',
303 => 'See Other',
304 => 'Not Modified',
305 => 'Use Proxy',
306 => '(Unused)',
307 => 'Temporary Redirect',
400 => 'Bad Request',
401 => 'Unauthorized',
402 => 'Payment Required',
403 => 'Forbidden',
404 => 'Not Found',
405 => 'Method Not Allowed',
406 => 'Not Acceptable',
407 => 'Proxy Authentication Required',
408 => 'Request Timeout',
409 => 'Conflict',
410 => 'Gone',
411 => 'Length Required',
412 => 'Precondition Failed',
413 => 'Request Entity Too Large',
414 => 'Request-URI Too Long',
415 => 'Unsupported Media Type',
416 => 'Requested Range Not Satisfiable',
417 => 'Expectation Failed',
422 => 'Unprocessable Entity',
423 => 'Locked',
500 => 'Internal Server Error',
501 => 'Not Implemented',
502 => 'Bad Gateway',
503 => 'Service Unavailable',
504 => 'Gateway Timeout',
505 => 'HTTP Version Not Supported',
);
/**
* @var HttpCache
*/
public static $instance = null;
public static $header = array();
public static $sessionPath = '';
public static $sessionName = '';
public $sessionStarted = false;
public $sessionFile = '';
public static function init()
{
self::$sessionName = ini_get('session.name');
self::$sessionPath = session_save_path();
if (!self::$sessionPath) {
self::$sessionPath = sys_get_temp_dir();
}
@\session_start();
}
}

View File

@ -1,80 +0,0 @@
types {
text/html html htm shtml;
text/css css;
text/xml xml;
image/gif gif;
image/jpeg jpeg jpg;
application/x-javascript js;
application/atom+xml atom;
application/rss+xml rss;
text/mathml mml;
text/plain txt;
text/vnd.sun.j2me.app-descriptor jad;
text/vnd.wap.wml wml;
text/x-component htc;
image/png png;
image/tiff tif tiff;
image/vnd.wap.wbmp wbmp;
image/x-icon ico;
image/x-jng jng;
image/x-ms-bmp bmp;
image/svg+xml svg svgz;
image/webp webp;
application/java-archive jar war ear;
application/mac-binhex40 hqx;
application/msword doc;
application/pdf pdf;
application/postscript ps eps ai;
application/rtf rtf;
application/vnd.ms-excel xls;
application/vnd.ms-powerpoint ppt;
application/vnd.wap.wmlc wmlc;
application/vnd.google-earth.kml+xml kml;
application/vnd.google-earth.kmz kmz;
application/x-7z-compressed 7z;
application/x-cocoa cco;
application/x-java-archive-diff jardiff;
application/x-java-jnlp-file jnlp;
application/x-makeself run;
application/x-perl pl pm;
application/x-pilot prc pdb;
application/x-rar-compressed rar;
application/x-redhat-package-manager rpm;
application/x-sea sea;
application/x-shockwave-flash swf;
application/x-stuffit sit;
application/x-tcl tcl tk;
application/x-x509-ca-cert der pem crt;
application/x-xpinstall xpi;
application/xhtml+xml xhtml;
application/zip zip;
application/octet-stream bin exe dll;
application/octet-stream deb;
application/octet-stream dmg;
application/octet-stream eot;
application/octet-stream iso img;
application/octet-stream msi msp msm;
audio/midi mid midi kar;
audio/mpeg mp3;
audio/ogg ogg;
audio/x-m4a m4a;
audio/x-realaudio ra;
video/3gpp 3gpp 3gp;
video/mp4 mp4;
video/mpeg mpeg mpg;
video/quicktime mov;
video/webm webm;
video/x-flv flv;
video/x-m4v m4v;
video/x-mng mng;
video/x-ms-asf asx asf;
video/x-ms-wmv wmv;
video/x-msvideo avi;
}

View File

@ -1,52 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Protocols;
use Workerman\Connection\ConnectionInterface;
/**
* Protocol interface
*/
interface ProtocolInterface
{
/**
* Check the integrity of the package.
* Please return the length of package.
* If length is unknow please return 0 that mean wating more data.
* If the package has something wrong please return false the connection will be closed.
*
* @param ConnectionInterface $connection
* @param string $recv_buffer
* @return int|false
*/
public static function input($recv_buffer, ConnectionInterface $connection);
/**
* Decode package and emit onMessage($message) callback, $message is the result that decode returned.
*
* @param ConnectionInterface $connection
* @param string $recv_buffer
* @return mixed
*/
public static function decode($recv_buffer, ConnectionInterface $connection);
/**
* Encode package brefore sending to client.
*
* @param ConnectionInterface $connection
* @param mixed $data
* @return string
*/
public static function encode($data, ConnectionInterface $connection);
}

View File

@ -1,70 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Protocols;
use Workerman\Connection\TcpConnection;
/**
* Text Protocol.
*/
class Text
{
/**
* Check the integrity of the package.
*
* @param string $buffer
* @param TcpConnection $connection
* @return int
*/
public static function input($buffer, TcpConnection $connection)
{
// Judge whether the package length exceeds the limit.
if (strlen($buffer) >= TcpConnection::$maxPackageSize) {
$connection->close();
return 0;
}
// Find the position of "\n".
$pos = strpos($buffer, "\n");
// No "\n", packet length is unknown, continue to wait for the data so return 0.
if ($pos === false) {
return 0;
}
// Return the current package length.
return $pos + 1;
}
/**
* Encode.
*
* @param string $buffer
* @return string
*/
public static function encode($buffer)
{
// Add "\n"
return $buffer . "\n";
}
/**
* Decode.
*
* @param string $buffer
* @return string
*/
public static function decode($buffer)
{
// Remove "\n"
return trim($buffer);
}
}

View File

@ -1,437 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Protocols;
use Workerman\Connection\ConnectionInterface;
use Workerman\Worker;
/**
* WebSocket protocol.
*/
class Websocket implements \Workerman\Protocols\ProtocolInterface
{
/**
* Minimum head length of websocket protocol.
*
* @var int
*/
const MIN_HEAD_LEN = 2;
/**
* Websocket blob type.
*
* @var string
*/
const BINARY_TYPE_BLOB = "\x81";
/**
* Websocket arraybuffer type.
*
* @var string
*/
const BINARY_TYPE_ARRAYBUFFER = "\x82";
/**
* Check the integrity of the package.
*
* @param string $buffer
* @param ConnectionInterface $connection
* @return int
*/
public static function input($buffer, ConnectionInterface $connection)
{
// Receive length.
$recv_len = strlen($buffer);
// We need more data.
if ($recv_len < self::MIN_HEAD_LEN) {
return 0;
}
// Has not yet completed the handshake.
if (empty($connection->websocketHandshake)) {
return self::dealHandshake($buffer, $connection);
}
// Buffer websocket frame data.
if ($connection->websocketCurrentFrameLength) {
// We need more frame data.
if ($connection->websocketCurrentFrameLength > $recv_len) {
// Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
return 0;
}
} else {
$data_len = ord($buffer[1]) & 127;
$firstbyte = ord($buffer[0]);
$is_fin_frame = $firstbyte >> 7;
$opcode = $firstbyte & 0xf;
switch ($opcode) {
case 0x0:
break;
// Blob type.
case 0x1:
break;
// Arraybuffer type.
case 0x2:
break;
// Close package.
case 0x8:
// Try to emit onWebSocketClose callback.
if (isset($connection->onWebSocketClose)) {
try {
call_user_func($connection->onWebSocketClose, $connection);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
} // Close connection.
else {
$connection->close();
}
return 0;
// Ping package.
case 0x9:
// Try to emit onWebSocketPing callback.
if (isset($connection->onWebSocketPing)) {
try {
call_user_func($connection->onWebSocketPing, $connection);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
} // Send pong package to client.
else {
$connection->send(pack('H*', '8a00'), true);
}
// Consume data from receive buffer.
if (!$data_len) {
$connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
if ($recv_len > self::MIN_HEAD_LEN) {
return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
}
return 0;
}
break;
// Pong package.
case 0xa:
// Try to emit onWebSocketPong callback.
if (isset($connection->onWebSocketPong)) {
try {
call_user_func($connection->onWebSocketPong, $connection);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
// Consume data from receive buffer.
if (!$data_len) {
$connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
if ($recv_len > self::MIN_HEAD_LEN) {
return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
}
return 0;
}
break;
// Wrong opcode.
default :
echo "error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n";
$connection->close();
return 0;
}
// Calculate packet length.
$head_len = 6;
if ($data_len === 126) {
$head_len = 8;
if ($head_len > $recv_len) {
return 0;
}
$pack = unpack('nn/ntotal_len', $buffer);
$data_len = $pack['total_len'];
} else {
if ($data_len === 127) {
$head_len = 14;
if ($head_len > $recv_len) {
return 0;
}
$arr = unpack('n/N2c', $buffer);
$data_len = $arr['c1']*4294967296 + $arr['c2'];
}
}
$current_frame_length = $head_len + $data_len;
if ($is_fin_frame) {
return $current_frame_length;
} else {
$connection->websocketCurrentFrameLength = $current_frame_length;
}
}
// Received just a frame length data.
if ($connection->websocketCurrentFrameLength === $recv_len) {
self::decode($buffer, $connection);
$connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
$connection->websocketCurrentFrameLength = 0;
return 0;
} // The length of the received data is greater than the length of a frame.
elseif ($connection->websocketCurrentFrameLength < $recv_len) {
self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
$connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
$current_frame_length = $connection->websocketCurrentFrameLength;
$connection->websocketCurrentFrameLength = 0;
// Continue to read next frame.
return self::input(substr($buffer, $current_frame_length), $connection);
} // The length of the received data is less than the length of a frame.
else {
return 0;
}
}
/**
* Websocket encode.
*
* @param string $buffer
* @param ConnectionInterface $connection
* @return string
*/
public static function encode($buffer, ConnectionInterface $connection)
{
if (!is_scalar($buffer)) {
throw new \Exception("You can't send(" . gettype($buffer) . ") to client, you need to convert it to a string. ");
}
$len = strlen($buffer);
if (empty($connection->websocketType)) {
$connection->websocketType = self::BINARY_TYPE_BLOB;
}
$first_byte = $connection->websocketType;
if ($len <= 125) {
$encode_buffer = $first_byte . chr($len) . $buffer;
} else {
if ($len <= 65535) {
$encode_buffer = $first_byte . chr(126) . pack("n", $len) . $buffer;
} else {
$encode_buffer = $first_byte . chr(127) . pack("xxxxN", $len) . $buffer;
}
}
// Handshake not completed so temporary buffer websocket data waiting for send.
if (empty($connection->websocketHandshake)) {
if (empty($connection->tmpWebsocketData)) {
$connection->tmpWebsocketData = '';
}
$connection->tmpWebsocketData .= $encode_buffer;
// Return empty string.
return '';
}
return $encode_buffer;
}
/**
* Websocket decode.
*
* @param string $buffer
* @param ConnectionInterface $connection
* @return string
*/
public static function decode($buffer, ConnectionInterface $connection)
{
$len = $masks = $data = $decoded = null;
$len = ord($buffer[1]) & 127;
if ($len === 126) {
$masks = substr($buffer, 4, 4);
$data = substr($buffer, 8);
} else {
if ($len === 127) {
$masks = substr($buffer, 10, 4);
$data = substr($buffer, 14);
} else {
$masks = substr($buffer, 2, 4);
$data = substr($buffer, 6);
}
}
for ($index = 0; $index < strlen($data); $index++) {
$decoded .= $data[$index] ^ $masks[$index % 4];
}
if ($connection->websocketCurrentFrameLength) {
$connection->websocketDataBuffer .= $decoded;
return $connection->websocketDataBuffer;
} else {
if ($connection->websocketDataBuffer !== '') {
$decoded = $connection->websocketDataBuffer . $decoded;
$connection->websocketDataBuffer = '';
}
return $decoded;
}
}
/**
* Websocket handshake.
*
* @param string $buffer
* @param \Workerman\Connection\TcpConnection $connection
* @return int
*/
protected static function dealHandshake($buffer, $connection)
{
// HTTP protocol.
if (0 === strpos($buffer, 'GET')) {
// Find \r\n\r\n.
$heder_end_pos = strpos($buffer, "\r\n\r\n");
if (!$heder_end_pos) {
return 0;
}
$header_length = $heder_end_pos + 4;
// Get Sec-WebSocket-Key.
$Sec_WebSocket_Key = '';
if (preg_match("/Sec-WebSocket-Key: *(.*?)\r\n/i", $buffer, $match)) {
$Sec_WebSocket_Key = $match[1];
} else {
$connection->send("HTTP/1.1 400 Bad Request\r\n\r\n<b>400 Bad Request</b><br>Sec-WebSocket-Key not found.<br>This is a WebSocket service and can not be accessed via HTTP.<br>See <a href=\"http://wiki.workerman.net/Error1\">http://wiki.workerman.net/Error1</a>",
true);
$connection->close();
return 0;
}
// Calculation websocket key.
$new_key = base64_encode(sha1($Sec_WebSocket_Key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));
// Handshake response data.
$handshake_message = "HTTP/1.1 101 Switching Protocols\r\n";
$handshake_message .= "Upgrade: websocket\r\n";
$handshake_message .= "Sec-WebSocket-Version: 13\r\n";
$handshake_message .= "Connection: Upgrade\r\n";
$handshake_message .= "Server: workerman/".Worker::VERSION."\r\n";
$handshake_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";
// Mark handshake complete..
$connection->websocketHandshake = true;
// Websocket data buffer.
$connection->websocketDataBuffer = '';
// Current websocket frame length.
$connection->websocketCurrentFrameLength = 0;
// Current websocket frame data.
$connection->websocketCurrentFrameBuffer = '';
// Consume handshake data.
$connection->consumeRecvBuffer($header_length);
// Send handshake response.
$connection->send($handshake_message, true);
// There are data waiting to be sent.
if (!empty($connection->tmpWebsocketData)) {
$connection->send($connection->tmpWebsocketData, true);
$connection->tmpWebsocketData = '';
}
// blob or arraybuffer
if (empty($connection->websocketType)) {
$connection->websocketType = self::BINARY_TYPE_BLOB;
}
// Try to emit onWebSocketConnect callback.
if (isset($connection->onWebSocketConnect)) {
self::parseHttpHeader($buffer);
try {
call_user_func($connection->onWebSocketConnect, $connection, $buffer);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
if (!empty($_SESSION) && class_exists('\GatewayWorker\Lib\Context')) {
$connection->session = \GatewayWorker\Lib\Context::sessionEncode($_SESSION);
}
$_GET = $_SERVER = $_SESSION = $_COOKIE = array();
}
if (strlen($buffer) > $header_length) {
return self::input(substr($buffer, $header_length), $connection);
}
return 0;
} // Is flash policy-file-request.
elseif (0 === strpos($buffer, '<polic')) {
$policy_xml = '<?xml version="1.0"?><cross-domain-policy><site-control permitted-cross-domain-policies="all"/><allow-access-from domain="*" to-ports="*"/></cross-domain-policy>' . "\0";
$connection->send($policy_xml, true);
$connection->consumeRecvBuffer(strlen($buffer));
return 0;
}
// Bad websocket handshake request.
$connection->send("HTTP/1.1 400 Bad Request\r\n\r\n<b>400 Bad Request</b><br>Invalid handshake data for websocket. ",
true);
$connection->close();
return 0;
}
/**
* Parse http header.
*
* @param string $buffer
* @return void
*/
protected static function parseHttpHeader($buffer)
{
// Parse headers.
list($http_header, ) = explode("\r\n\r\n", $buffer, 2);
$header_data = explode("\r\n", $http_header);
if ($_SERVER) {
$_SERVER = array();
}
list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ',
$header_data[0]);
unset($header_data[0]);
foreach ($header_data as $content) {
// \r\n\r\n
if (empty($content)) {
continue;
}
list($key, $value) = explode(':', $content, 2);
$key = str_replace('-', '_', strtoupper($key));
$value = trim($value);
$_SERVER['HTTP_' . $key] = $value;
switch ($key) {
// HTTP_HOST
case 'HOST':
$tmp = explode(':', $value);
$_SERVER['SERVER_NAME'] = $tmp[0];
if (isset($tmp[1])) {
$_SERVER['SERVER_PORT'] = $tmp[1];
}
break;
// cookie
case 'COOKIE':
parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);
break;
}
}
// QUERY_STRING
$_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
if ($_SERVER['QUERY_STRING']) {
// $GET
parse_str($_SERVER['QUERY_STRING'], $_GET);
} else {
$_SERVER['QUERY_STRING'] = '';
}
}
}

View File

@ -1,383 +0,0 @@
<?php
namespace Workerman\Protocols;
use Workerman\Worker;
use Workerman\Lib\Timer;
/**
* Websocket protocol for client.
*/
class Ws
{
/**
* Minimum head length of websocket protocol.
*
* @var int
*/
const MIN_HEAD_LEN = 2;
/**
* Websocket blob type.
*
* @var string
*/
const BINARY_TYPE_BLOB = "\x81";
/**
* Websocket arraybuffer type.
*
* @var string
*/
const BINARY_TYPE_ARRAYBUFFER = "\x82";
/**
* Check the integrity of the package.
*
* @param string $buffer
* @param ConnectionInterface $connection
* @return int
*/
public static function input($buffer, $connection)
{
if (empty($connection->handshakeStep)) {
echo "recv data before handshake. Buffer:" . bin2hex($buffer) . "\n";
return false;
}
// Recv handshake response
if ($connection->handshakeStep === 1) {
return self::dealHandshake($buffer, $connection);
}
$recv_len = strlen($buffer);
if ($recv_len < self::MIN_HEAD_LEN) {
return 0;
}
// Buffer websocket frame data.
if ($connection->websocketCurrentFrameLength) {
// We need more frame data.
if ($connection->websocketCurrentFrameLength > $recv_len) {
// Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
return 0;
}
} else {
$data_len = ord($buffer[1]) & 127;
$firstbyte = ord($buffer[0]);
$is_fin_frame = $firstbyte >> 7;
$opcode = $firstbyte & 0xf;
switch ($opcode) {
case 0x0:
break;
// Blob type.
case 0x1:
break;
// Arraybuffer type.
case 0x2:
break;
// Close package.
case 0x8:
// Try to emit onWebSocketClose callback.
if (isset($connection->onWebSocketClose)) {
try {
call_user_func($connection->onWebSocketClose, $connection);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
} // Close connection.
else {
$connection->close();
}
return 0;
// Ping package.
case 0x9:
// Try to emit onWebSocketPing callback.
if (isset($connection->onWebSocketPing)) {
try {
call_user_func($connection->onWebSocketPing, $connection);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
} // Send pong package to client.
else {
$connection->send(pack('H*', '8a00'), true);
}
// Consume data from receive buffer.
if (!$data_len) {
$connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
if ($recv_len > self::MIN_HEAD_LEN) {
return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
}
return 0;
}
break;
// Pong package.
case 0xa:
// Try to emit onWebSocketPong callback.
if (isset($connection->onWebSocketPong)) {
try {
call_user_func($connection->onWebSocketPong, $connection);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
// Consume data from receive buffer.
if (!$data_len) {
$connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
if ($recv_len > self::MIN_HEAD_LEN) {
return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
}
return 0;
}
break;
// Wrong opcode.
default :
echo "error opcode $opcode and close websocket connection. Buffer:" . $buffer . "\n";
$connection->close();
return 0;
}
// Calculate packet length.
if ($data_len === 126) {
if (strlen($buffer) < 6) {
return 0;
}
$pack = unpack('nn/ntotal_len', $buffer);
$current_frame_length = $pack['total_len'] + 4;
} else if ($data_len === 127) {
if (strlen($buffer) < 10) {
return 0;
}
$arr = unpack('n/N2c', $buffer);
$current_frame_length = $arr['c1']*4294967296 + $arr['c2'] + 10;
} else {
$current_frame_length = $data_len + 2;
}
if ($is_fin_frame) {
return $current_frame_length;
} else {
$connection->websocketCurrentFrameLength = $current_frame_length;
}
}
// Received just a frame length data.
if ($connection->websocketCurrentFrameLength === $recv_len) {
self::decode($buffer, $connection);
$connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
$connection->websocketCurrentFrameLength = 0;
return 0;
} // The length of the received data is greater than the length of a frame.
elseif ($connection->websocketCurrentFrameLength < $recv_len) {
self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
$connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
$current_frame_length = $connection->websocketCurrentFrameLength;
$connection->websocketCurrentFrameLength = 0;
// Continue to read next frame.
return self::input(substr($buffer, $current_frame_length), $connection);
} // The length of the received data is less than the length of a frame.
else {
return 0;
}
}
/**
* Websocket encode.
*
* @param string $buffer
* @param ConnectionInterface $connection
* @return string
*/
public static function encode($payload, $connection)
{
if (empty($connection->websocketType)) {
$connection->websocketType = self::BINARY_TYPE_BLOB;
}
$payload = (string)$payload;
if (empty($connection->handshakeStep)) {
self::sendHandshake($connection);
}
$mask = 1;
$mask_key = "\x00\x00\x00\x00";
$pack = '';
$length = $length_flag = strlen($payload);
if (65535 < $length) {
$pack = pack('NN', ($length & 0xFFFFFFFF00000000) >> 32, $length & 0x00000000FFFFFFFF);
$length_flag = 127;
} else if (125 < $length) {
$pack = pack('n*', $length);
$length_flag = 126;
}
$head = ($mask << 7) | $length_flag;
$head = $connection->websocketType . chr($head) . $pack;
$frame = $head . $mask_key;
// append payload to frame:
for ($i = 0; $i < $length; $i++) {
$frame .= $payload[$i] ^ $mask_key[$i % 4];
}
if ($connection->handshakeStep === 1) {
$connection->tmpWebsocketData = isset($connection->tmpWebsocketData) ? $connection->tmpWebsocketData . $frame : $frame;
return '';
}
return $frame;
}
/**
* Websocket decode.
*
* @param string $buffer
* @param ConnectionInterface $connection
* @return string
*/
public static function decode($bytes, $connection)
{
$masked = $bytes[1] >> 7;
$data_length = $masked ? ord($bytes[1]) & 127 : ord($bytes[1]);
$decoded_data = '';
if ($masked === true) {
if ($data_length === 126) {
$mask = substr($bytes, 4, 4);
$coded_data = substr($bytes, 8);
} else if ($data_length === 127) {
$mask = substr($bytes, 10, 4);
$coded_data = substr($bytes, 14);
} else {
$mask = substr($bytes, 2, 4);
$coded_data = substr($bytes, 6);
}
for ($i = 0; $i < strlen($coded_data); $i++) {
$decoded_data .= $coded_data[$i] ^ $mask[$i % 4];
}
} else {
if ($data_length === 126) {
$decoded_data = substr($bytes, 4);
} else if ($data_length === 127) {
$decoded_data = substr($bytes, 10);
} else {
$decoded_data = substr($bytes, 2);
}
}
if ($connection->websocketCurrentFrameLength) {
$connection->websocketDataBuffer .= $decoded_data;
return $connection->websocketDataBuffer;
} else {
if ($connection->websocketDataBuffer !== '') {
$decoded_data = $connection->websocketDataBuffer . $decoded_data;
$connection->websocketDataBuffer = '';
}
return $decoded_data;
}
}
/**
* Send websocket handshake data.
*
* @return void
*/
public static function onConnect($connection)
{
self::sendHandshake($connection);
}
/**
* Clean
*
* @param $connection
*/
public static function onClose($connection)
{
$connection->handshakeStep = null;
$connection->websocketCurrentFrameLength = 0;
$connection->tmpWebsocketData = '';
$connection->websocketDataBuffer = '';
if (!empty($connection->websocketPingTimer)) {
Timer::del($connection->websocketPingTimer);
$connection->websocketPingTimer = null;
}
}
/**
* Send websocket handshake.
*
* @param \Workerman\Connection\TcpConnection $connection
* @return void
*/
public static function sendHandshake($connection)
{
if (!empty($connection->handshakeStep)) {
return;
}
// Get Host.
$port = $connection->getRemotePort();
$host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
// Handshake header.
$header = 'GET ' . $connection->getRemoteURI() . " HTTP/1.1\r\n".
"Host: $host\r\n".
"Connection: Upgrade\r\n".
"Upgrade: websocket\r\n".
"Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
"Sec-WebSocket-Version: 13\r\n".
"Sec-WebSocket-Key: ".base64_encode(sha1(uniqid(mt_rand(), true), true))."\r\n\r\n";
$connection->send($header, true);
$connection->handshakeStep = 1;
$connection->websocketCurrentFrameLength = 0;
$connection->websocketDataBuffer = '';
}
/**
* Websocket handshake.
*
* @param string $buffer
* @param \Workerman\Connection\TcpConnection $connection
* @return int
*/
public static function dealHandshake($buffer, $connection)
{
$pos = strpos($buffer, "\r\n\r\n");
if ($pos) {
// handshake complete
$connection->handshakeStep = 2;
$handshake_response_length = $pos + 4;
// Try to emit onWebSocketConnect callback.
if (isset($connection->onWebSocketConnect)) {
try {
call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_response_length));
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}
// Headbeat.
if (!empty($connection->websocketPingInterval)) {
$connection->websocketPingTimer = Timer::add($connection->websocketPingInterval, function() use ($connection){
if (false === $connection->send(pack('H*', '8900'), true)) {
Timer::del($connection->websocketPingTimer);
$connection->websocketPingTimer = null;
}
});
}
$connection->consumeRecvBuffer($handshake_response_length);
if (!empty($connection->tmpWebsocketData)) {
$connection->send($connection->tmpWebsocketData, true);
$connection->tmpWebsocketData = '';
}
if (strlen($buffer) > $handshake_response_length) {
return self::input(substr($buffer, $handshake_response_length), $connection);
}
}
return 0;
}
}

View File

@ -1,400 +0,0 @@
# Workerman
[![Gitter](https://badges.gitter.im/walkor/Workerman.svg)](https://gitter.im/walkor/Workerman?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=body_badge)
## What is it
Workerman is a library for event-driven programming in PHP. It has a huge number of features. Each worker is able to handle thousands of connections.
## Requires
PHP 5.3 or Higher
A POSIX compatible operating system (Linux, OSX, BSD)
POSIX and PCNTL extensions for PHP
## Installation
```
composer require workerman/workerman
```
## Basic Usage
### A websocket server
test.php
```php
<?php
use Workerman\Worker;
require_once './Workerman/Autoloader.php';
// Create a Websocket server
$ws_worker = new Worker("websocket://0.0.0.0:2346");
// 4 processes
$ws_worker->count = 4;
// Emitted when new connection come
$ws_worker->onConnect = function($connection)
{
echo "New connection\n";
};
// Emitted when data received
$ws_worker->onMessage = function($connection, $data)
{
// Send hello $data
$connection->send('hello ' . $data);
};
// Emitted when connection closed
$ws_worker->onClose = function($connection)
{
echo "Connection closed\n";
};
// Run worker
Worker::runAll();
```
### An http server
test.php
```php
require_once './Workerman/Autoloader.php';
use Workerman\Worker;
// #### http worker ####
$http_worker = new Worker("http://0.0.0.0:2345");
// 4 processes
$http_worker->count = 4;
// Emitted when data received
$http_worker->onMessage = function($connection, $data)
{
// $_GET, $_POST, $_COOKIE, $_SESSION, $_SERVER, $_FILES are available
var_dump($_GET, $_POST, $_COOKIE, $_SESSION, $_SERVER, $_FILES);
// send data to client
$connection->send("hello world \n");
};
// run all workers
Worker::runAll();
```
### A WebServer
test.php
```php
require_once './Workerman/Autoloader.php';
use Workerman\WebServer;
use Workerman\Worker;
// WebServer
$web = new WebServer("http://0.0.0.0:80");
// 4 processes
$web->count = 4;
// Set the root of domains
$web->addRoot('www.your_domain.com', '/your/path/Web');
$web->addRoot('www.another_domain.com', '/another/path/Web');
// run all workers
Worker::runAll();
```
### A tcp server
test.php
```php
require_once './Workerman/Autoloader.php';
use Workerman\Worker;
// #### create socket and listen 1234 port ####
$tcp_worker = new Worker("tcp://0.0.0.0:1234");
// 4 processes
$tcp_worker->count = 4;
// Emitted when new connection come
$tcp_worker->onConnect = function($connection)
{
echo "New Connection\n";
};
// Emitted when data received
$tcp_worker->onMessage = function($connection, $data)
{
// send data to client
$connection->send("hello $data \n");
};
// Emitted when new connection come
$tcp_worker->onClose = function($connection)
{
echo "Connection closed\n";
};
Worker::runAll();
```
### Custom protocol
Protocols/MyTextProtocol.php
```php
namespace Protocols;
/**
* User defined protocol
* Format Text+"\n"
*/
class MyTextProtocol
{
public static function input($recv_buffer)
{
// Find the position of the first occurrence of "\n"
$pos = strpos($recv_buffer, "\n");
// Not a complete package. Return 0 because the length of package can not be calculated
if($pos === false)
{
return 0;
}
// Return length of the package
return $pos+1;
}
public static function decode($recv_buffer)
{
return trim($recv_buffer);
}
public static function encode($data)
{
return $data."\n";
}
}
```
test.php
```php
require_once './Workerman/Autoloader.php';
use Workerman\Worker;
// #### MyTextProtocol worker ####
$text_worker = new Worker("MyTextProtocol://0.0.0.0:5678");
$text_worker->onConnect = function($connection)
{
echo "New connection\n";
};
$text_worker->onMessage = function($connection, $data)
{
// send data to client
$connection->send("hello world \n");
};
$text_worker->onClose = function($connection)
{
echo "Connection closed\n";
};
// run all workers
Worker::runAll();
```
### Timer
test.php
```php
require_once './Workerman/Autoloader.php';
use Workerman\Worker;
use Workerman\Lib\Timer;
$task = new Worker();
$task->onWorkerStart = function($task)
{
// 2.5 seconds
$time_interval = 2.5;
$timer_id = Timer::add($time_interval,
function()
{
echo "Timer run\n";
}
);
};
// run all workers
Worker::runAll();
```
run with:
```php test.php start```
## Available commands
```php test.php start ```
```php test.php start -d ```
![workerman start](http://www.workerman.net/img/workerman-start.png)
```php test.php status ```
![workerman satus](http://www.workerman.net/img/workerman-status.png?a=123)
```php test.php stop ```
```php test.php restart ```
```php test.php reload ```
## Documentation
中文主页:[http://www.workerman.net](http://www.workerman.net)
中文文档: [http://doc3.workerman.net](http://doc3.workerman.net)
Documentation:[https://github.com/walkor/workerman-manual](https://github.com/walkor/workerman-manual/blob/master/english/src/SUMMARY.md)
# Benchmarks
```
CPU: Intel(R) Core(TM) i3-3220 CPU @ 3.30GHz and 4 processors totally
Memory: 8G
OS: Ubuntu 14.04 LTS
Software: ab
PHP: 5.5.9
```
**Codes**
```php
<?php
use Workerman\Worker;
$worker = new Worker('tcp://0.0.0.0:1234');
$worker->count=3;
$worker->onMessage = function($connection, $data)
{
$connection->send("HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nServer: workerman\r\nContent-Length: 5\r\n\r\nhello");
};
Worker::runAll();
```
**Result**
```shell
ab -n1000000 -c100 -k http://127.0.0.1:1234/
This is ApacheBench, Version 2.3 <$Revision: 1528965 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking 127.0.0.1 (be patient)
Completed 100000 requests
Completed 200000 requests
Completed 300000 requests
Completed 400000 requests
Completed 500000 requests
Completed 600000 requests
Completed 700000 requests
Completed 800000 requests
Completed 900000 requests
Completed 1000000 requests
Finished 1000000 requests
Server Software: workerman/3.1.4
Server Hostname: 127.0.0.1
Server Port: 1234
Document Path: /
Document Length: 5 bytes
Concurrency Level: 100
Time taken for tests: 7.240 seconds
Complete requests: 1000000
Failed requests: 0
Keep-Alive requests: 1000000
Total transferred: 73000000 bytes
HTML transferred: 5000000 bytes
Requests per second: 138124.14 [#/sec] (mean)
Time per request: 0.724 [ms] (mean)
Time per request: 0.007 [ms] (mean, across all concurrent requests)
Transfer rate: 9846.74 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 0.0 0 5
Processing: 0 1 0.2 1 9
Waiting: 0 1 0.2 1 9
Total: 0 1 0.2 1 9
Percentage of the requests served within a certain time (ms)
50% 1
66% 1
75% 1
80% 1
90% 1
95% 1
98% 1
99% 1
100% 9 (longest request)
```
# Demos
## [tadpole](http://kedou.workerman.net/)
[Live demo](http://kedou.workerman.net/)
[Source code](https://github.com/walkor/workerman)
![workerman todpole](http://www.workerman.net/img/workerman-todpole.png)
## [BrowserQuest](http://www.workerman.net/demos/browserquest/)
[Live demo](http://www.workerman.net/demos/browserquest/)
[Source code](https://github.com/walkor/BrowserQuest-PHP)
![BrowserQuest width workerman](http://www.workerman.net/img/browserquest.jpg)
## [web vmstat](http://www.workerman.net/demos/vmstat/)
[Live demo](http://www.workerman.net/demos/vmstat/)
[Source code](https://github.com/walkor/workerman-vmstat)
![web vmstat](http://www.workerman.net/img/workerman-vmstat.png)
## [live-ascii-camera](https://github.com/walkor/live-ascii-camera)
[Live demo camera page](http://www.workerman.net/demos/live-ascii-camera/camera.html)
[Live demo receive page](http://www.workerman.net/demos/live-ascii-camera/)
[Source code](https://github.com/walkor/live-ascii-camera)
![live-ascii-camera](http://www.workerman.net/img/live-ascii-camera.png)
## [live-camera](https://github.com/walkor/live-camera)
[Live demo camera page](http://www.workerman.net/demos/live-camera/camera.html)
[Live demo receive page](http://www.workerman.net/demos/live-camera/)
[Source code](https://github.com/walkor/live-camera)
![live-camera](http://www.workerman.net/img/live-camera.jpg)
## [chat room](http://chat.workerman.net/)
[Live demo](http://chat.workerman.net/)
[Source code](https://github.com/walkor/workerman-chat)
![workerman-chat](http://www.workerman.net/img/workerman-chat.png)
## [PHPSocket.IO](https://github.com/walkor/phpsocket.io)
[Live demo](http://www.workerman.net/demos/phpsocketio-chat/)
[Source code](https://github.com/walkor/phpsocket.io)
![phpsocket.io](http://www.workerman.net/img/socket.io.png)
## [statistics](http://www.workerman.net:55757/)
[Live demo](http://www.workerman.net:55757/)
[Source code](https://github.com/walkor/workerman-statistics)
![workerman-statistics](http://www.workerman.net/img/workerman-statistics.png)
## [flappybird](http://workerman.net/demos/flappy-bird/)
[Live demo](http://workerman.net/demos/flappy-bird/)
[Source code](https://github.com/walkor/workerman-flappy-bird)
![workerman-statistics](http://www.workerman.net/img/workerman-flappy-bird.png)
## [jsonRpc](https://github.com/walkor/workerman-JsonRpc)
[Source code](https://github.com/walkor/workerman-JsonRpc)
![workerman-jsonRpc](http://www.workerman.net/img/workerman-json-rpc.png)
## [thriftRpc](https://github.com/walkor/workerman-thrift)
[Source code](https://github.com/walkor/workerman-thrift)
![workerman-thriftRpc](http://www.workerman.net/img/workerman-thrift.png)
## [web-msg-sender](https://github.com/walkor/web-msg-sender)
[Live demo send page](http://workerman.net:3333/)
[Live demo receive page](http://workerman.net/web-msg-sender.html)
[Source code](https://github.com/walkor/web-msg-sender)
![web-msg-sender](http://www.workerman.net/img/web-msg-sender.png)
## [shadowsocks-php](https://github.com/walkor/shadowsocks-php)
[Source code](https://github.com/walkor/shadowsocks-php)
![shadowsocks-php](http://www.workerman.net/img/shadowsocks-php.png)
## [queue](https://github.com/walkor/workerman-queue)
[Source code](https://github.com/walkor/workerman-queue)
## LICENSE
Workerman is released under the [MIT license](https://github.com/walkor/workerman/blob/master/MIT-LICENSE.txt).

View File

@ -1,301 +0,0 @@
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman;
use Workerman\Protocols\Http;
use Workerman\Protocols\HttpCache;
/**
* WebServer.
*/
class WebServer extends Worker
{
/**
* Virtual host to path mapping.
*
* @var array ['workerman.net'=>'/home', 'www.workerman.net'=>'home/www']
*/
protected $serverRoot = array();
/**
* Mime mapping.
*
* @var array
*/
protected static $mimeTypeMap = array();
/**
* Used to save user OnWorkerStart callback settings.
*
* @var callback
*/
protected $_onWorkerStart = null;
/**
* Add virtual host.
*
* @param string $domain
* @param string $root_path
* @return void
*/
public function addRoot($domain, $root_path)
{
$this->serverRoot[$domain] = $root_path;
}
/**
* Construct.
*
* @param string $socket_name
* @param array $context_option
*/
public function __construct($socket_name, $context_option = array())
{
list(, $address) = explode(':', $socket_name, 2);
parent::__construct('http:' . $address, $context_option);
$this->name = 'WebServer';
}
/**
* Run webserver instance.
*
* @see Workerman.Worker::run()
*/
public function run()
{
$this->_onWorkerStart = $this->onWorkerStart;
$this->onWorkerStart = array($this, 'onWorkerStart');
$this->onMessage = array($this, 'onMessage');
parent::run();
}
/**
* Emit when process start.
*
* @throws \Exception
*/
public function onWorkerStart()
{
if (empty($this->serverRoot)) {
throw new \Exception('server root not set, please use WebServer::addRoot($domain, $root_path) to set server root path');
}
// Init HttpCache.
HttpCache::init();
// Init mimeMap.
$this->initMimeTypeMap();
// Try to emit onWorkerStart callback.
if ($this->_onWorkerStart) {
try {
call_user_func($this->_onWorkerStart, $this);
} catch (\Exception $e) {
self::log($e);
exit(250);
} catch (\Error $e) {
self::log($e);
exit(250);
}
}
}
/**
* Init mime map.
*
* @return void
*/
public function initMimeTypeMap()
{
$mime_file = Http::getMimeTypesFile();
if (!is_file($mime_file)) {
$this->log("$mime_file mime.type file not fond");
return;
}
$items = file($mime_file, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
if (!is_array($items)) {
$this->log("get $mime_file mime.type content fail");
return;
}
foreach ($items as $content) {
if (preg_match("/\s*(\S+)\s+(\S.+)/", $content, $match)) {
$mime_type = $match[1];
$workerman_file_extension_var = $match[2];
$workerman_file_extension_array = explode(' ', substr($workerman_file_extension_var, 0, -1));
foreach ($workerman_file_extension_array as $workerman_file_extension) {
self::$mimeTypeMap[$workerman_file_extension] = $mime_type;
}
}
}
}
/**
* Emit when http message coming.
*
* @param Connection\TcpConnection $connection
* @return void
*/
public function onMessage($connection)
{
// REQUEST_URI.
$workerman_url_info = parse_url($_SERVER['REQUEST_URI']);
if (!$workerman_url_info) {
Http::header('HTTP/1.1 400 Bad Request');
$connection->close('<h1>400 Bad Request</h1>');
return;
}
$workerman_path = isset($workerman_url_info['path']) ? $workerman_url_info['path'] : '/';
$workerman_path_info = pathinfo($workerman_path);
$workerman_file_extension = isset($workerman_path_info['extension']) ? $workerman_path_info['extension'] : '';
if ($workerman_file_extension === '') {
$workerman_path = ($len = strlen($workerman_path)) && $workerman_path[$len - 1] === '/' ? $workerman_path . 'index.php' : $workerman_path . '/index.php';
$workerman_file_extension = 'php';
}
$workerman_root_dir = isset($this->serverRoot[$_SERVER['SERVER_NAME']]) ? $this->serverRoot[$_SERVER['SERVER_NAME']] : current($this->serverRoot);
$workerman_file = "$workerman_root_dir/$workerman_path";
if ($workerman_file_extension === 'php' && !is_file($workerman_file)) {
$workerman_file = "$workerman_root_dir/index.php";
if (!is_file($workerman_file)) {
$workerman_file = "$workerman_root_dir/index.html";
$workerman_file_extension = 'html';
}
}
// File exsits.
if (is_file($workerman_file)) {
// Security check.
if ((!($workerman_request_realpath = realpath($workerman_file)) || !($workerman_root_dir_realpath = realpath($workerman_root_dir))) || 0 !== strpos($workerman_request_realpath,
$workerman_root_dir_realpath)
) {
Http::header('HTTP/1.1 400 Bad Request');
$connection->close('<h1>400 Bad Request</h1>');
return;
}
$workerman_file = realpath($workerman_file);
// Request php file.
if ($workerman_file_extension === 'php') {
$workerman_cwd = getcwd();
chdir($workerman_root_dir);
ini_set('display_errors', 'off');
ob_start();
// Try to include php file.
try {
// $_SERVER.
$_SERVER['REMOTE_ADDR'] = $connection->getRemoteIp();
$_SERVER['REMOTE_PORT'] = $connection->getRemotePort();
include $workerman_file;
} catch (\Exception $e) {
// Jump_exit?
if ($e->getMessage() != 'jump_exit') {
echo $e;
}
}
$content = ob_get_clean();
ini_set('display_errors', 'on');
if (strtolower($_SERVER['HTTP_CONNECTION']) === "keep-alive") {
$connection->send($content);
} else {
$connection->close($content);
}
chdir($workerman_cwd);
return;
}
// Send file to client.
return self::sendFile($connection, $workerman_file);
} else {
// 404
Http::header("HTTP/1.1 404 Not Found");
$connection->close('<html><head><title>404 File not found</title></head><body><center><h3>404 Not Found</h3></center></body></html>');
return;
}
}
public static function sendFile($connection, $file_path)
{
// Check 304.
$info = stat($file_path);
$modified_time = $info ? date('D, d M Y H:i:s', $info['mtime']) . ' GMT' : '';
if (!empty($_SERVER['HTTP_IF_MODIFIED_SINCE']) && $info) {
// Http 304.
if ($modified_time === $_SERVER['HTTP_IF_MODIFIED_SINCE']) {
// 304
Http::header('HTTP/1.1 304 Not Modified');
// Send nothing but http headers..
$connection->close('');
return;
}
}
// Http header.
if ($modified_time) {
$modified_time = "Last-Modified: $modified_time\r\n";
}
$file_size = filesize($file_path);
$file_info = pathinfo($file_path);
$extension = isset($file_info['extension']) ? $file_info['extension'] : '';
$file_name = isset($file_info['filename']) ? $file_info['filename'] : '';
$header = "HTTP/1.1 200 OK\r\n";
if (isset(self::$mimeTypeMap[$extension])) {
$header .= "Content-Type: " . self::$mimeTypeMap[$extension] . "\r\n";
} else {
$header .= "Content-Type: application/octet-stream\r\n";
$header .= "Content-Disposition: attachment; filename=\"$file_name\"\r\n";
}
$header .= "Connection: keep-alive\r\n";
$header .= $modified_time;
$header .= "Content-Length: $file_size\r\n\r\n";
$trunk_limit_size = 1024*1024;
if ($file_size < $trunk_limit_size) {
return $connection->send($header.file_get_contents($file_path), true);
}
$connection->send($header, true);
// Read file content from disk piece by piece and send to client.
$connection->fileHandler = fopen($file_path, 'r');
$do_write = function()use($connection)
{
// Send buffer not full.
while(empty($connection->bufferFull))
{
// Read from disk.
$buffer = fread($connection->fileHandler, 8192);
// Read eof.
if($buffer === '' || $buffer === false)
{
return;
}
$connection->send($buffer, true);
}
};
// Send buffer full.
$connection->onBufferFull = function($connection)
{
$connection->bufferFull = true;
};
// Send buffer drain.
$connection->onBufferDrain = function($connection)use($do_write)
{
$connection->bufferFull = false;
$do_write();
};
$do_write();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,33 +0,0 @@
{
"name" : "workerman/workerman",
"type" : "project",
"keywords": ["event-loop", "asynchronous"],
"homepage": "http://www.workerman.net",
"license" : "MIT",
"description": "An asynchronous event driven PHP framework for easily building fast, scalable network applications.",
"authors" : [
{
"name" : "walkor",
"email" : "walkor@workerman.net",
"homepage" : "http://www.workerman.net",
"role": "Developer"
}
],
"support" : {
"email" : "walkor@workerman.net",
"issues": "https://github.com/walkor/workerman/issues",
"forum" : "http://wenda.workerman.net/",
"wiki" : "http://doc3.workerman.net/index.html",
"source": "https://github.com/walkor/workerman"
},
"require": {
"php": ">=5.3"
},
"suggest": {
"ext-libevent": "For better performance."
},
"autoload": {
"psr-4": {"Workerman\\": "./"}
},
"minimum-stability":"dev"
}

10
composer.json Normal file
View File

@ -0,0 +1,10 @@
{
"name" : "workerman/phptty",
"type" : "project",
"keywords": ["web","tty"],
"homepage": "http://www.workerman.net",
"license" : "MIT",
"require": {
"workerman/workerman" : ">=3.3.0"
}
}

View File

@ -5,7 +5,7 @@ use \Workerman\Connection\TcpConnection;
use \Workerman\Connection\AsyncTcpConnection; use \Workerman\Connection\AsyncTcpConnection;
// 自动加载类 // 自动加载类
require_once __DIR__ . '/Workerman/Autoloader.php'; require_once __DIR__ . '/vendor/autoload.php';
define('STAGE_INIT', 0); define('STAGE_INIT', 0);
define('STAGE_ADDR', 1); define('STAGE_ADDR', 1);