天天看点

Android多线程断点下载器

一直想找一个精简的Android多线程下载的框架用到项目中,找了许久还是没有找到一个功能完善比较精简的,最近闲暇之余抽时间自己写了一个自认为功能比较完善的下载器,自己动手风衣足食嘛。

一开始还不觉得,在写的过程中多线程之间的调度,和压力测试之下bug层出不穷….,后来一步一步的完善,经过测试之后,自认为总算是可以拿出来见人了,最后来记录一下新路历程和大家分享

首先介绍一下Downloader具备以下功能:

  • 多线程,自定义下载线程数
  • 支持断点下载
  • 完善的状态回调

设计思路

想要的效果是这样的:

调用简单,无第三方依赖,lamda函数式接口,支持中断/继续下载

下面来说说具体遇到的问题和解决方案吧,这两个是最重要的问题:

  1. 多线程分块下载,每个线程的下载状态,下载进度的统计
  2. 使用何种数据结构表示下载的状态,并且需要精准的同步到本地

解决方案是这样的,一个一个来说,欢迎大家吐槽~~~

-线程状态调度

既然是多线程首先想到的当然是ExecutorService线程池,这里就用了Executors.newCachedThreadPool(),有线程的地方就往里面扔吧。

上代码,来看看Downloader的入口:

//整个下载逻辑从这里开始,细节就要看其中的每一个功能函数的具体实现了
public Downloader open(final File file, final String url) {
        reset();// 先中断正在下载的线程,重置状态值
        if (null == url || "".equals(url) || null == file || file.isDirectory()) {
            dividerError("error, url or file is empty");
            return this;
        }
        //这里CancelableRunner是自定义的可以cancel的线程
        //在分配线程时是一个“监视线程”和多个“下载线程”
        //monitor是监视线程,来监视和检测下载线程的状态,进度
        monitor = new CancelableRunner() {
            @Override
            public void run() {
                //DownloadInfo是用来描述下载状态的数据结构
                //先通过下载的Path和Url来表示一个DownloadInfo,getDownloadInfo优先从本地读取记录,如果没有就新生成
                downloadInfo = getDownloadInfo(file, url);
                if (null == downloadInfo) {
                    dividerError("error, can't create downloadInfo, maybe the url response content length is -1");
                    return;
                }
                //这里开始根据“空白块”(未下载区域)分配下载线程了,具体的分配逻辑见后文
                allotWorker(downloadInfo.getSpaceBlocks());
                //监听线程在这里开始循环监听
                while (true) {
                    if (isCancelled()) {
                        break;
                    }
                    //合并每个下载线程的下载进度 && 检查下载线程的状态
                    if (mergeWorkerProgress() && checkWorkerState()) {
                        try {
                            Thread.sleep(monitorPeriod);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        reset();
                        break;
                    }
                }
            }
        }.submitIn(executor);//这里封装了一下,就是提交到线程池跑起来了
        return this;
    }
           

大概流程是这样的:

1.获取DownloadInfo(文件描述信息,包括数据块的状态)

2.根据”空白块”分配下载线程

3.监听线程开始监听下载线程的下载状态,合并每个下载线程的下载进度

//根据未下载区域“块”分配下载线程
private void allotWorker(List<Block> spaceBlocks) {
        if (null == downloadInfo || null == spaceBlocks || spaceBlocks.isEmpty()) {
            return;
        }
        //这里把原始的块 重新根据 最大线程数 划分更合理的块区域来分配给下载线程
        spaceBlocks = reSplitBlock(spaceBlocks);
        final int sizeOfBlock = spaceBlocks.size();
        final int workerSize = Math.min(maxThreadSize, sizeOfBlock);
        for (int i = ; i < workerSize; i++) {
            Block block = spaceBlocks.get(i);
            //DownloadWorker就是具体的下载线程
            DownloadWorker worker = new DownloadWorker(downloadInfo.url, downloadInfo.file, block);
            //跑起来
            worker.submitIn(executor);
            workers.add(worker);
        }
    }

    //根据最大线程数重新划分块大小
    private List<Block> reSplitBlock(List<Block> spaceBlocks) {
        //如果总文件大小不足1M就不用分配了....
        if (downloadInfo.contentLength < ) {// > 1M
            return spaceBlocks;
        }
        List<Block> tmp = new ArrayList<>();
        final long maxWorkerLength = downloadInfo.contentLength / maxThreadSize;
        for (Block block : spaceBlocks) {
            //继续走,splitBlock具体开始划分
            List<Block> subBlocks = splitBlock(block, maxWorkerLength);
            if (null == subBlocks) {
                tmp.add(block);
            } else {
                tmp.addAll(subBlocks);
            }
        }
        return tmp;
    }

    //根据 子块最大长度 划分出多个块
    private static List<Block> splitBlock(Block rawBlock, long subBlockMaxLength) {
        if (null == rawBlock || rawBlock.getLength() <= subBlockMaxLength) {
            return null;
        }
        int size = ;
        long length;
        while ((length = rawBlock.getLength() / size) > subBlockMaxLength) {
            size++;
        }
        //Block是具体表示“块”的类[begin-end]
        List<Block> subBlocks = new ArrayList<>(size);
        for (int i = ; i < size; i++) {
            long begin = rawBlock.begin + i * length;
            subBlocks.add(new Block((i ==  ? begin : begin + ), Math.min(begin + length, rawBlock.end)));
        }
        return subBlocks;
    }
           

DownloadWorker下载线程,具体就是发起网络连接获取指定区域的远程数据,这里应该就不用详述了,百度google就出来,就贴一下关键代码

@Override
    public void run() {
        state = State.DOWNLOADING;//状态
        RandomAccessFile accessFile = null;
        HttpURLConnection conn = null;
        InputStream inStream = null;
        try {
            accessFile = new RandomAccessFile(file, "rwd");
            accessFile.seek(block.begin);
            conn = (HttpURLConnection) new URL(url).openConnection();
            conn.setConnectTimeout(TIME_OUT);
            conn.setReadTimeout(TIME_OUT);
            //这里分重要,默认使用Gzip是有时获取的ContentLength会为-1,所以加上这个identity,不使用Gzip
            conn.setRequestProperty("Accept-Encoding", "identity");
            //都懂的Range
            conn.setRequestProperty("Range", "bytes=" + block.begin + "-" + block.end);
            conn.connect();
            inStream = conn.getInputStream();
            byte[] buf = new byte[];
            int len;
            while ((len = inStream.read(buf)) != -) {
                if (isCancelled()) {
                    break;
                }
                accessFile.write(buf, , len);
                downloadedLength += len;
            }
            if (downloadedLength >= block.getLength() - ) {
                state = State.COMPLETE;
            } else {
                state = State.UN_COMPLETE;
            }
        } catch (Exception e) {
            state = State.ERROR;
        } finally {
            Common.close(inStream);
            Common.close(accessFile);
            Common.close(conn);
        }
    }
           

以上多线程的管理其实就差不多了,我这里的处理方法就是分为“监视线程”和“下载线程”,后者只关心下载,不处理统计进度收集状态,而“监视线程”就专门负责监视下载线程下载进度和状态。

另外一个想法是是用一个独立的线程专心来管理各个下载线程的下载进度可以屏蔽一些线程之间的数据同步问题。其实个人感觉不好的地方可能就是多开一个线程多消耗了一下系统资源,应该还有更好的方案,欢迎大家支招!~

-DownloadInfo文件描述数据结构

既然是多线程并且可以中断,必然下载的区域就存在“碎片”问题,比如:{ [0-10] [20-82] [122-102932932] },

这个方案多谢了同事的指点,有点类似迅雷,方案是这样的:保证一个”有序的“”无重复的“块描述信息

就像上面举例一样,那么问题来了,如何保证有序和无重复….

每个线程每次获取到新字节并写到文件中时,就将新下载区域用实体类Block来表示起点begin和终点end,并merge到一个列表中,这个列表是目前所有已下载的Block(一定时有序的)。所以问题转换成了,如何将Block合并(merge)到有序的集合中。

上代码,截取一些关键函数

final class DownloadInfo implements Serializable {
    ...
    private List<Block> downloadedBlocks;// 首先是 Block集合,每次合并保证有序
    ...

    //合并Block,并保证合并之后集合有序
    //具体逻辑是:新区域 链接 已有区域,
    //如果区域重复返回false合并失败,反之成功
    boolean merge(Block block) {
        if (block == null || block.end < block.begin) {
            return false;
        }
        boolean result = false;
        final int blockSize = downloadedBlocks.size();
        if (blockSize == ) {
            downloadedBlocks.add(block);
            result = true;
        } else {
            Block left, right;
            int removeIndex = -;
            for (int insertIndex = blockSize; insertIndex >= ; insertIndex--) {
                left = getBlock(insertIndex - );
                right = getBlock(insertIndex);

                if (left != null && right == null) {// 最右边
                    if (left.end +  > block.begin) {
                        result = false;// 错误,左侧区域重合 [1,3]&<3,5>
                    } else if (left.end +  == block.begin) {
                        left.end = block.end;
                        result = true;// 合并左侧 [1,2]&<3,5>
                        break;
                    } else {
                        downloadedBlocks.add(block);
                        result = true;// 正常添加至最右
                        break;
                    }
                } else if (left == null && right != null) {// 最左边
                    if (right.begin -  < block.end) {
                        result = false;// 错误,右侧区域重合 <3,5>&[5,8]
                    } else if (right.begin -  == block.end) {
                        right.begin = block.begin;
                        result = true;// 合并右侧 <3,5>&[6,8]
                        break;
                    } else {
                        downloadedBlocks.add(insertIndex, block);// insertIndex=0
                        result = true;// 正常添加至最左
                        break;
                    }
                } else if (left != null && right != null) {// 中间
                    if (left.end +  > block.begin || right.begin -  < block.end) {
                        result = false;// 错误,左右边界重合 [1,3]&<3,5>&[4,8]
                    } else if (left.end +  == block.begin && right.begin -  == block.end) {
                        left.end = right.end; // 合并左右两侧 [1,2]&<3,5>&[6,8]
                        removeIndex = insertIndex;
                        result = true;
                        break;
                    } else if (left.end +  == block.begin) {
                        left.end = block.end;// 合并左侧 [1,2]&<3,5>
                        result = true;
                        break;
                    } else if (right.begin -  == block.end) {
                        right.begin = block.begin;// 合并右侧 <3,5>&[6,8]
                        result = true;
                        break;
                    } else {
                        downloadedBlocks.add(insertIndex, block);// 添加至中间 [0,1]&<3,5>&[7,8]
                        result = true;
                        break;
                    }
                }
            }
            if (removeIndex >= ) {
                downloadedBlocks.remove(removeIndex);
            }
        }
        return result;
    }

    //这是上文在分配线程是用到的 获取所有未空白区域
    List<Block> getSpaceBlocks() {
        if (isCompleted()) {
            return null;
        }
        List<Block> spaceBlocks = new ArrayList<>();
        if (downloadedBlocks.isEmpty()) {
            spaceBlocks.add(new Block(, contentLength));
        } else {
            long begin, end;
            Block lastBlock = null;
            for (Block block : downloadedBlocks) {
                begin = lastBlock == null ?  : lastBlock.end + ;
                end = block.begin - ;
                if (begin < end) {
                    spaceBlocks.add(new Block(begin, end));
                }

                lastBlock = block;
            }
            Block endBlock = downloadedBlocks.get(downloadedBlocks.size() - );
            if (endBlock.end < contentLength) {
                spaceBlocks.add(new Block(endBlock.end + , contentLength));
            }
        }
        return spaceBlocks;
    }
}
           

在整个Downloader下载器的逻辑中DownLoadInfo描述了完整文件下载信息,所有无论是中断还是继续下载,只需要将下载信息Path+Url与DownloadInfo绑定起来,每次对应到相同的DownloadInfo就行了,所以所以在真正的下载路径中多了一个*.prop文件,比如自定义的是aaa.apk,那么在下载过程中会生成aaa.apk.prop文件,prop文件就是DownloadInfo序列化到文件中的产物了。

---------

到这里比较重要的两个要素就大概描述完了,在实现了Downloader主要逻辑之后,另外还添加了一些我个人认为比较方便的事件回调,可以支持lamda表达式的。

附上调用方式是这样的:

...
final String url = "https://qd.myapp.com/myapp/qqteam/AndroidQQ/mobileqq_android.apk";
        final File file = new File(Environment.getExternalStorageDirectory(), "QQMobile.apk");
        new Downloader()
                .setMaxThreadSize()//设置最大线程数是3
                .onComplete(arg0 -> logout(arg0 ? "success" : "error"))//下载完成回调arg0(Boolean)表示成功或失败
                .onProcess((arg0, arg1) -> logout("percent: " + (arg0 *  / arg1) + "% [" + BaseUtils.longSizeToStr(arg0) + "/" + BaseUtils.longSizeToStr(arg1) + "]"))//下载进度回调,arg0/arg1(当前下载进度/文件长度)
                .open(file, url);//下载入口,自动判断是否是继续下载
...
           

最后附上完成的Downlaoder代码,为了使用方便,这里把所有的类就整理到了一个java文件中,不关心内部逻辑的同学要使用就考一个文件就行了。

貌似不能上传文件,附上我个人平时开源的框架:

https://github.com/dnwang/android_agility_framework

类在这里:org.pinwheel.agility.tools.Downloader

不想使用整个框架的同学可以手动copy出来。