天天看點

手把手教你搭建一個 Python 連接配接資料庫,快速取數工具

大家好,我是安果!

在資料生産應用部門,取數分析是一個很常見的需求,實際上業務人員需求時刻變化,最高效的方式是讓業務部門自己來取,減少不必要的重複勞動,一般情況下,業務部門資料庫表結構一般是固定的,根據實際業務将取數需求做成sql 腳本,快速完成資料擷取---授人以漁的方式,提供平台或工具

那如何實作一個自助取數查詢工具?

基于底層資料來開發不難,無非是将使用者輸入變量作為篩選條件,将參數映射到 sql 語句,并生成一個 sql 語句然後再去資料庫執行

最後再利用 QT 開發一個 GUI 界面,使用者界面的點選和篩選條件,信号觸發對應按鈕與綁定的傳參槽函數執行

具體思路:

一、資料庫連接配接類

此處利用 pandas 讀寫操作 oracle 資料庫

二、主函數子產品

1)輸入參數子產品,外部輸入條件參數,建立資料庫關鍵字段映射

--注:讀取外部 txt 檔案,将篩選字段可能需要進行鍵值對轉換

2)sql 語句集合子產品,将待執行的業務 sql 語句統一存放到這裡

3)資料處理函數工廠

4)使用多線程提取資料

一、資料庫連接配接類

cx_Oracle 是一個 Python 擴充子產品,相當于 python 的 Oracle 資料庫的驅動,通過使用所有資料庫通路子產品通用的資料庫 API 來實作 Oracle 資料庫的查詢和更新

Pandas 是基于 NumPy 開發,為了解決資料分析任務的子產品,Pandas 引入了大量庫和一些标準的資料模型,提供了高效地操作大型資料集所需的方法類和函數

pandas 調用資料庫主要有 read_sql_table,read_sql_query,read_sql 三種方式

本文主要介紹一下 Pandas 中 read_sql_query 方法的使用

1:pd.read_sql_query()
讀取自定義資料,返還DataFrame格式,通過SQL查詢腳本包括增删改查。
pd.read_sql_query(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None,chunksize=None)
sql:要執行的sql腳本,文本類型
con:資料庫連接配接
index_col:選擇傳回結果集索引的列,文本/文本清單
coerce_float:非常有用,将數字形式的字元串直接以float型讀入
parse_dates:将某一列日期型字元串轉換為datetime型資料,與pd.to_datetime函數功能類似。
params:向sql腳本中傳入的參數,官方類型有清單,元組和字典。用于傳遞參數的文法是資料庫驅動程式相關的。
chunksize:如果提供了一個整數值,那麼就會傳回一個generator,每次輸出的行數就是提供的值的大小

read_sql_query()中可以接受SQL語句,DELETE,INSERT INTO、UPDATE操作沒有傳回值(但是會在資料庫中執行),程式會抛出SourceCodeCloseError,并終止程式。SELECT會傳回結果。如果想繼續運作,可以try捕捉此異常。
 
2:pd.read_sql_table()
讀取資料庫中的表,返還DataFrame格式(通過表名)
import pandas as pd
pd.read_sql_table(table_name, con, schema=None,index_col=None, coerce_float=True, parse_dates=None, columns=None,chunksize=None)
 
3:pd.read_sql()
讀資料庫通過SQL腳本或者表名
import pandas as pd
pd.read_sql(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None)

           

複制

以下建立連接配接 oracel 資料庫的連接配接類 Oracle_DB

主要提供 2 種操作資料的函數方法。

import cx_Oracle
# Pandas讀寫操作Oracle資料庫
import pandas as pd

# 避免編碼問題帶來的亂碼
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'


class Oracle_DB(object):
    def __init__(self):
        try:
            # 連接配接oracle
            # 方法1:sqlalchemy 提供的create_engine()
            # from sqlalchemy import create_engine
            # engine = create_engine('oracle+cx_oracle://username:password@ip:1521/ORCL')
            # #方法2:cx_Oracle.connect()
            self.engine = cx_Oracle.connect('username', 'password', 'ip:1521/database')

        except cx_Oracle.Error as e:
            print("Error %d:%s" % (e.args[0], e.args[1]))
            exit()
            
    # 查詢部分資訊
    def search_one(self, sql,sparm):
        try:
            # #查詢擷取資料用sql語句
            # 代傳參數:sparm--查詢指定字段參數
            df = pd.read_sql_query(sql, self.engine,params=sparm)

            self.engine.close()

        except Exception as e:
            return "Error " + e.args[0]

        return df

    # 查詢全部資訊
    def search_all(self, sql):
        try:

            # #查詢擷取資料用sql語句

            df = pd.read_sql_query(sql, self.engine)

            self.engine.close()

        except Exception as e:
            return "Error " + e.args[0]

        return df

           

