天天看點

用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。

用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。

現在工具不好用,用的pycharm自動同步,但對于git拉下來的新檔案不能自動上傳到linux,隻有自己編輯過或者手動ctrl + s的檔案才會自動同步。導緻為了不遺漏檔案,經常需要全量上傳,速度非常慢。

由于經常需要在windows的pycharm上直接使用linux解釋器,要快速測試,頻繁在本機和linux用git push pull不友善,測試環境是 用的git,但開發時候還是直接映射檔案夾同步比使用git更友善。

采用了連接配接池的方式,比單線程單linux連結,一個一個的上傳體積很小的碎片時候,檔案上傳速度提高了數十倍。

單linux連接配接上傳。

用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
"""
自動同步檔案夾到linux機器
"""
import json
import os
import queue
import re
import time
from collections import OrderedDict
from pathlib import Path
import paramiko
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler


class LinuxSynchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/'), only_upload_within_the_last_modify_time=7 * 24 * 60 * 60, cycle_interval=10, ):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以這些結尾的檔案
        :param file_volume_limit: 最大檔案容量能夠限制,如果超過此大小,則該檔案不上傳
        :param path_pattern_exluded_tuple: 更強大的檔案排除功能,比光排除以什麼字尾結尾更強大靈活
        :param only_upload_within_the_last_modify_time: 隻上傳離目前時間最晚修改時間以後的檔案
        :param cycle_interval: 每隔多少秒掃描一次需要上傳的檔案。
        """
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = remote_dir
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = only_upload_within_the_last_modify_time
        self._cycle_interval = cycle_interval
        self._file_volume_limit = file_volume_limit
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self.build_connect()

    # noinspection PyAttributeOutsideInit
    def build_connect(self):
        self.logger.warning('建立linux連接配接')
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        self.sftp = paramiko.SFTPClient.from_transport(t)

        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self.ssh = ssh

    # @decorators.tomorrow_threads(1)
    def ftp_upload(self, file: str):
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))
        for _ in range(10):
            try:
                time_start = time.time()
                self.sftp.put(file, file_remote)
                self.logger.debug(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
                break
            except FileNotFoundError:
                cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
                self.logger.info(cmd)
                tdin, stdout, stderr = self.ssh.exec_command(cmd)
                stderr_bytes = stderr.read()
                # self.logger.debug(stderr_bytes)
                if stderr_bytes != b'':
                    self.logger.debug(stderr_bytes)
            except OSError as e:
                self.logger.exception(e)
                pass
                self.build_connect()     # OSError: Socket is closed

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                        self.filename__st_mtime_map[file_full_name] = file_st_mtime
                        total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        self.logger.warning(f'需要上傳的所有檔案數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,檔案分别是 {json.dumps(self.filename__filesize_map, indent=4)}')

    @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)()

    def _start_upload_files(self):
        with decorators.TimerContextManager():
            self.find_all_files_meet_the_conditions()
            for file in self.filename__filesize_map:
                self.ftp_upload(file)
            self.logger.warn('完成')      
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。

采用了連接配接池 加多線程上傳

用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
"""
自動同步檔案夾到linux機器
這個更犀利,采用了連接配接池 加線程池,上傳大量碎片檔案的速度大幅提升。
"""
import hashlib
import json
import os
from threading import Thread
import queue
import re
import shutil
import filecmp
import time
from collections import OrderedDict
from pathlib import Path
from typing import Union
import paramiko
from paramiko import SSHException
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler, nb_print, BoundedThreadPoolExecutor


class LocalCopier(LoggerMixinDefaultWithFileHandler):
    """
    本地的兩個檔案夾之間的同步
    """

    def __init__(self, local_dir, remote_dir, *args, **kwargs):
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = str(remote_dir).replace('\\', '/')
        self.logger_extra_suffix = '本地windows間複制'

    def upload(self, file: str):
        file_remote = file.replace(self._local_dir, self._remote_dir)
        if not Path(file_remote).parent.exists():
            os.makedirs(str(Path(file_remote).parent))
        # if self.get_file_md5(Path(file).open('rb')) != self.get_file_md5(Path(file_remote).open('rb')) :
        if not Path(file_remote).exists() or not filecmp.cmp(file, file_remote):
            shutil.copyfile(file, file_remote)
            self.logger.info(f'從 {file} 複制成功到{file_remote} ,大小是 {round(os.path.getsize(file) / 1024)} kb')
        else:
            self.logger.debug(f'{file} 不複制到 {file_remote} 沒有變化。')

    @staticmethod
    def get_file_md5(file):
        m = hashlib.md5()
        while True:
            # 如果不用二進制打開檔案,則需要先編碼
            # data = f.read(1024).encode('utf-8')
            data = file.read(1024)  # 将檔案分塊讀取
            if not data:
                break
            m.update(data)
        return m.hexdigest()


@decorators.flyweight
class LinuxConnectionPool(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password):  # 對相同的連結參數做了享元模式儲存連接配接池。
        self.logger_extra_suffix = host
        self.logger.warning(f'初始化linux連接配接池{host}')
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self.queue_sftp_free = queue.Queue(100)
        self.queue_ssh_free = queue.Queue(100)
        self.build_connect()

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=0)
    def build_sftp(self):
        self.logger.warning(f'建立linux sftp連接配接中。。。')
        t_start = time.time()
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        sftp = paramiko.SFTPClient.from_transport(t)
        self.queue_sftp_free.put(sftp)
        self.logger.warning(f'建立linux sftp連接配接耗時 {round(time.time() - t_start, 2)}')

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=1)
    def bulid_ssh(self):
        self.logger.warning(f'建立linux ssh連接配接中。。。。')
        t_start = time.time()
        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self.queue_ssh_free.put(ssh)
        self.logger.warning(f'建立linux ssh連接配接耗時 {round(time.time() - t_start, 2)}')

    def build_connect(self):
        # decorators.tomorrow_threads(10)(self._build_sftp)()
        # decorators.tomorrow_threads(10)(self.__class__._bulid_ssh)(self)
        def _inner():
            executor = BoundedThreadPoolExecutor(100)
            for _ in range(10):
                time.sleep(0.2)
                executor.submit(self.build_sftp)
            for _ in range(3):
                time.sleep(0.5)
                executor.submit(self.bulid_ssh)

        Thread(target=_inner).start()

    def borrow_sftp(self):
        return self.queue_sftp_free.get()

    def borrow_ssh(self):
        return self.queue_ssh_free.get()

    def back_sftp(self, sftp):
        self.queue_sftp_free.put(sftp)

    def back_ssh(self, ssh):
        self.queue_ssh_free.put(ssh)


class LinuxRemoteUploader(LocalCopier):
    """
    windows同步到linux。
    """

    def __init__(self, local_dir, remote_dir, host, port, username, password):
        super().__init__(local_dir, remote_dir)
        self.logger_extra_suffix = host
        self.linux_conn_pool = LinuxConnectionPool(host, port, username, password)

    def _do_mkdir_operation(self, file_remote):
        cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
        self.logger.info(cmd)
        ssh = self.linux_conn_pool.borrow_ssh()
        try:
            tdin, stdout, stderr = ssh.exec_command(cmd)
        except SSHException:
            self.linux_conn_pool.bulid_ssh()
        except Exception as e:
            self.logger.exception(e)
        else:
            stderr_bytes = stderr.read()
            # self.logger.debug(stderr_bytes)
            if stderr_bytes != b'':
                self.logger.debug(stderr_bytes)
            self.linux_conn_pool.back_ssh(ssh)

    @decorators.tomorrow_threads(19)
    def upload(self, file: str):
        self.logger.debug(f'sftp空閑連結數量  {self.linux_conn_pool.queue_sftp_free.qsize()},  ssh空閑連結數量 {self.linux_conn_pool.queue_ssh_free.qsize()}')
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))

        for _ in range(10):
            sftp = self.linux_conn_pool.borrow_sftp()
            try:
                time_start = time.time()
                sftp.put(file, file_remote)
                self.logger.info(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
                self.linux_conn_pool.back_sftp(sftp)
                # self.linux_conn_pool.logger.debug((self.linux_conn_pool.queue_sftp_free.qsize(),self.linux_conn_pool.queue_ssh_free.qsize()))
                break
            except FileNotFoundError:
                self._do_mkdir_operation(file_remote)
                self.linux_conn_pool.back_sftp(sftp)
            except (OSError, SSHException) as e:
                self.logger.exception(e)
                self.linux_conn_pool.build_sftp()  # OSError: Socket is closed


class Synchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/', 'cnbooking_all.json'), only_upload_within_the_last_modify_time='7 * 24 * 60 * 60', cycle_interval=2, just_windows_copy=False):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以這些結尾的檔案。
        :param file_volume_limit: 最大檔案容量能夠限制,如果超過此大小,則該檔案不上傳
        :param path_pattern_exluded_tuple: 更強大的檔案排除功能,比光排除以什麼字尾結尾更強大靈活,使用的是python正規表達式。
        :param only_upload_within_the_last_modify_time: 隻上傳離目前時間最晚修改時間以後的檔案。
        :param cycle_interval: 每隔多少秒掃描一次需要上傳的檔案。
        :param just_windows_copy: 執行windows不同檔案夾之間的複制,不上傳linux。
        """
        self.logger_extra_suffix = host if not just_windows_copy else '本地'
        self._local_dir = str(local_dir).replace('\\', '/')
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = self._compute_result(only_upload_within_the_last_modify_time)
        self._cycle_interval = cycle_interval
        self._file_volume_limit = self._compute_result(file_volume_limit)
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self._just_windows_copy = just_windows_copy
        self.uploader = LinuxRemoteUploader(local_dir, remote_dir, host, port, username, password) if not just_windows_copy else LocalCopier(local_dir, remote_dir, host, port, username, password)

    @staticmethod
    def _compute_result(sth: Union[str, int]):
        return sth if isinstance(sth, int) else eval(sth)

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        t_start = time.time()
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name
                                                                                                                                             not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        if self.filename__st_mtime_map.get(file_full_name, None) != file_st_mtime:
                            self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                            self.filename__st_mtime_map[file_full_name] = file_st_mtime
                            total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        if len(self.filename__filesize_map) > 0:
            self.logger.warning(f'需要{"複制"  if self._just_windows_copy else "上傳"} 的所有檔案數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,'
                                f'查找檔案耗時 {round(time.time() - t_start, 2)} 秒,檔案分别是 {json.dumps(self.filename__filesize_map, indent=4)}')

    # @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        Thread(target=decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)).start()

    def _start_upload_files(self):
        self.find_all_files_meet_the_conditions()
        for file in self.filename__filesize_map:
            self.uploader.upload(file)


