天天看点

Go操作supervisor xml rpc接口及注意事项

Go操作supervisor xml rpc接口及注意事项

文章目录

  • ​​Go操作supervisor xml rpc接口及注意事项​​
  • ​​1. 前言​​
  • ​​2. 管理web​​
  • ​​3. go处理库​​
  • ​​4. 实时日志处理代码片段​​

1. 前言

之前提到过目前我们的进程都是通过supervisor(http://supervisord.org/)这样一个进程管理软件进行管理的,也专门做过专题翻译过supervisor的一些内容

我们会发现3.0以上的版本会有xml-rpc接口(http://supervisord.org/xmlrpc.html)可以通过对应接口控制supervisor管理的进程,包括获取对应的日志、运行状态等功能,这在实际开发过程中获取这些信息在web上进行控制、查询也是非常有帮助的,所以这里对go如何进行supervisor管理进程的信息的处理做简单的总结。

2. 管理web

一般在配置文件中添加:

[inet_http_server]
port=9001      

即可通过9001端口访问一个web页面:

Go操作supervisor xml rpc接口及注意事项

而通过xml-rpc可以获取状态,对这些进程进行控制管理,查看对应日志等。

注意:处于安全可能会需要配置该web的用户名和密码,但是为了方便进行程序管理,最好不要配置鉴权,否则程序可能由于鉴权失败无法进行控制。

3. go处理库

这里给个go-supervisor的处理库:https://github.com/abrander/go-supervisord

​​https://pkg.go.dev/github.com/abrander/go-supervisord#section-readme​​

import "github.com/abrander/go-supervisord"
  
func main() {
    c, err := supervisord.NewClient("http://127.0.0.1:9001/RPC2")
    if err != nil {
        panic(err.Error())
    }
    
    err = c.ClearLog()
    if err != nil {
        panic(err.Error())
    }
    
    err = c.Restart()
    if err != nil {
        panic(err.Error())
    }
}      

4. 实时日志处理代码片段

func (s *businessLogService) TailLog(name string, ws *ghttp.WebSocket) error {
    c, err := supervisord.NewClient("http://127.0.0.1:9001/RPC2")
    if err != nil {
        return err
    }
    defer c.Close()
    processInfo, err := c.GetProcessInfo(name)
    if err != nil {
        logger.Error(err)
        return err
    }
    filename = processInfo.StdoutLogfile
    logger.Debug(filename)
    s.serveWs(ws)
    return nil
}

/***
编译时需要安装以下依赖:
go get github.com/gorilla/websocket
go get github.com/hpcloud/tail
*/
const (
    // Time allowed to write the file to the client.
    //writeWait = 1 * time.Second
    writeWait = 100 * time.Millisecond

    // Time allowed to read the next pong message from the client.
    //pongWait = 24 * time.Hour
    pongWait = 60 * time.Second

    // Send pings to client with this period. Must be less than pongWait.
    pingPeriod = (pongWait * 9) / 10

    // Poll file for changes with this period.
    filePeriod = 1 * time.Second
)

var (
    filename string
)

func (s *businessLogService) readFileIfModified(lastMod time.Time) ([]byte, time.Time, error) {
    fi, err := os.Stat(filename)
    if err != nil {
        return nil, lastMod, err
    }
    if !fi.ModTime().After(lastMod) {
        return nil, lastMod, nil
    }
    p, err := ioutil.ReadFile(filename)
    if err != nil {
        return nil, fi.ModTime(), err
    }
    return p, fi.ModTime(), nil
}

func (s *businessLogService) reader(ws *ghttp.WebSocket) {
    defer ws.Close()
    ws.SetReadLimit(512)
    ws.SetReadDeadline(time.Now().Add(pongWait))
    ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
    for {
        _, _, err := ws.ReadMessage()
        if err != nil {
            logger.Warn(err)
            break
        }
    }
}

func (s *businessLogService) tailFile() *tail.Tail {
    tailFd, err := tail.TailFile(filename, tail.Config{
        ReOpen:    true,                                 // 文件被移除或被打包,需要重新打开
        Follow:    true,                                 // 实时跟踪
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, // 如果程序出现异常,保存上次读取的位置,避免重新读取。
        MustExist: false,                                // 如果文件不存在,是否推出程序,false是不退出
        Poll:      true,
    })

    if err != nil {
        logger.Error("tail file failed, err:", err)
        return nil
    }
    return tailFd
}

func (s *businessLogService) writer(ws *ghttp.WebSocket) {
    tailFd := s.tailFile()
    pingTicker := time.NewTicker(pingPeriod)
    fileTicker := time.NewTicker(filePeriod)
    maxTimeout := time.NewTicker(time.Duration(1) * time.Minute)
    defer func() {
        pingTicker.Stop()
        fileTicker.Stop()
        ws.Close()
    }()

    for {
        select {
        case msg, ok := <-tailFd.Lines:
            if ok {
                ws.SetWriteDeadline(time.Now().Add(writeWait))
                logger.Debug("read file content: %s\n", msg)
                if err := ws.WriteMessage(websocket.TextMessage, []byte(msg.Text)); err != nil {
                    return
                }
            }
        case <-pingTicker.C:
            ws.SetWriteDeadline(time.Now().Add(writeWait))
            if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
                return
            }
        case <-maxTimeout.C:
            ws.WriteMessage(websocket.TextMessage, []byte("Maximum timeout"))
            ws.Close()
        }
    }
}

func (s *businessLogService) serveWs(ws *ghttp.WebSocket) {
    go s.writer(ws)
    s.reader(ws)
}