天天看點

大資料智能加工系統進度報告

核心技術:

  • Flask架構
  • Pandas
  • 檔案上傳
  • 資料字典檢視

進度報告:

主要實作了使用者登入、檔案上傳、資料字典檢視功能。

核心代碼:

  • 檔案導入
#檔案導入
@app.route('/import_data', methods=['POST', 'GET'])
def import_data():
    flag=0;
    the_file = request.files.get("file")    #接收前端發送過來的檔案,擷取檔案對象
    type=the_file.filename.split(".")[1]    #根據檔案名擷取檔案類型
    print(type)                             #輸出檔案類型

    #根據檔案類型調用對應函數儲存檔案
    if(type=="csv" or type=="txt"):
        the_file.save("score_table/" + the_file.filename)  # 儲存檔案到指定路徑(score_table路徑下)
        flag=connectsql.read_csv(the_file.filename)        #導入檔案到資料庫
    elif(type=="xlsx" or type=="xls"):
        the_file.save("excel_example/" + the_file.filename)  # 儲存檔案到指定路徑(excel_example路徑下)
        flag = connectsql.read_example(the_file.filename)
    elif(type=="docx"):
        the_file.save("word_data/" + the_file.filename)  # 儲存檔案到指定路徑(word_data路徑下)
    else:
        the_file.save("test_data/" + the_file.filename)  # 儲存檔案到指定路徑(test_data路徑下)
    if(flag==1):
        return jsonify({"code": 0, "msg": "", "data": ""})  #code代表操作狀态,msg是描述資訊,data是請求的業務資料。
    else:
        return jsonify({"code": -1, "msg": "", "data": ""})      
  • 查詢已導入檔案
@app.route('/get_table_list')
def get_table_list():
    data=[]
    data=dictionary.get_table_data()
    data_re=[]
    for table_name,database_name,rows,data_time in data:
        #time strftime() 函數接收以時間元組,并傳回以可讀字元串表示的當地時間,"%Y-%m-%d %H:%M:%S"傳回時間類型:2021-11-05, 10:24:28
        data_time_str=data_time.strftime("%Y-%m-%d %H:%M:%S")
        #append() 方法用于在清單末尾添加新的對象,該方法無傳回值,但是會修改原來的清單
        data_re.append({"table_name":table_name,"database_name":database_name,"rows_num":rows,"create_time":data_time_str})
    count= len(data)
    print(data)
    return jsonify({"code": 0, "msg": "", "count": count,"data":data_re})      
  • 檢視資料字典
@app.route('/get_look_dictionary')
def get_look_dictionary():
    table_name=request.values.get("table_name")
    database_name=request.values.get("database_name")
    table_data,table_unit=dictionary.get_dictionary(table_name,database_name)
    data_re=[]
    count=len(table_data)
    for index in range(len(table_data)):
        print(table_data[index][4],table_unit[index])
        data_re.append({"key_english":table_data[index][0],"key_china":table_data[index][1],"key_type":table_data[index][2],
                        "key_long":table_data[index][3],"key_null":table_data[index][4],"key_unit":table_unit[index]})
    return jsonify({"code": 0, "msg": "", "count": count, "data": data_re})      
  • 讀取樣表生成資料字典
def read_example(path):
    flag=1
    conn, cursor = get_conn_mysql()     #連接配接資料庫
    #将excel轉換為csv檔案
    data = pd.read_excel('excel_example/'+path, 'Sheet1')   #使用pandas讀取excel檔案
    data.fillna('', inplace=True)       #fillna——缺失值替代,inplace=True直接修改原對象,inplace=False建立副本,修改副本
    print(data)
    csv_name = path.split(".")[0]       #split()——指定分隔符對字元串進行切片,以'.'進行分割
    # 編寫表建立語句(字段類型就設為string)
    # 表名
    table_name = path.split(".")[0]
    sql = "CREATE TABLE IF NOT EXISTS " + csv_name + " ("
    # 擷取key值 CREATE TABLE `bigwork_data`.`table_test` (    
    # 循環加入key值
    keys_china = ""
    keys=""
    key_china=data.keys()
    j=0
    for i in data.values.tolist()[1]:
        sql = sql + i + " VARCHAR(45) NOT NULL DEFAULT '#' comment '"+key_china[j]+"',"
        j=j+1;
        keys = keys + i + ","
    keys_china = keys_china[0:-1]
    keys = keys[0:-1]
    creat_sql = sql[0:-1] + ") ENGINE = InnoDB DEFAULT CHARACTER SET = utf8 COLLATE = utf8_bin;"
    print(creat_sql)
    # 擷取%s
    s = ','.join(['%s' for _ in range(len(data.columns))])
    # 擷取values
    keys_unit=data.values.tolist()[0];
    values=[]
    values.append(data.values.tolist()[0])
    for i in data.values.tolist()[2:]:
        values.append(i)
    print(values)
    # 組裝insert語句
    insert_sql = 'insert into {}({}) values({})'.format(table_name, keys, s)
    print(insert_sql)
    # 建立表
    try:
        cursor.execute(creat_sql)
    except:
        traceback.print_exc()
        flag=0
        print("表建立失敗")
    # # 插入資料
    try:
        for i in values:
            cursor.execute(insert_sql, i)
            print(insert_sql)
            print(i)
        conn.commit()
    except:
        traceback.print_exc()
        flag=0
        print("寫入錯誤")
    close_conn_mysql(cursor, conn)
    return      
  • 讀取excel檔案
