天天看點

多線程下載下傳資源後分片上傳至oss

使用多線程下載下傳資源後,分片上傳至阿裡oss(主流對象存儲都支援分片上傳)。實作邊下載下傳邊上傳
           

注意:下載下傳的資源url必須支援部分資源傳回。http狀态碼:206

下載下傳執行類DownloadTask

該類實作callable接口,負責下載下傳資源的某一部分

使用CloseableHttpClient進行下載下傳

package com.tinet.clink.chat.web.service.rtc;


import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Callable;


/**
 * @author dengsx
 * @create 2021-06-11
 **/
@Slf4j
class DownloadTask implements Callable<InputStream> {
    private final String url;
    private final long lowerBound; // 下載下傳的檔案區間
    private final long upperBound;
    private final CloseableHttpClient client;

    DownloadTask(String url, long lowerBound, long upperBound, CloseableHttpClient client) {
        this.url = url;
        this.lowerBound = lowerBound;
        this.upperBound = upperBound;
        this.client = client;
    }

    @Override
    public InputStream call() throws Exception {
        try (InputStream input = connect()) {
            log.info("線程{}下載下傳完成,下載下傳區間:{}-{},下載下傳位元組:{}", Thread.currentThread().getName(),
                    this.lowerBound, this.upperBound, input.available());
            return input;
        } catch (IOException e) {
            throw new IOException(e.getMessage());
        }
    }

    /**
     * 連接配接WEB伺服器,并傳回一個資料流
     *
     * @return 傳回輸入流
     * @throws IOException 網絡連接配接錯誤
     */
    private InputStream connect() throws IOException {
        HttpGet get = new HttpGet(url);
        get.setHeader("Range", "bytes=" + lowerBound + "-" + upperBound);
        try (CloseableHttpResponse response = client.execute(get)) {
            log.info("線程{}連接配接成功開始下載下傳", Thread.currentThread().getName());
            int statusCode = response.getStatusLine().getStatusCode();
            if (HttpStatus.SC_PARTIAL_CONTENT != statusCode) {
                throw new IOException("下載下傳連接配接不支援多線程下載下傳:" + statusCode);
            }

            return new ByteArrayInputStream(EntityUtils.toByteArray(response.getEntity()));
        } catch (IOException e) {
            throw new IOException("下出錯:" + e.getMessage());
        }
    }
}
           

上傳任務類UploadTask

該類負責将下載下傳的資源上傳至oss,使用阿裡oss的分片上傳

package com.tinet.clink.chat.web.service.rtc;

import com.tinet.clink.common.model.PartETagAdapter;
import com.tinet.clink.common.objectstorage.ObjectStorageClient;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Callable;


/**
 * @author dengsx
 * @create 2021-06-11
 **/
@Slf4j
public class UploadTask implements Callable<PartETagAdapter> {
    private final InputStream inputStream;
    private final ObjectStorageClient objectStorageClient;
    private final int order;
    private final long partSize;
    private final String uploadId;
    private final String fileBucket;
    private final String key;

    public UploadTask(InputStream inputStream, ObjectStorageClient objectStorageService, String fileBucket, String key, String uploadId, int order, long partSize) {
        this.inputStream = inputStream;
        this.objectStorageClient = objectStorageService;
        this.order = order;
        this.uploadId = uploadId;
        this.key = key;
        this.fileBucket = fileBucket;
        this.partSize = partSize;
    }

    @Override
    public PartETagAdapter call() throws IOException {
        log.info("開始上傳分片,分片order:{},分片大小:{}", order, partSize);
        return objectStorageClient.uploadPartObject(fileBucket, key, uploadId, inputStream, order, partSize);
    }
}

           

下載下傳器類Downloader

該類負責,配置設定任務給線程下載下傳,上傳,合并結果

package com.tinet.clink.chat.web.service.rtc;

import com.tinet.clink.common.model.CompleteMultipartUploadResultAdapter;
import com.tinet.clink.common.model.PartETagAdapter;
import com.tinet.clink.common.objectstorage.ObjectStorageClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 從騰訊雲下載下傳視訊上傳至oss
 * 多線程下載下傳
 * 邊下載下傳邊上傳
 *
 * @author dengsx
 * @create 2021-06-11
 **/
@Component
@Slf4j
public class Downloader implements InitializingBean {


    private CloseableHttpClient httpClient;

    private static final int DEFAULT_THREAD_COUNT;

    /**
     * 阻塞系數 大緻 0.5
     */
    private static final double BLOCK_FACTOR = 0.5;

    static {
        // io密集型任務,線程數設定為cpu核數的兩倍
        Double threadCount = Runtime.getRuntime().availableProcessors() / (1 - BLOCK_FACTOR);
        DEFAULT_THREAD_COUNT = threadCount.intValue();
    }

    @Autowired
    @Qualifier("downloaderTaskExecutor")
    private ThreadPoolTaskExecutor downloaderTaskExecutor;

    /**
     * 會話檔案上傳的桶
     */
    @Value("${object.storage.bucket.chat}")
    private String fileBucket;

