import pandas as pd
from sqlalchemy import create_engine
from urllib import parse
import uuid
import numpy as np
import time
def write_data_mysql(db_conf, mode='r', table_name=None,dbname = 'finance_task_center_0',times=6,batch_size=500000):
"""
:param db_conf: 資料庫配置
:param mode: 模式開啟:r讀取, w寫入
:param table_name: 分表名稱
:param dbname: 資料庫db名
:param times: 分幾次寫入
:param batch_size: 每次寫入多少行資料
:return: None,預設每次寫入300w
"""
user = db_conf["user"]
pwd = parse.quote_plus(db_conf["pwd"])
host = db_conf["host"]
port = db_conf["port"]
# dbname = 'finance_task_center_0'
cols = ['task_id', 'id', 'company_id', 'zid', 'uid', 'type_id', 'sid', 'seller_id', 'run_type', 'origin_type',
'execution_time', 'expiration_time', 'finish_time', 'market_id', 'market_code', 'repeat', 'message_id',
'min_version', 'version', 'priority', 'status', 'data', 'result', 'gmt_create', 'gmt_modified']
connect_info = f"mysql+pymysql://{user}:{pwd}@{host}:{port}/" +f"{dbname}"
engine = create_engine(connect_info)
if mode == 'r':
df = pd.read_sql_query('select * from {} limit 1'.format(table_name), engine)
print('===========讀取資料==============:')
print(df.columns.tolist())
print(df.values.tolist()[0])
if mode == 'w':
for t in range(times):
st =time.time()
genrator = gen_data(batch_size)
# for i,v in enumerate(genrator):
df = pd.DataFrame(genrator, columns=cols)
print(df)
end = time.time()
df.to_sql(name=table_name, con=engine, if_exists='append', schema=dbname, index=False)
print(f"=第{t+1}次寫入行數{batch_size}==========finish=============== table_name : {table_name},cost time {end-st} 秒")
print(f"\n@@@@@@@@=========={table_name}寫入{(times*batch_size)} 行完成=================@@@@@@@@@@@@@@")
def gen_data(nums):
"""
"""
data =[]
for k in range(nums):
taskid = abs(hash(uuid.uuid1().hex))
type_id = np.random.choice([1, 2, 3, 4, 5], 1, True, [0.4, 0.3, 0.1, 0.1, 0.1])[0]
row_data=[
taskid, # task id
4,# id
9012499355034237, # companyid,
100026, # zid
0, # UID
type_id, # type_id,
117, # `sid`,
'A303OEQ77COZA8', # `seller_id`,
1 , # runtype
1, # origin type
'2022-05-28 19:26:00', # excute time
'2022-05-28 20:00:00' , # expire time
'0000-00-00 00:00:00', # finish time
1, # `market_id`,
'US', # market_code`,
0, # `repeat`,
'', # message id
'*', # `min_version`,
'', # version
12, # `priority`,
0, #status,
'{"report_date_type":"custom", "report_date_month":"2022-04", ' \
'"report_custom_date_end":"2022-04-02", "report_custom_date_start":"2022-04-02"}', # `data`,
'' , # `result
'2022-05-31 17:37:58',
'2022-05-31 17:37:58'
]
data.append(row_data)
return data
if __name__ == '__main__':
db_conf2 = {
"user": "test",
"pwd": "test@vW9JMvL",
"host": "192.188.119.1",
"port": "3306",
"db_name": "finance_task_center_0"
}
table_name = 'client_tasks_33' # 'runoob_tbl'
s = write_data_mysql(db_conf2, table_name=table_name, mode='w',times=2,batch_size=1)