有時候需要連接配接第三方的各種資料源,總是要去寫不同的代碼,于是将MaxCompute, Hive, Oracle, Mysql等JDBC連接配接封裝起來,隻需要傳入不同的參數即可建立一個不同類型的連接配接池。
連接配接參數基礎類封裝
封裝了JDBC基礎的連接配接參數,如果不需要這些屬性可以繼承該類,增加新的屬性即可。
@Data
public class BaseJdbcConnParam implements Serializable {
/**
* driver name
*/
private String driverName;
/**
* IP
*/
private String ip;
/**
* db server port
*/
private Integer port;
/**
* db name
*/
private String dbName;
/**
* db connection username
*/
private String username;
/**
* db connection password
*/
private String password;
}
抽象連接配接工具類封裝
功能如下:
- 1、構造函數:根據連接配接參數不同建構不同的連接配接對象
- 2、建構具體的連接配接,子類實作buildConnection()
- 3、擷取連接配接,建構好之後直接擷取getConnection()
/**
* @Description 抽象連接配接工具類父類
* @Author itdl
* @Date 2022/08/15 09:54
*/
public abstract class AbstractConnUtil<P extends BaseJdbcConnParam> {
/**
* connection params
*/
protected final P connParam;
/**
* jdbc connection object
*/
protected final Connection connection;
/**
* 構造函數, 構造工具類對象
* @param connParam 連接配接參數
*/
public AbstractConnUtil(P connParam) {
this.connParam = connParam;
this.connection = buildConnection();
}
/**
* 建構連接配接對象
* @return 連接配接對象
*/
protected abstract Connection buildConnection();
/**
* 擷取連接配接
*/
public Connection getConnection() {
return connection;
}
}
連接配接池管理
功能如下:
- 1、根據不同的連接配接參數,和最大連接配接數去建立一個對應類型的連接配接池。
- 2、擷取連接配接方法,如果連接配接沒有了,等待其他線程釋放(最多等待十分鐘)
- 3、釋放連接配接方法,将連接配接放回連接配接池,然後喚醒等待的線程
- 4、關閉連接配接池所有的連接配接
/**
* @Description 連接配接池管理
* @Author itdl
* @Date 2022/08/16 09:42
*/
@Slf4j
public class DbConnPool<T extends BaseJdbcConnParam> {
/**
* 用于存放連接配接
*/
private final LinkedList<Connection> connPool = new LinkedList<Connection>();
/**
* 最大連接配接池數量
*/
private final Integer maxPoolSize;
private final T connParam;
/**
* 構造函數
* @param connParam 連接配接參數
* @param maxPoolSize 連接配接池大小
*/
public DbConnPool(T connParam, Integer maxPoolSize) {
this.maxPoolSize = maxPoolSize;
this.connParam = connParam;
// 初始化連接配接池
for (int i = 0; i < maxPoolSize; i++) {
connPool.addLast(this.createConnection());
}
}
/**
* 建立資料庫連接配接
* @return 連接配接
*/
private Connection createConnection() {
if (connParam instanceof OracleJdbcConnParam){
final OracleConnUtil util = new OracleConnUtil((OracleJdbcConnParam) connParam);
return util.getConnection();
}
if (connParam instanceof HiveJdbcConnParam){
final HiveConnUtil util = new HiveConnUtil((HiveJdbcConnParam) connParam);
return util.getConnection();
}
if (connParam instanceof MysqlJdbcConnParam){
final MysqlConnUtil util = new MysqlConnUtil((MysqlJdbcConnParam) connParam);
return util.getConnection();
}
if (connParam instanceof MaxComputeJdbcConnParam){
final MaxComputeJdbcUtil util = new MaxComputeJdbcUtil((MaxComputeJdbcConnParam) connParam);
return util.getConnection();
}
throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);
}
/**
* 擷取連接配接
* @return 連接配接
*/
public synchronized Connection getConnection(){
if (connPool.size() == 0){
// throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR);
// 最長等待十分鐘
try {
log.info("==========連接配接池已經空了, 請等待其他線程釋放==========");
wait(10 * 60 * 1000);
} catch (InterruptedException e) {
log.info("==========連接配接池已經空了, 等待了10分鐘還沒有釋放,抛出異常==========");
e.printStackTrace();
throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR);
}
}
// 去除最上面一個連接配接 如果沒有連接配接了,将會抛出異常
return connPool.removeFirst();
}
/**
* 用完後釋放連接配接
* @param conn 要釋放的連接配接
*/
public synchronized void freeConnection(Connection conn){
// 通知連接配接已經釋放
notifyAll();
this.connPool.addLast(conn);
}
/**
* 關閉連接配接池
*/
public synchronized void close(){
for (Connection connection : connPool) {
SqlUtil.close(connection);
}
}
}
SQL操作工具類
根據連接配接對象Connection和資料庫房源,封裝不同的sql執行。執行SQL核心功能封裝。
/**
* @Description SQL操作工具類
* @Author itdl
* @Date 2022/08/10 17:13
*/
@Slf4j
public class SqlUtil {
/**查詢mysql表注釋sql*/
public static final String SELECT_TABLES_MYSQL = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'";
/**查詢MaxCompute表注釋sql*/
public static final String SELECT_TABLES_MAX_COMPUTE = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'";
/**查詢oracle表注釋sql*/
public static final String SELECT_TABLES_ORACLE = "SELECT t2.TABLE_NAME as table_name, t2.COMMENTS as table_comment FROM user_tables t1 inner join user_tab_comments t2 on t1.TABLE_NAME = t2.TABLE_NAME";
/**查詢hive表注釋sql, 先查詢表名,根據表名擷取建表語句,正則提取表注釋*/
public static final String SELECT_TABLES_HIVE = "show tables";
public static final String SELECT_TABLES_2_HIVE = "describe extended %s";
/**分頁數量統計Mysql*/
private static final String SELECT_COUNT_MYSQL = "select count(1) from (%s) z";
/**分頁數量統計MaxCompute*/
private static final String SELECT_COUNT_MAX_COMPUTE = "select count(1) from (%s) z;";
/**分頁數量統計Hive*/
private static final String SELECT_COUNT_ORACLE = "select count(1) from (%s) z";
/**分頁數量統計Oracle*/
private static final String SELECT_COUNT_HIVE = "select count(1) from (%s) z";
/**maxCompute開啟全表掃描sql*/
private static final String FULL_SCAN_MAX_COMPUTE = "set odps.sql.allow.fullscan=true;";
/**分頁查詢sql-Mysql*/
private static final String SELECT_PAGE_MYSQL = "select z.* from (%s) z limit %s, %s";
/**分頁查詢sql-MaxCompute*/
private static final String SELECT_PAGE_MAX_COMPUTE = "select z.* from (%s) z limit %s, %s;";
/**分頁查詢sql-Hive*/
private static final String SELECT_PAGE_HIVE = "select * from (select row_number() over () as row_num_01,u.* from (%s) u) mm where mm.row_num_01 between %s and %s";
/**分頁查詢sql-Oracle*/
private static final String SELECT_PAGE_ORACLE = "select * from (SELECT ROWNUM as row_num_01,z.* from (%s) z) h where h.row_num_01 > %s and h.row_num_01 <= %s";
/**資料庫連接配接*/
private final Connection connection;
/**資料庫方言*/
private final Integer dbDialect;
/**支援的方言清單*/
private static final List<Integer> supportDbTypes =
Arrays.asList(DbDialectEnum.ORACLE.getCode(), DbDialectEnum.HIVE.getCode(), DbDialectEnum.MYSQL.getCode(), DbDialectEnum.MAX_COMPUTE.getCode());
public SqlUtil(Connection connection, Integer dbDialect) {
if (!supportDbTypes.contains(dbDialect)){
throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);
}
this.connection = connection;
this.dbDialect = dbDialect;
}
/**
* 根據connection擷取所有的表和對應的注釋
*/
public List<TableMetaInfo> getTables(String schemaName){
List<TableMetaInfo> result = new ArrayList<>();
String sql = "";
switch (this.dbDialect){
case 1:
sql = SELECT_TABLES_ORACLE;
break;
case 2:
sql = SELECT_TABLES_HIVE;
break;
case 3:
if (StringUtils.isBlank(schemaName)){
throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR);
}
sql = String.format(SELECT_TABLES_MYSQL, schemaName);
break;
case 4:
if (StringUtils.isBlank(schemaName)){
throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR);
}
sql = String.format(SELECT_TABLES_MAX_COMPUTE, schemaName);
default:
break;
}
if (StringUtils.isBlank(sql)){
throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);
}
// 執行SQL語句
final List<LinkedHashMap<String, Object>> resultMaps = querySql(sql);
if (ObjectUtils.isEmpty(resultMaps)){
return Lists.newArrayList();
}
// hive單獨處理
List<TableMetaInfo> result1 = getHiveTableMetaInfos(result, resultMaps);
if (result1 != null) return result1;
// 轉換結果
return resultMaps.stream().map(
m->{
final TableMetaInfo info = new TableMetaInfo();
Object tableNameObj = m.get("table_name");
String tableName = tableNameObj == null ? m.get("TABLE_NAME") == null ? "" : String.valueOf(m.get("TABLE_NAME")) : String.valueOf(tableNameObj);
Object tableCommentObj = m.get("table_comment");
String tableComment = tableCommentObj == null ? m.get("TABLE_COMMENT") == null ? "" : String.valueOf(m.get("TABLE_COMMENT")) : String.valueOf(tableCommentObj);
info.setTableName(tableName);
info.setComment(tableComment);
return info;
}
).collect(Collectors.toList());
}
/**
* 根據schemeName,表名擷取字段清單
* @param tableName 一般是資料庫 oracle是使用者名
*/
public List<TableColumnMetaInfo> getColumnsByTableName(String tableName){
try {
List<TableColumnMetaInfo> list = new ArrayList<>();
final DatabaseMetaData metaData = connection.getMetaData();
final ResultSet columns = metaData.getColumns(null, null, tableName, null);
while (columns.next()){
String columnName = columns.getString("COLUMN_NAME");
String remarks = columns.getString("REMARKS");
remarks = StringUtils.isBlank(remarks) ? "" : remarks;
final TableColumnMetaInfo metaInfo = new TableColumnMetaInfo(tableName, columnName, remarks);
list.add(metaInfo);
}
return list;
} catch (SQLException e) {
e.printStackTrace();
return Lists.newArrayList();
}
}
/**
* 執行sql查詢
* @param querySql 查詢sql
* @return List<Map<String, Object>> 通過LinkedHashMap接受,序列化時可保證順序一緻
*/
public List<LinkedHashMap<String, Object>> queryData(String querySql, boolean... fullScan){
Statement statement = null;
ResultSet resultSet = null;
try {
// 建立statement
statement = this.connection.createStatement();
// 執行全表掃描sql
for (boolean b : fullScan) {
if (b){
statement.execute(FULL_SCAN_MAX_COMPUTE);
break;
}
}
// 執行查詢語句
resultSet = statement.executeQuery(querySql);
// 建構結果傳回
return buildListMap(resultSet);
} catch (SQLException e) {
e.printStackTrace();
throw new BizException(ResultCode.SQL_EXEC_ERR);
} finally {
// 關閉resultSet, statement
close(resultSet, statement);
}
}
/**
* 執行sql查詢
* @param querySql 查詢sql
* @return List<Map<String, Object>>
*/
public List<LinkedHashMap<String, Object>> queryData(String querySql, Integer page, Integer size){
Statement statement = null;
ResultSet resultSet = null;
try {
// 1、替換分号
querySql = querySql.replaceAll(";", "");
// 建立statement
statement = this.connection.createStatement();
// 2、格式化SQL
int offset = (page - 1 ) * size;
String execSql = "";
switch (this.dbDialect){
case 1:
// oracle
execSql = String.format(SELECT_PAGE_ORACLE, querySql, offset, size);
break;
case 2:
// hive
execSql = String.format(SELECT_PAGE_HIVE, querySql, offset, size);
break;
case 3:
// mysql
execSql = String.format(SELECT_PAGE_MYSQL, querySql, offset, size);
break;
case 4:
// maxCompute
execSql = String.format(SELECT_PAGE_MAX_COMPUTE, querySql, offset, size);
break;
default:
break;
}
// maxCompute開啟全表掃描
if (DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect)){
statement.execute(FULL_SCAN_MAX_COMPUTE);
}
log.info("=======>>>執行分頁sql為:{}", execSql);
// 執行查詢語句
resultSet = statement.executeQuery(execSql);
// 建構結果傳回
return buildListMap(resultSet);
} catch (SQLException e) {
e.printStackTrace();
throw new BizException(ResultCode.SQL_EXEC_ERR);
} finally {
// 關閉resultSet, statement
close(resultSet, statement);
}
}
/**
* 執行分頁查詢
* @param querySql 分頁查詢sql
* @param page 頁碼 從1開始 第n頁傳n
* @param size 每頁記錄數
* @return 分頁查詢結果
*/
public PageResult<LinkedHashMap<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){
// 1、替換分号
querySql = querySql.replaceAll(";", "");
String countSql = "";
switch (this.dbDialect){
case 1:
// oracle
countSql = String.format(SELECT_COUNT_ORACLE, querySql);
break;
case 2:
// hive
countSql = String.format(SELECT_COUNT_HIVE, querySql);
break;
case 3:
// mysql
countSql = String.format(SELECT_COUNT_MYSQL, querySql);
break;
case 4:
// maxCompute
countSql = String.format(SELECT_COUNT_MAX_COMPUTE, querySql);
break;
default:
break;
}
log.info("=======>>>執行分頁統計總數sql為:{}", countSql);
// 查詢總數
final List<LinkedHashMap<String, Object>> countMap = queryData(countSql, DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect));
if (CollectionUtils.isEmpty(countMap)){
return new PageResult<>(0L, new ArrayList<>());
}
long count = 0L;
for (Object value : countMap.get(0).values()) {
count = Long.parseLong(String.valueOf(value));
}
if (count == 0){
return new PageResult<>(0L, new ArrayList<>());
}
// 執行分頁查詢 開啟全表掃描
final List<LinkedHashMap<String, Object>> resultList = queryData(querySql, page, size);
return new PageResult<>(count, resultList);
}
/**
* 執行分頁查詢
* @param querySql 分頁查詢sql
* @param page 頁碼 從1開始 第n頁傳n
* @param size 每頁記錄數
* @return 分頁查詢結果
*/
public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){
final PageResult<LinkedHashMap<String, Object>> result = pageQueryMap(querySql, page, size);
List<T> rows = new ArrayList<>();
for (LinkedHashMap<String, Object> row : result.getRows()) {
final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);
rows.add(t);
}
return new PageResult<>(result.getTotal(), rows);
}
/**
* 擷取hive的表注釋
* @param result 結果
* @param resultMaps show tables結果
* @return List<TableMetaInfo>
*/
private List<TableMetaInfo> getHiveTableMetaInfos(List<TableMetaInfo> result, List<LinkedHashMap<String, Object>> resultMaps) {
if (dbDialect.equals(DbDialectEnum.HIVE.getCode())){
for (LinkedHashMap<String, Object> resultMap : resultMaps) {
final String tabName = String.valueOf(resultMap.get("tab_name"));
final String descTableCommentSql = String.format(SELECT_TABLES_2_HIVE, tabName);
List<LinkedHashMap<String, Object>> resultMapsComments = querySql(descTableCommentSql);
// col_name -> Detailed Table Information
String comments = resultMapsComments.stream()
.filter(m -> "Detailed Table Information".equals(m.get("col_name")))
.map(m -> String.valueOf(m.get("data_type"))).findFirst()
.orElse("");
comments = ReUtil.get("parameters:\\{(?!.*?\\().*transient_lastDdlTime.*?comment=(.*?)\\}", comments,1);
if (StringUtils.isBlank(comments)) {
comments = "";
}
if (comments.contains(",")){
comments = comments.substring(0, comments.lastIndexOf(","));
}
result.add(new TableMetaInfo(tabName, comments));
log.info("===========>>>擷取表{}的注釋成功:{}", tabName, comments);
resultMapsComments.clear();
}
return result;
}
return null;
}
/**
* 執行SQL查詢
* @param sql sql語句
* @return 資料清單,使用LinkedHashMap是為了防止HashMap序列化後導緻順序亂序
*/
public List<LinkedHashMap<String, Object>> querySql(String sql){
// 執行sql
Statement statement = null;
ResultSet resultSet = null;
try {
statement = connection.createStatement();
resultSet = statement.executeQuery(sql);
return buildListMap(resultSet);
} catch (SQLException e) {
e.printStackTrace();
throw new BizException(ResultCode.SQL_EXEC_ERR);
}finally {
// 關閉
close(resultSet, statement);
}
}
/**
* 關閉對象 傳入多個時注意順序, 需要先關閉哪個就傳在參數前面
* @param objs 對象動态數組
*/
public static void close(Object ...objs){
if (objs == null || objs.length == 0){
return;
}
for (Object obj : objs) {
if (obj instanceof Statement){
try {
((Statement) obj).close();
}catch (Exception e){
e.printStackTrace();
}
}
if (obj instanceof ResultSet){
try {
((ResultSet) obj).close();
}catch (Exception e){
e.printStackTrace();
}
}
if (obj instanceof Connection){
try {
((Connection) obj).close();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
/**
* @Description 功能描述:将resultSet構造為List<Map>
* @Author itdl
* @Date 2022/4/18 21:13
* @Param {@link ResultSet} resultSet
* @Return {@link List < Map <String,Object>>}
**/
private List<LinkedHashMap<String, Object>> buildListMap(ResultSet resultSet) throws SQLException {
if (resultSet == null) {
return Lists.newArrayList();
}
List<LinkedHashMap<String, Object>> resultList = new ArrayList<>();
// 擷取中繼資料
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
// 擷取列數
int columnCount = metaData.getColumnCount();
LinkedHashMap<String, Object> map = new LinkedHashMap<>();
for (int i = 0; i < columnCount; i++) {
String columnName = metaData.getColumnName(i + 1);
// 過濾掉查詢的結果包含序号的
if("mm.row_num_01".equalsIgnoreCase(columnName)
|| "row_num_01".equalsIgnoreCase(columnName)){
continue;
}
// 去除hive查詢結果的mm.别名字首
if (columnName.startsWith("mm.")){
columnName = columnName.substring(columnName.indexOf(".") + 1);
}
Object object = resultSet.getObject(columnName);
// maxCompute裡面的空傳回的是使用\n
if ("\\N".equalsIgnoreCase(String.valueOf(object))) {
map.put(columnName, "");
} else {
map.put(columnName, object);
}
}
resultList.add(map);
}
return resultList;
}
}
MaxCompute JDBC連接配接池封裝
MaxCompute 已經有了JDBC連接配接方式 也就是 odbc-jdbc, 最終能夠擷取一個Connection. 官方文檔:https://help.aliyun.com/document_detail/161246.html
封裝MaxCompute JDBC連接配接參數
/**
* @author itdl
* @description maxCompute使用JDBC的連接配接參數
* @date 2022/08/08 10:07
*/
@Data
public class MaxComputeJdbcConnParam extends BaseJdbcConnParam{
/**阿裡雲accessId 相當于使用者名 */
private String aliyunAccessId;
/**阿裡雲accessKey 相當于密碼 */
private String aliyunAccessKey;
/** maxcompute_endpoint */
private String endpoint;
/**項目名稱*/
private String projectName;
}
封裝MaxCompute JDBC連接配接實作類
就是實作父類AbstractConnUtil,實作抽象方法buildConnection
/**
* @Description maxCompute JDBC連接配接實作
* @Author itdl
* @Date 2022/08/08 14:26
*/
@Slf4j
public class MaxComputeJdbcUtil extends AbstractConnUtil<MaxComputeJdbcConnParam>{
/**JDBC 驅動名稱*/
private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";
/**
* 構造函數, 構造工具類對象
*
* @param connParam 連接配接參數
*/
public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) {
super(connParam);
}
@Override
protected Connection buildConnection() {
return buildConn();
}
/**
* 建立連接配接
* @return 資料庫連接配接
*/
private Connection buildConn() {
try {
Class.forName(DRIVER_NAME);
} catch (ClassNotFoundException e) {
e.printStackTrace();
throw new BizException(ResultCode.MAX_COMPUTE_DRIVE_LOAD_ERR);
}
try {
Properties dbProperties = new Properties();
dbProperties.put("user", connParam.getAliyunAccessId());
dbProperties.put("password", connParam.getAliyunAccessKey());
dbProperties.put("remarks", "true");
// JDBCURL連接配接模闆
String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true";
// 使用驅動管理器連接配接擷取連接配接
return DriverManager.getConnection(
String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()), dbProperties);
} catch (SQLException e) {
e.printStackTrace();
throw new BizException(ResultCode.CONN_USER_PWD_ERR);
}
}
}
連接配接測試代碼一起放在結尾,将會開啟多個線程擷取連接配接,然後去擷取表名,表注釋,字段名,字段注釋,傳入page, size和普通sql就可以實作分頁查詢的封裝方法
Hive JDBC連接配接池封裝
Hive JDBC連接配接參數
Hive連接配接參數封裝,除了基礎的JDBC所需字段,還需要kerberos相關字段,因為hive開啟kerberos認證後,需要使用kertab密鑰檔案和kbr5.conf配置檔案去認證。将會在參數和測試代碼中得到重複的展現。
/**
* @Description Hive JDBC connection params
* @Author itdl
* @Date 2022/08/10 16:40
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class HiveJdbcConnParam extends BaseJdbcConnParam {
/**
* enable kerberos authentication
*/
private boolean enableKerberos;
/**
* principal
*/
private String principal;
/**
* kbr5 file path in dick
*/
private String kbr5FilePath;
/**
* keytab file path in dick
*/
private String keytabFilePath;
}
Hive JDBC擷取連接配接實作
Hive擷取JDBC連接配接之後,本來可以從Connection的中繼資料中擷取表的注釋,但是擷取的中文注釋居然是亂碼,但是我們Hue上檢視表注釋又是正常,暫時沒找到這種方式如何解決,進而退而求其次,通過表名去擷取建表語句,從建表語句中通過正規表達式提取表的注釋。
/**
* @Description hive connection util
* @Author itdl
* @Date 2022/08/10 16:52
*/
@Slf4j
public class HiveConnUtil extends AbstractConnUtil<HiveJdbcConnParam>{
public HiveConnUtil(HiveJdbcConnParam connParam) {
super(connParam);
}
/**
* 擷取連接配接
* @return 連接配接
*/
public Connection getConnection() {
return connection;
}
@Override
protected Connection buildConnection(){
try {
// Class.forName("org.apache.hive.jdbc.HiveDriver");
Class.forName(connParam.getDriverName());
} catch (ClassNotFoundException e) {
e.printStackTrace();
throw new BizException(ResultCode.HIVE_DRIVE_LOAD_ERR);
}
// 開啟kerberos後需要私鑰
// 拼接jdbcUrl
String jdbcUrl = "jdbc:hive2://%s:%s/%s";
String ip = connParam.getIp();
String port = connParam.getPort() + "";
String dbName = connParam.getDbName();
final String username = connParam.getUsername();
final String password = connParam.getPassword();
// is enable kerberos authentication
final boolean enableKerberos = connParam.isEnableKerberos();
// 格式化
Connection connection;
// 擷取連接配接
try {
Properties dbProperties = new Properties();
dbProperties.put("user", username);
dbProperties.put("password", password);
// 加上remark後, 能夠擷取到标注釋 但是會出現中文亂碼
dbProperties.put("remarks", "true");
if (!enableKerberos) {
jdbcUrl = String.format(jdbcUrl, ip, port, dbName);
connection = DriverManager.getConnection(jdbcUrl, dbProperties);
} else {
final String principal = connParam.getPrincipal();
final String kbr5FilePath = connParam.getKbr5FilePath();
final String secretFilePath = connParam.getKeytabFilePath();
String format = "jdbc:hive2://%s:%s/%s;principal=%s";
jdbcUrl = String.format(format, ip, port, dbName, principal);
// 使用hadoop安全認證
System.setProperty("java.security.krb5.conf", kbr5FilePath);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
// 解決windows中執行可能出現找不到HADOOP_HOME或hadoop.home.dir問題
// Kerberos認證
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
conf.set("keytab.file", secretFilePath);
conf.set("kerberos.principal", principal);
UserGroupInformation.setConfiguration(conf);
try {
UserGroupInformation.loginUserFromKeytab(username, secretFilePath);
} catch (IOException e) {
e.printStackTrace();
throw new BizException(ResultCode.KERBEROS_AUTH_FAIL_ERR);
}
try {
connection = DriverManager.getConnection(jdbcUrl, dbProperties);
} catch (SQLException e) {
e.printStackTrace();
throw new BizException(ResultCode.KERBEROS_AUTH_SUCCESS_GET_CONN_FAIL_ERR);
}
}
log.info("=====>>>擷取hive連接配接成功:username:{},jdbcUrl: {}", username, jdbcUrl);
return connection;
} catch (SQLException e) {
e.printStackTrace();
throw new BizException(ResultCode.HIVE_CONN_USER_PWD_ERR);
} catch (BizException e){
throw e;
}
catch (Exception e) {
e.printStackTrace();
throw new BizException(ResultCode.HIVE_CONN_ERR);
}
}
}
Oracle JDBC連接配接參數封裝
隻需要繼承父類即可
/**
* @Description Oracle連接配接的JDBC參數
* @Author itdl
* @Date 2022/08/15 09:50
*/
public class OracleJdbcConnParam extends BaseJdbcConnParam{
}
Oracle JDBC連接配接實作類
包括了普通使用者的認證和dba使用者的認證
/**
* @Description Oracle擷取jdbc連接配接工具類
* @Author itdl
* @Date 2022/08/15 09:52
*/
@Slf4j
public class OracleConnUtil extends AbstractConnUtil<OracleJdbcConnParam> {
/**
* 構造函數, 構造工具類對象
*
* @param connParam 連接配接參數
*/
public OracleConnUtil(OracleJdbcConnParam connParam) {
super(connParam);
}
@Override
protected Connection buildConnection() {
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
throw new BizException(ResultCode.ORACLE_DRIVE_LOAD_ERR);
}
// 拼接jdbcUrl
String jdbcUrl = "jdbc:oracle:thin:@//%s:%s/%s";
final String ip = connParam.getIp();
final String port = connParam.getPort() + "";
final String dbName = connParam.getDbName();
final String username = connParam.getUsername();
final String password = connParam.getPassword();
// 格式化
jdbcUrl = String.format(jdbcUrl, ip, port, dbName);
// 擷取連接配接
Connection connection;
try {
Properties dbProperties = new Properties();
// 使用者名 如果是dba,則後面跟了as sysdba
String dba = "as sysdba";
dbProperties.put("password", password);
dbProperties.put("remarks", "true");
if (username.trim().endsWith(dba)) {
dbProperties.put("user", username.trim().substring(0, username.trim().indexOf(dba) - 1));
dbProperties.put("defaultRowPrefetch", "15");
dbProperties.put("internal_logon", "sysdba");
connection = DriverManager.getConnection(jdbcUrl, dbProperties);
} else {
dbProperties.put("user", username);
connection = DriverManager.getConnection(jdbcUrl, dbProperties);
}
log.info("=====>>>擷取oracle連接配接成功:username:{}, jdbcUrl: {}", username, jdbcUrl);
return connection;
} catch (SQLException e) {
e.printStackTrace();
if (e.getMessage().contains("TNS:listener")) {
throw new BizException(ResultCode.CONN_LISTENER_UNKNOWN_ERR);
}
if (e.getMessage().contains("ORA-01017")) {
throw new BizException(ResultCode.CONN_USER_PWD_ERR);
}
if (e.getMessage().contains("IO 錯誤: Got minus one from a read call")) {
throw new BizException(ResultCode.CONN_CONN_TOO_MANY_ERR);
}
throw new BizException(ResultCode.CONN_UNKNOWN_ERR);
} catch (Exception e) {
throw new BizException(ResultCode.CONN_UNKNOWN_ERR);
}
}
}
Mysql JDBC連接配接池封裝
Mysql JDBC連接配接參數封裝
隻需要繼承父類即可
/**
* @Description Mysql連接配接的JDBC參數
* @Author itdl
* @Date 2022/08/15 09:50
*/
public class MysqlJdbcConnParam extends BaseJdbcConnParam{
}
/**
* @Description Mysql擷取jdbc連接配接工具類
* @Author itdl
* @Date 2022/08/15 09:52
*/
@Slf4j
public class MysqlConnUtil extends AbstractConnUtil<MysqlJdbcConnParam> {
/**
* 構造函數, 構造工具類對象
*
* @param connParam 連接配接參數
*/
public MysqlConnUtil(MysqlJdbcConnParam connParam) {
super(connParam);
}
@Override
protected Connection buildConnection() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
throw new BizException(ResultCode.MYSQL_DRIVE_LOAD_ERR);
}
// 拼接jdbcUrl
String jdbcUrl = "jdbc:mysql://%s:%s/%s?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8";
final String ip = connParam.getIp();
final String port = connParam.getPort() + "";
final String dbName = connParam.getDbName();
final String username = connParam.getUsername();
final String password = connParam.getPassword();
// 格式化
jdbcUrl = String.format(jdbcUrl, ip, port, dbName);
// 擷取連接配接
try {
Properties dbProperties = new Properties();
dbProperties.put("user", username);
dbProperties.put("password", password);
dbProperties.put("remarks", "true");
// 設定可以擷取tables remarks資訊
dbProperties.setProperty("useInformationSchema", "true");
Connection connection = DriverManager.getConnection(jdbcUrl,dbProperties);
log.info("=====>>>擷取mysql連接配接成功:username:{}, jdbcUrl: {}", username, jdbcUrl);
return connection;
} catch (SQLException e) {
e.printStackTrace();
if (e.getMessage().contains("Unknown database")){
throw new BizException(ResultCode.CONN_UNKNOWN_DB_ERR);
}
throw new BizException(ResultCode.CONN_USER_PWD_ERR);
} catch (Exception e) {
throw new BizException(ResultCode.CONN_UNKNOWN_ERR);
}
}
}
@SpringBootTest(classes = DbConnectionDemoApplication.class)
@RunWith(value = SpringRunner.class)
@Slf4j
class DbConnectionDemoApplicationTests {
private DbConnPool<?> connPool = null;
@Test
public void testMysqlConn() throws InterruptedException {
// 建立連接配接參數
final MysqlJdbcConnParam connParam = new MysqlJdbcConnParam();
final String ip = "localhost";
final Integer port = 3306;
final String username = "root";
final String password = "root";
final String dbname = "test_db";
// 設定參數
connParam.setDriverName(Driver.class.getName());
connParam.setIp(ip);
connParam.setPort(port);
connParam.setUsername(username);
connParam.setPassword(password);
connParam.setDbName(dbname);
// 建立連接配接池
connPool = new DbConnPool<>(connParam, 2);
handler01(dbname, DbDialectEnum.MYSQL);
new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start();
new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start();
Thread.sleep(60 * 1000);
}
@Test
public void testOracleConn() throws InterruptedException {
// 建立連接配接參數
final OracleJdbcConnParam connParam = new OracleJdbcConnParam();
final String ip = "你的Oracle的IP位址";
final Integer port = 1521;
// 如果是admin賬号 使用者後面+ as sysdba
final String username = "使用者名";
final String password = "密碼";
final String dbname = "執行個體/服務名";
// 設定參數
connParam.setDriverName(Driver.class.getName());
connParam.setIp(ip);
connParam.setPort(port);
connParam.setUsername(username);
connParam.setPassword(password);
connParam.setDbName(dbname);
// 建立連接配接池
connPool = new DbConnPool<>(connParam, 2);
final DbDialectEnum dbDialectEnum = DbDialectEnum.ORACLE;
// 處理操作(oracle的schemaName就是使用者名)
handler01(username, dbDialectEnum);
// 建立兩個線程擷取連接配接
new Thread(() -> handler01(username, dbDialectEnum)).start();
new Thread(() -> handler01(username, dbDialectEnum)).start();
Thread.sleep(60 * 1000);
}
@Test
public void testHiveConn() throws InterruptedException {
// 建立連接配接參數
final HiveJdbcConnParam connParam = new HiveJdbcConnParam();
final String ip = "連接配接的域名";
final Integer port = 10000;
// 如果是admin賬号 使用者後面+ as sysdba
final String username = "賬号@域名";
final String password = "";
final String dbname = "資料庫名";
final String principal = "hive/_HOST@域名";
final String kbr5FilePath = "C:\\workspace\\krb5.conf";
final String keytabFilePath = "C:\\workspace\\zhouyu.keytab";
// 設定參數
connParam.setDriverName(Driver.class.getName());
connParam.setIp(ip);
connParam.setPort(port);
connParam.setUsername(username);
connParam.setPassword(password);
connParam.setDbName(dbname);
connParam.setEnableKerberos(true);
connParam.setPrincipal(principal);
connParam.setKbr5FilePath(kbr5FilePath);
connParam.setKeytabFilePath(keytabFilePath);
// 建立連接配接池
connPool = new DbConnPool<>(connParam, 2);
final DbDialectEnum dbDialectEnum = DbDialectEnum.HIVE;
// 處理操作(oracle的schemaName就是使用者名)
handler01(username, dbDialectEnum);
// 建立兩個線程擷取連接配接
new Thread(() -> handler01(username, dbDialectEnum)).start();
new Thread(() -> handler01(username, dbDialectEnum)).start();
Thread.sleep(10 * 60 * 1000);
}
@Test
public void testMaxComputeConn() throws InterruptedException {
// 建立連接配接參數
final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam();
String accessId = "你的阿裡雲accessId";
String accessKey = "你的阿裡雲accessKey";
String endpoint = "http://service.cn-chengdu.maxcompute.aliyun.com/api";
String projectName = "項目名=資料庫名";
// 設定參數
connParam.setDriverName(Driver.class.getName());
connParam.setAliyunAccessId(accessId);
connParam.setAliyunAccessKey(accessKey);
connParam.setEndpoint(endpoint);
connParam.setProjectName(projectName);
// 建立連接配接池
connPool = new DbConnPool<>(connParam, 2);
final DbDialectEnum dbDialectEnum = DbDialectEnum.MAX_COMPUTE;
// 處理操作(oracle的schemaName就是使用者名)
handler01(projectName, dbDialectEnum);
// 建立兩個線程擷取連接配接
new Thread(() -> handler01(projectName, dbDialectEnum)).start();
new Thread(() -> handler01(projectName, dbDialectEnum)).start();
Thread.sleep(60 * 1000);
}
private void handler01(String schemaName, DbDialectEnum dbDialectEnum) {
final Connection connection = connPool.getConnection();
// 建構工具類
final SqlUtil sqlUtil = new SqlUtil(connection, dbDialectEnum.getCode());
// 擷取表和注釋
final List<TableMetaInfo> tables = sqlUtil.getTables(schemaName);
log.info("===============擷取所有表和注釋開始===================");
log.info(tables.toString());
log.info("===============擷取所有表和注釋結束===================");
// 擷取字段和注釋
final String tableName = tables.get(0).getTableName();
final List<TableColumnMetaInfo> columns = sqlUtil.getColumnsByTableName(tableName);
log.info("===============擷取第一個表的字段和注釋開始===================");
log.info(columns.toString());
log.info("===============擷取第一個表的字段和注釋結束===================");
final PageResult<LinkedHashMap<String, Object>> pageResult = sqlUtil.pageQueryMap("select * from " + tableName, 1, 10);
log.info("===============SQL分頁查詢開始===================");
log.info("總數:{}", pageResult.getTotal());
log.info("記錄數:{}", JSONObject.toJSONString(pageResult.getRows()));
log.info("===============SQL分頁查詢結束===================");
connPool.freeConnection(connection);
}
@After
public void close(){
if (connPool != null){
connPool.close();
log.info("==================連接配接池成功關閉================");
}
}
}