天天看點

位元組跳動面試官,我也實作了大檔案上傳和斷點續傳

作者:架構師成長曆程
位元組跳動面試官,我也實作了大檔案上傳和斷點續傳

前言

前幾天看到一個文章,感觸很深

位元組跳動面試官:請你實作一個大檔案上傳和斷點續傳

作者從0實作了大檔案的切片上傳,斷點續傳,秒傳,暫停等功能,深入淺出的把這個面試題進行了全面的剖析

彩虹屁不多吹,我決定蹭蹭熱點,錄錄視訊,把作者完整寫代碼的過程加進去,并且接着這篇文章寫,是以請看完上面的文章後再食用,我做了一些擴充如下

  1. 計算hash耗時的問題,不僅可以通過web-workder,還可以參考React的FFiber架構,通過requestIdleCallback來利用浏覽器的空閑時間計算,也不會卡死主線程
  2. 檔案hash的計算,是為了判斷檔案是否存在,進而實作秒傳的功能,是以我們可以參考布隆過濾器的理念, 犧牲一點點的識别率來換取時間,比如我們可以抽樣算hash
  3. 文中通過web-workder讓hash計算不卡頓主線程,但是大檔案由于切片過多,過多的HTTP連結過去,也會把浏覽器打挂 (我試了4個G的,直接卡死了), 我們可以通過控制異步請求的并發數來解決,我記得這也是頭條的一個面試題
  4. 每個切片的上傳進度不需要用表格來顯示,我們換成方塊進度條更直覺一些(如圖)
  5. 并發上傳中,報錯如何重試,比如每個切片我們允許重試兩次,三次再終止
  6. 由于檔案大小不一,我們每個切片的大小設定成固定的也有點略顯笨拙,我們可以參考TCP協定的慢啟動政策, 設定一個初始大小,根據上傳任務完成的時候,來動态調整下一個切片的大小, 確定檔案切片的大小和目前網速比對
  7. 小的體驗優化,比如上傳的時候
  8. 檔案碎片清理
位元組跳動面試官,我也實作了大檔案上傳和斷點續傳

已經存在的秒傳的切片就是綠的,正在上傳的是藍色的,并發量是4,廢話不多說,我們一起代碼開花

時間切片計算檔案hash

其實就是time-slice概念,React中Fiber架構的核心理念,利用浏覽器的空閑時間,計算大的diff過程,中途又任何的高優先級任務,比如動畫和輸入,都會中斷diff任務, 雖然整個計算量沒有減小,但是大大提高了使用者的互動體驗

這可能是最通俗的 React Fiber(時間分片) 打開方式

位元組跳動面試官,我也實作了大檔案上傳和斷點續傳

requestIdleCallback

window.requestIdleCallback()方法将在浏覽器的空閑時段内調用的函數排隊。這使開發者能夠在主事件循環上執行背景和低優先級工作 requestIdelCallback執行的方法,會傳遞一個deadline參數,能夠知道目前幀的剩餘時間,用法如下

scss複制代碼    requestIdelCallback(myNonEssentialWork);
    
    
    function myNonEssentialWork (deadline) {
    
      // deadline.timeRemaining()可以擷取到目前幀剩餘時間
      // 目前幀還有時間 并且任務隊列不為空
      while (deadline.timeRemaining() > 0 && tasks.length > 0) {
        doWorkIfNeeded();
      }
      if (tasks.length > 0){
        requestIdleCallback(myNonEssentialWork);
      }
    }

           

deadline的結構如下

java複制代碼interface Dealine {
  didTimeout: boolean // 表示任務執行是否超過約定時間
  timeRemaining(): DOMHighResTimeStamp // 任務可供執行的剩餘時間
}
           
位元組跳動面試官,我也實作了大檔案上傳和斷點續傳

該圖中的兩個幀,在每一幀内部,TASK和redering隻花費了一部分時間,并沒有占據整個幀,那麼這個時候,如圖中idle period的部分就是空閑時間,而每一幀中的空閑時間,根據該幀中處理事情的多少,複雜度等,消耗不等,是以空閑時間也不等。

而對于每一個deadline.timeRemaining()的傳回值,就是如圖中,Idle Callback到所在幀結尾的時間(ms級)

時間切片計算

我們接着之前文章的代碼,改造一下calculateHash