# noinspection PyPep8
if __name__ == '__main__':
    """
    配置裡面的内容格式如下,支援同步多個檔案夾映射。
    [
      {
        "host": "112.90.xx.xx",
        "port": 10005,
        "username": "root",
        "password": "@0^Lc97MewI3i7xxxxxx",
        "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
        "remote_dir": "/home/ydf/hotelf15",
        "file_suffix_tuple_exluded": [
          ".pyc",
          ".log",
          ".gz"
        ],
        "path_pattern_exluded_tuple": [
          "/.git/",
          "/.idea/",
          "cnbooking_cn_all.json"
        ],
        "only_upload_within_the_last_modify_time": "365 * 24 * 3600",
        "file_volume_limit": "2 * 1000 * 1000",
        "cycle_interval": 10
      }
    ]
    """

    for config_item in json.load(Path('/windows_to_linux_syn_config.json').open()):
        nb_print(json.dumps(config_item))
        Synchronizer(**config_item).start_upload_files()

    # sc create PythonApp6 binPath= "D:\Users\ydf\Desktop\oschina\coding\hotel_fares\dist\windows_to_linux_syn2\windows_to_linux_syn2.exe"
    # pyinstaller --distpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --workpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --specpath=D:\Users\ydf\Desktop\oschina\specify_pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 可以使用pyinstaller打包這個檔案。先添加PYTHONPATH變量,在另外的檔案夾執行這個指令。
    # pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py

    # cd ..
    # set PYTHONPATH=D:\coding2\hotel_fares
    # pyinstaller -F --icon="D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 測試更新。。。。。。.      
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。

 配置裡面的内容如下。

