laitimes

With double async, optimize from 191s to 2s

author:DBAplus Community

In development, we often encounter such a requirement to import Excel data into a database.

1. Generally I would do this

  • Read Excel that needs to be imported through POI;
  • Use the file name as the table name, the column header as the column name, and splice the data into SQL statement.
  • Insert the database via JDBC or mybatis;
With double async, optimize from 191s to 2s

In terms of operation, if there are many files and the amount of data is large, it will be very slow.

After the visit, I felt that there was no response, in fact, it was already reading + incoming, but it was relatively slow.

Reading a 100,000-line Excel, it actually took 191s, I thought it was stuck!

private void readXls(String filePath, String filename) throws Exception {
    @SuppressWarnings("resource")
    XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
    // 读取第一个工作表
    XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
    // 总行数
    int maxRow = sheet.getLastRowNum();


    StringBuilder insertBuilder = new StringBuilder();


    insertBuilder.append("insert into ").append(filename).append(" ( UUID,");


    XSSFRow row = sheet.getRow(0);
    for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
        insertBuilder.append(row.getCell(i)).append(",");
    }


    insertBuilder.deleteCharAt(insertBuilder.length() - 1);
    insertBuilder.append(" ) values ( ");




    StringBuilder stringBuilder = new StringBuilder();
    for (int i = 1; i <= maxRow; i++) {
        XSSFRow xssfRow = sheet.getRow(i);
        String id = "";
        String name = "";
        for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
            if (j == 0) {
                id = xssfRow.getCell(j) + "";
            } else if (j == 1) {
                name = xssfRow.getCell(j) + "";
            }
        }


        boolean flag = isExisted(id, name);
        if (!flag) {
            stringBuilder.append(insertBuilder);
            stringBuilder.append('\'').append(uuid()).append('\'').append(",");
            for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
                stringBuilder.append('\'').append(value).append('\'').append(",");
            }
            stringBuilder.deleteCharAt(stringBuilder.length() - 1);
            stringBuilder.append(" )").append("\n");
        }
    }




    List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
    int sum = JdbcUtil.executeDML(collect);
}


private static boolean isExisted(String id, String name) {
    String sql = "select count(1) as num from " + static_TABLE + " where ID = '" + id + "' and NAME = '" + name + "'";
    String num = JdbcUtil.executeSelect(sql, "num");
    return Integer.valueOf(num) > 0;
}


private static String uuid() {
    return UUID.randomUUID().toString().replace("-", "");
}           

2. Who wrote it? Drag it out, cut it!

Optimization 1: Query all data first, cache it in the map, and then make a judgment before insertion, which is much faster.

Optimization 2: If a single Excel file is too large, you can use asynchronous + multi-threaded to read several rows and put them into the database in batches.

With double async, optimize from 191s to 2s

Optimization 3: If the number of files is too large, you can use one Excel and one asynchronous to form a perfect double asynchronous read insertion.

With double async, optimize from 191s to 2s

After using double async, from 191s to 2s, can you believe it?

The key code for reading Excel files asynchronously and reading large Excel files in batches is posted below.

1、readExcelCacheAsync控制类

@RequestMapping(value = "/readExcelCacheAsync", method = RequestMethod.POST)
@ResponseBody
public String readExcelCacheAsync() {
    String path = "G:\\测试\\data\\";
    try {
  // 在读取Excel之前,缓存所有数据
        USER_INFO_SET = getUserInfo();




        File file = new File(path);
        String[] xlsxArr = file.list();
        for (int i = 0; i < xlsxArr.length; i++) {
            File fileTemp = new File(path + "\\" + xlsxArr[i]);
            String filename = fileTemp.getName().replace(".xlsx", "");
            readExcelCacheAsyncService.readXls(path + filename + ".xlsx", filename);
        }
    } catch (Exception e) {
        logger.error("|#ReadDBCsv|#异常: ", e);
        return "error";
    }
    return "success";
}           