    @Autowired
    private ObjectStorageClient objectStorageClient;

    public CompleteMultipartUploadResultAdapter start(String url, String fileKey) throws Exception {
        return this.start(url, DEFAULT_THREAD_COUNT, fileKey);
    }

    public CompleteMultipartUploadResultAdapter start(String url, Integer threadCount, String fileKey) throws Exception {

        if (threadCount == null || threadCount < 0) {
            throw new IllegalArgumentException("invalid threadCount number!");
        }

        threadCount = Math.min(DEFAULT_THREAD_COUNT, threadCount);

        long fileSize = getFileSize(url);

        log.info("開始下載下傳,檔案大小(kb):{},fileSize(Mb):{},下載下傳線程數:{}", fileSize, fileSize / 1024.0 / 1024, threadCount);

        //記錄時間
        long beginTime = System.currentTimeMillis();
        // 配置設定線程下載下傳
        Map<Integer, Future<InputStream>> downloadFuture = dispatcher(url, fileSize, threadCount);

        //分片上傳id
        String uploadId = objectStorageClient.getMultipartUploadId(fileBucket, fileKey);

        // 處理下載下傳結果
        List<Future<PartETagAdapter>> uploadFuture = handleDownloadFuture(downloadFuture, uploadId, fileKey, threadCount, fileSize);

        //處理上傳結果
        List<PartETagAdapter> uploadResult = handleUploadFuture(uploadFuture);

        //分片上傳合并
        CompleteMultipartUploadResultAdapter result = objectStorageClient.completePartlyUpload(fileBucket, fileKey, uploadId, uploadResult);

        log.info("下載下傳成功并上傳oss成功:本次用時:{}秒,上傳結果{},", (System.currentTimeMillis() - beginTime) / 1000, result.getKey());
        return result;

    }

    /**
     * 配置設定器,決定每個線程下載下傳哪個區間的資料
     */
    private Map<Integer, Future<InputStream>> dispatcher(String url, long fileSize, int threadCount) {
        long blockSize = fileSize / threadCount; // 每個線程要下載下傳的資料量
        long lowerBound;
        long upperBound;
        Map<Integer, Future<InputStream>> result = new HashMap<>();
        for (int i = 0; i < threadCount; i++) {

            lowerBound = i * blockSize;
            // 最後一個分片的上限是檔案大小
            upperBound = (i == threadCount - 1) ? fileSize - 1 : lowerBound + blockSize;
            // 異步送出下載下傳任務
            Future<InputStream> future = downloaderTaskExecutor.submit(new DownloadTask(url, lowerBound, upperBound, httpClient));
            result.put(i + 1, future);
        }
        return result;
    }


    /**
     * @return 要下載下傳的檔案的尺寸
     */
    private long getFileSize(String url) throws IOException {
        HttpHead head = new HttpHead(url);
        String lengthValue = httpClient.execute(head).getHeaders("Content-Length")[0].getValue();
        return Long.parseLong(lengthValue);

    }

    private List<Future<PartETagAdapter>> handleDownloadFuture(Map<Integer, Future<InputStream>> downloadFuture,
                                                               String uploadId, String key,
                                                               int threadCount, long fileSize) throws Exception {
        long blockSize = fileSize / threadCount;
        List<Future<PartETagAdapter>> uploadFuture = new ArrayList<>();
        while (!downloadFuture.isEmpty()) {
            for (Iterator<Map.Entry<Integer, Future<InputStream>>> it = downloadFuture.entrySet().iterator(); it.hasNext(); ) {
                Map.Entry<Integer, Future<InputStream>> currentFuture = it.next();
                if (currentFuture.getValue().isDone()) {
                    Integer partNumber = currentFuture.getKey();
                    long partSize = blockSize;
                    if (Objects.equals(partNumber, threadCount)) {
                        partSize = fileSize - (threadCount - 1) * blockSize;
                    }
                    InputStream inputStream = currentFuture.getValue().get();

                    //異步送出上傳任務
                    Future<PartETagAdapter> tagFuture = downloaderTaskExecutor.submit(new UploadTask(inputStream,
                            objectStorageClient, fileBucket, key, uploadId, currentFuture.getKey(), partSize));
                    uploadFuture.add(tagFuture);
                    it.remove();
                }
            }
            sleep(1000);
        }
        log.info("下載下傳結果處理完成,上傳任務送出完成");
        return uploadFuture;
    }

    private List<PartETagAdapter> handleUploadFuture(List<Future<PartETagAdapter>> partETagsFutures) throws Exception {
        List<PartETagAdapter> uploadResult = new ArrayList<>();
        while (!partETagsFutures.isEmpty()) {
            for (Iterator<Future<PartETagAdapter>> iterator = partETagsFutures.iterator(); iterator.hasNext(); ) {
                Future<PartETagAdapter> next = iterator.next();
                if (next.isDone()) {
                    PartETagAdapter partETag = next.get();
                    uploadResult.add(partETag);
                    iterator.remove();
                }
            }
            sleep(1000);
        }
        log.info("上傳任務結果處理完成");
        return uploadResult;
    }