用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
[
  {
    "host": "112.xx.89.16",
    "port": 10033,
    "username": "root",
    "password": "xxxx",
    "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
    "remote_dir": "/home/ydf/hotelf18",
    "file_suffix_tuple_exluded": [
      ".pyc",
      ".log",
      ".gz"
    ],
    "path_pattern_exluded_tuple": [
      "/.git/",
      "/.idea/",
      "cnbooking_cn_all.json"
    ],
    "only_upload_within_the_last_modify_time": "30 * 24 * 3600",
    "file_volume_limit": "2 * 1000 * 1000",
    "cycle_interval": 1
  },
  {
    "host": "112.90.xx.16",
    "port": 10033,
    "username": "root",
    "password": "xxxx",
    "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\movie_data",
    "remote_dir": "/home/ydf/movie_data2",
    "file_suffix_tuple_exluded": [
      ".pyc",
      ".log",
      ".gz"
    ],
    "path_pattern_exluded_tuple": [
      "/.git/",
      "/.idea/",
      "cnbooking_cn_all.json"
    ],
    "only_upload_within_the_last_modify_time": "30 * 24 * 3600",
    "file_volume_limit": "2 * 1000 * 1000",
    "cycle_interval": 1
  }
]      
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。

第一次運作是對指定最晚修改間之内的檔案進行全量上傳,之後是每隔2秒(由json檔案動态配置)檢查一次,将最近 10分鐘之内變化的檔案,上傳到linux。

