用python做的windows和linx檔案夾同步。解決自動同步、加快傳輸大量小檔案的速度、更豐富的檔案上傳過濾設定。
現在工具不好用,用的pycharm自動同步,但對于git拉下來的新檔案不能自動上傳到linux,隻有自己編輯過或者手動ctrl + s的檔案才會自動同步。導緻為了不遺漏檔案,經常需要全量上傳,速度非常慢。
由于經常需要在windows的pycharm上直接使用linux解釋器,要快速測試,頻繁在本機和linux用git push pull不友善,測試環境是 用的git,但開發時候還是直接映射檔案夾同步比使用git更友善。
采用了連接配接池的方式,比單線程單linux連結,一個一個的上傳體積很小的碎片時候,檔案上傳速度提高了數十倍。
單linux連接配接上傳。

"""
自動同步檔案夾到linux機器
"""
import json
import os
import queue
import re
import time
from collections import OrderedDict
from pathlib import Path
import paramiko
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler
class LinuxSynchronizer(LoggerMixinDefaultWithFileHandler):
def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
path_pattern_exluded_tuple=('/.git/', '/.idea/'), only_upload_within_the_last_modify_time=7 * 24 * 60 * 60, cycle_interval=10, ):
"""
:param host:
:param port:
:param username:
:param password:
:param local_dir:
:param remote_dir:
:param file_suffix_tuple_exluded: 排除以這些結尾的檔案
:param file_volume_limit: 最大檔案容量能夠限制,如果超過此大小,則該檔案不上傳
:param path_pattern_exluded_tuple: 更強大的檔案排除功能,比光排除以什麼字尾結尾更強大靈活
:param only_upload_within_the_last_modify_time: 隻上傳離目前時間最晚修改時間以後的檔案
:param cycle_interval: 每隔多少秒掃描一次需要上傳的檔案。
"""
self._host = host
self._port = port
self._username = username
self._password = password
self._local_dir = str(local_dir).replace('\\', '/')
self._remote_dir = remote_dir
self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
self._only_upload_within_the_last_modify_time = only_upload_within_the_last_modify_time
self._cycle_interval = cycle_interval
self._file_volume_limit = file_volume_limit
self.filename__filesize_map = dict()
self.filename__st_mtime_map = dict()
self.build_connect()
# noinspection PyAttributeOutsideInit
def build_connect(self):
self.logger.warning('建立linux連接配接')
# noinspection PyTypeChecker
t = paramiko.Transport((self._host, self._port))
t.connect(username=self._username, password=self._password)
self.sftp = paramiko.SFTPClient.from_transport(t)
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
self.ssh = ssh
# @decorators.tomorrow_threads(1)
def ftp_upload(self, file: str):
# file = file.replace('\\', '/')
pattern_str = self._local_dir
file_remote = file.replace(pattern_str, self._remote_dir)
# self.logger.debug((file, file_remote))
for _ in range(10):
try:
time_start = time.time()
self.sftp.put(file, file_remote)
self.logger.debug(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
break
except FileNotFoundError:
cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
self.logger.info(cmd)
tdin, stdout, stderr = self.ssh.exec_command(cmd)
stderr_bytes = stderr.read()
# self.logger.debug(stderr_bytes)
if stderr_bytes != b'':
self.logger.debug(stderr_bytes)
except OSError as e:
self.logger.exception(e)
pass
self.build_connect() # OSError: Socket is closed
def _judge_need_filter_a_file(self, filename: str):
ext = filename.split('.')[-1]
if '.' + ext in self._file_suffix_tuple_exluded:
return True
for path_pattern_exluded in self._path_pattern_exluded_tuple:
if re.search(path_pattern_exluded, filename):
return True
return False
def find_all_files_meet_the_conditions(self):
total_volume = 0
self.filename__filesize_map.clear()
for parent, dirnames, filenames in os.walk(self._local_dir):
for filename in filenames:
file_full_name = os.path.join(parent, filename).replace('\\', '/')
if not self._judge_need_filter_a_file(file_full_name):
# self.logger.debug(os.stat(file_full_name).st_mtime)
file_st_mtime = os.stat(file_full_name).st_mtime
volume = os.path.getsize(file_full_name)
if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
self.filename__st_mtime_map[file_full_name] = file_st_mtime
total_volume += volume
filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
filename__filesize_map_ordered_by_lsat_modify_time[k] = v
self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
self.logger.warning(f'需要上傳的所有檔案數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,檔案分别是 {json.dumps(self.filename__filesize_map, indent=4)}')
@decorators.tomorrow_threads(10)
def start_upload_files(self):
decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)()
def _start_upload_files(self):
with decorators.TimerContextManager():
self.find_all_files_meet_the_conditions()
for file in self.filename__filesize_map:
self.ftp_upload(file)
self.logger.warn('完成')

