天天看點

Java多線程百萬資料快速入庫實戰

出自:

騰訊課堂 700多分鐘幹貨實戰Java多線程高并發高性能實戰全集 , 我學習完了之後, 我給 老師在課上說的話做了個筆記,以及視訊的内容,還有代碼敲了一遍,然後添加了一些注釋,把執行結果也整理了一下, 做了個筆記

背景

某應用程式(單台伺服器,非分布式的多台伺服器),這單台伺服器就是你的筆記本電腦了,

并發産生100萬條資料,這100w條資料是你自己産生的,假設你是架構師,如何運用多線程等基礎知識将這100萬條資料,快速同步(4分鐘以内)到MySQL資料庫?

分析百萬資料快速入庫的特點

1.百萬資料快速入庫的特點:

資料量比較大(高并發),時間很短(性能),

100萬條資料如果一條一條的插入到資料庫的話,時間是很慢的,是以我們采用批量的方式插入,每次分一兩萬, 分多個批次,并行的插入到資料庫裡面.

這就是用并發程式設計的方式去解決高并發高性能的問題

2.百萬資料如何在短時間内入庫?如何從架構角度優化性能?

應用程式怎麼優化呢? 可以采用并發程式設計的形式,比如說多線程,線程池去提升性能

在資料連接配接池這層,我們可以調優,讓它的并發量更高,提高資料庫連接配接池的整體性能.

代碼

Producer

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer {

    public static void main(String[] args) {
        Producer.createData();
    }

    public static void createData() {
        ExecutorService pool = Executors.newFixedThreadPool(100);
        final int totalPageNo = 50; //分50批次

        final int pageSize = 20000; //每頁大小是2萬條
        //共10w條資料,每頁5000條資料,20個線程
        final long start = System.currentTimeMillis();
        final AtomicInteger atomicInt = new AtomicInteger();
        for (int currentPageNo = 0; currentPageNo < totalPageNo; currentPageNo++) {
            final int finalCurrentPageNo = currentPageNo;

            Runnable run = new Runnable() {

                @Override
                public void run() {
                    List userList = new ArrayList<>();
                    for (int i = 1; i <= pageSize; i++) {
                        int id = i + finalCurrentPageNo * pageSize;
                        User user = new User();
                        user.setId(id);
                        user.setName("huanglaoxie:" + id);
                        userList.add(user);
                    }

                    atomicInt.addAndGet(UserBatchHandler.batchSave(userList, Thread.currentThread().getName()));
                    //入庫的資料達到一百萬條的時候就會有個統計.
                    if (atomicInt.get() == (totalPageNo * pageSize)) {
                        //如果有一百萬的時候.就會在這裡有個結果
                        System.out.println("同步資料到db,它已經花費 " + ((System.currentTimeMillis() - start) / 1000) + "  秒");
                    }

                }
            };
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) {

                e.printStackTrace();
            }
            pool.execute(run);
        }

    }

}      

User

import java.sql.Timestamp;

public class User {
    private int id;
    private String name;
    private Timestamp createdTime;
    private Timestamp updatedTime;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Timestamp getCreatedTime() {
        return createdTime;
    }

    public void setCreatedTime(Timestamp createdTime) {
        this.createdTime = createdTime;
    }

    public Timestamp getUpdatedTime() {
        return updatedTime;
    }

    public void setUpdatedTime(Timestamp updatedTime) {
        this.updatedTime = updatedTime;
    }
}      

UserBatchHandler

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

public class UserBatchHandler {


    public static int batchSave(List userList, String threadName)  {
        String insertSql ="INSERT INTO user(id,name,createdTime,updatedTime) VALUES(?,?,sysdate(),sysdate())";
        //取得發送sql語句的對象
        PreparedStatement pst = null;
        User  user;
        int[] count = new int[0];
        Connection conn = null;
        try {
            conn= DataSourceUtils.getConnection();
            pst = conn.prepareStatement(insertSql);

            long start=System.currentTimeMillis();
            if(null!=userList&&userList.size()>0){
                for(int i=0;i<userList.size();i++){
                    user= (User) userList.get(i);
                    pst.setInt(1,user.getId());
                    pst.setString(2,user.getName());
                    //加入批處理
                    pst.addBatch();
                }

                count= pst.executeBatch();
                System.out.println(count.length);
                System.out.println(" threadName為"+threadName+", sync data to db, it  has spent " +(System.currentTimeMillis()-start)+"  ms");
        }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            //4. 釋放資源
            DataSourceUtils.close(conn, pst);
        }

        //擷取到資料更新的行數
        return count.length;
    }
}      

