天天看點

python 用戶端連接配接池方式操作Clickhouse資料庫

import itertools

import logging

import threading

import pandas as pd

from clickhouse_driver import Client

logger = logging.getLogger()

# CK用戶端類

class CKClient(Client):

def __init__(self, *args, **kwargs):

# 初始化用戶端

super(CKClient, self).__init__(*args, **kwargs)

# 線程鎖

self._thread_lock = threading.Lock()

@property

def connected(self):

return self.connection.connected

@property

def blocked(self):

return self._thread_lock.locked()

# 普通查詢

def query(self, *args, **kwargs):

with self._thread_lock:

return super(CKClient, self).execute(*args, **kwargs)

# 二進制流的方式查詢

def query_stream(self, *args, **kwargs):

with self._thread_lock:

return super(CKClient, self).execute_iter(*args, **kwargs)

# CK連接配接池對象類

class CKConnectPool(object):

def __init__(self, host, tcp_port=9000, min_count=10, max_count=30, **kwargs):

# 用戶端連接配接參數,伺服器位址,端口,其他參數

self.connect_args = {"host": host, "port": tcp_port, **kwargs}

# 最大連接配接數,預設:30個

self.max_connections = max_count

# 是否關閉

self.is_closed = False

# 用戶端連接配接池對象

self._connect_pool = list()

# 已經使用的連接配接,kv方式存放到字典中,k:對象位址,v:連接配接用戶端對象

self._used_connect = dict()

# 線程鎖

self._thread_lock = threading.Lock()

# 建立預設數量連接配接到連接配接池

self.init_connect_pool(min_count)

# 預設初始化最小連接配接數到連接配接池

def init_connect_pool(self, min_connections=10):

for c in range(0, min_connections):

self._connect_pool.append(self.get_connect())

# 添加連接配接用戶端到連接配接池

def add_pool(self, client):

with self._thread_lock:

# 添加關閉的連接配接,非阻塞的連接配接用戶端到池子中

if client.connected and not client.blocked:

self._connect_pool.append(client)

# 普通查詢

def query(self, sql, with_column_types=True, *args, **kwargs):

# 擷取一個連接配接,池子中有直接擷取一個,否則建立一個新的連接配接用戶端

client = self.get_connect()

try:

# 執行查詢

res = client.execute(sql, with_column_types=with_column_types, *args, **kwargs)

# 添加到連接配接池中

self.add_pool(client)

return res

except Exception as e:

logger.exception("CK資料庫正常方式執行查詢異常: %s" % e)

self._delete_connect(client)

return []

# 二進制流方式查詢

def query_stream(self, sql, with_column_types=True, *args, **kwargs):

client = self.get_connect()

try:

# 執行查詢

for connect in client.execute_iter(sql, with_column_types=with_column_types, *args, **kwargs):

yield connect

# 添加到連接配接池中

self.add_pool(client)

except Exception as e:

logger.exception("CK資料庫二進制流方式執行查詢異常: %s" % e)

self._delete_connect(client)

yield []

# 擷取一個連接配接用戶端

def get_connect(self):

# 如果使用的連接配接數量大于最大連接配接數量,則循環等待直到擷取到連接配接

while len(self._used_connect) >= self.max_connections:

for client in itertools.cycle(self._used_connect.values()):

if len(self._connect_pool) > 0:

client = self._connect_pool.pop()

if self._is_usable(client):

return client

with self._thread_lock:

if len(self._connect_pool) > 0:

client = self._connect_pool.pop()

else:

client = self._create_client()

return client

# 關閉連接配接

def disconnect(self):

self._colse_connect()

# 關閉連接配接

def close(self):

self._colse_connect()

# 連接配接是否可用

def _is_usable(self, client):

if client.blocked:

return False

if not client.connected:

try:

client.connection.force_connect()

except Exception:

if id(client) in self._used_connect:

self._delete_connect(id(client))

return False

return True

# 關閉連結

def _colse_connect(self, clients=None):

with self._thread_lock:

if self.is_closed:

return

