天天看點

Go操作supervisor xml rpc接口及注意事項

Go操作supervisor xml rpc接口及注意事項

文章目錄

  • ​​Go操作supervisor xml rpc接口及注意事項​​
  • ​​1. 前言​​
  • ​​2. 管理web​​
  • ​​3. go處理庫​​
  • ​​4. 實時日志處理代碼片段​​

1. 前言

之前提到過目前我們的程序都是通過supervisor(http://supervisord.org/)這樣一個程序管理軟體進行管理的,也專門做過專題翻譯過supervisor的一些内容

我們會發現3.0以上的版本會有xml-rpc接口(http://supervisord.org/xmlrpc.html)可以通過對應接口控制supervisor管理的程序,包括擷取對應的日志、運作狀态等功能,這在實際開發過程中擷取這些資訊在web上進行控制、查詢也是非常有幫助的,是以這裡對go如何進行supervisor管理程序的資訊的處理做簡單的總結。

2. 管理web

一般在配置檔案中添加:

[inet_http_server]
port=9001      

即可通過9001端口通路一個web頁面:

Go操作supervisor xml rpc接口及注意事項

而通過xml-rpc可以擷取狀态,對這些程序進行控制管理,檢視對應日志等。

注意:處于安全可能會需要配置該web的使用者名和密碼,但是為了友善進行程式管理,最好不要配置鑒權,否則程式可能由于鑒權失敗無法進行控制。

3. go處理庫

這裡給個go-supervisor的處理庫:https://github.com/abrander/go-supervisord

​​https://pkg.go.dev/github.com/abrander/go-supervisord#section-readme​​

import "github.com/abrander/go-supervisord"
  
func main() {
    c, err := supervisord.NewClient("http://127.0.0.1:9001/RPC2")
    if err != nil {
        panic(err.Error())
    }
    
    err = c.ClearLog()
    if err != nil {
        panic(err.Error())
    }
    
    err = c.Restart()
    if err != nil {
        panic(err.Error())
    }
}      

4. 實時日志處理代碼片段

func (s *businessLogService) TailLog(name string, ws *ghttp.WebSocket) error {
    c, err := supervisord.NewClient("http://127.0.0.1:9001/RPC2")
    if err != nil {
        return err
    }
    defer c.Close()
    processInfo, err := c.GetProcessInfo(name)
    if err != nil {
        logger.Error(err)
        return err
    }
    filename = processInfo.StdoutLogfile
    logger.Debug(filename)
    s.serveWs(ws)
    return nil
}

/***
編譯時需要安裝以下依賴:
go get github.com/gorilla/websocket
go get github.com/hpcloud/tail
*/
const (
    // Time allowed to write the file to the client.
    //writeWait = 1 * time.Second
    writeWait = 100 * time.Millisecond

    // Time allowed to read the next pong message from the client.
    //pongWait = 24 * time.Hour
    pongWait = 60 * time.Second

    // Send pings to client with this period. Must be less than pongWait.
    pingPeriod = (pongWait * 9) / 10

    // Poll file for changes with this period.
    filePeriod = 1 * time.Second
)

var (
    filename string
)

func (s *businessLogService) readFileIfModified(lastMod time.Time) ([]byte, time.Time, error) {
    fi, err := os.Stat(filename)
    if err != nil {
        return nil, lastMod, err
    }
    if !fi.ModTime().After(lastMod) {
        return nil, lastMod, nil
    }
    p, err := ioutil.ReadFile(filename)
    if err != nil {
        return nil, fi.ModTime(), err
    }
    return p, fi.ModTime(), nil
}

func (s *businessLogService) reader(ws *ghttp.WebSocket) {
    defer ws.Close()
    ws.SetReadLimit(512)
    ws.SetReadDeadline(time.Now().Add(pongWait))
    ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
    for {
        _, _, err := ws.ReadMessage()
        if err != nil {
            logger.Warn(err)
            break
        }
    }
}

func (s *businessLogService) tailFile() *tail.Tail {
    tailFd, err := tail.TailFile(filename, tail.Config{
        ReOpen:    true,                                 // 檔案被移除或被打包,需要重新打開
        Follow:    true,                                 // 實時跟蹤
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, // 如果程式出現異常,儲存上次讀取的位置,避免重新讀取。
        MustExist: false,                                // 如果檔案不存在,是否推出程式,false是不退出
        Poll:      true,
    })

    if err != nil {
        logger.Error("tail file failed, err:", err)
        return nil
    }
    return tailFd
}

func (s *businessLogService) writer(ws *ghttp.WebSocket) {
    tailFd := s.tailFile()
    pingTicker := time.NewTicker(pingPeriod)
    fileTicker := time.NewTicker(filePeriod)
    maxTimeout := time.NewTicker(time.Duration(1) * time.Minute)
    defer func() {
        pingTicker.Stop()
        fileTicker.Stop()
        ws.Close()
    }()

    for {
        select {
        case msg, ok := <-tailFd.Lines:
            if ok {
                ws.SetWriteDeadline(time.Now().Add(writeWait))
                logger.Debug("read file content: %s\n", msg)
                if err := ws.WriteMessage(websocket.TextMessage, []byte(msg.Text)); err != nil {
                    return
                }
            }
        case <-pingTicker.C:
            ws.SetWriteDeadline(time.Now().Add(writeWait))
            if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
                return
            }
        case <-maxTimeout.C:
            ws.WriteMessage(websocket.TextMessage, []byte("Maximum timeout"))
            ws.Close()
        }
    }
}

func (s *businessLogService) serveWs(ws *ghttp.WebSocket) {
    go s.writer(ws)
    s.reader(ws)
}