*/ class Worker { /** * 版本号 * @var string */ const VERSION = '3.0.9'; /** * 状态 启动中 * @var int */ const STATUS_STARTING = 1; /** * 状态 运行中 * @var int */ const STATUS_RUNNING = 2; /** * 状态 停止 * @var int */ const STATUS_SHUTDOWN = 4; /** * 状态 平滑重启中 * @var int */ const STATUS_RELOADING = 8; /** * 给子进程发送重启命令 KILL_WORKER_TIMER_TIME 秒后 * 如果对应进程仍然未重启则强行杀死 * @var int */ const KILL_WORKER_TIMER_TIME = 1; /** * 默认的backlog,即内核中用于存放未被进程认领(accept)的连接队列长度 * @var int */ const DEFAUL_BACKLOG = 1024; /** * udp最大包长 * @var int */ const MAX_UDP_PACKEG_SIZE = 65535; /** * worker的名称,用于在运行status命令时标记进程 * @var string */ public $name = 'none'; /** * 设置当前worker实例的进程数 * @var int */ public $count = 1; /** * 设置当前worker进程的运行用户,启动时需要root超级权限 * @var string */ public $user = ''; /** * 当前worker进程是否可以平滑重启 * @var bool */ public $reloadable = true; /** * 当worker进程启动时,如果设置了$onWorkerStart回调函数,则运行 * 此钩子函数一般用于进程启动后初始化工作 * @var callback */ public $onWorkerStart = null; /** * 当有客户端连接时,如果设置了$onConnect回调函数,则运行 * @var callback */ public $onConnect = null; /** * 当客户端连接上发来数据时,如果设置了$onMessage回调,则运行 * @var callback */ public $onMessage = null; /** * 当客户端的连接关闭时,如果设置了$onClose回调,则运行 * @var callback */ public $onClose = null; /** * 当客户端的连接发生错误时,如果设置了$onError回调,则运行 * 错误一般为客户端断开连接导致数据发送失败 * 具体错误码及错误详情会以参数的形式传递给回调,参见手册 * @var callback */ public $onError = null; /** * 当连接的发送缓冲区满时,如果设置了$onBufferFull回调,则执行 * @var callback */ public $onBufferFull = null; /** * 当链接的发送缓冲区被清空时,如果设置了$onBufferDrain回调,则执行 * @var callback */ public $onBufferDrain = null; /** * 当前进程退出时(由于平滑重启或者服务停止导致),如果设置了此回调,则运行 * @var callback */ public $onWorkerStop = null; /** * 传输层协议 * @var string */ public $transport = 'tcp'; /** * 所有的客户端连接 * @var array */ public $connections = array(); /** * 应用层协议,由初始化worker时指定 * 例如 new worker('http://0.0.0.0:8080');指定使用http协议 * @var string */ protected $_protocol = ''; /** * 当前worker实例初始化目录位置,用于设置应用自动加载的根目录 * @var string */ protected $_appInitPath = ''; /** * 是否以守护进程的方式运行。运行start时加上-d参数会自动以守护进程方式运行 * 例如 php start.php start -d * @var bool */ public static $daemonize = false; /** * 重定向标准输出,即将所有echo、var_dump等终端输出写到对应文件中 * 注意 此参数只有在以守护进程方式运行时有效 * @var string */ public static $stdoutFile = '/dev/null'; /** * pid文件的路径及名称 * 例如 Worker::$pidFile = '/tmp/workerman.pid'; * 注意 此属性一般不必手动设置,默认会放到php临时目录中 * @var string */ public static $pidFile = ''; /** * 日志目录,默认在workerman根目录下,与Applications同级 * 可以手动设置 * 例如 Worker::$logFile = '/tmp/workerman.log'; * @var unknown_type */ public static $logFile = ''; /** * 全局事件轮询库,用于监听所有资源的可读可写事件 * @var Select/Libevent */ public static $globalEvent = null; /** * 主进程pid * @var int */ protected static $_masterPid = 0; /** * 监听的socket * @var stream */ protected $_mainSocket = null; /** * socket名称,包括应用层协议+ip+端口号,在初始化worker时设置 * 值类似 http://0.0.0.0:80 * @var string */ protected $_socketName = ''; /** * socket的上下文,具体选项设置可以在初始化worker时传递 * @var context */ protected $_context = null; /** * 所有的worker实例 * @var array */ protected static $_workers = array(); /** * 所有worker进程的pid * 格式为 [worker_id=>[pid=>pid, pid=>pid, ..], ..] * @var array */ protected static $_pidMap = array(); /** * 所有需要重启的进程pid * 格式为 [pid=>pid, pid=>pid] * @var array */ protected static $_pidsToRestart = array(); /** * 当前worker状态 * @var int */ protected static $_status = self::STATUS_STARTING; /** * 所有worke名称(name属性)中的最大长度,用于在运行 status 命令时格式化输出 * @var int */ protected static $_maxWorkerNameLength = 12; /** * 所有socket名称(_socketName属性)中的最大长度,用于在运行 status 命令时格式化输出 * @var int */ protected static $_maxSocketNameLength = 12; /** * 所有user名称(user属性)中的最大长度,用于在运行 status 命令时格式化输出 * @var int */ protected static $_maxUserNameLength = 12; /** * 运行 status 命令时用于保存结果的文件名 * @var string */ protected static $_statisticsFile = ''; /** * 启动的全局入口文件 * 例如 php start.php start ,则入口文件为start.php * @var string */ protected static $_startFile = ''; /** * 全局统计数据,用于在运行 status 命令时展示 * 统计的内容包括 workerman启动的时间戳及每组worker进程的退出次数及退出状态码 * @var array */ protected static $_globalStatistics = array( 'start_timestamp' => 0, 'worker_exit_info' => array() ); /** * 运行所有worker实例 * @return void */ public static function runAll() { // 初始化环境变量 self::init(); // 解析命令 self::parseCommand(); // 尝试以守护进程模式运行 self::daemonize(); // 初始化所有worker实例,主要是监听端口 self::initWorkers(); // 初始化所有信号处理函数 self::installSignal(); // 展示启动界面 self::displayUI(); // 尝试重定向标准输入输出 self::resetStd(); // 保存主进程pid self::saveMasterPid(); // 创建子进程(worker进程)并运行 self::forkWorkers(); // 监控所有子进程(worker进程) self::monitorWorkers(); } /** * 初始化一些环境变量 * @return void */ public static function init() { // 如果没设置$pidFile,则生成默认值 if(empty(self::$pidFile)) { $backtrace = debug_backtrace(); self::$_startFile = $backtrace[count($backtrace)-1]['file']; self::$pidFile = sys_get_temp_dir()."/workerman.".str_replace('/', '_', self::$_startFile).".pid"; } // 没有设置日志文件,则生成一个默认值 if(empty(self::$logFile)) { self::$logFile = __DIR__ . '/../workerman.log'; } // 标记状态为启动中 self::$_status = self::STATUS_STARTING; // 启动时间戳 self::$_globalStatistics['start_timestamp'] = time(); // 设置status文件位置 self::$_statisticsFile = sys_get_temp_dir().'/workerman.status'; // 尝试设置进程名称(需要php>=5.5或者安装了proctitle扩展) self::setProcessTitle('WorkerMan: master process start_file=' . self::$_startFile); // 初始化定时器 Timer::init(); } /** * 初始化所有的worker实例,主要工作为获得格式化所需数据及监听端口 * @return void */ protected static function initWorkers() { foreach(self::$_workers as $worker) { // 没有设置worker名称,则使用none代替 if(empty($worker->name)) { $worker->name = 'none'; } // 获得所有worker名称中最大长度 $worker_name_length = strlen($worker->name); if(self::$_maxWorkerNameLength < $worker_name_length) { self::$_maxWorkerNameLength = $worker_name_length; } // 获得所有_socketName中最大长度 $socket_name_length = strlen($worker->getSocketName()); if(self::$_maxSocketNameLength < $socket_name_length) { self::$_maxSocketNameLength = $socket_name_length; } // 获得运行用户名的最大长度 if(empty($worker->user) || posix_getuid() !== 0) { $worker->user = self::getCurrentUser(); } $user_name_length = strlen($worker->user); if(self::$_maxUserNameLength < $user_name_length) { self::$_maxUserNameLength = $user_name_length; } // 监听端口 $worker->listen(); } } /** * 获得运行当前进程的用户名 * @return string */ protected static function getCurrentUser() { $user_info = posix_getpwuid(posix_getuid()); return $user_info['name']; } /** * 展示启动界面 * @return void */ protected static function displayUI() { echo "\033[1A\n\033[K-----------------------\033[47;30m WORKERMAN \033[0m-----------------------------\n\033[0m"; echo 'Workerman version:' . Worker::VERSION . " PHP version:".PHP_VERSION."\n"; echo "------------------------\033[47;30m WORKERS \033[0m-------------------------------\n"; echo "\033[47;30muser\033[0m",str_pad('', self::$_maxUserNameLength+2-strlen('user')), "\033[47;30mworker\033[0m",str_pad('', self::$_maxWorkerNameLength+2-strlen('worker')), "\033[47;30mlisten\033[0m",str_pad('', self::$_maxSocketNameLength+2-strlen('listen')), "\033[47;30mprocesses\033[0m \033[47;30m","status\033[0m\n"; foreach(self::$_workers as $worker) { echo str_pad($worker->user, self::$_maxUserNameLength+2),str_pad($worker->name, self::$_maxWorkerNameLength+2),str_pad($worker->getSocketName(), self::$_maxSocketNameLength+2), str_pad(' '.$worker->count, 9), " \033[32;40m [OK] \033[0m\n";; } echo "----------------------------------------------------------------\n"; } /** * 解析运行命令 * php yourfile.php start | stop | restart | reload | status * @return void */ public static function parseCommand() { // 检查运行命令的参数 global $argv; $start_file = $argv[0]; if(!isset($argv[1])) { exit("Usage: php yourfile.php {start|stop|restart|reload|status}\n"); } // 命令 $command = trim($argv[1]); // 子命令,目前只支持-d $command2 = isset($argv[2]) ? $argv[2] : ''; // 记录日志 self::log("Workerman[$start_file] $command"); // 检查主进程是否在运行 $master_pid = @file_get_contents(self::$pidFile); $master_is_alive = $master_pid && @posix_kill($master_pid, 0); if($master_is_alive) { if($command === 'start') { self::log("Workerman[$start_file] is running"); } } elseif($command !== 'start' && $command !== 'restart') { self::log("Workerman[$start_file] not run"); } // 根据命令做相应处理 switch($command) { // 启动 workerman case 'start': if($command2 == '-d') { Worker::$daemonize = true; } break; // 显示 workerman 运行状态 case 'status': // 尝试删除统计文件,避免脏数据 if(is_file(self::$_statisticsFile)) { @unlink(self::$_statisticsFile); } // 向主进程发送 SIGUSR2 信号 ,然后主进程会向所有子进程发送 SIGUSR2 信号 // 所有进程收到 SIGUSR2 信号后会向 $_statisticsFile 写入自己的状态 posix_kill($master_pid, SIGUSR2); // 睡眠100毫秒,等待子进程将自己的状态写入$_statisticsFile指定的文件 usleep(100000); // 展示状态 readfile(self::$_statisticsFile); exit(0); // 重启 workerman case 'restart': // 停止 workeran case 'stop': self::log("Workerman[$start_file] is stoping ..."); // 想主进程发送SIGINT信号,主进程会向所有子进程发送SIGINT信号 $master_pid && posix_kill($master_pid, SIGINT); // 如果 $timeout 秒后主进程没有退出则展示失败界面 $timeout = 5; $start_time = time(); while(1) { // 检查主进程是否存活 $master_is_alive = $master_pid && posix_kill($master_pid, 0); if($master_is_alive) { // 检查是否超过$timeout时间 if(time() - $start_time >= $timeout) { self::log("Workerman[$start_file] stop fail"); exit; } usleep(10000); continue; } self::log("Workerman[$start_file] stop success"); // 是restart命令 if($command === 'stop') { exit(0); } // -d 说明是以守护进程的方式启动 if($command2 == '-d') { Worker::$daemonize = true; } break; } break; // 平滑重启 workerman case 'reload': posix_kill($master_pid, SIGUSR1); self::log("Workerman[$start_file] reload"); exit; // 未知命令 default : exit("Usage: php yourfile.php {start|stop|restart|reload|status}\n"); } } /** * 安装信号处理函数 * @return void */ protected static function installSignal() { // stop pcntl_signal(SIGINT, array('\Workerman\Worker', 'signalHandler'), false); // reload pcntl_signal(SIGUSR1, array('\Workerman\Worker', 'signalHandler'), false); // status pcntl_signal(SIGUSR2, array('\Workerman\Worker', 'signalHandler'), false); // ignore pcntl_signal(SIGPIPE, SIG_IGN, false); } /** * 为子进程重新安装信号处理函数,使用全局事件轮询监听信号 * @return void */ protected static function reinstallSignal() { // uninstall stop signal handler pcntl_signal(SIGINT, SIG_IGN, false); // uninstall reload signal handler pcntl_signal(SIGUSR1, SIG_IGN, false); // uninstall status signal handler pcntl_signal(SIGUSR2, SIG_IGN, false); // reinstall stop signal handler self::$globalEvent->add(SIGINT, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler')); // uninstall reload signal handler self::$globalEvent->add(SIGUSR1, EventInterface::EV_SIGNAL,array('\Workerman\Worker', 'signalHandler')); // uninstall status signal handler self::$globalEvent->add(SIGUSR2, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler')); } /** * 信号处理函数 * @param int $signal */ public static function signalHandler($signal) { switch($signal) { // stop case SIGINT: self::stopAll(); break; // reload case SIGUSR1: self::$_pidsToRestart = self::getAllWorkerPids();; self::reload(); break; // show status case SIGUSR2: self::writeStatisticsToStatusFile(); break; } } /** * 尝试以守护进程的方式运行 * @throws Exception */ protected static function daemonize() { if(!self::$daemonize) { return; } umask(0); $pid = pcntl_fork(); if(-1 == $pid) { throw new Exception('fork fail'); } elseif($pid > 0) { exit(0); } if(-1 == posix_setsid()) { throw new Exception("setsid fail"); } // fork again avoid SVR4 system regain the control of terminal $pid = pcntl_fork(); if(-1 == $pid) { throw new Exception("fork fail"); } elseif(0 !== $pid) { exit(0); } } /** * 重定向标准输入输出 * @throws Exception */ protected static function resetStd() { if(!self::$daemonize) { return; } global $STDOUT, $STDERR; $handle = fopen(self::$stdoutFile,"a"); if($handle) { unset($handle); @fclose(STDOUT); @fclose(STDERR); $STDOUT = fopen(self::$stdoutFile,"a"); $STDERR = fopen(self::$stdoutFile,"a"); } else { throw new Exception('can not open stdoutFile ' . self::$stdoutFile); } } /** * 保存pid到文件中,方便运行命令时查找主进程pid * @throws Exception */ protected static function saveMasterPid() { self::$_masterPid = posix_getpid(); if(false === @file_put_contents(self::$pidFile, self::$_masterPid)) { throw new Exception('can not save pid to ' . self::$pidFile); } } /** * 获得所有子进程的pid * @return array */ protected static function getAllWorkerPids() { $pid_array = array(); foreach(self::$_pidMap as $worker_pid_array) { foreach($worker_pid_array as $worker_pid) { $pid_array[$worker_pid] = $worker_pid; } } return $pid_array; } /** * 创建子进程 * @return void */ protected static function forkWorkers() { foreach(self::$_workers as $worker) { // 启动过程中需要得到运行用户名的最大长度,在status时格式化展示 if(self::$_status === self::STATUS_STARTING) { if(empty($worker->name)) { $worker->name = $worker->getSocketName(); } $worker_name_length = strlen($worker->name); if(self::$_maxWorkerNameLength < $worker_name_length) { self::$_maxWorkerNameLength = $worker_name_length; } } // 创建子进程 while(count(self::$_pidMap[$worker->workerId]) < $worker->count) { self::forkOneWorker($worker); } } } /** * 创建一个子进程 * @param Worker $worker * @throws Exception */ protected static function forkOneWorker($worker) { $pid = pcntl_fork(); // 主进程记录子进程pid if($pid > 0) { self::$_pidMap[$worker->workerId][$pid] = $pid; } // 子进程运行 elseif(0 === $pid) { self::$_pidMap = array(); self::$_workers = array($worker->workerId => $worker); Timer::delAll(); self::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName()); self::setProcessUser($worker->user); $worker->run(); exit(250); } else { throw new Exception("forkOneWorker fail"); } } /** * 尝试设置运行当前进程的用户 * @return void */ protected static function setProcessUser($user_name) { if(empty($user_name) || posix_getuid() !== 0) { return; } $user_info = posix_getpwnam($user_name); if($user_info['uid'] != posix_getuid() || $user_info['gid'] != posix_getgid()) { if(!posix_setgid($user_info['gid']) || !posix_setuid($user_info['uid'])) { self::log( 'Notice : Can not run woker as '.$user_name." , You shuld be root\n", true); } } } /** * 设置当前进程的名称,在ps aux命令中有用 * 注意 需要php>=5.5或者安装了protitle扩展 * @param string $title * @return void */ protected static function setProcessTitle($title) { // >=php 5.5 if (function_exists('cli_set_process_title')) { @cli_set_process_title($title); } // 需要扩展 elseif(extension_loaded('proctitle') && function_exists('setproctitle')) { @setproctitle($title); } } /** * 监控所有子进程的退出事件及退出码 * @return void */ protected static function monitorWorkers() { self::$_status = self::STATUS_RUNNING; while(1) { // 如果有信号到来,尝试触发信号处理函数 pcntl_signal_dispatch(); // 挂起进程,直到有子进程退出或者被信号打断 $status = 0; $pid = pcntl_wait($status, WUNTRACED); // 有子进程退出 if($pid > 0) { // 查找是哪个进程组的,然后再启动新的进程补上 foreach(self::$_pidMap as $worker_id => $worker_pid_array) { if(isset($worker_pid_array[$pid])) { $worker = self::$_workers[$worker_id]; // 检查退出状态 if($status !== 0) { self::log("worker[".$worker->name.":$pid] exit with status $status"); } // 统计,运行status命令时使用 if(!isset(self::$_globalStatistics['worker_exit_info'][$worker_id][$status])) { self::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0; } self::$_globalStatistics['worker_exit_info'][$worker_id][$status]++; // 清除子进程信息 unset(self::$_pidMap[$worker_id][$pid]); break; } } // 如果不是关闭状态,则补充新的进程 if(self::$_status !== self::STATUS_SHUTDOWN) { self::forkWorkers(); // 如果该进程是因为运行reload命令退出,则继续执行reload流程 if(isset(self::$_pidsToRestart[$pid])) { unset(self::$_pidsToRestart[$pid]); self::reload(); } } else { // 如果是关闭状态,并且所有进程退出完毕,则主进程退出 if(!self::getAllWorkerPids()) { self::exitAndClearAll(); } } } else { // 如果是关闭状态,并且所有进程退出完毕,则主进程退出 if(self::$_status === self::STATUS_SHUTDOWN && !self::getAllWorkerPids()) { self::exitAndClearAll(); } } } } /** * 退出当前进程 * @return void */ protected static function exitAndClearAll() { @unlink(self::$pidFile); self::log("Workerman[".basename(self::$_startFile)."] has been stopped"); exit(0); } /** * 执行平滑重启流程 * @return void */ protected static function reload() { // 主进程部分 if(self::$_masterPid === posix_getpid()) { // 设置为平滑重启状态 if(self::$_status !== self::STATUS_RELOADING && self::$_status !== self::STATUS_SHUTDOWN) { self::log("Workerman[".basename(self::$_startFile)."] reloading"); self::$_status = self::STATUS_RELOADING; } // 如果有worker设置了reloadable=false,则过滤掉 $reloadable_pid_array = array(); foreach(self::$_pidMap as $worker_id =>$worker_pid_array) { $worker = self::$_workers[$worker_id]; if($worker->reloadable) { foreach($worker_pid_array as $pid) { $reloadable_pid_array[$pid] = $pid; } } } // 得到所有可以重启的进程 self::$_pidsToRestart = array_intersect(self::$_pidsToRestart , $reloadable_pid_array); // 平滑重启完毕 if(empty(self::$_pidsToRestart)) { if(self::$_status !== self::STATUS_SHUTDOWN) { self::$_status = self::STATUS_RUNNING; } return; } // 继续执行平滑重启流程 $one_worker_pid = current(self::$_pidsToRestart ); // 给子进程发送平滑重启信号 posix_kill($one_worker_pid, SIGUSR1); // 定时器,如果子进程在KILL_WORKER_TIMER_TIME秒后没有退出,则强行杀死 Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($one_worker_pid, SIGKILL), false); } // 子进程部分 else { // 如果当前worker的reloadable属性为真,则执行退出 $worker = current(self::$_workers); if($worker->reloadable) { self::stopAll(); } } } /** * 执行关闭流程 * @return void */ public static function stopAll() { self::$_status = self::STATUS_SHUTDOWN; // 主进程部分 if(self::$_masterPid === posix_getpid()) { self::log("Workerman[".basename(self::$_startFile)."] Stopping ..."); $worker_pid_array = self::getAllWorkerPids(); // 向所有子进程发送SIGINT信号,表明关闭服务 foreach($worker_pid_array as $worker_pid) { posix_kill($worker_pid, SIGINT); Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($worker_pid, SIGKILL),false); } } // 子进程部分 else { // 执行stop逻辑 foreach(self::$_workers as $worker) { $worker->stop(); } exit(0); } } /** * 将当前进程的统计信息写入到统计文件 * @return void */ protected static function writeStatisticsToStatusFile() { // 主进程部分 if(self::$_masterPid === posix_getpid()) { $loadavg = sys_getloadavg(); file_put_contents(self::$_statisticsFile, "---------------------------------------GLOBAL STATUS--------------------------------------------\n"); file_put_contents(self::$_statisticsFile, 'Workerman version:' . Worker::VERSION . " PHP version:".PHP_VERSION."\n", FILE_APPEND); file_put_contents(self::$_statisticsFile, 'start time:'. date('Y-m-d H:i:s', self::$_globalStatistics['start_timestamp']).' run ' . floor((time()-self::$_globalStatistics['start_timestamp'])/(24*60*60)). ' days ' . floor(((time()-self::$_globalStatistics['start_timestamp'])%(24*60*60))/(60*60)) . " hours \n", FILE_APPEND); file_put_contents(self::$_statisticsFile, 'load average: ' . implode(", ", $loadavg) . "\n", FILE_APPEND); file_put_contents(self::$_statisticsFile, count(self::$_pidMap) . ' workers ' . count(self::getAllWorkerPids())." processes\n", FILE_APPEND); file_put_contents(self::$_statisticsFile, str_pad('worker_name', self::$_maxWorkerNameLength) . " exit_status exit_count\n", FILE_APPEND); foreach(self::$_pidMap as $worker_id =>$worker_pid_array) { $worker = self::$_workers[$worker_id]; if(isset(self::$_globalStatistics['worker_exit_info'][$worker_id])) { foreach(self::$_globalStatistics['worker_exit_info'][$worker_id] as $worker_exit_status=>$worker_exit_count) { file_put_contents(self::$_statisticsFile, str_pad($worker->name, self::$_maxWorkerNameLength) . " " . str_pad($worker_exit_status, 16). " $worker_exit_count\n", FILE_APPEND); } } else { file_put_contents(self::$_statisticsFile, str_pad($worker->name, self::$_maxWorkerNameLength) . " " . str_pad(0, 16). " 0\n", FILE_APPEND); } } file_put_contents(self::$_statisticsFile, "---------------------------------------PROCESS STATUS-------------------------------------------\n", FILE_APPEND); file_put_contents(self::$_statisticsFile, "pid\tmemory ".str_pad('listening', self::$_maxSocketNameLength)." ".str_pad('worker_name', self::$_maxWorkerNameLength)." connections ".str_pad('total_request', 13)." ".str_pad('send_fail', 9)." ".str_pad('throw_exception', 15)."\n", FILE_APPEND); chmod(self::$_statisticsFile, 0722); foreach(self::getAllWorkerPids() as $worker_pid) { posix_kill($worker_pid, SIGUSR2); } return; } // 子进程部分 $worker = current(self::$_workers); $wrker_status_str = posix_getpid()."\t".str_pad(round(memory_get_usage()/(1024*1024),2)."M", 7)." " .str_pad($worker->getSocketName(), self::$_maxSocketNameLength) ." ".str_pad(($worker->name == $worker->getSocketName() ? 'none' : $worker->name), self::$_maxWorkerNameLength)." "; $wrker_status_str .= str_pad(ConnectionInterface::$statistics['connection_count'], 11)." ".str_pad(ConnectionInterface::$statistics['total_request'], 14)." ".str_pad(ConnectionInterface::$statistics['send_fail'],9)." ".str_pad(ConnectionInterface::$statistics['throw_exception'],15)."\n"; file_put_contents(self::$_statisticsFile, $wrker_status_str, FILE_APPEND); } /** * 检查错误 * @return void */ public static function checkErrors() { if(self::STATUS_SHUTDOWN != self::$_status) { $error_msg = "WORKER EXIT UNEXPECTED "; $errors = error_get_last(); if($errors && ($errors['type'] == E_ERROR || $errors['type'] == E_PARSE || $errors['type'] == E_CORE_ERROR || $errors['type'] == E_COMPILE_ERROR || $errors['type'] == E_RECOVERABLE_ERROR )) { $error_msg .= self::getErrorType($errors['type']) . " {$errors['message']} in {$errors['file']} on line {$errors['line']}"; } self::log($error_msg); } } /** * 获取错误类型对应的意义 * @param integer $type * @return string */ protected static function getErrorType($type) { switch($type) { case E_ERROR: // 1 // return 'E_ERROR'; case E_WARNING: // 2 // return 'E_WARNING'; case E_PARSE: // 4 // return 'E_PARSE'; case E_NOTICE: // 8 // return 'E_NOTICE'; case E_CORE_ERROR: // 16 // return 'E_CORE_ERROR'; case E_CORE_WARNING: // 32 // return 'E_CORE_WARNING'; case E_COMPILE_ERROR: // 64 // return 'E_COMPILE_ERROR'; case E_COMPILE_WARNING: // 128 // return 'E_COMPILE_WARNING'; case E_USER_ERROR: // 256 // return 'E_USER_ERROR'; case E_USER_WARNING: // 512 // return 'E_USER_WARNING'; case E_USER_NOTICE: // 1024 // return 'E_USER_NOTICE'; case E_STRICT: // 2048 // return 'E_STRICT'; case E_RECOVERABLE_ERROR: // 4096 // return 'E_RECOVERABLE_ERROR'; case E_DEPRECATED: // 8192 // return 'E_DEPRECATED'; case E_USER_DEPRECATED: // 16384 // return 'E_USER_DEPRECATED'; } return ""; } /** * 记录日志 * @param string $msg * @return void */ protected static function log($msg) { $msg = $msg."\n"; if(self::$_status === self::STATUS_STARTING || !self::$daemonize) { echo $msg; } file_put_contents(self::$logFile, date('Y-m-d H:i:s') . " " . $msg, FILE_APPEND | LOCK_EX); } /** * worker构造函数 * @param string $socket_name * @return void */ public function __construct($socket_name = '', $context_option = array()) { // 保存worker实例 $this->workerId = spl_object_hash($this); self::$_workers[$this->workerId] = $this; self::$_pidMap[$this->workerId] = array(); // 获得实例化文件路径,用于自动加载设置根目录 $backrace = debug_backtrace(); $this->_appInitPath = dirname($backrace[0]['file']); // 设置socket上下文 if($socket_name) { $this->_socketName = $socket_name; if(!isset($context_option['socket']['backlog'])) { $context_option['socket']['backlog'] = self::DEFAUL_BACKLOG; } $this->_context = stream_context_create($context_option); } } /** * 监听端口 * @throws Exception */ public function listen() { // 设置自动加载根目录 Autoloader::setRootPath($this->_appInitPath); if(!$this->_socketName) { return; } // 获得应用层通讯协议以及监听的地址 list($scheme, $address) = explode(':', $this->_socketName, 2); // 如果有指定应用层协议,则检查对应的协议类是否存在 if($scheme != 'tcp' && $scheme != 'udp') { $scheme = ucfirst($scheme); $this->_protocol = '\\Protocols\\'.$scheme; if(!class_exists($this->_protocol)) { $this->_protocol = "\\Workerman\\Protocols\\$scheme"; if(!class_exists($this->_protocol)) { throw new Exception("class \\Protocols\\$scheme not exist"); } } } elseif($scheme === 'udp') { $this->transport = 'udp'; } // flag $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN; $this->_mainSocket = stream_socket_server($this->transport.":".$address, $errno, $errmsg, $flags, $this->_context); if(!$this->_mainSocket) { throw new Exception($errmsg); } // 尝试打开tcp的keepalive,关闭TCP Nagle算法 if(function_exists('socket_import_stream')) { $socket = socket_import_stream($this->_mainSocket ); @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); @socket_set_option($socket, SOL_SOCKET, TCP_NODELAY, 1); } // 设置非阻塞 stream_set_blocking($this->_mainSocket, 0); // 放到全局事件轮询中监听_mainSocket可读事件(客户端连接事件) if(self::$globalEvent) { if($this->transport !== 'udp') { self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection')); } else { self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection')); } } } /** * 获得 socket name * @return string */ public function getSocketName() { return $this->_socketName ? $this->_socketName : 'none'; } /** * 运行worker实例 */ public function run() { // 注册进程退出回调,用来检查是否有错误 register_shutdown_function(array("\\Workerman\\Worker", 'checkErrors')); // 设置自动加载根目录 Autoloader::setRootPath($this->_appInitPath); // 如果没有全局事件轮询,则创建一个 if(!self::$globalEvent) { if(extension_loaded('libevent')) { self::$globalEvent = new Libevent(); } else { self::$globalEvent = new Select(); } // 监听_mainSocket上的可读事件(客户端连接事件) if($this->_socketName) { if($this->transport !== 'udp') { self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection')); } else { self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection')); } } } // 重新安装事件处理函数,使用全局事件轮询监听信号事件 self::reinstallSignal(); // 用全局事件轮询初始化定时器 Timer::init(self::$globalEvent); // 如果有设置进程启动回调,则执行 if($this->onWorkerStart) { call_user_func($this->onWorkerStart, $this); } // 子进程主循环 self::$globalEvent->loop(); } /** * 停止当前worker实例 * @return void */ public function stop() { // 如果有设置进程终止回调,则执行 if($this->onWorkerStop) { call_user_func($this->onWorkerStop, $this); } // 删除相关监听事件,关闭_mainSocket self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ); @fclose($this->_mainSocket); } /** * 接收一个客户端连接 * @param resources $socket * @return void */ public function acceptConnection($socket) { // 获得客户端连接 $new_socket = @stream_socket_accept($socket, 0); // 惊群现象,忽略 if(false === $new_socket) { return; } // 统计数据 ConnectionInterface::$statistics['connection_count']++; // 初始化连接对象 $connection = new TcpConnection($new_socket); $connection->worker = $this; $connection->protocol = $this->_protocol; $connection->onMessage = $this->onMessage; $connection->onClose = $this->onClose; $connection->onError = $this->onError; $connection->onBufferDrain = $this->onBufferDrain; $connection->onBufferFull = $this->onBufferFull; $this->connections[(int)$new_socket] = $connection; // 如果有设置连接回调,则执行 if($this->onConnect) { try { call_user_func($this->onConnect, $connection); } catch(Exception $e) { ConnectionInterface::$statistics['throw_exception']++; self::log($e); } } } /** * 处理udp连接(udp其实是无连接的,这里为保证和tcp连接接口一致) * @param resource $socket */ public function acceptUdpConnection($socket) { $recv_buffer = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $remote_address); if(false === $recv_buffer || empty($remote_address)) { return false; } // 模拟一个连接对象 $connection = new UdpConnection($socket, $remote_address); if($this->onMessage) { $parser = $this->_protocol; ConnectionInterface::$statistics['total_request']++; try { call_user_func($this->onMessage, $connection, $parser::decode($recv_buffer, $connection)); } catch(Exception $e) { ConnectionInterface::$statistics['throw_exception']++; } } } }