采用了連接配接池 加多線程上傳

"""
自動同步檔案夾到linux機器
這個更犀利,采用了連接配接池 加線程池,上傳大量碎片檔案的速度大幅提升。
"""
import hashlib
import json
import os
from threading import Thread
import queue
import re
import shutil
import filecmp
import time
from collections import OrderedDict
from pathlib import Path
from typing import Union
import paramiko
from paramiko import SSHException
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler, nb_print, BoundedThreadPoolExecutor
class LocalCopier(LoggerMixinDefaultWithFileHandler):
"""
本地的兩個檔案夾之間的同步
"""
def __init__(self, local_dir, remote_dir, *args, **kwargs):
self._local_dir = str(local_dir).replace('\\', '/')
self._remote_dir = str(remote_dir).replace('\\', '/')
self.logger_extra_suffix = '本地windows間複制'
def upload(self, file: str):
file_remote = file.replace(self._local_dir, self._remote_dir)
if not Path(file_remote).parent.exists():
os.makedirs(str(Path(file_remote).parent))
# if self.get_file_md5(Path(file).open('rb')) != self.get_file_md5(Path(file_remote).open('rb')) :
if not Path(file_remote).exists() or not filecmp.cmp(file, file_remote):
shutil.copyfile(file, file_remote)
self.logger.info(f'從 {file} 複制成功到{file_remote} ,大小是 {round(os.path.getsize(file) / 1024)} kb')
else:
self.logger.debug(f'{file} 不複制到 {file_remote} 沒有變化。')
@staticmethod
def get_file_md5(file):
m = hashlib.md5()
while True:
# 如果不用二進制打開檔案,則需要先編碼
# data = f.read(1024).encode('utf-8')
data = file.read(1024) # 将檔案分塊讀取
if not data:
break
m.update(data)
return m.hexdigest()
@decorators.flyweight
class LinuxConnectionPool(LoggerMixinDefaultWithFileHandler):
def __init__(self, host, port, username, password): # 對相同的連結參數做了享元模式儲存連接配接池。
self.logger_extra_suffix = host
self.logger.warning(f'初始化linux連接配接池{host}')
self._host = host
self._port = port
self._username = username
self._password = password
self.queue_sftp_free = queue.Queue(100)
self.queue_ssh_free = queue.Queue(100)
self.build_connect()
@decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=0)
def build_sftp(self):
self.logger.warning(f'建立linux sftp連接配接中。。。')
t_start = time.time()
# noinspection PyTypeChecker
t = paramiko.Transport((self._host, self._port))
t.connect(username=self._username, password=self._password)
sftp = paramiko.SFTPClient.from_transport(t)
self.queue_sftp_free.put(sftp)
self.logger.warning(f'建立linux sftp連接配接耗時 {round(time.time() - t_start, 2)}')
@decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=1)
def bulid_ssh(self):
self.logger.warning(f'建立linux ssh連接配接中。。。。')
t_start = time.time()
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
self.queue_ssh_free.put(ssh)
self.logger.warning(f'建立linux ssh連接配接耗時 {round(time.time() - t_start, 2)}')
def build_connect(self):
# decorators.tomorrow_threads(10)(self._build_sftp)()
# decorators.tomorrow_threads(10)(self.__class__._bulid_ssh)(self)
def _inner():
executor = BoundedThreadPoolExecutor(100)
for _ in range(10):
time.sleep(0.2)
executor.submit(self.build_sftp)
for _ in range(3):
time.sleep(0.5)
executor.submit(self.bulid_ssh)
Thread(target=_inner).start()
def borrow_sftp(self):
return self.queue_sftp_free.get()
def borrow_ssh(self):
return self.queue_ssh_free.get()
def back_sftp(self, sftp):
self.queue_sftp_free.put(sftp)
def back_ssh(self, ssh):
self.queue_ssh_free.put(ssh)
class LinuxRemoteUploader(LocalCopier):
"""
windows同步到linux。
"""
def __init__(self, local_dir, remote_dir, host, port, username, password):
super().__init__(local_dir, remote_dir)
self.logger_extra_suffix = host
self.linux_conn_pool = LinuxConnectionPool(host, port, username, password)
def _do_mkdir_operation(self, file_remote):
cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
self.logger.info(cmd)
ssh = self.linux_conn_pool.borrow_ssh()
try:
tdin, stdout, stderr = ssh.exec_command(cmd)
except SSHException:
self.linux_conn_pool.bulid_ssh()
except Exception as e:
self.logger.exception(e)
else:
stderr_bytes = stderr.read()
# self.logger.debug(stderr_bytes)
if stderr_bytes != b'':
self.logger.debug(stderr_bytes)
self.linux_conn_pool.back_ssh(ssh)
@decorators.tomorrow_threads(19)
def upload(self, file: str):
self.logger.debug(f'sftp空閑連結數量 {self.linux_conn_pool.queue_sftp_free.qsize()}, ssh空閑連結數量 {self.linux_conn_pool.queue_ssh_free.qsize()}')
# file = file.replace('\\', '/')
pattern_str = self._local_dir
file_remote = file.replace(pattern_str, self._remote_dir)
# self.logger.debug((file, file_remote))
for _ in range(10):
sftp = self.linux_conn_pool.borrow_sftp()
try:
time_start = time.time()
sftp.put(file, file_remote)
self.logger.info(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
self.linux_conn_pool.back_sftp(sftp)
# self.linux_conn_pool.logger.debug((self.linux_conn_pool.queue_sftp_free.qsize(),self.linux_conn_pool.queue_ssh_free.qsize()))
break
except FileNotFoundError:
self._do_mkdir_operation(file_remote)
self.linux_conn_pool.back_sftp(sftp)
except (OSError, SSHException) as e:
self.logger.exception(e)
self.linux_conn_pool.build_sftp() # OSError: Socket is closed
class Synchronizer(LoggerMixinDefaultWithFileHandler):
def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
path_pattern_exluded_tuple=('/.git/', '/.idea/', 'cnbooking_all.json'), only_upload_within_the_last_modify_time='7 * 24 * 60 * 60', cycle_interval=2, just_windows_copy=False):
"""
:param host:
:param port:
:param username:
:param password:
:param local_dir:
:param remote_dir:
:param file_suffix_tuple_exluded: 排除以這些結尾的檔案。
:param file_volume_limit: 最大檔案容量能夠限制,如果超過此大小,則該檔案不上傳
:param path_pattern_exluded_tuple: 更強大的檔案排除功能,比光排除以什麼字尾結尾更強大靈活,使用的是python正規表達式。
:param only_upload_within_the_last_modify_time: 隻上傳離目前時間最晚修改時間以後的檔案。
:param cycle_interval: 每隔多少秒掃描一次需要上傳的檔案。
:param just_windows_copy: 執行windows不同檔案夾之間的複制,不上傳linux。
"""
self.logger_extra_suffix = host if not just_windows_copy else '本地'
self._local_dir = str(local_dir).replace('\\', '/')
self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
self._only_upload_within_the_last_modify_time = self._compute_result(only_upload_within_the_last_modify_time)
self._cycle_interval = cycle_interval
self._file_volume_limit = self._compute_result(file_volume_limit)
self.filename__filesize_map = dict()
self.filename__st_mtime_map = dict()
self._just_windows_copy = just_windows_copy
self.uploader = LinuxRemoteUploader(local_dir, remote_dir, host, port, username, password) if not just_windows_copy else LocalCopier(local_dir, remote_dir, host, port, username, password)
@staticmethod
def _compute_result(sth: Union[str, int]):
return sth if isinstance(sth, int) else eval(sth)
def _judge_need_filter_a_file(self, filename: str):
ext = filename.split('.')[-1]
if '.' + ext in self._file_suffix_tuple_exluded:
return True
for path_pattern_exluded in self._path_pattern_exluded_tuple:
if re.search(path_pattern_exluded, filename):
return True
return False
def find_all_files_meet_the_conditions(self):
t_start = time.time()
total_volume = 0
self.filename__filesize_map.clear()
for parent, dirnames, filenames in os.walk(self._local_dir):
for filename in filenames:
file_full_name = os.path.join(parent, filename).replace('\\', '/')
if not self._judge_need_filter_a_file(file_full_name):
# self.logger.debug(os.stat(file_full_name).st_mtime)
file_st_mtime = os.stat(file_full_name).st_mtime
volume = os.path.getsize(file_full_name)
if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name
not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
if self.filename__st_mtime_map.get(file_full_name, None) != file_st_mtime:
self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
self.filename__st_mtime_map[file_full_name] = file_st_mtime
total_volume += volume
filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
filename__filesize_map_ordered_by_lsat_modify_time[k] = v
self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
if len(self.filename__filesize_map) > 0:
self.logger.warning(f'需要{"複制" if self._just_windows_copy else "上傳"} 的所有檔案數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,'
f'查找檔案耗時 {round(time.time() - t_start, 2)} 秒,檔案分别是 {json.dumps(self.filename__filesize_map, indent=4)}')
# @decorators.tomorrow_threads(10)
def start_upload_files(self):
Thread(target=decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)).start()
def _start_upload_files(self):
self.find_all_files_meet_the_conditions()
for file in self.filename__filesize_map:
self.uploader.upload(file)
# noinspection PyPep8
if __name__ == '__main__':
"""
配置裡面的内容格式如下,支援同步多個檔案夾映射。
[
{
"host": "112.90.xx.xx",
"port": 10005,
"username": "root",
"password": "@0^Lc97MewI3i7xxxxxx",
"local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
"remote_dir": "/home/ydf/hotelf15",
"file_suffix_tuple_exluded": [
".pyc",
".log",
".gz"
],
"path_pattern_exluded_tuple": [
"/.git/",
"/.idea/",
"cnbooking_cn_all.json"
],
"only_upload_within_the_last_modify_time": "365 * 24 * 3600",
"file_volume_limit": "2 * 1000 * 1000",
"cycle_interval": 10
}
]
"""
for config_item in json.load(Path('/windows_to_linux_syn_config.json').open()):
nb_print(json.dumps(config_item))
Synchronizer(**config_item).start_upload_files()
# sc create PythonApp6 binPath= "D:\Users\ydf\Desktop\oschina\coding\hotel_fares\dist\windows_to_linux_syn2\windows_to_linux_syn2.exe"
# pyinstaller --distpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --workpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --specpath=D:\Users\ydf\Desktop\oschina\specify_pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
# 可以使用pyinstaller打包這個檔案。先添加PYTHONPATH變量,在另外的檔案夾執行這個指令。
# pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
# cd ..
# set PYTHONPATH=D:\coding2\hotel_fares
# pyinstaller -F --icon="D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
# 測試更新。。。。。。.