def read_excel(path):
    conn, cursor = get_conn_mysql()     #連接配接資料庫
    #将excel轉換為csv檔案
    data = pd.read_excel('excel_data/'+path, 'Sheet1')
    csv_name = path.split(".")[0]   
    # 編寫表建立語句(字段類型就設為string)
    # 表名
    table_name = path.split(".")[0]
    sql = "CREATE TABLE " + csv_name + " ("
    # 擷取key值 CREATE TABLE `bigwork_data`.`table_test` (   
    # 循環加入key值
    keys = ""
    for i in data.keys():
        sql = sql + i + " VARCHAR(45) NOT NULL,"
        keys = keys + i + ","
    keys = keys[0:-1]
    creat_sql = sql[0:-1] + ") ENGINE = InnoDB DEFAULT CHARACTER SET = utf8 COLLATE = utf8_bin;"
    # 擷取%s
    s = ','.join(['%s' for _ in range(len(data.columns))])
    # 擷取values
    values = data.values.tolist()
    print(values)
    # 組裝insert語句
    insert_sql = 'insert into {}({}) values({})'.format(table_name, keys, s)
    print(insert_sql)
    print(creat_sql)
    print(keys);
    print(values)
    
    close_conn_mysql(cursor, conn)      
  • 讀取csv檔案
def read_csv(path):
    conn, cursor=get_conn_mysql()
    flag=1
    data=pd.read_csv("score_table/"+path)
    data.fillna('', inplace=True)
    #編寫表建立語句(字段類型就設為string)
    #表名
    table_name = path.split(".")[0]
    sql = "CREATE TABLE IF NOT EXISTS " + table_name + " ("
    # 擷取key值 CREATE TABLE `bigwork_data`.`table_test` (    
    # 循環加入key值
    keys_china = ""
    keys = ""
    key_china = data.keys()
    j = 0
    for i in data.values.tolist()[1]:
        sql = sql + i + " VARCHAR(45) NOT NULL DEFAULT '#' comment '" + key_china[j] + "',"
        j = j + 1;
        keys = keys + i + ","
    keys_china = keys_china[0:-1]
    keys = keys[0:-1]
    creat_sql = sql[0:-1] + ") ENGINE = InnoDB DEFAULT CHARACTER SET = utf8 COLLATE = utf8_bin;"
    print(creat_sql)
    # 擷取%s
    s = ','.join(['%s' for _ in range(len(data.columns))])
    # 擷取values
    keys_unit = data.values.tolist()[0];
    values = []
    values.append(data.values.tolist()[0])
    for i in data.values.tolist()[2:]:
        values.append(i)
    print(values)
    # 組裝insert語句
    insert_sql = 'insert into {}({}) values({})'.format(table_name, keys, s)
    print(insert_sql)    
    # 建立表
    try:
        cursor.execute(creat_sql)
    except:
        traceback.print_exc()
        flag = 0
        print("表建立失敗")
    # # 插入資料
    try:
        for i in values:
            cursor.execute(insert_sql, i)
            print(insert_sql)
            print(i)
        conn.commit()
    except:
        traceback.print_exc()
        flag = 0
        print("寫入錯誤")
    close_conn_mysql(cursor, conn)
    return      
  • 擷取表的資料字典
def get_dictionary(name_table,database_name):   
    sql="select column_name,column_comment ,data_type,CHARACTER_MAXIMUM_LENGTH,COLUMN_DEFAULT " \
        "from information_schema.columns " \
        "where table_name='"+name_table+"' and table_schema='"+database_name+"'"
    res = query_mysql(sql)
    sql="select * from "+name_table+" limit 1"
    res2=query_mysql(sql)
    print(res)
    print(res2)
    return res,res2[0]
    pass      
  • 擷取表資訊
def get_table_data():
    sql="SELECT TABLE_NAME,TABLE_SCHEMA,TABLE_ROWS,CREATE_TIME " \
        "FROM information_schema.TABLES " \
        "where  TABLE_SCHEMA='bigdata';"
    res = query_mysql(sql)
    print(res)
    return res
    pass      

運作結果:

大資料智能加工系統進度報告
大資料智能加工系統進度報告
大資料智能加工系統進度報告
大資料智能加工系統進度報告