天天看點

Android多線程斷點下載下傳器

一直想找一個精簡的Android多線程下載下傳的架構用到項目中,找了許久還是沒有找到一個功能完善比較精簡的,最近閑暇之餘抽時間自己寫了一個自認為功能比較完善的下載下傳器,自己動手風衣足食嘛。

一開始還不覺得,在寫的過程中多線程之間的排程,和壓力測試之下bug層出不窮….,後來一步一步的完善,經過測試之後,自認為總算是可以拿出來見人了,最後來記錄一下新路曆程和大家分享

首先介紹一下Downloader具備以下功能:

  • 多線程,自定義下載下傳線程數
  • 支援斷點下載下傳
  • 完善的狀态回調

設計思路

想要的效果是這樣的:

調用簡單,無第三方依賴,lamda函數式接口,支援中斷/繼續下載下傳

下面來說說具體遇到的問題和解決方案吧,這兩個是最重要的問題:

  1. 多線程分塊下載下傳,每個線程的下載下傳狀态,下載下傳進度的統計
  2. 使用何種資料結構表示下載下傳的狀态,并且需要精準的同步到本地

解決方案是這樣的,一個一個來說,歡迎大家吐槽~~~

-線程狀态排程

既然是多線程首先想到的當然是ExecutorService線程池,這裡就用了Executors.newCachedThreadPool(),有線程的地方就往裡面扔吧。

上代碼,來看看Downloader的入口:

