// 把運作的腳本變為守護程序
protected static function daemonize()
{
if (!self::$daemonize) {
return;
}
umask();
// 建立子程序.
$pid = pcntl_fork();
if (- === $pid) {
throw new Exception('fork fail');
} elseif ($pid > ) {
// 結束父程序.
exit();
}
// 使目前子程序成為會話組長.
if (- === posix_setsid()) {
throw new Exception("setsid fail");
}
// Fork again avoid SVR4 system regain the control of terminal.
// 再次fork子程序,使程序完全脫離終端,成為守護程序.
$pid = pcntl_fork();
if (- === $pid) {
throw new Exception("fork fail");
} elseif ( !== $pid) {
exit();
}
}
核心代碼:
$pid = pcntl_fork();
posix_setsid()
$pid = pcntl_fork();
// 初始化worker資料.
protected static function initWorkers()
{
// 周遊所有的worker.
foreach (self::$_workers as $worker) {
// 如果worker名字不存在,則給予一個初始值.
if (empty($worker->name)) {
$worker->name = 'none';
}
// 擷取最大的worker名稱長度.
$worker_name_length = strlen($worker->name);
if (self::$_maxWorkerNameLength < $worker_name_length) {
self::$_maxWorkerNameLength = $worker_name_length;
}
// 擷取最長的socket名稱長度.
$socket_name_length = strlen($worker->getSocketName());
if (self::$_maxSocketNameLength < $socket_name_length) {
self::$_maxSocketNameLength = $socket_name_length;
}
// Get unix user of the worker process.
// 如果運作使用者沒有設定,那麼把目前運作使用者作為運作使用者.
if (empty($worker->user)) {
$worker->user = self::getCurrentUser();
} else {
// 如果不是root使用者,并且設定的使用者和目前運作腳本使用者不比對,則給出一個警告資訊.
// posix_getuid() 擷取目前運作使用者的id, root使用者的id=0.
if (posix_getuid() !== && $worker->user != self::getCurrentUser()) {
self::log('Warning: You must have the root privileges to change uid and gid.');
}
}
// Get maximum length of unix user name.
$user_name_length = strlen($worker->user);
if (self::$_maxUserNameLength < $user_name_length) {
self::$_maxUserNameLength = $user_name_length;
}
// Listen.
// 建立worker監聽.
if (!$worker->reusePort) {
$worker->listen();
}
}
}
posix_getuid() 擷取運作腳本的目前uid (root使用者的uid=)
// 擷取目前運作腳本的使用者名.
protected static function getCurrentUser()
{
// 擷取目前運作使用者的使用者名.
// posix_getpwuid 根據使用者id擷取使用者資訊.
$user_info = posix_getpwuid(posix_getuid());
return $user_info['name'];
}
posix_getpwuid() 根據使用者的uid擷取使用者的資訊.
// worker進行監聽.
public function listen()
{
if (!$this->_socketName) {
return;
}
// Autoload.
// 設定自動加載路徑.
Autoloader::setRootPath($this->_autoloadRootPath);
if (!$this->_mainSocket) {
// Get the application layer communication protocol and listening address.
// 切割出協定和位址.
list($scheme, $address) = explode(':', $this->_socketName, );
// Check application layer protocol class.
// 是否為内置協定,不是内置協定需要加載協定的定義.
if (!isset(self::$_builtinTransports[$scheme])) {
$scheme = ucfirst($scheme);
/**
* 協定可能存在的目錄.
* Project\Protocols\...
* Project\WorkerMan\Protocols\...
*/
$this->protocol = '\\Protocols\\' . $scheme;
if (!class_exists($this->protocol)) {
$this->protocol = "\\Workerman\\Protocols\\$scheme";
// 自定義協定處理不存在,抛出錯誤.
if (!class_exists($this->protocol)) {
throw new Exception("class \\Protocols\\$scheme not exist");
}
}
// 自定義的協定預設都走tcp.
if (!isset(self::$_builtinTransports[$this->transport])) {
throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
}
} else {
$this->transport = $scheme;
}
$local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;
// Flag.
$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno = ;
$errmsg = '';
// SO_REUSEPORT.
// 端口複用-php7才能使用.
if ($this->reusePort) {
stream_context_set_option($this->_context, 'socket', 'so_reuseport', );
}
// Create an Internet or Unix domain server socket.
// 建立服務端server的socket監聽.
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
if (!$this->_mainSocket) {
throw new Exception($errmsg);
}
if ($this->transport === 'ssl') {
stream_socket_enable_crypto($this->_mainSocket, false);
} elseif ($this->transport === 'unix') {
$socketFile = substr($address, );
if ($this->user) {
chown($socketFile, $this->user);
}
if ($this->group) {
chgrp($socketFile, $this->group);
}
}
// Try to open keepalive for tcp and disable Nagle algorithm.
//
if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
// 将stream轉換為socket對象.
$socket = socket_import_stream($this->_mainSocket);
// 設定socket屬性keepalive和禁用Nagle.
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, );
@socket_set_option($socket, SOL_TCP, TCP_NODELAY, );
}
// 非阻塞模式.
stream_set_blocking($this->_mainSocket, );
}
// 注冊監聽(self::$globalEvent但是這裡此變量=null,所有是不會執行的).
$this->resumeAccept();
}
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context); // 建立socket監聽