2. Read large Excel files in batches

@Async("async-executor")
public void readXls(String filePath, String filename) throws Exception {
    @SuppressWarnings("resource")
    XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
    // 读取第一个工作表
    XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
    // 总行数
    int maxRow = sheet.getLastRowNum();
    logger.info(filename + ".xlsx,一共" + maxRow + "行数据!");
    StringBuilder insertBuilder = new StringBuilder();


    insertBuilder.append("insert into ").append(filename).append(" ( UUID,");


    XSSFRow row = sheet.getRow(0);
    for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
        insertBuilder.append(row.getCell(i)).append(",");
    }


    insertBuilder.deleteCharAt(insertBuilder.length() - 1);
    insertBuilder.append(" ) values ( ");


    int times = maxRow / STEP + 1;
    //logger.info("将" + maxRow + "行数据分" + times + "次插入数据库!");
    for (int time = 0; time < times; time++) {
        int start = STEP * time + 1;
        int end = STEP * time + STEP;


        if (time == times - 1) {
            end = maxRow;
        }


        if(end + 1 - start > 0){
            //logger.info("第" + (time + 1) + "次插入数据库!" + "准备插入" + (end + 1 - start) + "条数据!");
            //readExcelDataAsyncService.readXlsCacheAsync(sheet, row, start, end, insertBuilder);
            readExcelDataAsyncService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder);
        }
    }
}           

3. Asynchronous batch warehousing

@Async("async-executor")
public void readXlsCacheAsync(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder) {
    StringBuilder stringBuilder = new StringBuilder();
    for (int i = start; i <= end; i++) {
        XSSFRow xssfRow = sheet.getRow(i);
        String id = "";
        String name = "";
        for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
            if (j == 0) {
                id = xssfRow.getCell(j) + "";
            } else if (j == 1) {
                name = xssfRow.getCell(j) + "";
            }
        }
  // 先在读取Excel之前,缓存所有数据,再做判断
        boolean flag = isExisted(id, name);
        if (!flag) {
            stringBuilder.append(insertBuilder);
            stringBuilder.append('\'').append(uuid()).append('\'').append(",");
            for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
                stringBuilder.append('\'').append(value).append('\'').append(",");
            }
            stringBuilder.deleteCharAt(stringBuilder.length() - 1);
            stringBuilder.append(" )").append("\n");
        }
    }


    List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
    if (collect != null && collect.size() > 0) {
        int sum = JdbcUtil.executeDML(collect);
    }
}


private boolean isExisted(String id, String name) {
    return ReadExcelCacheAsyncController.USER_INFO_SET.contains(id + "," + name);
}           

4. Asynchronous thread pool tool class

The role of @Async is to process tasks asynchronously.

  • Add a @Async to the method to indicate that the method is asynchronous;
  • Adding @Async to a class means that all methods in the class are asynchronous;
  • The class that uses this annotation must be a Spring-managed class;
  • You need to add @EnableAsync annotations to the startup class or configuration class for the @Async to take effect;

When using @Async, if you do not specify the name of the thread pool, that is, you do not customize the thread pool, @Async there is a default thread pool, and the default thread pool SimpleAsyncTaskExecutor of Spring is used.

The default configuration for the default thread pool is as follows:

  • Default number of core threads: 8;
  • Maximum number of threads:Integet.MAX_VALUE;
  • 队列使用LinkedBlockingQueue;
  • The capacity is :Integet.MAX_VALUE;
  • Idle thread retention time: 60s;
  • Thread pool rejection policy: AbortPolicy;

As can be seen from the maximum number of threads, in the case of concurrency, threads will be created indefinitely.

It can also be reconfigured via yml:

spring:
  task:
    execution:
      pool:
        max-size: 10
        core-size: 5
        keep-alive: 3s
        queue-capacity: 1000
        thread-name-prefix: my-executor           