反對極端面向過程程式設計思維方式,喜歡面向對象和設計模式的解讀,喜歡對比極端面向過程程式設計和oop程式設計消耗代碼代碼行數的差別和原因。緻力于使用oop和36種設計模式寫出最高可複用的架構級代碼和使用最少的代碼行數完成任務,緻力于使用oop和設計模式來使部分代碼減少90%行,使絕大部分py檔案最低減少50%-80%行的寫法。

用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
"""
自動同步檔案夾到linux機器
"""
import json
import os
import queue
import re
import time
from collections import OrderedDict
from pathlib import Path
import paramiko
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler


class LinuxSynchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/'), only_upload_within_the_last_modify_time=7 * 24 * 60 * 60, cycle_interval=10, ):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以這些結尾的檔案
        :param file_volume_limit: 最大檔案容量能夠限制,如果超過此大小,則該檔案不上傳
        :param path_pattern_exluded_tuple: 更強大的檔案排除功能,比光排除以什麼字尾結尾更強大靈活
        :param only_upload_within_the_last_modify_time: 隻上傳離目前時間最晚修改時間以後的檔案
        :param cycle_interval: 每隔多少秒掃描一次需要上傳的檔案。
        """
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = remote_dir
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = only_upload_within_the_last_modify_time
        self._cycle_interval = cycle_interval
        self._file_volume_limit = file_volume_limit
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self.build_connect()

    # noinspection PyAttributeOutsideInit
    def build_connect(self):
        self.logger.warning('建立linux連接配接')
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        self.sftp = paramiko.SFTPClient.from_transport(t)

        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self.ssh = ssh

    # @decorators.tomorrow_threads(1)
    def ftp_upload(self, file: str):
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))
        for _ in range(10):
            try:
                time_start = time.time()
                self.sftp.put(file, file_remote)
                self.logger.debug(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
                break
            except FileNotFoundError:
                cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
                self.logger.info(cmd)
                tdin, stdout, stderr = self.ssh.exec_command(cmd)
                stderr_bytes = stderr.read()
                # self.logger.debug(stderr_bytes)
                if stderr_bytes != b'':
                    self.logger.debug(stderr_bytes)
            except OSError as e:
                self.logger.exception(e)
                pass
                self.build_connect()     # OSError: Socket is closed

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                        self.filename__st_mtime_map[file_full_name] = file_st_mtime
                        total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        self.logger.warning(f'需要上傳的所有檔案數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,檔案分别是 {json.dumps(self.filename__filesize_map, indent=4)}')

    @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)()

    def _start_upload_files(self):
        with decorators.TimerContextManager():
            self.find_all_files_meet_the_conditions()
            for file in self.filename__filesize_map:
                self.ftp_upload(file)
            self.logger.warn('完成')      
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。

用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
"""
自動同步檔案夾到linux機器
這個更犀利,采用了連接配接池 加線程池,上傳大量碎片檔案的速度大幅提升。
"""
import hashlib
import json
import os
from threading import Thread
import queue
import re
import shutil
import filecmp
import time
from collections import OrderedDict
from pathlib import Path
from typing import Union
import paramiko
from paramiko import SSHException
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler, nb_print, BoundedThreadPoolExecutor


class LocalCopier(LoggerMixinDefaultWithFileHandler):
    """
    本地的兩個檔案夾之間的同步
    """

    def __init__(self, local_dir, remote_dir, *args, **kwargs):
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = str(remote_dir).replace('\\', '/')
        self.logger_extra_suffix = '本地windows間複制'

    def upload(self, file: str):
        file_remote = file.replace(self._local_dir, self._remote_dir)
        if not Path(file_remote).parent.exists():
            os.makedirs(str(Path(file_remote).parent))
        # if self.get_file_md5(Path(file).open('rb')) != self.get_file_md5(Path(file_remote).open('rb')) :
        if not Path(file_remote).exists() or not filecmp.cmp(file, file_remote):
            shutil.copyfile(file, file_remote)
            self.logger.info(f'從 {file} 複制成功到{file_remote} ,大小是 {round(os.path.getsize(file) / 1024)} kb')
        else:
            self.logger.debug(f'{file} 不複制到 {file_remote} 沒有變化。')

    @staticmethod
    def get_file_md5(file):
        m = hashlib.md5()
        while True:
            # 如果不用二進制打開檔案,則需要先編碼
            # data = f.read(1024).encode('utf-8')
            data = file.read(1024)  # 将檔案分塊讀取
            if not data:
                break
            m.update(data)
        return m.hexdigest()


@decorators.flyweight
class LinuxConnectionPool(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password):  # 對相同的連結參數做了享元模式儲存連接配接池。
        self.logger_extra_suffix = host
        self.logger.warning(f'初始化linux連接配接池{host}')
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self.queue_sftp_free = queue.Queue(100)
        self.queue_ssh_free = queue.Queue(100)
        self.build_connect()

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=0)
    def build_sftp(self):
        self.logger.warning(f'建立linux sftp連接配接中。。。')
        t_start = time.time()
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        sftp = paramiko.SFTPClient.from_transport(t)
        self.queue_sftp_free.put(sftp)
        self.logger.warning(f'建立linux sftp連接配接耗時 {round(time.time() - t_start, 2)}')

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=1)
    def bulid_ssh(self):
        self.logger.warning(f'建立linux ssh連接配接中。。。。')
        t_start = time.time()
        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self.queue_ssh_free.put(ssh)
        self.logger.warning(f'建立linux ssh連接配接耗時 {round(time.time() - t_start, 2)}')

    def build_connect(self):
        # decorators.tomorrow_threads(10)(self._build_sftp)()
        # decorators.tomorrow_threads(10)(self.__class__._bulid_ssh)(self)
        def _inner():
            executor = BoundedThreadPoolExecutor(100)
            for _ in range(10):
                time.sleep(0.2)
                executor.submit(self.build_sftp)
            for _ in range(3):
                time.sleep(0.5)
                executor.submit(self.bulid_ssh)

        Thread(target=_inner).start()

    def borrow_sftp(self):
        return self.queue_sftp_free.get()

    def borrow_ssh(self):
        return self.queue_ssh_free.get()

    def back_sftp(self, sftp):
        self.queue_sftp_free.put(sftp)

    def back_ssh(self, ssh):
        self.queue_ssh_free.put(ssh)


class LinuxRemoteUploader(LocalCopier):
    """
    windows同步到linux。
    """

    def __init__(self, local_dir, remote_dir, host, port, username, password):
        super().__init__(local_dir, remote_dir)
        self.logger_extra_suffix = host
        self.linux_conn_pool = LinuxConnectionPool(host, port, username, password)

    def _do_mkdir_operation(self, file_remote):
        cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
        self.logger.info(cmd)
        ssh = self.linux_conn_pool.borrow_ssh()
        try:
            tdin, stdout, stderr = ssh.exec_command(cmd)
        except SSHException:
            self.linux_conn_pool.bulid_ssh()
        except Exception as e:
            self.logger.exception(e)
        else:
            stderr_bytes = stderr.read()
            # self.logger.debug(stderr_bytes)
            if stderr_bytes != b'':
                self.logger.debug(stderr_bytes)
            self.linux_conn_pool.back_ssh(ssh)

    @decorators.tomorrow_threads(19)
    def upload(self, file: str):
        self.logger.debug(f'sftp空閑連結數量  {self.linux_conn_pool.queue_sftp_free.qsize()},  ssh空閑連結數量 {self.linux_conn_pool.queue_ssh_free.qsize()}')
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))

        for _ in range(10):
            sftp = self.linux_conn_pool.borrow_sftp()
            try:
                time_start = time.time()
                sftp.put(file, file_remote)
                self.logger.info(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
                self.linux_conn_pool.back_sftp(sftp)
                # self.linux_conn_pool.logger.debug((self.linux_conn_pool.queue_sftp_free.qsize(),self.linux_conn_pool.queue_ssh_free.qsize()))
                break
            except FileNotFoundError:
                self._do_mkdir_operation(file_remote)
                self.linux_conn_pool.back_sftp(sftp)
            except (OSError, SSHException) as e:
                self.logger.exception(e)
                self.linux_conn_pool.build_sftp()  # OSError: Socket is closed


class Synchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/', 'cnbooking_all.json'), only_upload_within_the_last_modify_time='7 * 24 * 60 * 60', cycle_interval=2, just_windows_copy=False):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以這些結尾的檔案。
        :param file_volume_limit: 最大檔案容量能夠限制,如果超過此大小,則該檔案不上傳
        :param path_pattern_exluded_tuple: 更強大的檔案排除功能,比光排除以什麼字尾結尾更強大靈活,使用的是python正規表達式。
        :param only_upload_within_the_last_modify_time: 隻上傳離目前時間最晚修改時間以後的檔案。
        :param cycle_interval: 每隔多少秒掃描一次需要上傳的檔案。
        :param just_windows_copy: 執行windows不同檔案夾之間的複制,不上傳linux。
        """
        self.logger_extra_suffix = host if not just_windows_copy else '本地'
        self._local_dir = str(local_dir).replace('\\', '/')
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = self._compute_result(only_upload_within_the_last_modify_time)
        self._cycle_interval = cycle_interval
        self._file_volume_limit = self._compute_result(file_volume_limit)
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self._just_windows_copy = just_windows_copy
        self.uploader = LinuxRemoteUploader(local_dir, remote_dir, host, port, username, password) if not just_windows_copy else LocalCopier(local_dir, remote_dir, host, port, username, password)

    @staticmethod
    def _compute_result(sth: Union[str, int]):
        return sth if isinstance(sth, int) else eval(sth)

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        t_start = time.time()
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name
                                                                                                                                             not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        if self.filename__st_mtime_map.get(file_full_name, None) != file_st_mtime:
                            self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                            self.filename__st_mtime_map[file_full_name] = file_st_mtime
                            total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        if len(self.filename__filesize_map) > 0:
            self.logger.warning(f'需要{"複制"  if self._just_windows_copy else "上傳"} 的所有檔案數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,'
                                f'查找檔案耗時 {round(time.time() - t_start, 2)} 秒,檔案分别是 {json.dumps(self.filename__filesize_map, indent=4)}')

    # @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        Thread(target=decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)).start()

    def _start_upload_files(self):
        self.find_all_files_meet_the_conditions()
        for file in self.filename__filesize_map:
            self.uploader.upload(file)