配置裡面的内容如下。

[
{
"host": "112.xx.89.16",
"port": 10033,
"username": "root",
"password": "xxxx",
"local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
"remote_dir": "/home/ydf/hotelf18",
"file_suffix_tuple_exluded": [
".pyc",
".log",
".gz"
],
"path_pattern_exluded_tuple": [
"/.git/",
"/.idea/",
"cnbooking_cn_all.json"
],
"only_upload_within_the_last_modify_time": "30 * 24 * 3600",
"file_volume_limit": "2 * 1000 * 1000",
"cycle_interval": 1
},
{
"host": "112.90.xx.16",
"port": 10033,
"username": "root",
"password": "xxxx",
"local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\movie_data",
"remote_dir": "/home/ydf/movie_data2",
"file_suffix_tuple_exluded": [
".pyc",
".log",
".gz"
],
"path_pattern_exluded_tuple": [
"/.git/",
"/.idea/",
"cnbooking_cn_all.json"
],
"only_upload_within_the_last_modify_time": "30 * 24 * 3600",
"file_volume_limit": "2 * 1000 * 1000",
"cycle_interval": 1
}
]

第一次運作是對指定最晚修改間之内的檔案進行全量上傳,之後是每隔2秒(由json檔案動态配置)檢查一次,将最近 10分鐘之内變化的檔案,上傳到linux。
反對極端面向過程程式設計思維方式,喜歡面向對象和設計模式的解讀,喜歡對比極端面向過程程式設計和oop程式設計消耗代碼代碼行數的差別和原因。緻力于使用oop和36種設計模式寫出最高可複用的架構級代碼和使用最少的代碼行數完成任務,緻力于使用oop和設計模式來使部分代碼減少90%行,使絕大部分py檔案最低減少50%-80%行的寫法。