ini複制代碼
    async calculateHashIdle(chunks) {
      return new Promise(resolve => {
        const spark = new SparkMD5.ArrayBuffer();
        let count = 0;
        // 根據檔案内容追加計算
        const appendToSpark = async file => {
          return new Promise(resolve => {
            const reader = new FileReader();
            reader.readAsArrayBuffer(file);
            reader.onload = e => {
              spark.append(e.target.result);
              resolve();
            };
          });
        };
        const workLoop = async deadline => {
          // 有任務,并且目前幀還沒結束
          while (count < chunks.length && deadline.timeRemaining() > 1) {
            await appendToSpark(chunks[count].file);
            count++;
            // 沒有了 計算完畢
            if (count < chunks.length) {
              // 計算中
              this.hashProgress = Number(
                ((100 * count) / chunks.length).toFixed(2)
              );
              // console.log(this.hashProgress)
            } else {
              // 計算完畢
              this.hashProgress = 100;
              resolve(spark.end());
            }
          }
          window.requestIdleCallback(workLoop);
        };
        window.requestIdleCallback(workLoop);
      });
    },
           

計算過程中,頁面放個輸入框,輸入無壓力,時間切片的威力

位元組跳動面試官,我也實作了大檔案上傳和斷點續傳

上圖是React15和Fiber架構的對比,可以看出下圖任務量沒變,但是變得零散了,不混卡頓主線程

抽樣hash

計算檔案md5值的作用,無非就是為了判定檔案是否存在,我們可以考慮設計一個抽樣的hash,犧牲一些命中率的同時,提升效率,設計思路如下

  1. 檔案切成2M的切片
  2. 第一個和最後一個切片全部内容,其他切片的取 首中尾三個地方各2個位元組
  3. 合并後的内容,計算md5,稱之為影分身Hash
  4. 這個hash的結果,就是檔案存在,有小機率誤判,但是如果不存在,是100%準的的 ,和布隆過濾器的思路有些相似, 可以考慮兩個hash配合使用
  5. 我在自己電腦上試了下1.5G的檔案,全量大概要20秒,抽樣大概1秒還是很不錯的, 可以先用來判斷檔案是不是不存在
  6. 我真是個小機靈
位元組跳動面試官,我也實作了大檔案上傳和斷點續傳
抽樣md5: 1028.006103515625ms

全量md5: 21745.13916015625ms

           
async calculateHashSample() {
      return new Promise(resolve => {
        const spark = new SparkMD5.ArrayBuffer();
        const reader = new FileReader();
        const file = this.container.file;
        // 檔案大小
        const size = this.container.file.size;
        let offset = 2 * 1024 * 1024;

        let chunks = [file.slice(0, offset)];

        // 前面100K

        let cur = offset;
        while (cur < size) {
          // 最後一塊全部加進來
          if (cur + offset >= size) {
            chunks.push(file.slice(cur, cur + offset));
          } else {
            // 中間的 前中後去兩個位元組
            const mid = cur + offset / 2;
            const end = cur + offset;
            chunks.push(file.slice(cur, cur + 2));
            chunks.push(file.slice(mid, mid + 2));
            chunks.push(file.slice(end - 2, end));
          }
          // 前取兩個位元組
          cur += offset;
        }
        // 拼接
        reader.readAsArrayBuffer(new Blob(chunks));
        reader.onload = e => {
          spark.append(e.target.result);

          resolve(spark.end());
        };
      });
    }
           

網絡請求并發控制

大檔案hash計算後,一次發幾百個http請求,計算哈希沒卡,結果TCP建立的過程就把浏覽器弄死了,而且我記得本身異步請求并發數的控制,本身就是頭條的一個面試題

位元組跳動面試官,我也實作了大檔案上傳和斷點續傳

思路其實也不難,就是我們把異步請求放在一個隊列裡,比如并發數是3,就先同時發起3個請求,然後有請求結束了,再發起下一個請求即可, 思路清楚,代碼也就呼之欲出了

我們通過并發數max來管理并發數,發起一個請求max--,結束一個請求max++即可