clients = clients or self._connect_pool + list(self._used_connect.values())

try:

for client in clients:

client.disconnect()

self.is_closed = True

except Exception as e:

logger.warning("關閉CK資料庫連結異常: %s" % e)

# 關閉并清除使用的連接配接

def _delete_connect(self, client):

# 關閉連接配接

self._colse_connect([client])

with self._thread_lock:

# 清除使用的連接配接

del self._used_connect[id(client)]

# 添加使用的連接配接用戶端

def _add_connect(self, client):

self._used_connect[id(client)] = client

# 建立連接配接用戶端

def _create_client(self):

client = CKClient(**self.connect_args)

self._add_connect(client)

return client

# 自動關閉連接配接

def __del__(self):

self._colse_connect()

# 根據配置建立CK資料庫連接配接池

def create_ck_dbpool():

# 以下配置可以通過配置檔案加載

user = '使用者名'

password = '密碼'

# 資料庫位址

host = 'ip'

# 端口号,Client模式走tcp協定端口号預設為:9000

port = 9000

# 資料庫名稱

dbname = '資料庫名稱'

# 最小連接配接數量

min_count = 30

# 最大連接配接數量

max_count = 100

# 其他配置參數

db_config = dict()

db_config['database'] = dbname

db_config['user'] = user

db_config['password'] = password

db_config['settings'] = {'use_numpy': True}

ck_connect_pool = CKConnectPool(host, port, min_count, max_count, **db_config)

return ck_connect_pool

# 建立用戶端模式連接配接池對象執行個體

CKDBClientPool = create_ck_dbpool()

# clickhouse 查詢器

class CKDBClientHelper(object):

@staticmethod

def save_dataframe(table_name, data_df):

"""

插入資料到ck資料庫

:param table_name: save data table

:param data_df: pandas DataFrame.

:desc 儲存速度優于batch_save,未經過大量測試

:return: object

"""

if data_df.empty:

return True

rv = None

df = data_df.copy()

try:

logger.info(f"[Client] batch insert dataframe to {table_name}... total: {len(df)}.")

# 轉置object、非空特殊類型處理

target_df = df.astype(object).where(df.notnull(), None)

# 預設df列與表字段一直

cols = ", ".join(target_df.columns.tolist())

connect = CKDBClientPool.get_connect()

insert_sql = f" INSERT INTO {table_name} ({cols}) VALUES "

rv = connect.insert_dataframe(insert_sql, target_df)

logger.info(f"[Client] 資料插入資料庫表: {table_name} 完成,插入總資料量為: {rv}.")

return rv

except Exception as e:

logger.error(f"[Client] batch insert dataframe to {table_name} error: {e}", exc_info=True)

return rv

@staticmethod

def query_dataframe(sql, params=None, coerce_float=False):

"""

查詢 DataFrame

:param sql: query sql

:param params: Defaults to ``None``

substitution parameters.

:param coerce_float bool, default False

Attempt to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets.

:return: pandas DataFrame

"""

res_df = pd.DataFrame()

try:

connect = CKDBClientPool.get_connect()

if coerce_float is False:

res_df = connect.query_dataframe(sql, params=params, settings={'use_numpy': True})

else:

data, columns = connect.execute(sql, params=params, with_column_types=True, settings={'use_numpy': True})

# columns = [re.sub(r'\W', '_', col[0]) for col in columns]

res_df = pd.DataFrame.from_records(data, columns=columns, coerce_float=coerce_float)

logger.info(f"[Client] query runtime:s.")

return res_df

except Exception as e:

logger.error(f"[Client] query data for dataframe error: {e}", exc_info=True)

return res_df

if __name__ == '__main__':

# 用戶端連接配接池模插入或式查詢ck資料庫

# 插入資料庫表名稱

table_name = ''

# 插入資料

data_df = pd.DataFrame()

CKDBClientHelper.save_dataframe(table_name, data_df)

# 查詢語句

query_sql = 'select 字段 from 表名稱'

# 查詢參數

params = dict()

CKDBClientHelper.query_dataframe(query_sql, params)