//整個下載下傳邏輯從這裡開始,細節就要看其中的每一個功能函數的具體實作了
public Downloader open(final File file, final String url) {
        reset();// 先中斷正在下載下傳的線程,重置狀态值
        if (null == url || "".equals(url) || null == file || file.isDirectory()) {
            dividerError("error, url or file is empty");
            return this;
        }
        //這裡CancelableRunner是自定義的可以cancel的線程
        //在配置設定線程時是一個“監視線程”和多個“下載下傳線程”
        //monitor是監視線程,來監視和檢測下載下傳線程的狀态,進度
        monitor = new CancelableRunner() {
            @Override
            public void run() {
                //DownloadInfo是用來描述下載下傳狀态的資料結構
                //先通過下載下傳的Path和Url來表示一個DownloadInfo,getDownloadInfo優先從本地讀取記錄,如果沒有就新生成
                downloadInfo = getDownloadInfo(file, url);
                if (null == downloadInfo) {
                    dividerError("error, can't create downloadInfo, maybe the url response content length is -1");
                    return;
                }
                //這裡開始根據“空白塊”(未下載下傳區域)配置設定下載下傳線程了,具體的配置設定邏輯見後文
                allotWorker(downloadInfo.getSpaceBlocks());
                //監聽線程在這裡開始循環監聽
                while (true) {
                    if (isCancelled()) {
                        break;
                    }
                    //合并每個下載下傳線程的下載下傳進度 && 檢查下載下傳線程的狀态
                    if (mergeWorkerProgress() && checkWorkerState()) {
                        try {
                            Thread.sleep(monitorPeriod);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        reset();
                        break;
                    }
                }
            }
        }.submitIn(executor);//這裡封裝了一下,就是送出到線程池跑起來了
        return this;
    }
           

大概流程是這樣的:

1.擷取DownloadInfo(檔案描述資訊,包括資料塊的狀态)

2.根據”空白塊”配置設定下載下傳線程

3.監聽線程開始監聽下載下傳線程的下載下傳狀态,合并每個下載下傳線程的下載下傳進度

//根據未下載下傳區域“塊”配置設定下載下傳線程
private void allotWorker(List<Block> spaceBlocks) {
        if (null == downloadInfo || null == spaceBlocks || spaceBlocks.isEmpty()) {
            return;
        }
        //這裡把原始的塊 重新根據 最大線程數 劃分更合理的塊區域來配置設定給下載下傳線程
        spaceBlocks = reSplitBlock(spaceBlocks);
        final int sizeOfBlock = spaceBlocks.size();
        final int workerSize = Math.min(maxThreadSize, sizeOfBlock);
        for (int i = ; i < workerSize; i++) {
            Block block = spaceBlocks.get(i);
            //DownloadWorker就是具體的下載下傳線程
            DownloadWorker worker = new DownloadWorker(downloadInfo.url, downloadInfo.file, block);
            //跑起來
            worker.submitIn(executor);
            workers.add(worker);
        }
    }

    //根據最大線程數重新劃分塊大小
    private List<Block> reSplitBlock(List<Block> spaceBlocks) {
        //如果總檔案大小不足1M就不用配置設定了....
        if (downloadInfo.contentLength < ) {// > 1M
            return spaceBlocks;
        }
        List<Block> tmp = new ArrayList<>();
        final long maxWorkerLength = downloadInfo.contentLength / maxThreadSize;
        for (Block block : spaceBlocks) {
            //繼續走,splitBlock具體開始劃分
            List<Block> subBlocks = splitBlock(block, maxWorkerLength);
            if (null == subBlocks) {
                tmp.add(block);
            } else {
                tmp.addAll(subBlocks);
            }
        }
        return tmp;
    }

    //根據 子塊最大長度 劃分出多個塊
    private static List<Block> splitBlock(Block rawBlock, long subBlockMaxLength) {
        if (null == rawBlock || rawBlock.getLength() <= subBlockMaxLength) {
            return null;
        }
        int size = ;
        long length;
        while ((length = rawBlock.getLength() / size) > subBlockMaxLength) {
            size++;
        }
        //Block是具體表示“塊”的類[begin-end]
        List<Block> subBlocks = new ArrayList<>(size);
        for (int i = ; i < size; i++) {
            long begin = rawBlock.begin + i * length;
            subBlocks.add(new Block((i ==  ? begin : begin + ), Math.min(begin + length, rawBlock.end)));
        }
        return subBlocks;
    }
           

DownloadWorker下載下傳線程,具體就是發起網絡連接配接擷取指定區域的遠端資料,這裡應該就不用詳述了,百度google就出來,就貼一下關鍵代碼

@Override
    public void run() {
        state = State.DOWNLOADING;//狀态
        RandomAccessFile accessFile = null;
        HttpURLConnection conn = null;
        InputStream inStream = null;
        try {
            accessFile = new RandomAccessFile(file, "rwd");
            accessFile.seek(block.begin);
            conn = (HttpURLConnection) new URL(url).openConnection();
            conn.setConnectTimeout(TIME_OUT);
            conn.setReadTimeout(TIME_OUT);
            //這裡分重要,預設使用Gzip是有時擷取的ContentLength會為-1,是以加上這個identity,不使用Gzip
            conn.setRequestProperty("Accept-Encoding", "identity");
            //都懂的Range
            conn.setRequestProperty("Range", "bytes=" + block.begin + "-" + block.end);
            conn.connect();
            inStream = conn.getInputStream();
            byte[] buf = new byte[];
            int len;
            while ((len = inStream.read(buf)) != -) {
                if (isCancelled()) {
                    break;
                }
                accessFile.write(buf, , len);
                downloadedLength += len;
            }
            if (downloadedLength >= block.getLength() - ) {
                state = State.COMPLETE;
            } else {
                state = State.UN_COMPLETE;
            }
        } catch (Exception e) {
            state = State.ERROR;
        } finally {
            Common.close(inStream);
            Common.close(accessFile);
            Common.close(conn);
        }
    }
           

以上多線程的管理其實就差不多了,我這裡的處理方法就是分為“監視線程”和“下載下傳線程”,後者隻關心下載下傳,不處理統計進度收集狀态,而“監視線程”就專門負責監視下載下傳線程下載下傳進度和狀态。

另外一個想法是是用一個獨立的線程專心來管理各個下載下傳線程的下載下傳進度可以屏蔽一些線程之間的資料同步問題。其實個人感覺不好的地方可能就是多開一個線程多消耗了一下系統資源,應該還有更好的方案,歡迎大家支招!~

-DownloadInfo檔案描述資料結構

既然是多線程并且可以中斷,必然下載下傳的區域就存在“碎片”問題,比如:{ [0-10] [20-82] [122-102932932] },

這個方案多謝了同僚的指點,有點類似迅雷,方案是這樣的:保證一個”有序的“”無重複的“塊描述資訊

就像上面舉例一樣,那麼問題來了,如何保證有序和無重複….

每個線程每次擷取到新位元組并寫到檔案中時,就将新下載下傳區域用實體類Block來表示起點begin和終點end,并merge到一個清單中,這個清單是目前所有已下載下傳的Block(一定時有序的)。是以問題轉換成了,如何将Block合并(merge)到有序的集合中。

上代碼,截取一些關鍵函數

final class DownloadInfo implements Serializable {
    ...
    private List<Block> downloadedBlocks;// 首先是 Block集合,每次合并保證有序
    ...

    //合并Block,并保證合并之後集合有序
    //具體邏輯是:新區域 連結 已有區域,
    //如果區域重複傳回false合并失敗,反之成功
    boolean merge(Block block) {
        if (block == null || block.end < block.begin) {
            return false;
        }
        boolean result = false;
        final int blockSize = downloadedBlocks.size();
        if (blockSize == ) {
            downloadedBlocks.add(block);
            result = true;
        } else {
            Block left, right;
            int removeIndex = -;
            for (int insertIndex = blockSize; insertIndex >= ; insertIndex--) {
                left = getBlock(insertIndex - );
                right = getBlock(insertIndex);

                if (left != null && right == null) {// 最右邊
                    if (left.end +  > block.begin) {
                        result = false;// 錯誤,左側區域重合 [1,3]&<3,5>
                    } else if (left.end +  == block.begin) {
                        left.end = block.end;
                        result = true;// 合并左側 [1,2]&<3,5>
                        break;
                    } else {
                        downloadedBlocks.add(block);
                        result = true;// 正常添加至最右
                        break;
                    }
                } else if (left == null && right != null) {// 最左邊
                    if (right.begin -  < block.end) {
                        result = false;// 錯誤,右側區域重合 <3,5>&[5,8]
                    } else if (right.begin -  == block.end) {
                        right.begin = block.begin;
                        result = true;// 合并右側 <3,5>&[6,8]
                        break;
                    } else {
                        downloadedBlocks.add(insertIndex, block);// insertIndex=0
                        result = true;// 正常添加至最左
                        break;
                    }
                } else if (left != null && right != null) {// 中間
                    if (left.end +  > block.begin || right.begin -  < block.end) {
                        result = false;// 錯誤,左右邊界重合 [1,3]&<3,5>&[4,8]
                    } else if (left.end +  == block.begin && right.begin -  == block.end) {
                        left.end = right.end; // 合并左右兩側 [1,2]&<3,5>&[6,8]
                        removeIndex = insertIndex;
                        result = true;
                        break;
                    } else if (left.end +  == block.begin) {
                        left.end = block.end;// 合并左側 [1,2]&<3,5>
                        result = true;
                        break;
                    } else if (right.begin -  == block.end) {
                        right.begin = block.begin;// 合并右側 <3,5>&[6,8]
                        result = true;
                        break;
                    } else {
                        downloadedBlocks.add(insertIndex, block);// 添加至中間 [0,1]&<3,5>&[7,8]
                        result = true;
                        break;
                    }
                }
            }
            if (removeIndex >= ) {
                downloadedBlocks.remove(removeIndex);
            }
        }
        return result;
    }

    //這是上文在配置設定線程是用到的 擷取所有未空白區域
    List<Block> getSpaceBlocks() {
        if (isCompleted()) {
            return null;
        }
        List<Block> spaceBlocks = new ArrayList<>();
        if (downloadedBlocks.isEmpty()) {
            spaceBlocks.add(new Block(, contentLength));
        } else {
            long begin, end;
            Block lastBlock = null;
            for (Block block : downloadedBlocks) {
                begin = lastBlock == null ?  : lastBlock.end + ;
                end = block.begin - ;
                if (begin < end) {
                    spaceBlocks.add(new Block(begin, end));
                }

                lastBlock = block;
            }
            Block endBlock = downloadedBlocks.get(downloadedBlocks.size() - );
            if (endBlock.end < contentLength) {
                spaceBlocks.add(new Block(endBlock.end + , contentLength));
            }
        }
        return spaceBlocks;
    }
}
           

在整個Downloader下載下傳器的邏輯中DownLoadInfo描述了完整檔案下載下傳資訊,所有無論是中斷還是繼續下載下傳,隻需要将下載下傳資訊Path+Url與DownloadInfo綁定起來,每次對應到相同的DownloadInfo就行了,是以是以在真正的下載下傳路徑中多了一個*.prop檔案,比如自定義的是aaa.apk,那麼在下載下傳過程中會生成aaa.apk.prop檔案,prop檔案就是DownloadInfo序列化到檔案中的産物了。

---------

到這裡比較重要的兩個要素就大概描述完了,在實作了Downloader主要邏輯之後,另外還添加了一些我個人認為比較友善的事件回調,可以支援lamda表達式的。

附上調用方式是這樣的:

...
final String url = "https://qd.myapp.com/myapp/qqteam/AndroidQQ/mobileqq_android.apk";
        final File file = new File(Environment.getExternalStorageDirectory(), "QQMobile.apk");
        new Downloader()
                .setMaxThreadSize()//設定最大線程數是3
                .onComplete(arg0 -> logout(arg0 ? "success" : "error"))//下載下傳完成回調arg0(Boolean)表示成功或失敗
                .onProcess((arg0, arg1) -> logout("percent: " + (arg0 *  / arg1) + "% [" + BaseUtils.longSizeToStr(arg0) + "/" + BaseUtils.longSizeToStr(arg1) + "]"))//下載下傳進度回調,arg0/arg1(目前下載下傳進度/檔案長度)
                .open(file, url);//下載下傳入口,自動判斷是否是繼續下載下傳
...
           

最後附上完成的Downlaoder代碼,為了使用友善,這裡把所有的類就整理到了一個java檔案中,不關心内部邏輯的同學要使用就考一個檔案就行了。

貌似不能上傳檔案,附上我個人平時開源的架構:

https://github.com/dnwang/android_agility_framework

類在這裡:org.pinwheel.agility.tools.Downloader

不想使用整個架構的同學可以手動copy出來。