javascript複制代碼
+async sendRequest(forms, max=4) {
+  return new Promise(resolve => {
+    const len = forms.length;
+    let idx = 0;
+    let counter = 0;
+    const start = async ()=> {
+      // 有請求,有通道
+      while (idx < len && max > 0) {
+        max--; // 占用通道
+        console.log(idx, "start");
+        const form = forms[idx].form;
+        const index = forms[idx].index;
+        idx++
+        request({
+          url: '/upload',
+          data: form,
+          onProgress: this.createProgresshandler(this.chunks[index]),
+          requestList: this.requestList
+        }).then(() => {
+          max++; // 釋放通道
+          counter++;
+          if (counter === len) {
+            resolve();
+          } else {
+            start();
+          }
+        });
+      }
+    }
+    start();
+  });
+}

async uploadChunks(uploadedList = []) {
  // 這裡一起上傳,碰見大檔案就是災難
  // 沒被hash計算打到,被一次性的tcp連結把浏覽器稿挂了
  // 異步并發控制政策,我記得這個也是頭條一個面試題
  // 比如并發量控制成4
  const list = this.chunks
    .filter(chunk => uploadedList.indexOf(chunk.hash) == -1)
    .map(({ chunk, hash, index }, i) => {
      const form = new FormData();
      form.append("chunk", chunk);
      form.append("hash", hash);
      form.append("filename", this.container.file.name);
      form.append("fileHash", this.container.hash);
      return { form, index };
    })
-     .map(({ form, index }) =>
-       request({
-           url: "/upload",
-         data: form,
-         onProgress: this.createProgresshandler(this.chunks[index]),
-         requestList: this.requestList
-       })
-     );
-   // 直接全量并發
-   await Promise.all(list);
     // 控制并發  
+   const ret =  await this.sendRequest(list,4)

  if (uploadedList.length + list.length === this.chunks.length) {
    // 上傳和已經存在之和 等于全部的再合并
    await this.mergeRequest();
  }
},
           

話說位元組跳動另外一個面試題我也做出來的,不知道能不能通過他們的一面

慢啟動政策實作

TCP擁塞控制的問題 其實就是根據目前網絡情況,動态調整切片的大小

  1. chunk中帶上size值,不過進度條數量不确定了,修改createFileChunk, 請求加上時間統計)
  2. 比如我們理想是30秒傳遞一個
  3. 初始大小定為1M,如果上傳花了10秒,那下一個區塊大小變成3M
  4. 如果上傳花了60秒,那下一個區塊大小變成500KB 以此類推
  5. 并發+慢啟動的邏輯有些複雜,我自己還沒繞明白,囧是以先一次隻傳一個切片,來示範這個邏輯,建立一個handleUpload1函數
javascript複制代碼async handleUpload1(){
      // @todo資料縮放的比率 可以更平緩  
      // @todo 并發+慢啟動

      // 慢啟動上傳邏輯 
      const file = this.container.file
      if (!file) return;
      this.status = Status.uploading;
      const fileSize = file.size
      let offset = 1024*1024 
      let cur = 0 
      let count =0
      this.container.hash = await this.calculateHashSample();

      while(cur<fileSize){
        // 切割offfset大小
        const chunk = file.slice(cur, cur+offset)
        cur+=offset
        const chunkName = this.container.hash + "-" + count;
        const form = new FormData();
        form.append("chunk", chunk);
        form.append("hash", chunkName);
        form.append("filename", file.name);
        form.append("fileHash", this.container.hash);
        form.append("size", chunk.size);

        let start = new Date().getTime()
        await request({ url: '/upload',data: form })
        const now = new Date().getTime()
        
        const time = ((now -start)/1000).toFixed(4)
        let rate = time/30
        // 速率有最大2和最小0.5
        if(rate<0.5) rate=0.5
        if(rate>2) rate=2
        // 新的切片大小等比變化
        console.log(`切片${count}大小是${this.format(offset)},耗時${time}秒,是30秒的${rate}倍,修正大小為${this.format(offset/rate)}`)
        // 動态調整offset
        offset = parseInt(offset/rate)
        // if(time)
        count++
      }
    }

           

調整下slow 3G網速 看下效果

複制代碼切片0大小是1024.00KB,耗時13.2770秒,是30秒的0.5倍,修正大小為2.00MB
切片1大小是2.00MB,耗時25.4130秒,是30秒的0.8471倍,修正大小為2.36MB
切片2大小是2.36MB,耗時14.1260秒,是30秒的0.5倍,修正大小為4.72MB
           

搞定

