天天看點

各個擊破,超大檔案分片上傳Vue.js3.0+Tornado6異步IO高效寫入

作者:劉悅技術分享

分治算法是一種很古老但很務實的方法。本意即使将一個較大的整體打碎分成小的局部,這樣每個小的局部都不足以對抗大的整體。戰國時期,秦國破壞合縱的連橫即是一種分而治之的手段;十九世紀,比利時殖民者占領盧旺達, 将盧旺達的種族分為胡圖族與圖西族,以圖進行分裂控制,莫不如是。

21世紀,人們往往會在Leetcode平台上刷分治算法題,但事實上,從工業角度上來看,算法如果不和實際業務場景相結合,算法就永遠是虛無缥缈的存在,它隻會出現在開發者的某一次不經意的面試中,而真實的算法,并不是虛空的,它應該能幫助我們解決實際問題,是的,它應該落地成為實體。

大檔案分片上傳就是這樣一個契合分治算法的場景,現而今,視訊檔案的體積越來越大,高清視訊體積大概2-4g不等,但4K視訊的分辨率是标準高清的四倍,需要四倍的存儲空間——隻需兩到三分鐘的未壓縮4K 電影,或者電影預告片的長度,就可以達到500GB。 8K視訊檔案更是大得難以想象,而現在12K正在出現,如此巨大的檔案,該怎樣設計一套合理的資料傳輸方案?這裡我們以前後端分離項目為例,前端使用Vue.js3.0配合ui庫Ant-desgin,後端采用并發異步架構Tornado實作大檔案的分片無阻塞傳輸與異步IO寫入服務。

前端分片

首先,安裝Vue3.0以上版本:

npm install -g @vue/cli           

安裝異步請求庫axios:

npm install axios --save           

随後,安裝Ant-desgin:

npm i --save ant-design-vue@next -S           

Ant-desgin雖然因為曾經的聖誕節“彩蛋門”事件而聲名狼藉,但客觀地說,它依然是業界不可多得的優秀UI架構之一。

接着在項目程式入口檔案引入使用:

import { createApp } from 'vue'
import App from './App.vue'
import { router } from './router/index'


import axios from 'axios'
import qs from 'qs'

import Antd from 'ant-design-vue';
import 'ant-design-vue/dist/antd.css';


const app = createApp(App)


app.config.globalProperties.axios = axios;
app.config.globalProperties.upload_dir = "https://localhost/static/";

app.config.globalProperties.weburl = "http://localhost:8000";

app.use(router);
app.use(Antd);

app.mount('#app')           

随後,參照Ant-desgin官方文檔:https://antdv.com/components/overview-cn 建構上傳控件:

<a-upload

    @change="fileupload"
    :before-upload="beforeUpload"
  >
    <a-button>
      <upload-outlined></upload-outlined>
      上傳檔案
    </a-button>
  </a-upload>           

注意這裡需要将綁定的before-upload強制傳回false,設定為手動上傳:

beforeUpload:function(file){


      return false;

}           

接着聲明分片方法:

fileupload:function(file){


      var size = file.file.size;//總大小
 

      var shardSize = 200 * 1024; //分片大小
 
      this.shardCount = Math.ceil(size / shardSize); //總片數

      console.log(this.shardCount);
 
 
      for (var i = 0; i < this.shardCount; ++i) {
 
        //計算每一片的起始與結束位置
 
        var start = i * shardSize;
 
        var end = Math.min(size, start + shardSize);

        var tinyfile = file.file.slice(start, end);

        let data = new FormData();
        data.append('file', tinyfile);
        data.append('count',i);
        data.append('filename',file.file.name);

        const axiosInstance = this.axios.create({withCredentials: false});

        axiosInstance({
            method: 'POST',
            url:'http://localhost:8000/upload/',  //上傳位址
            data:data
        }).then(data =>{

           this.finished += 1;

           console.log(this.finished);

           if(this.finished == this.shardCount){
            this.mergeupload(file.file.name);
           }

        }).catch(function(err) {
            //上傳失敗
        });



      }



    }           

具體分片邏輯是,大檔案總體積按照單片體積的大小做除法并向上取整,擷取到檔案的分片個數,這裡為了測試友善,将單片體積設定為200kb,可以随時做修改。

随後,分片過程中使用Math.min方法計算每一片的起始和結束位置,再通過slice方法進行切片操作,最後将分片的下标、檔案名、以及分片本體異步發送到背景。

當所有的分片請求都發送完畢後,封裝分片合并方法,請求後端發起合并分片操作:

mergeupload:function(filename){


      this.myaxios(this.weburl+"/upload/","put",{"filename":filename}).then(data =>{

              console.log(data);

        });

}           

至此,前端分片邏輯就完成了。

後端異步IO寫入

為了避免同步寫入引起的阻塞,安裝aiofiles庫:

pip3 install aiofiles           

aiofiles用于處理asyncio應用程式中的本地磁盤檔案,配合Tornado的異步非阻塞機制,可以有效的提升檔案寫入效率:

import aiofiles

# 分片上傳
class SliceUploadHandler(BaseHandler):
    
    async def post(self):


        file = self.request.files["file"][0]
        filename = self.get_argument("filename")
        count = self.get_argument("count")

        filename = '%s_%s' % (filename,count) # 構成該分片唯一辨別符

        contents = file['body'] #異步讀取檔案
        async with aiofiles.open('./static/uploads/%s' % filename, "wb") as f:
            await f.write(contents)

        return {"filename": file.filename,"errcode":0}           

這裡後端擷取到分片實體、檔案名、以及分片辨別後,将分片檔案以檔案名_分片辨別的格式異步寫入到系統目錄中,以一張378kb大小的png圖檔為例,分片檔案應該順序為200kb和178kb,如圖所示:

各個擊破,超大檔案分片上傳Vue.js3.0+Tornado6異步IO高效寫入

當分片檔案都寫入成功後,觸發分片合并接口:

import aiofiles

# 分片上傳
class SliceUploadHandler(BaseHandler):
    
    async def post(self):


        file = self.request.files["file"][0]
        filename = self.get_argument("filename")
        count = self.get_argument("count")

        filename = '%s_%s' % (filename,count) # 構成該分片唯一辨別符

        contents = file['body'] #異步讀取檔案
        async with aiofiles.open('./static/uploads/%s' % filename, "wb") as f:
            await f.write(contents)

        return {"filename": file.filename,"errcode":0}


    async def put(self):

        filename = self.get_argument("filename")
        chunk = 0

        async with aiofiles.open('./static/uploads/%s' % filename,'ab') as target_file:

            while True:
                try:
                    source_file = open('./static/uploads/%s_%s' % (filename,chunk), 'rb')
                    await target_file.write(source_file.read())
                    source_file.close()
                except Exception as e:
                    print(str(e))
                    break

                chunk = chunk + 1
        self.finish({"msg":"ok","errcode":0})           

這裡通過檔案名進行尋址,随後周遊合并,注意句柄寫入模式為增量位元組碼寫入,否則會逐層将分片檔案覆寫,同時也兼具了斷點續寫的功能。有些邏輯會将分片個數傳入後端,讓後端判斷分片合并個數,其實并不需要,因為如果尋址失敗,會自動抛出異常并且跳出循環,進而節約了一個參數的帶寬占用。

輪詢服務

在真實的超大檔案傳輸場景中,由于網絡或者其他因素,很可能導緻分片任務中斷,此時就需要通過降級快速響應,傳回托底資料,避免使用者的長時間等待,這裡我們使用基于Tornado的Apscheduler庫來排程分片任務:

pip install apscheduler           

随後編寫job.py輪詢服務檔案:

from datetime import datetime
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import RequestHandler, Application
from apscheduler.schedulers.tornado import TornadoScheduler


scheduler = None
job_ids   = []

# 初始化
def init_scheduler():
    global scheduler
    scheduler = TornadoScheduler()
    scheduler.start()
    print('[Scheduler Init]APScheduler has been started')

# 要執行的定時任務在這裡
def task1(options):
    print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options))


class MainHandler(RequestHandler):
    def get(self):
        self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')


class SchedulerHandler(RequestHandler):
    def get(self):
        global job_ids
        job_id = self.get_query_argument('job_id', None)
        action = self.get_query_argument('action', None)
        if job_id:
            # add
            if 'add' == action:
                if job_id not in job_ids:
                    job_ids.append(job_id)
                    scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,))
                    self.write('[TASK ADDED] - {}'.format(job_id))
                else:
                    self.write('[TASK EXISTS] - {}'.format(job_id))
            # remove
            elif 'remove' == action:
                if job_id in job_ids:
                    scheduler.remove_job(job_id)
                    job_ids.remove(job_id)
                    self.write('[TASK REMOVED] - {}'.format(job_id))
                else:
                    self.write('[TASK NOT FOUND] - {}'.format(job_id))
        else:
            self.write('[INVALID PARAMS] INVALID job_id or action')


if __name__ == "__main__":
    routes    = [
        (r"/", MainHandler),
        (r"/scheduler/?", SchedulerHandler),
    ]
    init_scheduler()
    app       = Application(routes, debug=True)
    app.listen(8888)
    IOLoop.current().start()           

每一次分片接口被調用後,就建立定時任務對分片檔案進行監測,如果分片成功就删除分片檔案,同時删除任務,否則就啟用降級預案。

結語

分治法對超大檔案進行分片切割,同時并發異步發送,可以提高傳輸效率,降低傳輸時間,和之前的一篇:聚是一團火散作滿天星,前端Vue.js+elementUI結合後端FastAPI實作大檔案分片上傳,邏輯上有異曲同工之妙,但手法上卻略有不同,确是頗有互相借鏡之處,最後代碼開源于Github:https://github.com/zcxey2911/Tornado6_Vuejs3_Edu,與衆親同飨。

繼續閱讀