天天看點

資料導入任務并行化

當我們需要做入庫操作的時候,一般采取導入的方式,而導入的方式中,一般采取excel導入的方式比較多,而當excel中資料量很大的時候,導入的時長就不受控制,是以我們需要考慮異步并行化處理。

如何實作異步

我們可以考慮在使用者上傳導入檔案的時候,在檔案上傳完畢的時候,直接傳回結果,并提示資料導入中。然後利用@Async注解實作異步。

以下面的方法為例:

@PutMapping("/vulns/add")
    @ResponseBody
    public Result importVulns(MultipartFile file, @BaseApiVisible VisibleUser visibleUser,
                              @Nullable @RequestParam("force_import") Boolean forceImport) {
        // TODO: 2022/9/7 校驗檔案是否正在使用
        String md5 = MD5Utils.compMd5(file);
        if (fileService.inProcessFileMd5s.contains(md5)) {
            if (Objects.isNull(forceImport) || (Objects.nonNull(forceImport) && !forceImport)) {
                return Result.failure(CommonResultStatus.REPEAT_FILE);
            }
        }
        fileService.inProcessFileMd5s.add(md5);
        List<ImportAddVulnQuery> queries = VulnPersonFormatUtils.getDataFromExcel(file);
        Long fileId = -1L;
        try {
            String name = minioUtil.uploadFile(file, "demo");
                    .build());
            Long hisId = vulnImportHistoryRepository.save(VulnImportHistory.builder()

                    .build());
            Long importDataProcessId = importDataProcessRepository.save(ImportDataProcess.builder()
                    .build());
            demo1(visibleUser, file, md5, hisId, importDataProcessId, fileId);
            return Result.success("導入檔案上傳成功,請到導入管理檢視導入情況");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return Result.failure("導入檔案上傳失敗");
    }      

我們真正處理導入是在demo1這個函數中執行的,而我們隻需要給該函數加入@Async注解,即可實作異步調用。

@Async
public void demo1(){
    
}      

不過,想要使該注解生效,必須在啟動類加上@EnableAsync的配置。

如何實作并行化

在實作并行化之前,我們需要了解一個方法或者一個導入流程中哪些情況比較耗時。我總結了以下串行化需要轉變為并行化的場景:

1、資料庫反複通路

2、反複調用外部接口

3、爬蟲反複抓取外部頁面

等等

一句話描述就是:需要反複進行網絡傳輸的部分都需要考慮并行化。

即,原本我們需要反複執行n次的步驟,如下圖:

![1aee65714e3949f1a681f6fd1b6ad15.jpg](https://img-blog.csdnimg.cn/img_convert/41620980021d1fe85ad5152c0d2583e6.jpeg#clientId=ue9901bba-4996-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=712&id=u057604c4&margin=[object Object]&name=1aee65714e3949f1a681f6fd1b6ad15.jpg&originHeight=2532&originWidth=1170&originalType=binary&ratio=1&rotation=0&showTitle=false&size=61401&status=done&style=none&taskId=u854fafde-ee43-43d7-b5a7-f3f8f5e0859&title=&width=329)

在局部并行化之後,我們就隻需要執行n/并發數 次了。可以節省等待的時間。

![8b24d1a93fe0527682a3b01c9ec8f90.jpg](https://img-blog.csdnimg.cn/img_convert/9d12600127994107d139956cf2133529.jpeg#clientId=ue9901bba-4996-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=760&id=u01cf63f4&margin=[object Object]&name=8b24d1a93fe0527682a3b01c9ec8f90.jpg&originHeight=2532&originWidth=1170&originalType=binary&ratio=1&rotation=0&showTitle=false&size=66968&status=done&style=none&taskId=ue6fcd22e-739c-4886-857b-7a61e7cd797&title=&width=351)

那麼代碼怎麼寫呢?

假設原來的寫法為

for (A a : as) {
    handle(a)
}      

那麼現在可以寫為

ExecutorService updatePool = Executors.newFixedThreadPool(10);
List<Callable<Integer>> updateCallers = new ArrayList<>();
for (A a : as) {
    updateCallers.add(() -> {
        handle(a);
        return null;
    });
}
try {
    updatePool.invokeAll(callers);
    updatePool.shutdown();
} catch (InterruptedException e) {
    e.printStackTrace();
}      

這樣,就以10個并發進行執行了。

但是這裡需要注意的點就是,要確定你的執行沒有先後順序,如果有,就不能這麼改。

疊代建議