天天看點

基于 Fabric 部署分布式爬蟲的思考

Python:基于 Fabric 部署分布式爬蟲的思考

Fabric 本身是一款用于自動化管理,釋出任務和布署應用的工具,在自動化運維中比較常見

當然其他的 連接配接工具同樣優秀,比如 paramiko ,隻是 fabric 封裝的更好,文檔更全,使用也更簡單

中文文檔 https://fabric-chs.readthedocs.io/zh_CN/chs/

Fabric 是一個用 Python 編寫的指令行工具庫,它可以幫助系統管理者高效地執行某些任務

一個讓通過 SSH 執行 Shell 指令更加 容易 、 更符合 Python 風格 的指令庫

基于 Fabric 部署分布式爬蟲的思考

安裝

sudo pip3 install fabric3
           

最終這樣都可以運作

fab -f REQUESTS_HTML.py host_type # host_type 是任務函數

or

python3 REQUESTS_HTML.py  # 運作整個檔案
           

實戰案例

我是一個不怎麼愛說話的人(其實是文筆一般),是以直接貼代碼恐怕是最好的分享的方式了

第一個 Demo

from fabric.api import run,cd,env,hosts,execute
env.hosts=['[email protected]:22']
env.password='pwd2'

def host_type():
    with cd('../home'):
        run('ls')
        run('cd youboy_redis')
        run('cd Youboy && cd youboy && ls && python3 run.py')

print(execute(host_type)) # execute 執行任務
           

可以很清晰的看到函數執行的順序,進入 爬蟲 目錄并運作一個爬蟲腳本,就像在本地執行指令一樣,這裡支援 with 上下文

簡單做一個類封裝

from fabric.api import run,cd,env,hosts,execute
class H():
    def __init__(self,host,pwd):
        env.hosts=list(hosts)
        env.password=pwd

    def host_type(self):
        with cd('../home'):
            run('ls')
    def run(self):
        print(execute(self.host_type))

h = H('[email protected]:22','pwd2')
h.run()

           

嗯,沒有問題

并行執行

我們在介紹執行遠端指令時曾提到過多台機器的任務預設情況下是串行執行的。

Fabric 支援并行任務,當伺服器的任務之間沒有依賴時,并行可以有效的加快執行速度。

怎麼開啟并行執行呢?

在執行”fab”指令時加上”-P”參數

$ fab -P host_type
           

或者

設定 ”env.parallel” 環境參數為True

from fabric.api import env
env.parallel = True
           

如果,我們隻想對某一任務做并行的話,我們可以在任務函數上加上”@parallel”裝飾器:

from fabric.api import parallel
 
@parallel
def runs_in_parallel():
    pass
 
def runs_serially():
    pass
           

這樣即便并行未開啟,”runs_in_parallel()”任務也會并行執行。

反過來,我們可以在任務函數上加上”@serial”裝飾器:

from fabric.api import serial
 
def runs_in_parallel():
    pass
 
@serial
def runs_serially():
    pass
           

這樣即便并行已經開啟,”runs_serially()”任務也會串行執行。

試着做一些事情

配置檔案

使用 yaml 格式

[mysql]
host = 127.0.0.1
port = 3306
db = python
user = root
passwd = 123456
charset = utf8

[mongodb]
host = ip
port = 27017
db = QXB

[redis]
host = 127.0.0.1
port = 6379
db = 0

[server]
aliyun1_host = ["公網ip", "ssh密碼", 22]
aliyun2_host = ["公網ip", "ssh密碼", 22]

           

讀取配置

config.py

from configparser import ConfigParser
import json
config = ConfigParser()
config.read('./conf.yml')  # ['conf.ini'] ['conf.cfg]


# 擷取所有的section
# print(config.sections())  # ['mysql', redis]

conf_list = list()
for host in config.options('server'):
    str_host = config.get('server', host)
    json_host = json.loads(str_host)
    conf_list.append(json_host)

print(conf_list)
           

遠端連接配接伺服器以及一些常用操作

import warnings
warnings.filterwarnings("ignore")
import time
from fabric.api import * # run,cd,env,hosts,execute,sudo,settings,hide
from fabric.colors import *
from fabric.contrib.console import confirm
import config
import json
from fabric.tasks import Task