DataSourceUtils

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class DataSourceUtils {

    public static void main(String[] args){
      Connection conn=  DataSourceUtils.getConnection();
      System.out.println("conn is  :  "+conn);
    }

    //建立一個成員變量
    private static DataSource ds;

    /**
     * 加載的代碼寫在靜态代碼塊中
     */
    static {
        try {
            Properties info = new Properties();
            //加載類路徑下,即src目錄下的druid.properties這個檔案
            info.load(DataSourceUtils.class.getResourceAsStream("/druid.properties"));

            //讀取屬性檔案建立連接配接池
            ds = DruidDataSourceFactory.createDataSource(info);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 得到資料源
     */
    public static DataSource getDataSource() {
        return ds;
    }

    /**
     * 得到連接配接對象
     */
    public static Connection getConnection() {
        try {
            return ds.getConnection();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }


    /**
     * 釋放資源
     */
    public static void close(Connection conn, Statement stmt, ResultSet rs) {
        if (rs!=null) {
            try {
                rs.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (stmt!=null) {
            try {
                stmt.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (conn!=null) {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }


    public static void close(Connection conn, Statement stmt) {
        close(conn, stmt, null);
    }


}      

druid.properties

# 配置連接配接池的參數
initialSize=50
maxActive=200
maxWait=600000
minIdle=5



driverClassName=com.mysql.jdbc.Driver
url=jdbc:mysql://zjj101:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC&useSSL=false
username=root
password=root      

sql腳本

CREATE TABLE `test`.`user` (
  `id` INT NOT NULL,
  `name` VARCHAR(45) NULL,
  `createdTime` timestamp NULL,
  `updatedTime` timestamp NULL,
  PRIMARY KEY (`id`))
COMMENT = '使用者測試表';

ALTER TABLE `test`.`user`
ADD INDEX `index` (`id` ASC);


SELECT count(*) FROM test.user;
# delete  from test.user;
SELECT *  FROM test.user  order by  id desc;      

操作說明:

  1. 執行sql腳本

    2.執行Producer類即可

其它優化:

池技術為什麼能提升性能?

連接配接池:

tomcat連接配接池,資料庫連接配接池等等,通過複用連接配接來減少建立和釋放連接配接的時間來提升性能.

線程池:

線程池和連接配接池也是一樣的,通過複用連接配接來減少建立和釋放連接配接的時間來提升性能.

druid資料庫連接配接池性能調優

# 配置連接配接池的參數
 initialSize=50
 # 連接配接池的最大資料庫連接配接數。設為0表示無限制。
 maxActive=200
 # 最大建立連接配接等待時間。如果超過此時間将接到異常。設為-1表示無限制。
 maxWait=600000
 # 連接配接池中的最小空閑連接配接數,Druid會定時掃描連接配接池的連接配接,如果空閑的連接配接數大于該值,則關閉多餘的連接配接,反之則建立更多的連接配接以滿足最小連接配接數要求。
 minIdle=5      

MySQL的核心參數優化

配置 “my.cnf” 檔案裡面的innodb_thread_concurrency 的配置,這個是調整線程的并發數 ,配置完了别忘了重新開機MySQL服務

當 innodb_thread_concurrency=12

執行程式結果:

我測試一下,第一次是 79秒 第二次88秒 第三次92秒, 不知道為什麼 一次比一次多了.

當innodb_thread_concurrency=32的時候

我測試了一下第一次是67秒 ,第二次是62秒

另外,其它參數也可以修改:

innodb_buffer_pool_size 參數

max_allowed_packet 參數配置

代碼Git位址