"""
自動同步檔案夾到linux機器
"""
import json
import os
import queue
import re
import time
from collections import OrderedDict
from pathlib import Path
import paramiko
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler
class LinuxSynchronizer(LoggerMixinDefaultWithFileHandler):
def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
path_pattern_exluded_tuple=('/.git/', '/.idea/'), only_upload_within_the_last_modify_time=7 * 24 * 60 * 60, cycle_interval=10, ):
"""
:param host:
:param port:
:param username:
:param password:
:param local_dir:
:param remote_dir:
:param file_suffix_tuple_exluded: 排除以這些結尾的檔案
:param file_volume_limit: 最大檔案容量能夠限制,如果超過此大小,則該檔案不上傳
:param path_pattern_exluded_tuple: 更強大的檔案排除功能,比光排除以什麼字尾結尾更強大靈活
:param only_upload_within_the_last_modify_time: 隻上傳離目前時間最晚修改時間以後的檔案
:param cycle_interval: 每隔多少秒掃描一次需要上傳的檔案。
"""
self._host = host
self._port = port
self._username = username
self._password = password
self._local_dir = str(local_dir).replace('\\', '/')
self._remote_dir = remote_dir
self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
self._only_upload_within_the_last_modify_time = only_upload_within_the_last_modify_time
self._cycle_interval = cycle_interval
self._file_volume_limit = file_volume_limit
self.filename__filesize_map = dict()
self.filename__st_mtime_map = dict()
self.build_connect()
# noinspection PyAttributeOutsideInit
def build_connect(self):
self.logger.warning('建立linux連接配接')
# noinspection PyTypeChecker
t = paramiko.Transport((self._host, self._port))
t.connect(username=self._username, password=self._password)
self.sftp = paramiko.SFTPClient.from_transport(t)
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
self.ssh = ssh
# @decorators.tomorrow_threads(1)
def ftp_upload(self, file: str):
# file = file.replace('\\', '/')
pattern_str = self._local_dir
file_remote = file.replace(pattern_str, self._remote_dir)
# self.logger.debug((file, file_remote))
for _ in range(10):
try:
time_start = time.time()
self.sftp.put(file, file_remote)
self.logger.debug(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
break
except FileNotFoundError:
cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
self.logger.info(cmd)
tdin, stdout, stderr = self.ssh.exec_command(cmd)
stderr_bytes = stderr.read()
# self.logger.debug(stderr_bytes)
if stderr_bytes != b'':
self.logger.debug(stderr_bytes)
except OSError as e:
self.logger.exception(e)
pass
self.build_connect() # OSError: Socket is closed
def _judge_need_filter_a_file(self, filename: str):
ext = filename.split('.')[-1]
if '.' + ext in self._file_suffix_tuple_exluded:
return True
for path_pattern_exluded in self._path_pattern_exluded_tuple:
if re.search(path_pattern_exluded, filename):
return True
return False
def find_all_files_meet_the_conditions(self):
total_volume = 0
self.filename__filesize_map.clear()
for parent, dirnames, filenames in os.walk(self._local_dir):
for filename in filenames:
file_full_name = os.path.join(parent, filename).replace('\\', '/')
if not self._judge_need_filter_a_file(file_full_name):
# self.logger.debug(os.stat(file_full_name).st_mtime)
file_st_mtime = os.stat(file_full_name).st_mtime
volume = os.path.getsize(file_full_name)
if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
self.filename__st_mtime_map[file_full_name] = file_st_mtime
total_volume += volume
filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
filename__filesize_map_ordered_by_lsat_modify_time[k] = v
self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
self.logger.warning(f'需要上傳的所有檔案數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,檔案分别是 {json.dumps(self.filename__filesize_map, indent=4)}')
@decorators.tomorrow_threads(10)
def start_upload_files(self):
decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)()
def _start_upload_files(self):
with decorators.TimerContextManager():
self.find_all_files_meet_the_conditions()
for file in self.filename__filesize_map:
self.ftp_upload(file)
self.logger.warn('完成')


