天天看點

多線程事物復原、多線程造成死鎖,造成連接配接資源不夠的解決方案

作者:java油膩的程式猿

一 事物五大類

多線程事物復原、多線程造成死鎖,造成連接配接資源不夠的解決方案

二 事物使用區分

1 自動復原和手動復原不能一起使用回報錯沖突除非PROPAGATION_REQUIRES_NEW新事物才不會和自動事物沖突

2 手動復原包含兩種

1》 SqlSession

  • // 擷取資料庫連接配接,擷取會話(内部自有事務)
  • SqlSession sqlSession = sqlContext.getSqlSession();
  • Connection connection = sqlSession.getConnection();

2 》TransactionManager

注意 DataSourceTransactionManager.rollback和 TransactionStatus.setRollbackOnly差別

setRollbackOnly rollback
不同點 控制範圍小,資源釋放需要配合自動事物 控制範圍大,釋放不需要自動事物管理
相同點 能復原 能復原

三 下面是測試分析

情況一手動事物死鎖

多線程事物復原、多線程造成死鎖,造成連接配接資源不夠的解決方案

情況二手動事物不鎖

多線程事物復原、多線程造成死鎖,造成連接配接資源不夠的解決方案

情況三自動+手動事物不鎖表(注意手動需要開啟新事物不然沖突)

多線程事物復原、多線程造成死鎖,造成連接配接資源不夠的解決方案

情況四自動+手動事物鎖表

多線程事物復原、多線程造成死鎖,造成連接配接資源不夠的解決方案

情況五自動+手動事物鎖表

多線程事物復原、多線程造成死鎖,造成連接配接資源不夠的解決方案

情況六自動+手動事物不鎖表

多線程事物復原、多線程造成死鎖,造成連接配接資源不夠的解決方案
檢視mysql事物鎖
//目前運作的所有事務 首先我們檢視被鎖表的程序
SELECT * FROM information_schema.INNODB_TRX;
//目前出現的鎖
SELECT * FROM information_schema.INNODB_LOCKs;
//鎖等待的對應關系
SELECT * FROM information_schema.INNODB_LOCK_waits;
解決:kill事務,清理表資料,轉移到曆史表,檢查定時任務
然後找到程序号,即 trx_mysql_thread_id
然後執行;
kill 程序号;           

四 多線程事物推薦一下兩種

注意線程不能用太多,調整mysql最大連接配接數5.7預設151改成500,根據cpu情況來

推薦最大核心數=cpu*2+1,但是還有其它線程使用sleep和await,所見建議處理大資料量的時候

線程控制在10以内

@Override
    public void testTranslationOfThreads6() throws SQLException {
        //準備資料50000條
        List<Cdr> list = new ArrayList<>();
        for (int i = 0; i < 50000; i++) {
            Cdr cdr = new Cdr();
            cdr.setSrc("" + i);
            cdr.setDatetime(DateUtils.formatTime(new Date()));
            list.add(cdr);
        }
        // 擷取資料庫連接配接,擷取會話(内部自有事務)
        SqlSession sqlSession = sqlContext.getSqlSession();
        Connection connection = sqlSession.getConnection();
        try {
            // 設定手動送出
            connection.setAutoCommit(false);
            //擷取mapper
            CdrMapper employeeMapper = sqlSession.getMapper(CdrMapper.class);
            //先做删除操作
//            employeeMapper.remove()
            //擷取執行器
            ExecutorService service = ExecutorConfig.getThreadPool();
            List<Callable<Integer>> callableList = new ArrayList<>();
            //拆分list
            List<List<Cdr>> lists = averageAssign(list, 5);
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            for (int i = 0; i < lists.size(); i++) {
                if (i == lists.size() - 1) {
                    atomicBoolean.set(false);
                }
                List<Cdr> list1 = lists.get(i);
                //使用傳回結果的callable去執行,
                Callable<Integer> callable = () -> {
                    //讓最後一個線程抛出異常
                    if (!atomicBoolean.get()) {
                        throw new JeecgBootException(001, "出現異常");
                    }
                    int insert = employeeMapper.insert(list1.get(0));

                    return insert;
                };
                callableList.add(callable);
            }
            //執行子線程
            List<Future<Integer>> futures = service.invokeAll(callableList);
            for (Future<Integer> future : futures) {
                //如果有一個執行不成功,則全部復原
                if (future.get() <= 0) {
                    connection.rollback();
                    return;
                }
            }
            connection.commit();
            System.out.println("添加完畢");
        } catch (Exception e) {
            connection.rollback();
            log.info("error", e);
            throw new JeecgBootException(002, "出現異常");
        } finally {
            connection.close();
        }

    }           
@Override
    public void testTranslationOfThreads3() {
        long startTime = System.currentTimeMillis();
        //準備資料50000條
        List<Cdr> list = new ArrayList<>();
        for (int i = 0; i < 50000; i++) {
            Cdr cdr = new Cdr();
            cdr.setSrc("" + i);
            cdr.setDatetime(DateUtils.formatTime(new Date()));
            list.add(cdr);
        }
        // 線程數量
        final Integer threadCount = 2;
        //每個線程處理的資料量
        final Integer dataPartionLength = (list.size() + threadCount - 1) / threadCount;
        // 建立多線程處理任務
        ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
        CountDownLatch threadLatchs = new CountDownLatch(threadCount);
        AtomicBoolean isError = new AtomicBoolean(false);
        try {
            for (int i = 0; i < threadCount; i++) {
                // 每個線程處理的資料
                List<Cdr> threadDatas = list.stream()
                        .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
                studentThreadPool.execute(() -> {
                    try {
                        try {
                            this.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
                        } catch (Throwable e) {
                            e.printStackTrace();
                            isError.set(true);
                        } finally {
                            threadLatchs.countDown();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        isError.set(true);
                    }
                });
            }

            // 倒計時鎖設定逾時時間 30s
            boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
            // 判斷是否逾時
            if (!await) {
                isError.set(true);
            }

        } catch (Exception e) {
            e.printStackTrace();
            isError.set(true);
        }
        if (!transactionStatuses.isEmpty()) {
            if (isError.get()) {
                transactionStatuses.forEach(s -> transactionManager.rollback(s));
            } else {
                transactionStatuses.forEach(s -> transactionManager.commit(s));
            }
        }
        long endTime = System.currentTimeMillis();
        log.info("共耗時:{}", (endTime - startTime) / 1000 + "秒");
        System.out.println("主線程完成");
    }           

繼續閱讀