天天看點

WorkerMan::Worker類解析(3)

// 把運作的腳本變為守護程序
    protected static function daemonize()
    {
        if (!self::$daemonize) {
            return;
        }
        umask();
        // 建立子程序.
        $pid = pcntl_fork();
        if (- === $pid) {
            throw new Exception('fork fail');
        } elseif ($pid > ) {
            // 結束父程序.
            exit();
        }
        // 使目前子程序成為會話組長.
        if (- === posix_setsid()) {
            throw new Exception("setsid fail");
        }
        // Fork again avoid SVR4 system regain the control of terminal.
        // 再次fork子程序,使程序完全脫離終端,成為守護程序.
        $pid = pcntl_fork();
        if (- === $pid) {
            throw new Exception("fork fail");
        } elseif ( !== $pid) {
            exit();
        }
    }

核心代碼:
$pid = pcntl_fork();
posix_setsid()
$pid = pcntl_fork();
           
// 初始化worker資料.
    protected static function initWorkers()
    {
        // 周遊所有的worker.
        foreach (self::$_workers as $worker) {
            // 如果worker名字不存在,則給予一個初始值.
            if (empty($worker->name)) {
                $worker->name = 'none';
            }

            // 擷取最大的worker名稱長度.
            $worker_name_length = strlen($worker->name);
            if (self::$_maxWorkerNameLength < $worker_name_length) {
                self::$_maxWorkerNameLength = $worker_name_length;
            }

            // 擷取最長的socket名稱長度.
            $socket_name_length = strlen($worker->getSocketName());
            if (self::$_maxSocketNameLength < $socket_name_length) {
                self::$_maxSocketNameLength = $socket_name_length;
            }

            // Get unix user of the worker process.
            // 如果運作使用者沒有設定,那麼把目前運作使用者作為運作使用者.
            if (empty($worker->user)) {
                $worker->user = self::getCurrentUser();
            } else {
                // 如果不是root使用者,并且設定的使用者和目前運作腳本使用者不比對,則給出一個警告資訊.
                // posix_getuid() 擷取目前運作使用者的id, root使用者的id=0.
                if (posix_getuid() !==  && $worker->user != self::getCurrentUser()) {
                    self::log('Warning: You must have the root privileges to change uid and gid.');
                }
            }

            // Get maximum length of unix user name.
            $user_name_length = strlen($worker->user);
            if (self::$_maxUserNameLength < $user_name_length) {
                self::$_maxUserNameLength = $user_name_length;
            }

            // Listen.
            // 建立worker監聽.
            if (!$worker->reusePort) {
                $worker->listen();
            }
        }
    }

 posix_getuid() 擷取運作腳本的目前uid (root使用者的uid=)

    // 擷取目前運作腳本的使用者名.
    protected static function getCurrentUser()
    {
        // 擷取目前運作使用者的使用者名.
        // posix_getpwuid 根據使用者id擷取使用者資訊.
        $user_info = posix_getpwuid(posix_getuid());
        return $user_info['name'];
    }

 posix_getpwuid() 根據使用者的uid擷取使用者的資訊.
           
// worker進行監聽.
    public function listen()
    {
        if (!$this->_socketName) {
            return;
        }

        // Autoload.
        // 設定自動加載路徑.
        Autoloader::setRootPath($this->_autoloadRootPath);

        if (!$this->_mainSocket) {
            // Get the application layer communication protocol and listening address.
            // 切割出協定和位址.
            list($scheme, $address) = explode(':', $this->_socketName, );
            // Check application layer protocol class.
            // 是否為内置協定,不是内置協定需要加載協定的定義.
            if (!isset(self::$_builtinTransports[$scheme])) {
                $scheme         = ucfirst($scheme);
                /**
                 * 協定可能存在的目錄.
                 * Project\Protocols\...
                 * Project\WorkerMan\Protocols\...
                 */
                $this->protocol = '\\Protocols\\' . $scheme;
                if (!class_exists($this->protocol)) {
                    $this->protocol = "\\Workerman\\Protocols\\$scheme";
                    // 自定義協定處理不存在,抛出錯誤.
                    if (!class_exists($this->protocol)) {
                        throw new Exception("class \\Protocols\\$scheme not exist");
                    }
                }

                // 自定義的協定預設都走tcp.
                if (!isset(self::$_builtinTransports[$this->transport])) {
                    throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
                }
            } else {
                $this->transport = $scheme;
            }

            $local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;

            // Flag.
            $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
            $errno = ;
            $errmsg = '';
            // SO_REUSEPORT.
            // 端口複用-php7才能使用.
            if ($this->reusePort) {
                stream_context_set_option($this->_context, 'socket', 'so_reuseport', );
            }

            // Create an Internet or Unix domain server socket.
            // 建立服務端server的socket監聽.
            $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
            if (!$this->_mainSocket) {
                throw new Exception($errmsg);
            }

            if ($this->transport === 'ssl') {
                stream_socket_enable_crypto($this->_mainSocket, false);
            } elseif ($this->transport === 'unix') {
                $socketFile = substr($address, );
                if ($this->user) {
                    chown($socketFile, $this->user);
                }
                if ($this->group) {
                    chgrp($socketFile, $this->group);
                }
            }

            // Try to open keepalive for tcp and disable Nagle algorithm.
            //
            if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
                // 将stream轉換為socket對象.
                $socket = socket_import_stream($this->_mainSocket);
                // 設定socket屬性keepalive和禁用Nagle.
                @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, );
                @socket_set_option($socket, SOL_TCP, TCP_NODELAY, );
            }

            // 非阻塞模式.
            stream_set_blocking($this->_mainSocket, );
        }

        // 注冊監聽(self::$globalEvent但是這裡此變量=null,所有是不會執行的).
        $this->resumeAccept();
    }

 $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context); // 建立socket監聽
           

繼續閱讀