    private void sleep(int millis) {
        try {
            TimeUnit.MILLISECONDS.sleep(millis);
        } catch (InterruptedException e) {
            //ignore
        }
    }

    @Override
    public void afterPropertiesSet() {
        // 設定keep alive的連接配接政策
        ConnectionKeepAliveStrategy strategy = (response, context) -> {
            // Honor 'keep-alive' header
            HeaderElementIterator it = new BasicHeaderElementIterator(
                    response.headerIterator(HTTP.CONN_KEEP_ALIVE));
            while (it.hasNext()) {
                HeaderElement he = it.nextElement();
                String param = he.getName();
                String value = he.getValue();
                if (value != null && "timeout".equalsIgnoreCase(param)) {
                    return Long.parseLong(value) * 1000;
                }
            }
            return 5 * 1000;
        };

        PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
        connectionManager.setMaxTotal(20);
        connectionManager.setDefaultMaxPerRoute(5);

        httpClient = HttpClients.custom()
                .setKeepAliveStrategy(strategy).setConnectionManager(connectionManager).build();
    }
}


           

阿裡oss分片上傳步驟

1.擷取分片上傳id
/**
     * 擷取分片上傳id
     *
     * @param bucketName
     * @param key
     * @return
     */
    public String getMultipartUploadId(String bucketName, String key) {
        // 建立InitiateMultipartUploadRequest對象。
        InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, key);

        // 如果需要在初始化分片時設定檔案存儲類型,請參考以下示例代碼。
        // ObjectMetadata metadata = new ObjectMetadata();
        // metadata.setHeader(OSSHeaders.OSS_STORAGE_CLASS, StorageClass.Standard.toString());
        // request.setObjectMetadata(metadata);

        // 初始化分片。
        InitiateMultipartUploadResult upResult = getInternalOSSClient().initiateMultipartUpload(request);
        // 傳回uploadId,它是分片上傳事件的唯一辨別,可以根據這個uploadId發起相關的操作,如取消分片上傳、查詢分片上傳等。
        return upResult.getUploadId();
    }
           
2.上傳分片

隻需要将分片的流等參數準備好即可

/**
     * 分片上傳
     *
     * @param bucketName
     * @param key
     * @param uploadId
     * @param inputStream 要上傳的流
     * @param order       分片上傳的序号,後面根據這序号拼接完整的檔案
     * @return
     */
    public PartETagAdapter uploadPartObject(String bucketName, String key, String uploadId, InputStream inputStream, int order, long fileSize) throws IOException {
        UploadPartRequest uploadPartRequest = new UploadPartRequest();
        uploadPartRequest.setBucketName(bucketName);
        uploadPartRequest.setKey(key);
        uploadPartRequest.setUploadId(uploadId);
        uploadPartRequest.setInputStream(inputStream);
        // 設定分片大小。除了最後一個分片沒有大小限制,其他的分片最小為100 KB。
        uploadPartRequest.setPartSize(fileSize);
        // 設定分片号。每一個上傳的分片都有一個分片号,取值範圍是1~10000,如果超出這個範圍,OSS将傳回InvalidArgument的錯誤碼。
        uploadPartRequest.setPartNumber(order);
        // 每個分片不需要按順序上傳,甚至可以在不同用戶端上傳,OSS會按照分片号排序組成完整的檔案。
        PartETagAdapter ossPartETagAdapter = new PartETagAdapter();
        ossPartETagAdapter.setOssPartETag(getInternalOSSClient().uploadPart(uploadPartRequest).getPartETag());
        return ossPartETagAdapter;
    }
           
3.合并上傳結果
/**
     * 完成分片上傳
     *
     * @param bucketName
     * @param key
     * @param uploadId
     * @param partETags
     * @return
     */
    public CompleteMultipartUploadResultAdapter completePartlyUpload(String bucketName, String key, String uploadId, List<PartETagAdapter> partETags) {
        // 建立CompleteMultipartUploadRequest對象。
        // 在執行完成分片上傳操作時,需要提供所有有效的partETags。OSS收到送出的partETags後,會逐一驗證每個分片的有效性。當所有的資料分片驗證通過後,OSS将把這些分片組合成一個完整的檔案。
        CompleteMultipartUploadRequest completeMultipartUploadRequest =
                new CompleteMultipartUploadRequest(bucketName, key, uploadId,
                        partETags.stream().map(PartETagAdapter::getOssPartETag).collect(Collectors.toList()));

        // 如果需要在完成檔案上傳的同時設定檔案通路權限,請參考以下示例代碼。
        // completeMultipartUploadRequest.setObjectACL(CannedAccessControlList.PublicRead);

        // 完成上傳。
        CompleteMultipartUploadResultAdapter resultAdapter = new CompleteMultipartUploadResultAdapter();
        resultAdapter.setOssMultiPartUploadResult(getInternalOSSClient().completeMultipartUpload(completeMultipartUploadRequest));
        return resultAdapter;
    }
           

end~