You can also customize the thread pool, and the following @Async custom thread pool is implemented with simple code below.

@EnableAsync// 支持异步操作
@Configuration
public class AsyncTaskConfig {
    /**
     * com.google.guava中的线程池
     * @return
     */
    @Bean("my-executor")
    public Executor firstExecutor() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-executor").build();
        // 获取CPU的处理器数量
        int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,
                200, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(), threadFactory);
        threadPool.allowsCoreThreadTimeOut();
        return threadPool;
    }
    /**
     * Spring线程池
     * @return
     */
    @Bean("async-executor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // 核心线程数
        taskExecutor.setCorePoolSize(24);
        // 线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        taskExecutor.setMaxPoolSize(200);
        // 缓存队列
        taskExecutor.setQueueCapacity(50);
        // 空闲时间,当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
        taskExecutor.setKeepAliveSeconds(200);
        // 异步方法内部线程名称
        taskExecutor.setThreadNamePrefix("async-executor-");
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}           
With double async, optimize from 191s to 2s

5. Causes of asynchronous failure

  • The method of annotation @Async is not public;
  • 注解@Async的返回值只能为void或Future;
  • Annotation @Async methods will also fail with static modifications;
  • No @EnableAsync annotations;
  • The caller and @Async cannot be in the same class;
  • It's useless to annotate @Transactional on the Async method, but it's valid to annotate @Transcational on the method called by the Async method;

3. The number of core threads in the thread pool is set

There is a problem, I have not had time to explore, the number of core threads in the thread pool CorePoolSize, the maximum number of threads MaxPoolSize, how many are set, the most appropriate, the most efficient.

Take this opportunity to test it out.

1. I remember there was a saying that the number of processors of a CPU

Is it the most efficient to set the number of core threads CorePoolSize to the number of processors of the CPU?

// 获取CPU的处理器数量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;           

Runtime.getRuntime().availableProcessors()获取的是CPU核心线程数,也就是计算资源。

  • CPU-intensive, the thread pool size is set to N, that is, the number of threads is the same as the CPU, which can avoid context switching between threads as much as possible, but in actual development, it is generally set to N+1, in order to prevent unexpected thread blocking, if there is blocking, the extra threads will continue to execute the task to ensure the utilization efficiency of the CPU.
  • For I/O intensive, the thread pool size is set to 2N, which is based on the service stress test, and is recommended if it does not involve services.

In practice, you need to adjust the size of the specific thread pool, which can be adjusted through stress testing and the current status of the machine.

If the thread pool is too large, the CPU will be constantly switched, and the performance of the entire system will not be greatly improved, but the system will be slow.

The number of processors of my computer's CPU is 24.

So how many lines are best read at a time?

The Excel of the test contains 100,000 pieces of data, 100,000/24 = 4166, so I set it to 4200, is it the most efficient?

During the test, it was found that this was really the case.

2. I remember that everyone habitually sets the number of core threads CorePoolSize and the maximum number of threads MaxPoolSize to the same, and they all like to set it to 200

Is it written casually, or is it based on experience?

The test found that when you set both the CorePoolSize and MaxPoolSize to 200, it will open 150 threads at the same time for the first time.

Why is this?

With double async, optimize from 191s to 2s

3. After dozens of tests

I found that the number of core threads doesn't seem to make much difference

The number of reads and inbound per time is key, not too much, because each inbound will be slow;

It can't be too little, if it's too few, more than 150 threads, it will cause thread blocking, and it will also slow down;

With double async, optimize from 191s to 2s

4. Read and insert the database through EasyExcel

EasyExcel way, I don't write double asynchronous optimization, everyone remember to fall into the cycle of low-level diligence.

1、ReadEasyExcelController

