天天看點

【實戰小項目】python開發自動化運維工具--批量操作主機

有很多開源自動化運維工具都很好用如ansible/salt stack等,完全不用重複造輪子。隻不過,很多運維同學學習Python之後,苦于沒小項目訓練,本篇示範用Python寫一個批量操作主機的工具,大家空餘時候可以試着寫寫,完善完善。

1 思路分析

在運維工作中,古老的方式部署環境、上線代碼可能都需要手動在伺服器上敲指令,不勝其煩。是以,腳本,自動化工具等還是很有必要的。我覺得一個批量操作工具應該考慮以下幾點:

(1)本質上,就是到遠端主機上執行指令并傳回結果。

(2)做到批量。也就是要并發對多台機器進行操作。

(3)将傳回的結果,清晰地展示給使用者。

通常開源的主機批量管理工具有兩類,一類是有agent,如SaltStack、Puppet等;另一類是無agent如ansible。雖然我們沒必要重複造輪子,但是可以試着寫一寫,一是加深對這類軟體原理的了解,二是練習Python。建議如果伺服器規模在1000台以内的用無agent的方式也能hold住;如果超過1000台,用有agent的會好太多。

接下來我們一起看看怎麼具體實作。

2 到遠端機器上執行指令

到遠端機器上執行指令,并傳回結果,至少有兩種方式:一是用paramiko子產品;而是可以建立機器互信,從中控執行ssh指令。

下面我把自己封裝好的代碼貼一下,是基于paramiko子產品封裝的,ssh的大家可以自己實作:

import paramiko

class SSHParamiko(object):

    err = "argument passwd or rsafile can not be None"

    def __init__(self, host, port, user, passwd=None, rsafile=None):
        self.h = host
        self.p = port
        self.u = user
        self.w = passwd
        self.rsa = rsafile

    def _connect(self):
        if self.w:
            return self.pwd_connect()
        elif self.rsa:
            return self.rsa_connect()
        else:
            raise ConnectionError(self.err)

    def _transfer(self):
        if self.w:
            return self.pwd_transfer()
        elif self.rsa:
            return self.rsa_transfer()
        else:
            raise ConnectionError(self.err)

    def pwd_connect(self):
        conn = paramiko.SSHClient()
        conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        conn.connect(self.h, self.p, self.u, self.w)
        return conn

    def rsa_connect(self):
        pkey = paramiko.RSAKey.from_private_key_file(self.rsa)
        conn = paramiko.SSHClient()
        conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        conn.connect(hostname=self.h, port=self.p, username=self.u, pkey=pkey)
        return conn

    def pwd_transfer(self):
        transport = paramiko.Transport(self.h, self.p)
        transport.connect(username=self.u, password=self.w)
        sftp = paramiko.SFTPClient.from_transport(transport)
        return sftp, transport

    def rsa_transfer(self):
        pkey = paramiko.RSAKey.from_private_key_file(self.rsa)
        transport = paramiko.Transport(self.h, self.p)
        transport.connect(username=self.u, pkey=pkey)
        sftp = paramiko.SFTPClient.from_transport(transport)
        return sftp, transport

    def run_cmd(self, cmd):
        conn = self._connect()
        stdin, stdout, stderr = conn.exec_command(cmd)
        code = stdout.channel.recv_exit_status()
        stdout, stderr = stdout.read(), stderr.read()
        conn.close()
        if not stderr:
            return code, stdout.decode()
        else:
            return code, stderr.decode()

    def get_file(self, remote, local):
        sftp, conn = self._transfer()
        sftp.get(remote, local)
        conn.close()

    def put_file(self, local, remote):
        sftp, conn = self._transfer()
        sftp.put(local, remote)
        conn.close()      

當然,代碼還可以重構一下哈。接下來我們看下效果:

if __name__ == '__main__':
    h = "我的測試機IP"
    p = 22
    u = "我的使用者名"
    w = "我的密碼"

    obj = SSHParamiko(h, p, u, w)
    r = obj.run_cmd("df -h")
    print(r[0])
    print(r[1])        

執行之後的結果是:

0
Filesystem      Size  Used Avail Use% Mounted on
/dev/vda1        40G  3.4G   34G   9% /
devtmpfs        3.9G     0  3.9G   0% /dev
tmpfs           3.9G     0  3.9G   0% /dev/shm
tmpfs           3.9G  410M  3.5G  11% /run
tmpfs           3.9G     0  3.9G   0% /sys/fs/cgroup
/dev/vdb        300G   12G  289G   4% /search/odin
tmpfs           783M     0  783M   0% /run/user/0
tmpfs           783M     0  783M   0% /run/user/1000        