複制

二、資料提取主函數子產品

cx_Oracle 是一個 Python 擴充子產品,相當于 python 的 Oracle 資料庫的驅動,通過使用所有資料庫通路子產品通用的資料庫 API 來實作 Oracle 資料庫的查詢和更新。

1)外部輸入參數子產品

txt 文本中,就包含一列資料,第一行列名,讀取的時候忽略第一行

#建立ID——編号字典
def buildid():
    sqlid = """select * from b_build_info"""
    db = Oracle_DB()  # 執行個體化一個對象
    b_build_info = db.search_all(sqlid)
    ID_bUILDCODE = b_build_info.set_index("BUILDCODE")["ID"].to_dict()
    return ID_bUILDCODE
    
#通過文本傳入待導出資料清單
def read_task_list():
    build_code=buildid()
    tasklist=[]
    is_first_line=True
    with open("./b_lst.txt") as lst:
        for line in lst:
            if is_first_line:
                is_first_line=False
                continue
            tasklist.append(build_code.get(line.strip('\n')))  #鍵值對轉換
    return tasklist
           

複制

2)業務 sql 語句集合

注意in後面{0}不要加引号,這裡傳入為元組,params 參數傳入sparm

= {'Start_time':'2021-04-01','End_time':'2021-05-01'},此處參數可根據需要改變           

複制

def sql_d(lst):
    # 逐月資料
    sql_d_energy_item_month = """select * from d_energy_item_month
           where recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
           and recorddate < to_date(:End_time, 'yyyy-MM-dd')
           and  buildid  in {0}
           order by recorddate asc""".format(lst)

    # 逐月資料
    sql_d_energy_month = """select d.*,t.name from d_energy_month d join t_device_info t on d.branchid = t.id
           where d.recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
           and d.recorddate < to_date(:End_time, 'yyyy-MM-dd')
           and d.buildid = '{0}'
           order by d.recorddate asc""".format(lst)

    # 查詢當日資料
    sql_energy_item_hour_cheak = """select * from d_energy_item_hour
            where trunc(sysdate)=trunc(recorddate)
            order by recorddate asc""".format(lst)

    sql_collection = [sql_d_energy_item_month, sql_d_energy_item_day, sql_d_energy_item_hour, sql_d_energy_month,
                      sql_d_energy_day, sql_d_energy_hour, sql_energy_hour_cheak]
                      #此處省略部分sql語句
    return sql_collection
           

複制

3)業務資料處理

業務資料處理流程,原始資料後處理,這裡不作介紹:

def db_extranction(lst,sparm,sql_type):   
    """sql_type--輸入需要操作的sql業務序号"""
    sql_=sql_d(lst)[sql_type]  #輸出sql語句
    db = Oracle_DB()  # 執行個體化一個對象
    res=db.search_one(sql_,sparm)
    # 資料處理加工
    RES=Data_item_factory(res)  #此處省略
    # res = db.search_all(sql_d_energy_item_month)
    print(RES)
    return RES           

複制

多線程提取資料部分,這裡 tasklist 清單多線程提取資料

import threading
# Pandas讀寫操作Oracle資料庫
from tools.Data_Update_oracle import Oracle_DB
import pandas as pd
from concurrent import futures  

if __name__ == '__main__':
    #外部傳入
    tasklist= read_task_list()
    print(tasklist)
    # 輸入時間查找範圍參數,可手動修改
    sparm = {'Start_time':'2021-04-01','End_time':'2021-05-01'}
    lst = tuple(list(tasklist))
    
    #業務類型序号,可手動修改
    sql_type=0
    
    #全部提取
    db_extranction(lst,sparm,sql_type)  

    #多線程按字段分批提取
    方法一:使用threading子產品的Thread類的構造器建立線程
    #threads=[threading.Thread(target=db_extranction,args=(lst,sparm,sql_type)) for lst in tasklist]
    # [threads[i].start() for i in range(len(threads))]
    
    方法二:使用python的concurrent庫,這是官方基于 threading 封裝,先安裝該庫
    # with futures.ThreadPoolExecutor(len(tasklist)) as executor:
    #     executor.map([db_extranction(lst,sparm,sql_type) for lst in tasklist],tasklist)   
           

複制

手把手教你搭建一個 Python 連接配接資料庫,快速取數工具
手把手教你搭建一個 Python 連接配接資料庫,快速取數工具

到此整個資料庫取數工具開發流程介紹完畢,就差最後一步分享給小夥伴使用了,做成 GUI 應用此處不做詳細介紹,建構獨立的 python 環境,快速釋出你的應用