# noinspection PyPep8
if __name__ == '__main__':
    """
    配置裡面的内容格式如下,支援同步多個檔案夾映射。
    [
      {
        "host": "112.90.xx.xx",
        "port": 10005,
        "username": "root",
        "password": "@0^Lc97MewI3i7xxxxxx",
        "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
        "remote_dir": "/home/ydf/hotelf15",
        "file_suffix_tuple_exluded": [
          ".pyc",
          ".log",
          ".gz"
        ],
        "path_pattern_exluded_tuple": [
          "/.git/",
          "/.idea/",
          "cnbooking_cn_all.json"
        ],
        "only_upload_within_the_last_modify_time": "365 * 24 * 3600",
        "file_volume_limit": "2 * 1000 * 1000",
        "cycle_interval": 10
      }
    ]
    """

    for config_item in json.load(Path('/windows_to_linux_syn_config.json').open()):
        nb_print(json.dumps(config_item))
        Synchronizer(**config_item).start_upload_files()

    # sc create PythonApp6 binPath= "D:\Users\ydf\Desktop\oschina\coding\hotel_fares\dist\windows_to_linux_syn2\windows_to_linux_syn2.exe"
    # pyinstaller --distpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --workpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --specpath=D:\Users\ydf\Desktop\oschina\specify_pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 可以使用pyinstaller打包這個檔案。先添加PYTHONPATH變量,在另外的檔案夾執行這個指令。
    # pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py

    # cd ..
    # set PYTHONPATH=D:\coding2\hotel_fares
    # pyinstaller -F --icon="D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 測試更新。。。。。。.      
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
[
  {
    "host": "112.xx.89.16",
    "port": 10033,
    "username": "root",
    "password": "xxxx",
    "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
    "remote_dir": "/home/ydf/hotelf18",
    "file_suffix_tuple_exluded": [
      ".pyc",
      ".log",
      ".gz"
    ],
    "path_pattern_exluded_tuple": [
      "/.git/",
      "/.idea/",
      "cnbooking_cn_all.json"
    ],
    "only_upload_within_the_last_modify_time": "30 * 24 * 3600",
    "file_volume_limit": "2 * 1000 * 1000",
    "cycle_interval": 1
  },
  {
    "host": "112.90.xx.16",
    "port": 10033,
    "username": "root",
    "password": "xxxx",
    "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\movie_data",
    "remote_dir": "/home/ydf/movie_data2",
    "file_suffix_tuple_exluded": [
      ".pyc",
      ".log",
      ".gz"
    ],
    "path_pattern_exluded_tuple": [
      "/.git/",
      "/.idea/",
      "cnbooking_cn_all.json"
    ],
    "only_upload_within_the_last_modify_time": "30 * 24 * 3600",
    "file_volume_limit": "2 * 1000 * 1000",
    "cycle_interval": 1
  }
]      
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。 用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。