并發重試+報錯

  1. 請求出錯.catch 把任務重新放在隊列中
  2. 出錯後progress設定為-1 進度條顯示紅色
  3. 數組存儲每個檔案hash請求的重試次數,做累加 比如[1,0,2],就是第0個檔案切片報錯1次,第2個報錯2次
  4. 超過3的直接reject

首先後端模拟報錯

javascript複制代碼      if(Math.random()<0.5){
        // 機率報錯
        console.log('機率報錯了')
        res.statusCode=500
        res.end()
        return 
      }

           
ini複制代碼async sendRequest(urls, max=4) {
-      return new Promise(resolve => {
+      return new Promise((resolve,reject) => {
         const len = urls.length;
         let idx = 0;
         let counter = 0;
+        const retryArr = []

 
         const start = async ()=> {
           // 有請求,有通道
-          while (idx < len && max > 0) {
+          while (counter < len && max > 0) {
             max--; // 占用通道
             console.log(idx, "start");
-            const form = urls[idx].form;
-            const index = urls[idx].index;
-            idx++
+            // 任務不能僅僅累加擷取,而是要根據狀态
+            // wait和error的可以送出請求 友善重試
+            const i = urls.findIndex(v=>v.status==Status.wait || v.status==Status.error )// 等待或者error
+            urls[i].status = Status.uploading
+            const form = urls[i].form;
+            const index = urls[i].index;
+            if(typeof retryArr[index]=='number'){
+              console.log(index,'開始重試')
+            }
             request({
               url: '/upload',
               data: form,
               onProgress: this.createProgresshandler(this.chunks[index]),
               requestList: this.requestList
             }).then(() => {
+               urls[i].status = Status.done
               max++; // 釋放通道
               counter++;
+              urls[counter].done=true
               if (counter === len) {
                 resolve();
               } else {
                 start();
               }
-            });
+            }).catch(()=>{
+               urls[i].status = Status.error
+               if(typeof retryArr[index]!=='number'){
+                  retryArr[index] = 0
+               }
+              // 次數累加
+              retryArr[index]++
+              // 一個請求報錯3次的
+              if(retryArr[index]>=2){
+                return reject()
+              }
+              console.log(index, retryArr[index],'次報錯')
+              // 3次報錯以内的 重新開機
+              this.chunks[index].progress = -1 // 報錯的進度條
+              max++; // 釋放目前占用的通道,但是counter不累加
+              
+              start()
+            })
           }
         }
         start();

}

           

如圖所示,報錯後會區塊變紅,但是會重試

檔案碎片清理

如果很多人傳了一半就離開了,這些切片存在就沒意義了,可以考慮定期清理,當然 ,我們可以使用node-schedule來管理定時任務 比如我們每天掃一次target,如果檔案的修改時間是一個月以前了,就直接删除把

scss複制代碼// 為了友善測試,我改成每5秒掃一次, 過期1鐘的删除做示範
const fse = require('fs-extra')
const path = require('path')
const schedule = require('node-schedule')


// 空目錄删除
function remove(file,stats){
    const now = new Date().getTime()
    const offset = now - stats.ctimeMs 
    if(offset>1000*60){
        // 大于60秒的碎片
        console.log(file,'過期了,浪費空間的玩意,删除')
        fse.unlinkSync(file)
    }
}

async function scan(dir,callback){
    const files = fse.readdirSync(dir)
    files.forEach(filename=>{
        const fileDir = path.resolve(dir,filename)
        const stats = fse.statSync(fileDir)
        if(stats.isDirectory()){
            return scan(fileDir,remove)
        }
        if(callback){
            callback(fileDir,stats)
        }
    })
}
// *    *    *    *    *    *
// ┬    ┬    ┬    ┬    ┬    ┬
// │    │    │    │    │    │
// │    │    │    │    │    └ day of week (0 - 7) (0 or 7 is Sun)
// │    │    │    │    └───── month (1 - 12)
// │    │    │    └────────── day of month (1 - 31)
// │    │    └─────────────── hour (0 - 23)
// │    └──────────────────── minute (0 - 59)
// └───────────────────────── second (0 - 59, OPTIONAL)
let start = function(UPLOAD_DIR){
    // 每5秒
    schedule.scheduleJob("*/5 * * * * *",function(){
        console.log('開始掃描')
        scan(UPLOAD_DIR)
    })
}
exports.start = start

           
bash複制代碼開始掃描
/upload/target/625c.../625c...-0 過期了,删除
/upload/target/625c.../625c...-1 過期了,删除
/upload/target/625c.../625c...-10 過期了,删除
/upload/target/625c.../625c...-11 過期了,删除
/upload/target/625c.../625c...-12 過期了,删除


           

後續擴充和思考

留幾個思考題,下次寫文章再實作 友善繼續蹭熱度

  1. requestIdleCallback相容性,如何自己實作一個react也是自己寫的排程邏輯,以後有機會寫個文章介紹React自己實作的requestIdleCallback
  2. 并發+慢啟動配合
  3. 抽樣hash+全量哈希+時間切片配合
  4. 大檔案切片下載下傳一樣的切片邏輯,通過axios.head請求擷取content-Length使用http的Range這個header就可以切片下載下傳了,其他邏輯和上傳差不多
  5. 小的體驗優化比如離開頁面的提醒 等等小tips
  6. 慢啟動的變化應該更平滑,比如使用三角函數,把變化率平滑的限制在0.5~1.5之間
  7. websocket推送進度

思考和總結

  1. 任何看似簡單的需求,量級提升後,都變得很複雜,人生也是這樣
  2. 檔案上傳簡單,大檔案就複雜,單機簡單,分布式難
  3. 就連一個簡單的leftPad函數(左邊補齊字元),考慮到性能,二分法性能都秒殺數組join ,引起大讨論論left-pad函數的實作任何一個知識點 都值得深挖
  4. 産品經理下次再說啥需求簡單,就kan他
  5. 我準備結合上一篇,錄一個大檔案上傳的手摸手剖析視訊 敬請期待

代碼

https://github.com/shengxinjing/upload

# 1、什麼是秒傳

通俗的說,你把要上傳的東西上傳,伺服器會先做MD5校驗,如果伺服器上有一樣的東西,它就直接給你個新位址,其實你下載下傳的都是伺服器上的同一個檔案,想要不秒傳,其實隻要讓MD5改變,就是對檔案本身做一下修改(改名字不行),例如一個文本檔案,你多加幾個字,MD5就變了,就不會秒傳了.

# 2、本文實作的秒傳核心邏輯

a、利用redis的set方法存放檔案上傳狀态,其中key為檔案上傳的md5,value為是否上傳完成的标志位,

b、當标志位true為上傳已經完成,此時如果有相同檔案上傳,則進入秒傳邏輯。如果标志位為false,則說明還沒上傳完成,此時需要在調用set的方法,儲存塊号檔案記錄的路徑,其中key為上傳檔案md5加一個固定字首,value為塊号檔案記錄路徑

# 分片上傳

# 1、什麼是分片上傳

分片上傳,就是将所要上傳的檔案,按照一定的大小,将整個檔案分隔成多個資料塊(我們稱之為Part)來進行分别上傳,上傳完之後再由服務端對所有上傳的檔案進行彙總整合成原始的檔案。

# 2、分片上傳的場景

1.大檔案上傳

2.網絡環境環境不好,存在需要重傳風險的場景

# 斷點續傳

# 1、什麼是斷點續傳

斷點續傳是在下載下傳或上傳時,将下載下傳或上傳任務(一個檔案或一個壓縮包)人為的劃分為幾個部分,每一個部分采用一個線程進行上傳或下載下傳,如果碰到網絡故障,可以從已經上傳或下載下傳的部分開始繼續上傳或者下載下傳未完成的部分,而沒有必要從頭開始上傳或者下載下傳。本文的斷點續傳主要是針對斷點上傳場景。

# 2、應用場景

斷點續傳可以看成是分片上傳的一個衍生,是以可以使用分片上傳的場景,都可以使用斷點續傳。

# 3、實作斷點續傳的核心邏輯

在分片上傳的過程中,如果因為系統崩潰或者網絡中斷等異常因素導緻上傳中斷,這時候用戶端需要記錄上傳的進度。在之後支援再次上傳時,可以繼續從上次上傳中斷的地方進行繼續上傳。

為了避免用戶端在上傳之後的進度資料被删除而導緻重新開始從頭上傳的問題,服務端也可以提供相應的接口便于用戶端對已經上傳的分片資料進行查詢,進而使用戶端知道已經上傳的分片資料,進而從下一個分片資料開始繼續上傳。

# 4、實作流程步驟

a、方案一,正常步驟

  • 将需要上傳的檔案按照一定的分割規則,分割成相同大小的資料塊;
  • 初始化一個分片上傳任務,傳回本次分片上傳唯一辨別;
  • 按照一定的政策(串行或并行)發送各個分片資料塊;
  • 發送完成後,服務端根據判斷資料上傳是否完整,如果完整,則進行資料塊合成得到原始檔案。

b、方案二、本文實作的步驟

  • 前端(用戶端)需要根據固定大小對檔案進行分片,請求後端(服務端)時要帶上分片序号和大小
  • 服務端建立conf檔案用來記錄分塊位置,conf檔案長度為總分片數,每上傳一個分塊即向conf檔案中寫入一個127,那麼沒上傳的位置就是預設的0,已上傳的就是Byte.MAX_VALUE 127(這步是實作斷點續傳和秒傳的核心步驟)
  • 伺服器按照請求資料中給的分片序号和每片分塊大小(分片大小是固定且一樣的)算出開始位置,與讀取到的檔案片段資料,寫入檔案。

# 5、分片上傳/斷點上傳代碼實作

a、前端采用百度提供的webuploader的插件,進行分片。因本文主要介紹服務端代碼實作,webuploader如何進行分片,具體實作可以檢視如下連結:

http://fex.baidu.com/webuploader/getting-started.htmlopen in new window

b、後端用兩種方式實作檔案寫入,一種是用RandomAccessFile,如果對RandomAccessFile不熟悉的朋友,可以檢視如下連結:

https://blog.csdn.net/dimudan2015/article/details/81910690open in new window

另一種是使用MappedByteBuffer,對MappedByteBuffer不熟悉的朋友,可以檢視如下連結進行了解:

https://www.jianshu.com/p/f90866dcbffcopen in new window

# 後端進行寫入操作的核心代碼

# 1、RandomAccessFile實作方式

@UploadMode(mode = UploadModeEnum.RANDOM_ACCESS)  
@Slf4j  
public class RandomAccessUploadStrategy extends SliceUploadTemplate {  
  
  @Autowired  
  private FilePathUtil filePathUtil;  
  
  @Value("${upload.chunkSize}")  
  private long defaultChunkSize;  
  
  @Override  
  public boolean upload(FileUploadRequestDTO param) {  
    RandomAccessFile accessTmpFile = null;  
    try {  
      String uploadDirPath = filePathUtil.getPath(param);  
      File tmpFile = super.createTmpFile(param);  
      accessTmpFile = new RandomAccessFile(tmpFile, "rw");  
      //這個必須與前端設定的值一緻  
      long chunkSize = Objects.isNull(param.getChunkSize()) ? defaultChunkSize * 1024 * 1024  
          : param.getChunkSize();  
      long offset = chunkSize * param.getChunk();  
      //定位到該分片的偏移量  
      accessTmpFile.seek(offset);  
      //寫入該分片資料  
      accessTmpFile.write(param.getFile().getBytes());  
      boolean isOk = super.checkAndSetUploadProgress(param, uploadDirPath);  
      return isOk;  
    } catch (IOException e) {  
      log.error(e.getMessage(), e);  
    } finally {  
      FileUtil.close(accessTmpFile);  
    }  
   return false;  
  }  
  
}  
           

# 2、MappedByteBuffer實作方式

@UploadMode(mode = UploadModeEnum.MAPPED_BYTEBUFFER)  
@Slf4j  
public class MappedByteBufferUploadStrategy extends SliceUploadTemplate {  
  
  @Autowired  
  private FilePathUtil filePathUtil;  
  
  @Value("${upload.chunkSize}")  
  private long defaultChunkSize;  
  
  @Override  
  public boolean upload(FileUploadRequestDTO param) {  
  
    RandomAccessFile tempRaf = null;  
    FileChannel fileChannel = null;  
    MappedByteBuffer mappedByteBuffer = null;  
    try {  
      String uploadDirPath = filePathUtil.getPath(param);  
      File tmpFile = super.createTmpFile(param);  
      tempRaf = new RandomAccessFile(tmpFile, "rw");  
      fileChannel = tempRaf.getChannel();  
  
      long chunkSize = Objects.isNull(param.getChunkSize()) ? defaultChunkSize * 1024 * 1024  
          : param.getChunkSize();  
      //寫入該分片資料  
      long offset = chunkSize * param.getChunk();  
      byte[] fileData = param.getFile().getBytes();  
      mappedByteBuffer = fileChannel  
.map(FileChannel.MapMode.READ_WRITE, offset, fileData.length);  
      mappedByteBuffer.put(fileData);  
      boolean isOk = super.checkAndSetUploadProgress(param, uploadDirPath);  
      return isOk;  
  
    } catch (IOException e) {  
      log.error(e.getMessage(), e);  
    } finally {  
  
      FileUtil.freedMappedByteBuffer(mappedByteBuffer);  
      FileUtil.close(fileChannel);  
      FileUtil.close(tempRaf);  
  
    }  
  
    return false;  
  }  
  
}  
           

# 3、檔案操作核心模闆類代碼

@Slf4j  
public abstract class SliceUploadTemplate implements SliceUploadStrategy {  
  
  public abstract boolean upload(FileUploadRequestDTO param);  
  
  protected File createTmpFile(FileUploadRequestDTO param) {  
  
    FilePathUtil filePathUtil = SpringContextHolder.getBean(FilePathUtil.class);  
    param.setPath(FileUtil.withoutHeadAndTailDiagonal(param.getPath()));  
    String fileName = param.getFile().getOriginalFilename();  
    String uploadDirPath = filePathUtil.getPath(param);  
    String tempFileName = fileName + "_tmp";  
    File tmpDir = new File(uploadDirPath);  
    File tmpFile = new File(uploadDirPath, tempFileName);  
    if (!tmpDir.exists()) {  
      tmpDir.mkdirs();  
    }  
    return tmpFile;  
  }  
  
  @Override  
  public FileUploadDTO sliceUpload(FileUploadRequestDTO param) {  
  
    boolean isOk = this.upload(param);  
    if (isOk) {  
      File tmpFile = this.createTmpFile(param);  
      FileUploadDTO fileUploadDTO = this.saveAndFileUploadDTO(param.getFile().getOriginalFilename(), tmpFile);  
      return fileUploadDTO;  
    }  
    String md5 = FileMD5Util.getFileMD5(param.getFile());  
  
    Map<Integer, String> map = new HashMap<>();  
    map.put(param.getChunk(), md5);  
    return FileUploadDTO.builder().chunkMd5Info(map).build();  
  }  
  
  /**  
   * 檢查并修改檔案上傳進度  
   */  
  public boolean checkAndSetUploadProgress(FileUploadRequestDTO param, String uploadDirPath) {  
  
    String fileName = param.getFile().getOriginalFilename();  
    File confFile = new File(uploadDirPath, fileName + ".conf");  
    byte isComplete = 0;  
    RandomAccessFile accessConfFile = null;  
    try {  
      accessConfFile = new RandomAccessFile(confFile, "rw");  
      //把該分段标記為 true 表示完成  
      System.out.println("set part " + param.getChunk() + " complete");  
      //建立conf檔案檔案長度為總分片數,每上傳一個分塊即向conf檔案中寫入一個127,那麼沒上傳的位置就是預設0,已上傳的就是Byte.MAX_VALUE 127  
      accessConfFile.setLength(param.getChunks());  
      accessConfFile.seek(param.getChunk());  
      accessConfFile.write(Byte.MAX_VALUE);  
  
      //completeList 檢查是否全部完成,如果數組裡是否全部都是127(全部分片都成功上傳)  
      byte[] completeList = FileUtils.readFileToByteArray(confFile);  
      isComplete = Byte.MAX_VALUE;  
      for (int i = 0; i < completeList.length && isComplete == Byte.MAX_VALUE; i++) {  
        //與運算, 如果有部分沒有完成則 isComplete 不是 Byte.MAX_VALUE  
        isComplete = (byte) (isComplete & completeList[i]);  
        System.out.println("check part " + i + " complete?:" + completeList[i]);  
      }  
  
    } catch (IOException e) {  
      log.error(e.getMessage(), e);  
    } finally {  
      FileUtil.close(accessConfFile);  
    }  
 boolean isOk = setUploadProgress2Redis(param, uploadDirPath, fileName, confFile, isComplete);  
    return isOk;  
  }  
  
  /**  
   * 把上傳進度資訊存進redis  
   */  
  private boolean setUploadProgress2Redis(FileUploadRequestDTO param, String uploadDirPath,  
      String fileName, File confFile, byte isComplete) {  
  
    RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);  
    if (isComplete == Byte.MAX_VALUE) {  
      redisUtil.hset(FileConstant.FILE_UPLOAD_STATUS, param.getMd5(), "true");  
      redisUtil.del(FileConstant.FILE_MD5_KEY + param.getMd5());  
      confFile.delete();  
      return true;  
    } else {  
      if (!redisUtil.hHasKey(FileConstant.FILE_UPLOAD_STATUS, param.getMd5())) {  
        redisUtil.hset(FileConstant.FILE_UPLOAD_STATUS, param.getMd5(), "false");  
        redisUtil.set(FileConstant.FILE_MD5_KEY + param.getMd5(),  
            uploadDirPath + FileConstant.FILE_SEPARATORCHAR + fileName + ".conf");  
      }  
  
      return false;  
    }  
  }  
/**  
   * 儲存檔案操作  
   */  
  public FileUploadDTO saveAndFileUploadDTO(String fileName, File tmpFile) {  
  
    FileUploadDTO fileUploadDTO = null;  
  
    try {  
  
      fileUploadDTO = renameFile(tmpFile, fileName);  
      if (fileUploadDTO.isUploadComplete()) {  
        System.out  
            .println("upload complete !!" + fileUploadDTO.isUploadComplete() + " name=" + fileName);  
        //TODO 儲存檔案資訊到資料庫  
  
      }  
  
    } catch (Exception e) {  
      log.error(e.getMessage(), e);  
    } finally {  
  
    }  
    return fileUploadDTO;  
  }  
/**  
   * 檔案重命名  
   *  
   * @param toBeRenamed 将要修改名字的檔案  
   * @param toFileNewName 新的名字  
   */  
  private FileUploadDTO renameFile(File toBeRenamed, String toFileNewName) {  
    //檢查要重命名的檔案是否存在,是否是檔案  
    FileUploadDTO fileUploadDTO = new FileUploadDTO();  
    if (!toBeRenamed.exists() || toBeRenamed.isDirectory()) {  
      log.info("File does not exist: {}", toBeRenamed.getName());  
      fileUploadDTO.setUploadComplete(false);  
      return fileUploadDTO;  
    }  
    String ext = FileUtil.getExtension(toFileNewName);  
    String p = toBeRenamed.getParent();  
    String filePath = p + FileConstant.FILE_SEPARATORCHAR + toFileNewName;  
    File newFile = new File(filePath);  
    //修改檔案名  
    boolean uploadFlag = toBeRenamed.renameTo(newFile);  
  
    fileUploadDTO.setMtime(DateUtil.getCurrentTimeStamp());  
    fileUploadDTO.setUploadComplete(uploadFlag);  
    fileUploadDTO.setPath(filePath);  
    fileUploadDTO.setSize(newFile.length());  
    fileUploadDTO.setFileExt(ext);  
    fileUploadDTO.setFileId(toFileNewName);  
  
    return fileUploadDTO;  
  }  
}  
           

# 總結

在實作分片上傳的過程,需要前端和後端配合,比如前後端的上傳塊号的檔案大小,前後端必須得要一緻,否則上傳就會有問題。其次檔案相關操作正常都是要搭建一個檔案伺服器的,比如使用fastdfs、hdfs等。

本示例代碼在電腦組態為4核記憶體8G情況下,上傳24G大小的檔案,上傳時間需要30多分鐘,主要時間耗費在前端的md5值計算,後端寫入的速度還是比較快。

如果項目組覺得自建檔案伺服器太花費時間,且項目的需求僅僅隻是上傳下載下傳,那麼推薦使用阿裡的oss伺服器,其介紹可以檢視官網:

https://help.aliyun.com/product/31815.htmlopen in new window

阿裡的oss它本質是一個對象存儲伺服器,而非檔案伺服器,是以如果有涉及到大量删除或者修改檔案的需求,oss可能就不是一個好的選擇。

文末提供一個oss表單上傳的連結demo,通過oss表單上傳,可以直接從前端把檔案上傳到oss伺服器,把上傳的壓力都推給oss伺服器:

繼續閱讀