天天看點

SpringBoot 整合 MinIO 實作視訊的分片上傳/斷點續傳(親測可行)

作者:老誠不bug

1、前言

之前做了一個慕課網上的仿短視訊開發,裡面有很多比較粗糙的實作,比如視訊上傳部分是直接由前端上傳雲服務,沒考慮到客戶的網絡環境品質等問題,如果一個視訊快上傳完了,但是網斷了沒有上傳完成需要客戶重新上傳,這對于使用者體驗是極差的。

那麼我們對于視訊檔案的上傳可以采取斷點續傳,上傳過程中,如果出現網絡異常或程式崩潰導緻檔案上傳失敗時,将從斷點記錄處繼續上傳未上傳完成的部分,斷點續傳依賴于MD5和分片上傳,對于本demo分片上傳的流程如圖

SpringBoot 整合 MinIO 實作視訊的分片上傳/斷點續傳(親測可行)

通過檔案唯一辨別MD5,在資料庫中查詢此前是否建立過該SysUploadTask,如果存在,直接傳回TaskInfo;如果不存在,通過amazonS3擷取到UploadId并建立一個SysUploadTask傳回。

前端将檔案分好片後,通過伺服器得到每一片的一個預位址,然後由前端直接向minio伺服器發起真正的上傳請求,避免上傳時占用應用伺服器的帶寬,影響系統穩定。最後再向後端伺服器發起合并請求。

2、資料庫結構

SpringBoot 整合 MinIO 實作視訊的分片上傳/斷點續傳(親測可行)

3、後端實作

3.1、根據MD5擷取是否存在相同檔案

Controller層

/**
 * 查詢是否上傳過,若存在,傳回TaskInfoDTO
 * @param identifier 檔案md5
 * @return
 */
@GetMapping("/{identifier}")
public GraceJSONResult taskInfo (@PathVariable("identifier") String identifier) {
    return GraceJSONResult.ok(sysUploadTaskService.getTaskInfo(identifier));
}
           

Service層

/**
 * 查詢是否上傳過,若存在,傳回TaskInfoDTO
 * @param identifier
 * @return
 */
public TaskInfoDTO getTaskInfo(String identifier) {
    SysUploadTask task = getByIdentifier(identifier);
    if (task == null) {
        return null;
    }
    TaskInfoDTO result = new TaskInfoDTO().setFinished(true).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(task.getBucketName(), task.getObjectKey()));

    boolean doesObjectExist = amazonS3.doesObjectExist(task.getBucketName(), task.getObjectKey());
    if (!doesObjectExist) {
        // 未上傳完,傳回已上傳的分片
        ListPartsRequest listPartsRequest = new ListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());
        PartListing partListing = amazonS3.listParts(listPartsRequest);
        result.setFinished(false).getTaskRecord().setExitPartList(partListing.getParts());
    }
    return result;
}
           

3.2、初始化一個上傳任務

Controller層

/**
 * 建立一個上傳任務
 * @return
 */
@PostMapping
public GraceJSONResult initTask (@Valid @RequestBody InitTaskParam param) {
    return GraceJSONResult.ok(sysUploadTaskService.initTask(param));
}
           

Service層

/**
 * 初始化一個任務
 */
public TaskInfoDTO initTask(InitTaskParam param) {

    Date currentDate = new Date();
    String bucketName = minioProperties.getBucketName();
    String fileName = param.getFileName();
    String suffix = fileName.substring(fileName.lastIndexOf(".")+1, fileName.length());
    String key = StrUtil.format("{}/{}.{}", DateUtil.format(currentDate, "YYYY-MM-dd"), IdUtil.randomUUID(), suffix);
    String contentType = MediaTypeFactory.getMediaType(key).orElse(MediaType.APPLICATION_OCTET_STREAM).toString();
    ObjectMetadata objectMetadata = new ObjectMetadata();
    objectMetadata.setContentType(contentType);
    InitiateMultipartUploadResult initiateMultipartUploadResult = amazonS3
            .initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, key).withObjectMetadata(objectMetadata));
    String uploadId = initiateMultipartUploadResult.getUploadId();

    SysUploadTask task = new SysUploadTask();
    int chunkNum = (int) Math.ceil(param.getTotalSize() * 1.0 / param.getChunkSize());
    task.setBucketName(minioProperties.getBucketName())
            .setChunkNum(chunkNum)
            .setChunkSize(param.getChunkSize())
            .setTotalSize(param.getTotalSize())
            .setFileIdentifier(param.getIdentifier())
            .setFileName(fileName)
            .setObjectKey(key)
            .setUploadId(uploadId);
    sysUploadTaskMapper.insert(task);
    return new TaskInfoDTO().setFinished(false).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(bucketName, key));
}
           