可以清晰看到第一行是指令執行的狀态碼,0表示成功,非0表示失敗;第二行開始就是我們的指令傳回結果。是不是比較清晰呢? 

3 并發執行并展示輸出結果

并發執行通常用Python3自帶的線程子產品就行,這裡我用的from concurrent.futures import ThreadPoolExecutor。并且當拿到結果之後,我還做了一些格式化輸出,比如綠色輸出表示成功,紅色輸出表示指令執行失敗,黃色表示提醒等。廢話不多說,直接看代碼吧!

from concurrent.futures import ThreadPoolExecutor

class AllRun(object):
    def __init__(self, ssh_objs, cmds, max_worker=50):
        self.objs = [o for o in ssh_objs]
        self.cmds = [c for c in cmds]
        self.max_worker = max_worker  # 最大并發線程數

        self.success_hosts = []       # 存放成功機器數目
        self.failed_hosts = []        # 存放失敗的機器IP
        self.mode = None
        self.func = None

    def serial_exec(self, obj):
        """單台機器上串行執行指令,并傳回結果至字典"""
        result = list()
        for c in self.cmds:
            r = obj.run_cmd(c)
            result.append([c, r])
        return obj, result

    def concurrent_run(self):
        """并發執行"""
        future = ThreadPoolExecutor(self.max_worker)
        for obj in self.objs:
            try:
                future.submit(self.serial_exec, obj).add_done_callback(self.callback)
            except Exception as err:
                err = self.color_str(err, "red")
                print(err)
        future.shutdown(wait=True)

    def callback(self, future_obj):
        """回調函數,處理傳回結果"""
        ssh_obj, rlist = future_obj.result()
        print(self.color_str("{} execute detail:".format(ssh_obj.h), "yellow"))
        is_success = True
        for item in rlist:
            cmd, [code, res] = item
            info = f"{cmd} | code => {code}\nResult:\n{res}"
            if code != 0:
                info = self.color_str(info, "red")
                is_success = False
                if ssh_obj.h not in self.failed_hosts:
                    self.failed_hosts.append(ssh_obj.h)
            else:
                info = self.color_str(info, "green")
            print(info)
        if is_success:
            self.success_hosts.append(ssh_obj.h)
            if ssh_obj.h in self.failed_hosts:
                self.failed_hosts.remove(ssh_obj.h)

    def overview(self):
        """展示總的執行結果"""
        for i in self.success_hosts:
            print(self.color_str(i, "green"))
        print("-" * 30)
        for j in self.failed_hosts:
            print(self.color_str(j, "red"))
        info = "Success hosts {}; Failed hosts {}."
        s, f = len(self.success_hosts), len(self.failed_hosts)
        info = self.color_str(info.format(s, f), "yellow")
        print(info)

    @staticmethod
    def color_str(old, color=None):
        """給字元串添加顔色"""
        if color == "red":
            new = "\033[31;1m{}\033[0m".format(old)
        elif color == "yellow":
            new = "\033[33;1m{}\033[0m".format(old)
        elif color == "blue":
            new = "\033[34;1m{}\033[0m".format(old)
        elif color == "green":
            new = "\033[36;1m{}\033[0m".format(old)
        else:
            new = old
        return new

if __name__ == '__main__':
    h1 = "adime01.shouji.sjs.ted"
    p1 = 22
    u1 = "odin"
    w1 = "*****"

    h = "10.129.206.97"
    p = 22
    u = "root"
    w = "*****"

    obj1 = SSHParamiko(h1, p1, u1, w1)
    obj = SSHParamiko(h, p, u, w)

    cmds = ["df -h", "ls"]

    all_obj = AllRun([obj1, obj], cmds)
    all_obj.concurrent_run()
    all_obj.overview()      

 上述代碼運作的結果:

【實戰小項目】python開發自動化運維工具--批量操作主機

從執行結果來看,高亮顯示,清新明了。既顯示了各個主機的各個指令執行狀态碼,傳回結果,最後還彙總結果,成功了多少台機器和失敗了多少台機器。

我們還可以換一下執行的指令,讓指令執行失敗看看:

【實戰小項目】python開發自動化運維工具--批量操作主機

後期還可以包裝一下,将主機、密碼、批量執行的指令寫在配置檔案中;或再根據需要包裝成指令行工具,在日常運維工作中可以适當減少人肉敲指令的繁瑣。

作者:ZingpLiu

出處:http://www.cnblogs.com/zingp/

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接。

繼續閱讀