class HA():
    def __init__(self):
        self.host = "root@{host}:{port}"
        self.ssh = "root@{host}:{port}"
        self.env = env
        self.env.warn_only = True # 這樣寫比較痛快
        self.env.hosts = [
            self.host.format(host=host[0],port=host[2]) for host in config.conf_list]
        self.env.passwords = {
            self.ssh.format(host=host[0], port=host[2]):host[1] for host in config.conf_list}

 
        print(self.env["hosts"])

    # def Hide_all(self):
    #     with settings(hide('everything'), warn_only=True):  # 關閉顯示
    #         result = run('ls')
    #         print(result)  # 指令執行的結果
    #         print(result.return_code)

    # def Show_all(self):
    #     with settings(show('everything'), warn_only=True):  # 顯示所有
    #         result = run('docker')
    #         print(str(result.return_code))  # 傳回碼,0表示正确執行,1表示錯誤
    #         print(str(result.failed))

    # @task
    # def Prefix(self): # 字首,它接受一個指令作為參數,表示在其内部執行的代碼塊,都要先執行prefix的指令參數。
    #     with cd('../home'):
    #         with prefix('echo 123'):
    #             run('echo caonima')


    # def Shell_env(self): # 設定shell腳本的環境變量 
    #     with shell_env(HTTP_PROXY='1.1.1.1'):
    #         run('echo $HTTP_PROXY')


    # def Path_env(self): # 配置遠端伺服器PATH環境變量,隻對目前會話有效,不會影響遠端伺服器的其他操作,path的修改支援多種模式
    #     with path('/tmp', 'prepend'):
    #         run("echo $PATH")
    #     run("echo $PATH")


    # def Mongo(self): # 嘗試連接配接mongodb資料庫  不知道為什麼制定端口就不行了
    #     # with remote_tunnel(27017):
    #     run('mongo')


    # def Mysql(self):  # 嘗試連接配接mysql資料庫
    #     with remote_tunnel(3306):
    #         run('mysql -u root -p password')

    '''
    指定host時,可以同時指定使用者名和端口号: [email protected]:port
    通過指令行指定要多哪些hosts執行人物:fab mytask:hosts="host1;host2"
    通過hosts裝飾器指定要對哪些hosts執行目前task
    通過env.reject_unkown_hosts控制未知host的行為,預設True,類似于SSH的StrictHostKeyChecking的選項設定為no,不進行公鑰确認。
    '''

    # @hosts('[email protected]:22')
    # @task
    # def Get_Ip(self):
    #     run('ifconfig') 
    #     # return run("ip a")

    # @hosts("[email protected]:22")
    # @runs_once
    # def Get_One_Ip(self):
    #     run('ifconfig')

    '''
    role是對伺服器進行分類的手段,通過role可以定義伺服器的角色,
    以便對不同的伺服器執行不同的操作,Role邏輯上将伺服器進行了分類,
    分類以後,我們可以對某一類伺服器指定一個role名即可。
    進行task任務時,對role進行控制。
    '''

    # @roles('web')  # 隻對role為db的主機進行操作
    # @task
    # def Roles_Get_Ip():
    #     run('ifconfig')
        

    # def Confirm(self): # 有時候我們在某一步執行錯誤,會給使用者提示,是否繼續執行時,confirm就非常有用了,它包含在 fabric.contrib.console中
    #     result = confirm('Continue Anyway?')
    #     print(result)

    # def run_python(self):
    #     run("python3 trigger.py")

    @task
    @parallel
    def celery_call(): # 執行celery任務
        with cd('../home'):
            warn(yellow('----->Celery'))
            puts(green('----->puts'))
            run('cd ./celery_1 && celery -A Celery worker -l info')
            time.sleep(3)
            run('python3 run_tasks.py')
    

    # @task
    # def update_file(): # 上傳檔案到伺服器
    #     with settings(warn_only=True):
    #         local("tar -czf test.tar.gz config.py")
    #         result = put("test.tar.gz", "/home/test.tar.gz")
    #     if result.failed and not confirm("continue[y/n]?"):
    #         abort("put test.tar.gz failed")

    #     with settings(warn_only=True):
    #         local_file_md5 = local("md5sum test.tar.gz",capture=True).split(" ")[0]
    #         remote_file_md5 = run("md5sum /home/test.tar.gz").split(" ")[0]
    #     if local_file_md5 == remote_file_md5:
    #         print(green("local_file == remote_file"))
    #     else:
    #         print(red("local_file != remote"))
    #     run("mkdir /home/test")
    #     run("tar -zxf /home/test.tar.gz -C /home/scp")

    '''
    有一個地方很神奇,self和@task裝飾器在類中不能共用,否則會報錯
    '''

    # @task
    # def downloads_file(): # get檔案到本地
    #     with settings(warn_only=True):
    #         result = get("/home/celery_1", "./")
    #     if result.failed and not confirm("continue[y/n]?"):
    #         abort("get test.tar.gz failed")
    #     local("mkdir ./test")
    #     local("tar zxf ./hh.tar.gz -C ./test")

    # @task
    # @parallel
    # def scp_docker_file():
    #     with settings(warn_only=True):
    #         local("tar -czf docker.tar.gz ../docker")
    #         result = put("docker.tar.gz", "/home/docker.tar.gz")
    #     if result.failed and not confirm("continue[y/n]?"):
    #         abort("put dockerfile failed")
    #     run("mkdir /home/docker")
    #     run("tar -zxf /home/docker.tar.gz -C /home")


    def Run(self):
        execute(self.celery_call)
    