"""
自動同步檔案夾到linux機器
這個更犀利,采用了連接配接池 加線程池,上傳大量碎片檔案的速度大幅提升。
"""
import hashlib
import json
import os
from threading import Thread
import queue
import re
import shutil
import filecmp
import time
from collections import OrderedDict
from pathlib import Path
from typing import Union
import paramiko
from paramiko import SSHException
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler, nb_print, BoundedThreadPoolExecutor
class LocalCopier(LoggerMixinDefaultWithFileHandler):
"""
本地的兩個檔案夾之間的同步
"""
def __init__(self, local_dir, remote_dir, *args, **kwargs):
self._local_dir = str(local_dir).replace('\\', '/')
self._remote_dir = str(remote_dir).replace('\\', '/')
self.logger_extra_suffix = '本地windows間複制'
def upload(self, file: str):
file_remote = file.replace(self._local_dir, self._remote_dir)
if not Path(file_remote).parent.exists():
os.makedirs(str(Path(file_remote).parent))
# if self.get_file_md5(Path(file).open('rb')) != self.get_file_md5(Path(file_remote).open('rb')) :
if not Path(file_remote).exists() or not filecmp.cmp(file, file_remote):
shutil.copyfile(file, file_remote)
self.logger.info(f'從 {file} 複制成功到{file_remote} ,大小是 {round(os.path.getsize(file) / 1024)} kb')
else:
self.logger.debug(f'{file} 不複制到 {file_remote} 沒有變化。')
@staticmethod
def get_file_md5(file):
m = hashlib.md5()
while True:
# 如果不用二進制打開檔案,則需要先編碼
# data = f.read(1024).encode('utf-8')
data = file.read(1024) # 将檔案分塊讀取
if not data:
break
m.update(data)
return m.hexdigest()
@decorators.flyweight
class LinuxConnectionPool(LoggerMixinDefaultWithFileHandler):
def __init__(self, host, port, username, password): # 對相同的連結參數做了享元模式儲存連接配接池。
self.logger_extra_suffix = host
self.logger.warning(f'初始化linux連接配接池{host}')
self._host = host
self._port = port
self._username = username
self._password = password
self.queue_sftp_free = queue.Queue(100)
self.queue_ssh_free = queue.Queue(100)
self.build_connect()
@decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=0)
def build_sftp(self):
self.logger.warning(f'建立linux sftp連接配接中。。。')
t_start = time.time()
# noinspection PyTypeChecker
t = paramiko.Transport((self._host, self._port))
t.connect(username=self._username, password=self._password)
sftp = paramiko.SFTPClient.from_transport(t)
self.queue_sftp_free.put(sftp)
self.logger.warning(f'建立linux sftp連接配接耗時 {round(time.time() - t_start, 2)}')
@decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=1)
def bulid_ssh(self):
self.logger.warning(f'建立linux ssh連接配接中。。。。')
t_start = time.time()
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
self.queue_ssh_free.put(ssh)
self.logger.warning(f'建立linux ssh連接配接耗時 {round(time.time() - t_start, 2)}')
def build_connect(self):
# decorators.tomorrow_threads(10)(self._build_sftp)()
# decorators.tomorrow_threads(10)(self.__class__._bulid_ssh)(self)
def _inner():
executor = BoundedThreadPoolExecutor(100)
for _ in range(10):
time.sleep(0.2)
executor.submit(self.build_sftp)
for _ in range(3):
time.sleep(0.5)
executor.submit(self.bulid_ssh)
Thread(target=_inner).start()
def borrow_sftp(self):
return self.queue_sftp_free.get()
def borrow_ssh(self):
return self.queue_ssh_free.get()
def back_sftp(self, sftp):
self.queue_sftp_free.put(sftp)
def back_ssh(self, ssh):
self.queue_ssh_free.put(ssh)
class LinuxRemoteUploader(LocalCopier):
"""
windows同步到linux。
"""
def __init__(self, local_dir, remote_dir, host, port, username, password):
super().__init__(local_dir, remote_dir)
self.logger_extra_suffix = host
self.linux_conn_pool = LinuxConnectionPool(host, port, username, password)
def _do_mkdir_operation(self, file_remote):
cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
self.logger.info(cmd)
ssh = self.linux_conn_pool.borrow_ssh()
try:
tdin, stdout, stderr = ssh.exec_command(cmd)
except SSHException:
self.linux_conn_pool.bulid_ssh()
except Exception as e:
self.logger.exception(e)
else:
stderr_bytes = stderr.read()
# self.logger.debug(stderr_bytes)
if stderr_bytes != b'':
self.logger.debug(stderr_bytes)
self.linux_conn_pool.back_ssh(ssh)
@decorators.tomorrow_threads(19)
def upload(self, file: str):
self.logger.debug(f'sftp空閑連結數量 {self.linux_conn_pool.queue_sftp_free.qsize()}, ssh空閑連結數量 {self.linux_conn_pool.queue_ssh_free.qsize()}')
# file = file.replace('\\', '/')
pattern_str = self._local_dir
file_remote = file.replace(pattern_str, self._remote_dir)
# self.logger.debug((file, file_remote))
for _ in range(10):
sftp = self.linux_conn_pool.borrow_sftp()
try:
time_start = time.time()
sftp.put(file, file_remote)
self.logger.info(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
self.linux_conn_pool.back_sftp(sftp)
# self.linux_conn_pool.logger.debug((self.linux_conn_pool.queue_sftp_free.qsize(),self.linux_conn_pool.queue_ssh_free.qsize()))
break
except FileNotFoundError:
self._do_mkdir_operation(file_remote)
self.linux_conn_pool.back_sftp(sftp)
except (OSError, SSHException) as e:
self.logger.exception(e)
self.linux_conn_pool.build_sftp() # OSError: Socket is closed
class Synchronizer(LoggerMixinDefaultWithFileHandler):
def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
path_pattern_exluded_tuple=('/.git/', '/.idea/', 'cnbooking_all.json'), only_upload_within_the_last_modify_time='7 * 24 * 60 * 60', cycle_interval=2, just_windows_copy=False):
"""
:param host:
:param port:
:param username:
:param password:
:param local_dir:
:param remote_dir:
:param file_suffix_tuple_exluded: 排除以這些結尾的檔案。
:param file_volume_limit: 最大檔案容量能夠限制,如果超過此大小,則該檔案不上傳
:param path_pattern_exluded_tuple: 更強大的檔案排除功能,比光排除以什麼字尾結尾更強大靈活,使用的是python正規表達式。
:param only_upload_within_the_last_modify_time: 隻上傳離目前時間最晚修改時間以後的檔案。
:param cycle_interval: 每隔多少秒掃描一次需要上傳的檔案。
:param just_windows_copy: 執行windows不同檔案夾之間的複制,不上傳linux。
"""
self.logger_extra_suffix = host if not just_windows_copy else '本地'
self._local_dir = str(local_dir).replace('\\', '/')
self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
self._only_upload_within_the_last_modify_time = self._compute_result(only_upload_within_the_last_modify_time)
self._cycle_interval = cycle_interval
self._file_volume_limit = self._compute_result(file_volume_limit)
self.filename__filesize_map = dict()
self.filename__st_mtime_map = dict()
self._just_windows_copy = just_windows_copy
self.uploader = LinuxRemoteUploader(local_dir, remote_dir, host, port, username, password) if not just_windows_copy else LocalCopier(local_dir, remote_dir, host, port, username, password)
@staticmethod
def _compute_result(sth: Union[str, int]):
return sth if isinstance(sth, int) else eval(sth)
def _judge_need_filter_a_file(self, filename: str):
ext = filename.split('.')[-1]
if '.' + ext in self._file_suffix_tuple_exluded:
return True
for path_pattern_exluded in self._path_pattern_exluded_tuple:
if re.search(path_pattern_exluded, filename):
return True
return False
def find_all_files_meet_the_conditions(self):
t_start = time.time()
total_volume = 0
self.filename__filesize_map.clear()
for parent, dirnames, filenames in os.walk(self._local_dir):
for filename in filenames:
file_full_name = os.path.join(parent, filename).replace('\\', '/')
if not self._judge_need_filter_a_file(file_full_name):
# self.logger.debug(os.stat(file_full_name).st_mtime)
file_st_mtime = os.stat(file_full_name).st_mtime
volume = os.path.getsize(file_full_name)
if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name
not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
if self.filename__st_mtime_map.get(file_full_name, None) != file_st_mtime:
self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
self.filename__st_mtime_map[file_full_name] = file_st_mtime
total_volume += volume
filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
filename__filesize_map_ordered_by_lsat_modify_time[k] = v
self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
if len(self.filename__filesize_map) > 0:
self.logger.warning(f'需要{"複制" if self._just_windows_copy else "上傳"} 的所有檔案數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,'
f'查找檔案耗時 {round(time.time() - t_start, 2)} 秒,檔案分别是 {json.dumps(self.filename__filesize_map, indent=4)}')
# @decorators.tomorrow_threads(10)
def start_upload_files(self):
Thread(target=decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)).start()
def _start_upload_files(self):
self.find_all_files_meet_the_conditions()
for file in self.filename__filesize_map:
self.uploader.upload(file)
# noinspection PyPep8
if __name__ == '__main__':
"""
配置裡面的内容格式如下,支援同步多個檔案夾映射。
[
{
"host": "112.90.xx.xx",
"port": 10005,
"username": "root",
"password": "@0^Lc97MewI3i7xxxxxx",
"local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
"remote_dir": "/home/ydf/hotelf15",
"file_suffix_tuple_exluded": [
".pyc",
".log",
".gz"
],
"path_pattern_exluded_tuple": [
"/.git/",
"/.idea/",
"cnbooking_cn_all.json"
],
"only_upload_within_the_last_modify_time": "365 * 24 * 3600",
"file_volume_limit": "2 * 1000 * 1000",
"cycle_interval": 10
}
]
"""
for config_item in json.load(Path('/windows_to_linux_syn_config.json').open()):
nb_print(json.dumps(config_item))
Synchronizer(**config_item).start_upload_files()
# sc create PythonApp6 binPath= "D:\Users\ydf\Desktop\oschina\coding\hotel_fares\dist\windows_to_linux_syn2\windows_to_linux_syn2.exe"
# pyinstaller --distpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --workpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --specpath=D:\Users\ydf\Desktop\oschina\specify_pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
# 可以使用pyinstaller打包這個檔案。先添加PYTHONPATH變量,在另外的檔案夾執行這個指令。
# pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
# cd ..
# set PYTHONPATH=D:\coding2\hotel_fares
# pyinstaller -F --icon="D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
# 測試更新。。。。。。.


[
{
"host": "112.xx.89.16",
"port": 10033,
"username": "root",
"password": "xxxx",
"local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
"remote_dir": "/home/ydf/hotelf18",
"file_suffix_tuple_exluded": [
".pyc",
".log",
".gz"
],
"path_pattern_exluded_tuple": [
"/.git/",
"/.idea/",
"cnbooking_cn_all.json"
],
"only_upload_within_the_last_modify_time": "30 * 24 * 3600",
"file_volume_limit": "2 * 1000 * 1000",
"cycle_interval": 1
},
{
"host": "112.90.xx.16",
"port": 10033,
"username": "root",
"password": "xxxx",
"local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\movie_data",
"remote_dir": "/home/ydf/movie_data2",
"file_suffix_tuple_exluded": [
".pyc",
".log",
".gz"
],
"path_pattern_exluded_tuple": [
"/.git/",
"/.idea/",
"cnbooking_cn_all.json"
],
"only_upload_within_the_last_modify_time": "30 * 24 * 3600",
"file_volume_limit": "2 * 1000 * 1000",
"cycle_interval": 1
}
]