@RequestMapping(value = "/readEasyExcel", method = RequestMethod.POST)
@ResponseBody
public String readEasyExcel() {
    try {
        String path = "G:\\测试\\data\\";
        String[] xlsxArr = new File(path).list();
        for (int i = 0; i < xlsxArr.length; i++) {
            String filePath = path + xlsxArr[i];
            File fileTemp = new File(path + xlsxArr[i]);
            String fileName = fileTemp.getName().replace(".xlsx", "");
            List<UserInfo> list = new ArrayList<>();
            EasyExcel.read(filePath, UserInfo.class, new ReadEasyExeclAsyncListener(readEasyExeclService, fileName, batchCount, list)).sheet().doRead();
        }
    }catch (Exception e){
        logger.error("readEasyExcel 异常:",e);
        return "error";
    }
    return "suceess";
}           

2、ReadEasyExeclAsyncListener

public ReadEasyExeclService readEasyExeclService;
 // 表名
    public String TABLE_NAME;
    // 批量插入阈值
    private int BATCH_COUNT;
    // 数据集合
    private List<UserInfo> LIST;


    public ReadEasyExeclAsyncListener(ReadEasyExeclService readEasyExeclService, String tableName, int batchCount, List<UserInfo> list) {
        this.readEasyExeclService = readEasyExeclService;
        this.TABLE_NAME = tableName;
        this.BATCH_COUNT = batchCount;
        this.LIST = list;
    }


    @Override
    public void invoke(UserInfo data, AnalysisContext analysisContext) {
        data.setUuid(uuid());
        data.setTableName(TABLE_NAME);
        LIST.add(data);
        if(LIST.size() >= BATCH_COUNT){
            // 批量入库
            readEasyExeclService.saveDataBatch(LIST);
        }
    }


    @Override
    public void doAfterAllAnalysed(AnalysisContext analysisContext) {
        if(LIST.size() > 0){
         // 最后一批入库
            readEasyExeclService.saveDataBatch(LIST);
        }
    }


    public static String uuid() {
        return UUID.randomUUID().toString().replace("-", "");
    }
}           

3、ReadEasyExeclServiceImpl

@Service
public class ReadEasyExeclServiceImpl implements ReadEasyExeclService {


    @Resource
    private ReadEasyExeclMapper readEasyExeclMapper;


    @Override
    public void saveDataBatch(List<UserInfo> list) {
     // 通过mybatis入库
        readEasyExeclMapper.saveDataBatch(list);
        // 通过JDBC入库
        // insertByJdbc(list);
        list.clear();
    }
    
    private void insertByJdbc(List<UserInfo> list){
        List<String> sqlList = new ArrayList<>();
        for (UserInfo u : list){
            StringBuilder sqlBuilder = new StringBuilder();
            sqlBuilder.append("insert into ").append(u.getTableName()).append(" ( UUID,ID,NAME,AGE,ADDRESS,PHONE,OP_TIME ) values ( ");
            sqlBuilder.append("'").append(ReadEasyExeclAsyncListener.uuid()).append("',")
                            .append("'").append(u.getId()).append("',")
                            .append("'").append(u.getName()).append("',")
                            .append("'").append(u.getAge()).append("',")
                            .append("'").append(u.getAddress()).append("',")
                            .append("'").append(u.getPhone()).append("',")
                            .append("sysdate )");
            sqlList.add(sqlBuilder.toString());
        }


        JdbcUtil.executeDML(sqlList);
    }
}           

4、UserInfo

@Data
public class UserInfo {


    private String tableName;


    private String uuid;


    @ExcelProperty(value = "ID")
    private String id;


    @ExcelProperty(value = "NAME")
    private String name;


    @ExcelProperty(value = "AGE")
    private String age;


    @ExcelProperty(value = "ADDRESS")
    private String address;


    @ExcelProperty(value = "PHONE")
    private String phone;
}           

Author丨哪吒

Source丨Official Account: Nezha Programming (ID: gh_61b183bcf690)

The DBAPLUS community welcomes contributions from technical personnel at [email protected]