import itertools
import logging
import threading
import pandas as pd
from clickhouse_driver import Client
logger = logging.getLogger()
# CK用戶端類
class CKClient(Client):
def __init__(self, *args, **kwargs):
# 初始化用戶端
super(CKClient, self).__init__(*args, **kwargs)
# 線程鎖
self._thread_lock = threading.Lock()
@property
def connected(self):
return self.connection.connected
@property
def blocked(self):
return self._thread_lock.locked()
# 普通查詢
def query(self, *args, **kwargs):
with self._thread_lock:
return super(CKClient, self).execute(*args, **kwargs)
# 二進制流的方式查詢
def query_stream(self, *args, **kwargs):
with self._thread_lock:
return super(CKClient, self).execute_iter(*args, **kwargs)
# CK連接配接池對象類
class CKConnectPool(object):
def __init__(self, host, tcp_port=9000, min_count=10, max_count=30, **kwargs):
# 用戶端連接配接參數,伺服器位址,端口,其他參數
self.connect_args = {"host": host, "port": tcp_port, **kwargs}
# 最大連接配接數,預設:30個
self.max_connections = max_count
# 是否關閉
self.is_closed = False
# 用戶端連接配接池對象
self._connect_pool = list()
# 已經使用的連接配接,kv方式存放到字典中,k:對象位址,v:連接配接用戶端對象
self._used_connect = dict()
# 線程鎖
self._thread_lock = threading.Lock()
# 建立預設數量連接配接到連接配接池
self.init_connect_pool(min_count)
# 預設初始化最小連接配接數到連接配接池
def init_connect_pool(self, min_connections=10):
for c in range(0, min_connections):
self._connect_pool.append(self.get_connect())
# 添加連接配接用戶端到連接配接池
def add_pool(self, client):
with self._thread_lock:
# 添加關閉的連接配接,非阻塞的連接配接用戶端到池子中
if client.connected and not client.blocked:
self._connect_pool.append(client)
# 普通查詢
def query(self, sql, with_column_types=True, *args, **kwargs):
# 擷取一個連接配接,池子中有直接擷取一個,否則建立一個新的連接配接用戶端
client = self.get_connect()
try:
# 執行查詢
res = client.execute(sql, with_column_types=with_column_types, *args, **kwargs)
# 添加到連接配接池中
self.add_pool(client)
return res
except Exception as e:
logger.exception("CK資料庫正常方式執行查詢異常: %s" % e)
self._delete_connect(client)
return []
# 二進制流方式查詢
def query_stream(self, sql, with_column_types=True, *args, **kwargs):
client = self.get_connect()
try:
# 執行查詢
for connect in client.execute_iter(sql, with_column_types=with_column_types, *args, **kwargs):
yield connect
# 添加到連接配接池中
self.add_pool(client)
except Exception as e:
logger.exception("CK資料庫二進制流方式執行查詢異常: %s" % e)
self._delete_connect(client)
yield []
# 擷取一個連接配接用戶端
def get_connect(self):
# 如果使用的連接配接數量大于最大連接配接數量,則循環等待直到擷取到連接配接
while len(self._used_connect) >= self.max_connections:
for client in itertools.cycle(self._used_connect.values()):
if len(self._connect_pool) > 0:
client = self._connect_pool.pop()
if self._is_usable(client):
return client
with self._thread_lock:
if len(self._connect_pool) > 0:
client = self._connect_pool.pop()
else:
client = self._create_client()
return client
# 關閉連接配接
def disconnect(self):
self._colse_connect()
# 關閉連接配接
def close(self):
self._colse_connect()
# 連接配接是否可用
def _is_usable(self, client):
if client.blocked:
return False
if not client.connected:
try:
client.connection.force_connect()
except Exception:
if id(client) in self._used_connect:
self._delete_connect(id(client))
return False
return True
# 關閉連結
def _colse_connect(self, clients=None):
with self._thread_lock:
if self.is_closed:
return
clients = clients or self._connect_pool + list(self._used_connect.values())
try:
for client in clients:
client.disconnect()
self.is_closed = True
except Exception as e:
logger.warning("關閉CK資料庫連結異常: %s" % e)
# 關閉并清除使用的連接配接
def _delete_connect(self, client):
# 關閉連接配接
self._colse_connect([client])
with self._thread_lock:
# 清除使用的連接配接
del self._used_connect[id(client)]
# 添加使用的連接配接用戶端
def _add_connect(self, client):
self._used_connect[id(client)] = client
# 建立連接配接用戶端
def _create_client(self):
client = CKClient(**self.connect_args)
self._add_connect(client)
return client
# 自動關閉連接配接
def __del__(self):
self._colse_connect()
# 根據配置建立CK資料庫連接配接池
def create_ck_dbpool():
# 以下配置可以通過配置檔案加載
user = '使用者名'
password = '密碼'
# 資料庫位址
host = 'ip'
# 端口号,Client模式走tcp協定端口号預設為:9000
port = 9000
# 資料庫名稱
dbname = '資料庫名稱'
# 最小連接配接數量
min_count = 30
# 最大連接配接數量
max_count = 100
# 其他配置參數
db_config = dict()
db_config['database'] = dbname
db_config['user'] = user
db_config['password'] = password
db_config['settings'] = {'use_numpy': True}
ck_connect_pool = CKConnectPool(host, port, min_count, max_count, **db_config)
return ck_connect_pool
# 建立用戶端模式連接配接池對象執行個體
CKDBClientPool = create_ck_dbpool()
# clickhouse 查詢器
class CKDBClientHelper(object):
@staticmethod
def save_dataframe(table_name, data_df):
"""
插入資料到ck資料庫
:param table_name: save data table
:param data_df: pandas DataFrame.
:desc 儲存速度優于batch_save,未經過大量測試
:return: object
"""
if data_df.empty:
return True
rv = None
df = data_df.copy()
try:
logger.info(f"[Client] batch insert dataframe to {table_name}... total: {len(df)}.")
# 轉置object、非空特殊類型處理
target_df = df.astype(object).where(df.notnull(), None)
# 預設df列與表字段一直
cols = ", ".join(target_df.columns.tolist())
connect = CKDBClientPool.get_connect()
insert_sql = f" INSERT INTO {table_name} ({cols}) VALUES "
rv = connect.insert_dataframe(insert_sql, target_df)
logger.info(f"[Client] 資料插入資料庫表: {table_name} 完成,插入總資料量為: {rv}.")
return rv
except Exception as e:
logger.error(f"[Client] batch insert dataframe to {table_name} error: {e}", exc_info=True)
return rv
@staticmethod
def query_dataframe(sql, params=None, coerce_float=False):
"""
查詢 DataFrame
:param sql: query sql
:param params: Defaults to ``None``
substitution parameters.
:param coerce_float bool, default False
Attempt to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets.
:return: pandas DataFrame
"""
res_df = pd.DataFrame()
try:
connect = CKDBClientPool.get_connect()
if coerce_float is False:
res_df = connect.query_dataframe(sql, params=params, settings={'use_numpy': True})
else:
data, columns = connect.execute(sql, params=params, with_column_types=True, settings={'use_numpy': True})
# columns = [re.sub(r'\W', '_', col[0]) for col in columns]
res_df = pd.DataFrame.from_records(data, columns=columns, coerce_float=coerce_float)
logger.info(f"[Client] query runtime:s.")
return res_df
except Exception as e:
logger.error(f"[Client] query data for dataframe error: {e}", exc_info=True)
return res_df
if __name__ == '__main__':
# 用戶端連接配接池模插入或式查詢ck資料庫
# 插入資料庫表名稱
table_name = ''
# 插入資料
data_df = pd.DataFrame()
CKDBClientHelper.save_dataframe(table_name, data_df)
# 查詢語句
query_sql = 'select 字段 from 表名稱'
# 查詢參數
params = dict()
CKDBClientHelper.query_dataframe(query_sql, params)