使用多線程下載下傳資源後,分片上傳至阿裡oss(主流對象存儲都支援分片上傳)。實作邊下載下傳邊上傳
注意:下載下傳的資源url必須支援部分資源傳回。http狀态碼:206
下載下傳執行類DownloadTask
該類實作callable接口,負責下載下傳資源的某一部分
使用CloseableHttpClient進行下載下傳
package com.tinet.clink.chat.web.service.rtc;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Callable;
/**
* @author dengsx
* @create 2021-06-11
**/
@Slf4j
class DownloadTask implements Callable<InputStream> {
private final String url;
private final long lowerBound; // 下載下傳的檔案區間
private final long upperBound;
private final CloseableHttpClient client;
DownloadTask(String url, long lowerBound, long upperBound, CloseableHttpClient client) {
this.url = url;
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.client = client;
}
@Override
public InputStream call() throws Exception {
try (InputStream input = connect()) {
log.info("線程{}下載下傳完成,下載下傳區間:{}-{},下載下傳位元組:{}", Thread.currentThread().getName(),
this.lowerBound, this.upperBound, input.available());
return input;
} catch (IOException e) {
throw new IOException(e.getMessage());
}
}
/**
* 連接配接WEB伺服器,并傳回一個資料流
*
* @return 傳回輸入流
* @throws IOException 網絡連接配接錯誤
*/
private InputStream connect() throws IOException {
HttpGet get = new HttpGet(url);
get.setHeader("Range", "bytes=" + lowerBound + "-" + upperBound);
try (CloseableHttpResponse response = client.execute(get)) {
log.info("線程{}連接配接成功開始下載下傳", Thread.currentThread().getName());
int statusCode = response.getStatusLine().getStatusCode();
if (HttpStatus.SC_PARTIAL_CONTENT != statusCode) {
throw new IOException("下載下傳連接配接不支援多線程下載下傳:" + statusCode);
}
return new ByteArrayInputStream(EntityUtils.toByteArray(response.getEntity()));
} catch (IOException e) {
throw new IOException("下出錯:" + e.getMessage());
}
}
}
上傳任務類UploadTask
該類負責将下載下傳的資源上傳至oss,使用阿裡oss的分片上傳
package com.tinet.clink.chat.web.service.rtc;
import com.tinet.clink.common.model.PartETagAdapter;
import com.tinet.clink.common.objectstorage.ObjectStorageClient;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Callable;
/**
* @author dengsx
* @create 2021-06-11
**/
@Slf4j
public class UploadTask implements Callable<PartETagAdapter> {
private final InputStream inputStream;
private final ObjectStorageClient objectStorageClient;
private final int order;
private final long partSize;
private final String uploadId;
private final String fileBucket;
private final String key;
public UploadTask(InputStream inputStream, ObjectStorageClient objectStorageService, String fileBucket, String key, String uploadId, int order, long partSize) {
this.inputStream = inputStream;
this.objectStorageClient = objectStorageService;
this.order = order;
this.uploadId = uploadId;
this.key = key;
this.fileBucket = fileBucket;
this.partSize = partSize;
}
@Override
public PartETagAdapter call() throws IOException {
log.info("開始上傳分片,分片order:{},分片大小:{}", order, partSize);
return objectStorageClient.uploadPartObject(fileBucket, key, uploadId, inputStream, order, partSize);
}
}
下載下傳器類Downloader
該類負責,配置設定任務給線程下載下傳,上傳,合并結果
package com.tinet.clink.chat.web.service.rtc;
import com.tinet.clink.common.model.CompleteMultipartUploadResultAdapter;
import com.tinet.clink.common.model.PartETagAdapter;
import com.tinet.clink.common.objectstorage.ObjectStorageClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 從騰訊雲下載下傳視訊上傳至oss
* 多線程下載下傳
* 邊下載下傳邊上傳
*
* @author dengsx
* @create 2021-06-11
**/
@Component
@Slf4j
public class Downloader implements InitializingBean {
private CloseableHttpClient httpClient;
private static final int DEFAULT_THREAD_COUNT;
/**
* 阻塞系數 大緻 0.5
*/
private static final double BLOCK_FACTOR = 0.5;
static {
// io密集型任務,線程數設定為cpu核數的兩倍
Double threadCount = Runtime.getRuntime().availableProcessors() / (1 - BLOCK_FACTOR);
DEFAULT_THREAD_COUNT = threadCount.intValue();
}
@Autowired
@Qualifier("downloaderTaskExecutor")
private ThreadPoolTaskExecutor downloaderTaskExecutor;
/**
* 會話檔案上傳的桶
*/
@Value("${object.storage.bucket.chat}")
private String fileBucket;
@Autowired
private ObjectStorageClient objectStorageClient;
public CompleteMultipartUploadResultAdapter start(String url, String fileKey) throws Exception {
return this.start(url, DEFAULT_THREAD_COUNT, fileKey);
}
public CompleteMultipartUploadResultAdapter start(String url, Integer threadCount, String fileKey) throws Exception {
if (threadCount == null || threadCount < 0) {
throw new IllegalArgumentException("invalid threadCount number!");
}
threadCount = Math.min(DEFAULT_THREAD_COUNT, threadCount);
long fileSize = getFileSize(url);
log.info("開始下載下傳,檔案大小(kb):{},fileSize(Mb):{},下載下傳線程數:{}", fileSize, fileSize / 1024.0 / 1024, threadCount);
//記錄時間
long beginTime = System.currentTimeMillis();
// 配置設定線程下載下傳
Map<Integer, Future<InputStream>> downloadFuture = dispatcher(url, fileSize, threadCount);
//分片上傳id
String uploadId = objectStorageClient.getMultipartUploadId(fileBucket, fileKey);
// 處理下載下傳結果
List<Future<PartETagAdapter>> uploadFuture = handleDownloadFuture(downloadFuture, uploadId, fileKey, threadCount, fileSize);
//處理上傳結果
List<PartETagAdapter> uploadResult = handleUploadFuture(uploadFuture);
//分片上傳合并
CompleteMultipartUploadResultAdapter result = objectStorageClient.completePartlyUpload(fileBucket, fileKey, uploadId, uploadResult);
log.info("下載下傳成功并上傳oss成功:本次用時:{}秒,上傳結果{},", (System.currentTimeMillis() - beginTime) / 1000, result.getKey());
return result;
}
/**
* 配置設定器,決定每個線程下載下傳哪個區間的資料
*/
private Map<Integer, Future<InputStream>> dispatcher(String url, long fileSize, int threadCount) {
long blockSize = fileSize / threadCount; // 每個線程要下載下傳的資料量
long lowerBound;
long upperBound;
Map<Integer, Future<InputStream>> result = new HashMap<>();
for (int i = 0; i < threadCount; i++) {
lowerBound = i * blockSize;
// 最後一個分片的上限是檔案大小
upperBound = (i == threadCount - 1) ? fileSize - 1 : lowerBound + blockSize;
// 異步送出下載下傳任務
Future<InputStream> future = downloaderTaskExecutor.submit(new DownloadTask(url, lowerBound, upperBound, httpClient));
result.put(i + 1, future);
}
return result;
}
/**
* @return 要下載下傳的檔案的尺寸
*/
private long getFileSize(String url) throws IOException {
HttpHead head = new HttpHead(url);
String lengthValue = httpClient.execute(head).getHeaders("Content-Length")[0].getValue();
return Long.parseLong(lengthValue);
}
private List<Future<PartETagAdapter>> handleDownloadFuture(Map<Integer, Future<InputStream>> downloadFuture,
String uploadId, String key,
int threadCount, long fileSize) throws Exception {
long blockSize = fileSize / threadCount;
List<Future<PartETagAdapter>> uploadFuture = new ArrayList<>();
while (!downloadFuture.isEmpty()) {
for (Iterator<Map.Entry<Integer, Future<InputStream>>> it = downloadFuture.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<Integer, Future<InputStream>> currentFuture = it.next();
if (currentFuture.getValue().isDone()) {
Integer partNumber = currentFuture.getKey();
long partSize = blockSize;
if (Objects.equals(partNumber, threadCount)) {
partSize = fileSize - (threadCount - 1) * blockSize;
}
InputStream inputStream = currentFuture.getValue().get();
//異步送出上傳任務
Future<PartETagAdapter> tagFuture = downloaderTaskExecutor.submit(new UploadTask(inputStream,
objectStorageClient, fileBucket, key, uploadId, currentFuture.getKey(), partSize));
uploadFuture.add(tagFuture);
it.remove();
}
}
sleep(1000);
}
log.info("下載下傳結果處理完成,上傳任務送出完成");
return uploadFuture;
}
private List<PartETagAdapter> handleUploadFuture(List<Future<PartETagAdapter>> partETagsFutures) throws Exception {
List<PartETagAdapter> uploadResult = new ArrayList<>();
while (!partETagsFutures.isEmpty()) {
for (Iterator<Future<PartETagAdapter>> iterator = partETagsFutures.iterator(); iterator.hasNext(); ) {
Future<PartETagAdapter> next = iterator.next();
if (next.isDone()) {
PartETagAdapter partETag = next.get();
uploadResult.add(partETag);
iterator.remove();
}
}
sleep(1000);
}
log.info("上傳任務結果處理完成");
return uploadResult;
}
private void sleep(int millis) {
try {
TimeUnit.MILLISECONDS.sleep(millis);
} catch (InterruptedException e) {
//ignore
}
}
@Override
public void afterPropertiesSet() {
// 設定keep alive的連接配接政策
ConnectionKeepAliveStrategy strategy = (response, context) -> {
// Honor 'keep-alive' header
HeaderElementIterator it = new BasicHeaderElementIterator(
response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && "timeout".equalsIgnoreCase(param)) {
return Long.parseLong(value) * 1000;
}
}
return 5 * 1000;
};
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(20);
connectionManager.setDefaultMaxPerRoute(5);
httpClient = HttpClients.custom()
.setKeepAliveStrategy(strategy).setConnectionManager(connectionManager).build();
}
}
阿裡oss分片上傳步驟
1.擷取分片上傳id
/**
* 擷取分片上傳id
*
* @param bucketName
* @param key
* @return
*/
public String getMultipartUploadId(String bucketName, String key) {
// 建立InitiateMultipartUploadRequest對象。
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, key);
// 如果需要在初始化分片時設定檔案存儲類型,請參考以下示例代碼。
// ObjectMetadata metadata = new ObjectMetadata();
// metadata.setHeader(OSSHeaders.OSS_STORAGE_CLASS, StorageClass.Standard.toString());
// request.setObjectMetadata(metadata);
// 初始化分片。
InitiateMultipartUploadResult upResult = getInternalOSSClient().initiateMultipartUpload(request);
// 傳回uploadId,它是分片上傳事件的唯一辨別,可以根據這個uploadId發起相關的操作,如取消分片上傳、查詢分片上傳等。
return upResult.getUploadId();
}
2.上傳分片
隻需要将分片的流等參數準備好即可
/**
* 分片上傳
*
* @param bucketName
* @param key
* @param uploadId
* @param inputStream 要上傳的流
* @param order 分片上傳的序号,後面根據這序号拼接完整的檔案
* @return
*/
public PartETagAdapter uploadPartObject(String bucketName, String key, String uploadId, InputStream inputStream, int order, long fileSize) throws IOException {
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucketName);
uploadPartRequest.setKey(key);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(inputStream);
// 設定分片大小。除了最後一個分片沒有大小限制,其他的分片最小為100 KB。
uploadPartRequest.setPartSize(fileSize);
// 設定分片号。每一個上傳的分片都有一個分片号,取值範圍是1~10000,如果超出這個範圍,OSS将傳回InvalidArgument的錯誤碼。
uploadPartRequest.setPartNumber(order);
// 每個分片不需要按順序上傳,甚至可以在不同用戶端上傳,OSS會按照分片号排序組成完整的檔案。
PartETagAdapter ossPartETagAdapter = new PartETagAdapter();
ossPartETagAdapter.setOssPartETag(getInternalOSSClient().uploadPart(uploadPartRequest).getPartETag());
return ossPartETagAdapter;
}
3.合并上傳結果
/**
* 完成分片上傳
*
* @param bucketName
* @param key
* @param uploadId
* @param partETags
* @return
*/
public CompleteMultipartUploadResultAdapter completePartlyUpload(String bucketName, String key, String uploadId, List<PartETagAdapter> partETags) {
// 建立CompleteMultipartUploadRequest對象。
// 在執行完成分片上傳操作時,需要提供所有有效的partETags。OSS收到送出的partETags後,會逐一驗證每個分片的有效性。當所有的資料分片驗證通過後,OSS将把這些分片組合成一個完整的檔案。
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, key, uploadId,
partETags.stream().map(PartETagAdapter::getOssPartETag).collect(Collectors.toList()));
// 如果需要在完成檔案上傳的同時設定檔案通路權限,請參考以下示例代碼。
// completeMultipartUploadRequest.setObjectACL(CannedAccessControlList.PublicRead);
// 完成上傳。
CompleteMultipartUploadResultAdapter resultAdapter = new CompleteMultipartUploadResultAdapter();
resultAdapter.setOssMultiPartUploadResult(getInternalOSSClient().completeMultipartUpload(completeMultipartUploadRequest));
return resultAdapter;
}
end~