h = HA()
h.Run()
           

我盡可能添加一些代碼注釋,更多解釋還請參考 fabric 文檔啊

看看 docker

隻要能夠連接配接到伺服器,那麼在這些伺服器上安裝服務也就在情理之中了,比如 docker

來看具體的代碼實作

import warnings
warnings.filterwarnings("ignore")
import time
from fabric.api import * # run,cd,env,hosts,execute,sudo,settings,hide
from fabric.colors import *
from fabric.contrib.console import confirm
import config
import json
from fabric.tasks import Task

class HA():
    def __init__(self):
        self.host = "root@{host}:{port}"
        self.ssh = "root@{host}:{port}"
        self.env = env
        self.env.warn_only = True # 這樣寫比較痛快
        self.env.hosts = [
            self.host.format(host=host[0],port=host[2]) for host in config.conf_list]
        self.env.passwords = {
            self.ssh.format(host=host[0], port=host[2]):host[1] for host in config.conf_list}


        print(self.env["hosts"])

    
    
    @task
    def get_docker_v(): # 檢視docker版本
        with cd('../home'):
            run('docker version')

    @task
    def pull_images(images_name):
        with settings(warn_only=True):
            with cd("../home/"):
                try:
                    run("docker pull {}".format(images_name))
                except:
                    abort("docker pull failed")

    @task
    def push_images(images_name,username_repository,tag):
        with settings(warn_only=True):
            with cd("../home/"):
                try:
                    run("docker tag {image_name} {username_repository}:{tag}".format(images_name=images_name,username_repository=username_repository,tag=tag))
                    run("docker push {username_repository}:{tag}".format(username_repository=username_repository,tag=tag))
                except:
                    abort("docker push failed")

    @task
    def run_docker_images(images_name_tag):
        with settings(warn_only=True):
            with cd("../home/"):
                try:
                    run("docker run -p 4000:80 {}".format(images_name_tag))
                except:
                    abort("docker run failed")


    @task
    @parallel
    def execute_docker_compose():
        with settings(warn_only=True):
            with cd("../home/flask_app"):
                run("docker-compose up")


    @task
    def create_docker_service(service_name,images_name,num=4):
        with settings(warn_only=True):
            with cd("../home/"):
                run("docker service create --name {service_name} -p 4000:80 {images_name}".format(service_name=service_name,images_name=images_name))
                run("docker service scale {service_name}={num}".format(service_name=service_name,num=num))
    
    
    @task
    def stop_docker_service(service_name):
        with settings(warn_only=True):
            with cd("../home/"):
                run("docker service rm {}".format(service_name))

    def Run(self):
        # execute(self.create_docker_service,"demo","3417947630/py:hello")
        execute(self.execute_docker_compose)

h = HA()
h.Run()
           

嘿嘿,挺好

總結

基于python第三方庫 fabric 實作遠端ssh分布式排程部署應用,是一種很不錯的選擇,那麼如果用于部署 爬蟲的應用呢?

如果你是使用 scrapy 架構編寫的爬蟲(或者是其他架構,各種腳本也是一樣),那麼可以直接運作檔案上傳的方法把完整目錄拷貝到目标伺服器(當然是批量的)

然後鍵入爬取的指令,記住 fabric 是支援并行的,就能達到多機協作抓取的目的了

其實在 scrapy 中也可能用 scrapyd 來打包部署分布式爬蟲,但是打包過程略為繁瑣,而 SSH 連接配接則比較直接,操作簡單

然鵝說到底 fabric 也隻是一種自動化運維的工具,本質上也隻是把代碼拷貝到目标伺服器和執行相應的指令而已,并沒有像 scrapyd 提供爬蟲管理的可視化界面

是以這樣看來, Fabric 至少算得上是部署分布式爬蟲的一種選擇,就是因為部署簡單

以上就是我對這個 py 庫的一些看法,它為我們日後部署應用和服務提供了更多的選擇,多多實戰吧 !!

歡迎轉載,但要聲明出處,不然我順着網線過去就是一拳。

個人技術部落格:http://www.gzky.live