天天看點

[docker]通過rsyslog記錄日志并轉發nginx日志到python程式關鍵點1 nginx日志配置關鍵點2 rsyslog配置與Dockerfile關鍵點3 python程式通過tcp的方式讀取rsyslog

記錄我是如何把rsyslog做成docker鏡像,擷取nginx的accesslog并且轉發到python的

關鍵點1 nginx日志配置

nginx日志要設定成json格式輸出,nginx.conf如下圖所示,這個可以在docker鏡像中通過volume把nginx.conf挂載進去,然後把/var/log/nginx/access.log挂載到本地

user  root;
worker_processes  1;
error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;
events {
    worker_connections  1024;
}
http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;
    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';
    log_format main '{"time_local": "$time_local", '
   '"path": "$request_uri", '
   '"ip": "$remote_addr", '
   '"time": "$time_iso8601", '
   '"user_agent": "$http_user_agent", '
   '"user_id_got": "$uid_got", '
   '"user_id_set": "$uid_set", '
   '"remote_user": "$remote_user", '
   '"request": "$request", '
   '"status": "$status", '
   '"body_bytes_sent": "$body_bytes_sent", '
   '"request_time": "$request_time", '
   '"http_referrer": "$http_referer" }';
    access_log  /var/log/nginx/access.log  main;
    sendfile        on;
    #tcp_nopush     on;
    keepalive_timeout  65;
    #gzip  on;
    include /etc/nginx/conf.d/*.conf;
}      

關鍵點2 rsyslog配置與Dockerfile

編寫一個51-nginx-forward.conf檔案放置在/etc/rsyslog.d/下即可

module(load="imfile")
input(type="imfile"
      File="/var/log/nginx/access.log"
      Tag="mywebsite:")
# omfwd module for forwarding the logs to another tcp server
if( $syslogtag == 'mywebsite:')  then {
  action(type="omfwd" target="python伺服器IP位址" port="6000" protocol="tcp"
            action.resumeRetryCount="100"
            queue.type="linkedList" queue.size="10000")
}      

我們可以用一個Dockerfile來運作rsyslog,docker run的時候注意日志的挂載

FROM ubuntu:16.04
RUN apt-get update && apt-get install -y rsyslog; \
    rm -rf /var/lib/apt/lists/*
ADD 51-nginx-forward.conf /etc/rsyslog.d/.
# RUN cat /dev/null> /var/log/mail.log
CMD service rsyslog start && tail -f /var/log/syslog      

關鍵點3 python程式通過tcp的方式讀取rsyslog

python程式與rsyslog建立tcp連接配接,可以實時的進行資料庫的插入語句

import asyncio
import json
import time
import database_init
class LogAnalyser:
    def __init__(self):
        pass
    def process(self, str_input):
        # print(str_input)
        str_input = str_input.decode("utf-8", errors="ignore")
        # Add your processing steps here
        # ...
        try:
            # Extract created_at from the log string
            str_splits = str_input.split("{", 1)
            json_text = "{" + str_splits[1]
            data = json.loads(json_text)
            created_at = data["time"]
            request_all = data["request"].split(" /", 1)
            http_type = request_all[0]
            path = data["path"]
            request_time = data["request_time"]
            if PREFIX in data["path"]:
                path = data["path"]
                return http_type, path, created_at,request_time  # The order is relevant for INSERT query params
        except Exception as e:
            print("error in read_rsylog.py,Class LogAnalyser,function process")
            print(e)
        return None
@asyncio.coroutine
def handle_echo(reader, writer):
    log_filter = LogAnalyser()
    while True:
        line = yield from reader.readline()
        if not line:
            break
        params = log_filter.process(line)
        if params:
            # 進行一堆操作,例如進行資料庫的插入
            # execute_sql(params=params)
if __name__ == '__main__':
    CURSOR = database_init.DBConnect().CURSOR
    CONN = database_init.DBConnect().CONN
    PREFIX = database_init.DBConnect().CONFIG["TEST_SWAGGER"]["PREFIX"]
    database_init.DBConnect().create_table()
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_echo, None, 6000, loop=loop)
    server = loop.run_until_complete(coro)
    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    # Close the server
    print("Closing the server.")
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
    CURSOR.close()
    CONN.close()