3.3、擷取每個分片的預簽名上傳位址

Controller層

/**
 * 擷取每個分片的預簽名上傳位址
 * @param identifier
 * @param partNumber
 * @return
 */
@GetMapping("/{identifier}/{partNumber}")
public GraceJSONResult preSignUploadUrl (@PathVariable("identifier") String identifier, @PathVariable("partNumber") Integer partNumber) {
    SysUploadTask task = sysUploadTaskService.getByIdentifier(identifier);
    if (task == null) {
        return GraceJSONResult.error("分片任務不存在");
    }
    Map<String, String> params = new HashMap<>();
    params.put("partNumber", partNumber.toString());
    params.put("uploadId", task.getUploadId());
    return GraceJSONResult.ok(sysUploadTaskService.genPreSignUploadUrl(task.getBucketName(), task.getObjectKey(), params));
}
           

Service層

/**
 * 生成預簽名上傳url
 * @param bucket 桶名
 * @param objectKey 對象的key
 * @param params 額外的參數
 * @return
 */
public String genPreSignUploadUrl(String bucket, String objectKey, Map<String, String> params) {
    Date currentDate = new Date();
    Date expireDate = DateUtil.offsetMillisecond(currentDate, PRE_SIGN_URL_EXPIRE.intValue());
    GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucket, objectKey)
            .withExpiration(expireDate).withMethod(HttpMethod.PUT);
    if (params != null) {
        params.forEach((key, val) -> request.addRequestParameter(key, val));
    }
    URL preSignedUrl = amazonS3.generatePresignedUrl(request);
    return preSignedUrl.toString();
}
           

3.4、合并分片

Controller層

/**
 * 合并分片
 * @param identifier
 * @return
 */
@PostMapping("/merge/{identifier}")
public GraceJSONResult merge (@PathVariable("identifier") String identifier) {
    sysUploadTaskService.merge(identifier);
    return GraceJSONResult.ok();
}
           

Service層

/**
 * 合并分片
 * @param identifier
 */
public void merge(String identifier) {
    SysUploadTask task = getByIdentifier(identifier);
    if (task == null) {
        throw new RuntimeException("分片任務不存");
    }

    ListPartsRequest listPartsRequest = new ListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());
    PartListing partListing = amazonS3.listParts(listPartsRequest);
    List<PartSummary> parts = partListing.getParts();
    if (!task.getChunkNum().equals(parts.size())) {
        // 已上傳分塊數量與記錄中的數量不對應,不能合并分塊
        throw new RuntimeException("分片缺失,請重新上傳");
    }
    CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest()
            .withUploadId(task.getUploadId())
            .withKey(task.getObjectKey())
            .withBucketName(task.getBucketName())
            .withPartETags(parts.stream().map(partSummary -> new PartETag(partSummary.getPartNumber(), partSummary.getETag())).collect(Collectors.toList()));
    CompleteMultipartUploadResult result = amazonS3.completeMultipartUpload(completeMultipartUploadRequest);
}
           

4、分片檔案清理問題

視訊上傳一半不上傳了,怎麼清理碎片分片。

可以考慮在sys_upload_task表中新加一個status字段,表示是否合并分片,預設為false,merge請求結束後變更為true,通過一個定時任務定期清理為status為false的記錄。另外MinIO自身對于臨時上傳的分片,會實施定時清理。

Demo位址

https://github.com/robinsyn/MinIO_Demo
來源:blog.csdn.net/weixin_44153131/article/details/129249169