天天看點

造資料pandas -sqlalchemy

import pandas as pd
from sqlalchemy import create_engine
from urllib import parse
import uuid
import numpy as  np
import  time

def write_data_mysql(db_conf, mode='r', table_name=None,dbname = 'finance_task_center_0',times=6,batch_size=500000):
    """
    :param db_conf: 資料庫配置
    :param mode: 模式開啟:r讀取, w寫入
    :param table_name: 分表名稱
    :param dbname: 資料庫db名
    :param times:  分幾次寫入
    :param batch_size: 每次寫入多少行資料
    :return: None,預設每次寫入300w
    """
    user = db_conf["user"]
    pwd = parse.quote_plus(db_conf["pwd"])
    host = db_conf["host"]
    port = db_conf["port"]
    # dbname = 'finance_task_center_0'
    cols = ['task_id', 'id', 'company_id', 'zid', 'uid', 'type_id', 'sid', 'seller_id', 'run_type', 'origin_type',
            'execution_time', 'expiration_time', 'finish_time', 'market_id', 'market_code', 'repeat', 'message_id',
            'min_version', 'version', 'priority', 'status', 'data', 'result', 'gmt_create', 'gmt_modified']

    connect_info = f"mysql+pymysql://{user}:{pwd}@{host}:{port}/" +f"{dbname}"
    engine = create_engine(connect_info)
    if mode == 'r':
        df = pd.read_sql_query('select * from {} limit 1'.format(table_name), engine)
        print('===========讀取資料==============:')
        print(df.columns.tolist())
        print(df.values.tolist()[0])
    if mode == 'w':
        for t in range(times):
            st =time.time()
            genrator = gen_data(batch_size)
            # for i,v in enumerate(genrator):
            df = pd.DataFrame(genrator, columns=cols)
            print(df)
            end = time.time()
            df.to_sql(name=table_name, con=engine, if_exists='append', schema=dbname, index=False)
            print(f"=第{t+1}次寫入行數{batch_size}==========finish=============== table_name : {table_name},cost time {end-st} 秒")
        print(f"\n@@@@@@@@=========={table_name}寫入{(times*batch_size)} 行完成=================@@@@@@@@@@@@@@")

def gen_data(nums):
    """
    """
    data =[]
    for k in range(nums):
        taskid = abs(hash(uuid.uuid1().hex))
        type_id = np.random.choice([1, 2, 3, 4, 5], 1, True, [0.4, 0.3, 0.1, 0.1, 0.1])[0]
        row_data=[
            taskid,  # task id
             4,# id
            9012499355034237,  # companyid,
            100026,  # zid
             0, # UID
            type_id,  # type_id,
            117,  # `sid`,
            'A303OEQ77COZA8',  # `seller_id`,
            1  ,  #  runtype
            1, # origin type
            '2022-05-28 19:26:00', # excute time
            '2022-05-28 20:00:00' , # expire time
            '0000-00-00 00:00:00', # finish time
            1,  # `market_id`,
            'US',  # market_code`,
            0,  # `repeat`,
            '', # message  id
            '*',  # `min_version`,
            '', # version
            12,  # `priority`,
            0, #status,
            '{"report_date_type":"custom", "report_date_month":"2022-04", ' \
            '"report_custom_date_end":"2022-04-02", "report_custom_date_start":"2022-04-02"}',  # `data`,
            '' , # `result
            '2022-05-31 17:37:58',
            '2022-05-31 17:37:58'
        ]
        data.append(row_data)
    return  data
if __name__ == '__main__':
    db_conf2 = {
        "user": "test",
        "pwd": "test@vW9JMvL",
        "host": "192.188.119.1",
        "port": "3306",
        "db_name": "finance_task_center_0"
    }

    table_name = 'client_tasks_33'  # 'runoob_tbl'
    s = write_data_mysql(db_conf2, table_name=table_name, mode='w',times=2,batch_size=1)