天天看點

PySide6(PyQt)實作多層級目錄結構檔案下載下傳器

作者:SZ深呼吸

  我們有時候從檔案伺服器上下載下傳檔案時,會遇到多層級目錄結構的檔案。如圖所示:

PySide6(PyQt)實作多層級目錄結構檔案下載下傳器

  如果我們想一次性把所有檔案下載下傳下來,怎麼辦?一個檔案一個檔案手動下載下傳太麻煩了。用Python來解決這個問題很友善,我用PySide6寫了一個檔案下載下傳器。

PySide6(PyQt)實作多層級目錄結構檔案下載下傳器
PySide6(PyQt)實作多層級目錄結構檔案下載下傳器

我們來對比一下伺服器上的檔案和下載下傳的檔案:

PySide6(PyQt)實作多層級目錄結構檔案下載下傳器
PySide6(PyQt)實作多層級目錄結構檔案下載下傳器

檔案大小、占用空間、檔案和檔案夾個數完全一緻。

實作代碼如下:

import os
import re
import sys
import requests
from contextlib import closing
from PySide6 import QtCore
from PySide6.QtCore import QThread, Slot, Signal, QDir
from PySide6.QtWidgets import QWidget, QVBoxLayout, QApplication, QLineEdit, \
    QLabel, QHBoxLayout, QPushButton, QTextEdit, QFileDialog, QStyleFactory

REQUEST_USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36'


class MainWindow(QWidget):

    def __init__(self, parent=None):
        super(MainWindow, self).__init__(parent)
        self.downloadUrl = None
        self.savePath = None
        self.resize(960, 600)
        self.setWindowTitle('多層級目錄結構檔案下載下傳器')

        self.verticalLayout = QVBoxLayout(self)
        self.horizontalLayout = QHBoxLayout()
        self.horizontalLayout.addWidget(QLabel('下載下傳位址:'))
        self.downloadUrlLineEdit = QLineEdit(self)
        self.downloadUrlLineEdit.setText('http://127.0.0.1/UpdateDemo1.0.1/')
        self.horizontalLayout.addWidget(self.downloadUrlLineEdit)
        self.downloadButton = self.createButton('下載下傳', self.download)
        self.horizontalLayout.addWidget(self.downloadButton)
        self.verticalLayout.addLayout(self.horizontalLayout)

        self.horizontalLayout_2 = QHBoxLayout()
        self.horizontalLayout_2.addWidget(QLabel('儲存路徑:'))
        self.savePathLineEdit = QLineEdit(self)
        self.horizontalLayout_2.addWidget(self.savePathLineEdit)

        self.browserButton = self.createButton('浏覽', self.browse)
        self.horizontalLayout_2.addWidget(self.browserButton)
        self.verticalLayout.addLayout(self.horizontalLayout_2)

        self.textEdit = QTextEdit(self)
        self.verticalLayout.addWidget(self.textEdit)
        self.tipLabel = QLabel(self)
        self.verticalLayout.addWidget(self.tipLabel)

        self.downloadThread = DownloadThread(self)

    def createButton(self, text, member):
        button = QPushButton(text)
        button.clicked.connect(member)
        return button

    def download(self):
        self.downloadUrl = self.downloadUrlLineEdit.text()
        self.savePath = self.savePathLineEdit.text()
        self.textEdit.clear()
        self.downloadThread.start()

    def browse(self):
        directory = QFileDialog.getExistingDirectory(self, "檔案儲存目錄", QDir.currentPath())
        if not directory.endswith('/'):
            directory = directory + '/'
        self.savePathLineEdit.setText(directory)

    @Slot(str)
    def showLog(self, message):
        self.textEdit.append(message)

    @Slot(str)
    def showTip(self, message):
        self.tipLabel.setText(message)


# 信号對象
class Communicate(QtCore.QObject):
    # 建立一個信号
    showTipSignal = Signal(str)
    showLogSignal = Signal(str)


class DownloadThread(QThread):
    def __init__(self, parent=None):
        QThread.__init__(self, parent)
        self.parent = parent
        self.dirCount = 0
        self.fileCount = 0
        self.signals = Communicate()
        self.signals.showLogSignal.connect(parent.showLog)
        self.signals.showTipSignal.connect(parent.showTip)

    def run(self):
        self.signals.showTipSignal.emit('正在下載下傳更新檔案...')
        self.download_walk(self.parent.downloadUrl, self.parent.savePath)

    def download_walk(self, root_url, root_path):

        with closing(requests.get(root_url, headers={"User-Agent": REQUEST_USER_AGENT},
                                  stream=True)) as response:
            file_list = re.findall(r'<a href="(.*)">(.*)</a> (.*) (.*)\r', response.text)
            # print(file_list)

            for file in file_list:
                file_url = f'{root_url}{file[0]}'
                file_name = file[1].strip()
                date_time = file[2].strip()
                file_size = file[3]
                save_path = f'{root_path}{file_name}'
                print(file_name)

                if file_name.endswith('/'):
                    if not os.path.exists(save_path):
                        os.mkdir(save_path)
                    self.download_walk(file_url, save_path)
                    self.dirCount = self.dirCount + 1
                else:
                    self.download_file(file_url, save_path)
                    self.fileCount = self.fileCount + 1
                    self.signals.showTipSignal.emit(f'下載下傳完畢. 包含:{self.dirCount}個檔案夾,{self.fileCount}個檔案')

    def download_file(self, file_url, save_path):
        with closing(requests.get(file_url, headers={"User-Agent": REQUEST_USER_AGENT},
                                  stream=True)) as response:
            chunk_size = 1024  # 單次請求最大值
            content_size = int(response.headers['content-length'])  # 内容體總大小
            data_count = 0
            with open(save_path, "wb") as f:
                for data in response.iter_content(chunk_size=chunk_size):
                    f.write(data)
                    data_count = data_count + len(data)
                    now_jd = (data_count / content_size) * 100
                    # print("\r 檔案下載下傳進度:%d%%(%d/%d) - %s" % (now_jd, data_count, content_size, save_path), end=" ")
                    self.signals.showTipSignal.emit(f'正在下載下傳:{file_url} ({data_count}位元組/{content_size}位元組) {round(now_jd, 2)}%')
                    if now_jd == 100:
                        self.signals.showLogSignal.emit(f'{save_path}')
                # print('\r')


if __name__ == "__main__":
    app = QApplication([])
    app.setStyle(QStyleFactory.create('Fusion'))
    window = MainWindow()
    window.show()
    sys.exit(app.exec())