天天看點

Python與SQL Server資料庫完美結合,資料處理得心應手

作者:雲中随心而記

闇€瑕佹簮鐮佸彲浠ョ洿鎺ヨ仈绯?

鏂規涓€锛?/h1>

鐩存帴濂楃敤鑴氭湰锛岄渶鍙互鐪嬫噦涓€浜涜剼鏈€昏緫

杩欎釜绫誨疄鐜頒簡鍚屾椂杩炴帴澶氫釜 SQL Server 鏁版嵁搴擄紝骞舵彁渚涗簡鎵ц鏌ヨ鍜岄潪鏌ヨ鎿嶄綔鐨勮兘鍔?

import pymssql

class SqlServerConnector:
    def __init__(self, servers):
        self.connections = {}  # 瀛樺偍鏁版嵁搴撹繛鎺ョ殑瀛楀吀
        for server in servers:
            conn = pymssql.connect(
                server=server['host'],
                port=server['port'],
                user=server['username'],
                password=server['password'],
                database=server['database']
            )
            self.connections[server['name']] = conn

    def execute_query(self, connection_name, query, params=None):
        with self.connections[connection_name].cursor() as cursor:  # 鑾峰彇娓告爣(cursor)
            cursor.execute(query, params)  # 鎵ц鏌ヨ璇彞
            result = cursor.fetchall()  # 鑾峰彇缁撴灉闆?        return result
    
    def execute_non_query(self, connection_name, query, params=None):
        with self.connections[connection_name].cursor() as cursor:  # 鑾峰彇娓告爣(cursor)
            cursor.execute(query, params)  # 鎵ц澧炲垹鏀矽鍙?            self.connections[connection_name].commit()  # 鎻愪氦浜嬪姟
            return cursor.rowcount  # 杩斿洖鍙楀獎鍝嶇殑琛屾暟
        
    def execute_query_all(self, servers, query, params=None):
        results = {}  # 瀛樺偍鎵€鏈夋煡璇㈢粨鏋滅殑瀛楀吀
        for server in servers:
            connection_name = server['name']
            results[connection_name] = self.execute_query(connection_name, query, params)
        return results
    
    def execute_non_query_all(self, servers, query, params=None):
        results = {}  # 瀛樺偍鎵€鏈夋搷浣滅粨鏋滅殑瀛楀吀
        for server in servers:
            connection_name = server['name']
            try:
                res = self.execute_non_query(connection_name, query, params)
                results[connection_name] = res
            except Exception as e:
                print(f"Error occurred when running query on {connection_name}: {e}")
                self.connections[connection_name].rollback() # 鍑洪敊鏃跺洖婊?        return results           
浠ヤ笅鏄唬鐮佷腑鐨勮В閲?
import pymssql锛氬鍏?pymssql 搴擄紝鐢ㄤ簬涓?SQL Server 鏁版嵁搴撹繘琛岃繛鎺ュ拰浜や簰銆?
class SqlServerConnector:锛氬畾涔変竴涓悕涓?SqlServerConnector 鐨勭被銆?
def __init__(self, servers):锛氱被鐨勫垵濮嬪寲鏂規硶锛屾帴鍙椾竴涓寘鍚涓暟鎹簱淇℃伅鐨勫垪琛ㄤ綔涓哄弬鏁般€?
self.connections = {}锛氬垵濮嬪寲涓€涓瓧鍏革紝鐢ㄤ簬瀛樺偍鏁版嵁搴撹繛鎺ャ€?
for server in servers:锛氶亶鍘嗘暟鎹簱鍒楄〃涓殑姣忎釜鏁版嵁搴撲俊鎭€?
conn = pymssql.connect(...)锛氭牴鎹暟鎹簱淇℃伅鍒涘緩鏁版嵁搴撹繛鎺ュ璞°€俿erver['host']锛氭暟鎹簱鏈嶅姟鍣ㄧ殑涓繪満鍦闆潃銆俿erver['port']锛氭暟鎹簱鏈嶅姟鍣ㄧ殑绔彛鍙楓€俿erver['username']锛氳繛鎺ユ暟鎹簱鐨勭敤鎴峰悕銆俿erver['password']锛氳繛鎺ユ暟鎹簱鐨勫瘑鐮併€俿erver['database']锛氳杩炴帴鐨勬暟鎹簱鍚嶇О銆?
self.connections[server['name']] = conn锛氬皢鏁版嵁搴撹繛鎺ュ璞℃坊鍔犲埌瀛楀吀涓紝浠ユ暟鎹簱鍚嶇О浣滀負閿€?
def execute_query(self, connection_name, query, params=None):锛氭墽琛屾煡璇㈣鍙ョ殑鏂規硶銆俢onnection_name锛氭暟鎹簱鍚嶇О銆俼uery锛氳鎵ц鐨勬煡璇㈣鍙ャ€俻arams=None锛氬彲閫夌殑鏌ヨ鍙傛暟銆?def __init__(self, servers):锛氱被鐨勫垵濮嬪寲鏂規硶锛屾帴鍙椾竴涓寘鍚涓暟鎹簱淇℃伅鐨勫垪琛ㄤ綔涓哄弬鏁般€?
self.connections = {}锛氬垵濮嬪寲涓€涓瓧鍏革紝鐢ㄤ簬瀛樺偍鏁版嵁搴撹繛鎺ャ€?
for server in servers:锛氶亶鍘嗘暟鎹簱鍒楄〃涓殑姣忎釜鏁版嵁搴撲俊鎭€?
conn = pymssql.connect(...)锛氭牴鎹暟鎹簱淇℃伅鍒涘緩鏁版嵁搴撹繛鎺ュ璞°€俿erver['host']锛氭暟鎹簱鏈嶅姟鍣ㄧ殑涓繪満鍦闆潃銆俿erver['port']锛氭暟鎹簱鏈嶅姟鍣ㄧ殑绔彛鍙楓€俿erver['username']锛氳繛鎺ユ暟鎹簱鐨勭敤鎴峰悕銆俿erver['password']锛氳繛鎺ユ暟鎹簱鐨勫瘑鐮併€俿erver['database']锛氳杩炴帴鐨勬暟鎹簱鍚嶇О銆?
self.connections[server['name']] = conn锛氬皢鏁版嵁搴撹繛鎺ュ璞℃坊鍔犲埌瀛楀吀涓紝浠ユ暟鎹簱鍚嶇О浣滀負閿€?
def execute_query(self, connection_name, query, params=None):锛氭墽琛屾煡璇㈣鍙ョ殑鏂規硶銆俢onnection_name锛氭暟鎹簱鍚嶇О銆俼uery锛氳鎵ц鐨勬煡璇㈣鍙ャ€俻arams=None锛氬彲閫夌殑鏌ヨ鍙傛暟銆?/code>           

鏂規浜岋細鐩存帴璋冪敤灏佽鑴氭湰锛堝啓鐢ㄤ緥锛屾墽琛岃剼鏈嵆鍙級

鑴氭湰瀹炵幇灏佽鍚庯紝鍙渶瑕佸湪SqlServer.yaml鏂囦歡涓啓鐢ㄤ緥鍗沖彲锛屾鍚庢墽琛孲qlServer.py鑴氭湰鍗沖疄鐜版暟鎹簱鐨勫鍒犳敼鏌ユ搷浣?

鐩綍浠嬬粛锛?

Python與SQL Server資料庫完美結合,資料處理得心應手
SqlServer.yaml 缂栧啓澶氫釜鏁版嵁搴撹繛鎺ヤ俊鎭?
Sqlserver_jzjy_uat:
  -
    serverid: 0
    host: 127.0.0.1
    user: sa
    password: pwd1
    db: run
    charset: utf-8

  -
    serverid: 1
    host: 127.0.0.2
    user: sa
    password: pwd2
    db: run
    charset: utf-8

  -
    serverid: 2
    host: 127.0.0.3
    user: sa
    password: pwd2
    db: run
    charset: utf-8           
PublicConfig.py鑴氭湰锛?閰嶇疆璇誨彇淇℃伅锛屾柟渚胯皟鐢?
import os
from Public_Utils.util_yaml import YamlReader

class YamlPath:
    def __init__(self):
        current = os.path.abspath(__file__)
        self.base_dir = os.path.dirname(os.path.dirname(current))
        self._config_path = self.base_dir + os.sep + "Public_Config\Public_yaml"

    def get_sqlserver_file(self):
        self._config_file = self._config_path + os.sep + "SqlServer.yaml"
        return self._config_file

class ConfigYaml:
    def __init__(self):   #鍒濆yaml璇誨彇閰嶇疆鏂囦歡
        self.sqlserver_config = YamlReader(YamlPath().get_sqlserver_file()).yaml_data()

    def get_sqlserver_yaml(self):
        return self.sqlserver_config['Sqlserver_jzjy_uat']

    def get_serveridlist_yaml(self):
        return self.sqlserver_config['uat_serverlist']


if __name__ == '__main__':
    pass           
SqlServer.py 鎵ц鑴氭湰
import pymssql
from Public_Config.PublicConfig import ConfigYaml

class SQLServer:
    def __init__(self):
        # 杩炴帴鏁版嵁搴?        self.connections = {}
        for server in ConfigYaml().get_sqlserver_yaml():
                conn = pymssql.connect(host =server['host'],
                                       user=server['user'],
                                       password=server['password'],
                                       database=server['db'],
                                       charset='cp936', as_dict=True)
                self.connections[server['serverid']] = conn

    def execute_query(self, connection_name, query, params=None):
        with self.connections[connection_name].cursor() as cursor:     #鑾峰彇娓告爣锛坈ursor锛?            cursor.execute(query,params)   #鎵ц鏌ヨ璇彞
            result = cursor.fetchall()     #鑾峰彇缁撴灉闆嗗悎
        return result

    def execute_query_all(self, query, params=None):  #鍏ㄩ儴鏁版嵁搴撴墽琛屾煡璇紙select锛?        results = {}
        for serverid in ConfigYaml().get_sqlserver_yaml():
            connection_name = serverid['serverid']
            results[connection_name] = self.execute_query(connection_name,query,params)
        return results

    def execute_query_special(self, query, params=None):  #鎸囧畾鐨勫嚑涓暟鎹簱涓€璧鋒墽琛岃鍙?锛坰elect锛?        results = {}
        serveridlist = ConfigYaml().get_serveridlist_yaml()['serveridlist']
        for serverid in ConfigYaml().get_sqlserver_yaml():
            if str(serverid['serverid']) in serveridlist.split(','):
                connection_name = serverid['serverid']
                results[connection_name] = self.execute_query(connection_name, query, params)
        return results

    def execute_non_query(self,connection_name,query,params=None):
        with self.connections[connection_name].cursor() as cursor:
            cursor.execute(query,params)    #鎵ц澧炲垹鏌ョ瓑鎿嶄綔
            self.connections[connection_name].commit()   #鎻愪氦浜嬪姟
        return cursor.rowcount      #杩斿洖鍙楀獎鍝嶇殑琛屾暟

    def execute_non_query_all(self, query, params=None):  #瀹炵幇閰嶇疆琛ㄤ腑鐨勬墍鏈夊鍒犳敼绛夋搷浣渋nsert,delete,update锛?        results = {}   #鍌ㄥ瓨鎵€鏈夋煡璇㈢粨鏋滅殑瀛楀吀
        for serverid in ConfigYaml().get_sqlserver_yaml():
            connection_name = serverid['serverid']
            try:
                res = self.execute_non_query(connection_name,query,params)
                results[connection_name] = res
            except Exception as e:
                print(f"Error occurred when running query on {connection_name} :{e}")
                self.connections[connection_name].rollback()   #鍑洪敊鏃跺洖婊?        return results

    def execute_non_query_special(self, query, params=None):  #瀹炵幇閰嶇疆琛ㄤ腑鎸囧畾鏁版嵁搴撶殑澧炲垹鏀圭瓑鎿嶄綔insert,delete,update锛?        results = {}   #鍌ㄥ瓨鎵€鏈夋煡璇㈢粨鏋滅殑瀛楀吀
        for serverid in ConfigYaml().get_sqlserver_yaml():
            serveridlist = ConfigYaml().get_serveridlist_yaml()['serveridlist']
            if str(serverid['serverid']) in serveridlist.split(','):
                connection_name = serverid['serverid']
                try:
                    res = self.execute_non_query(connection_name,query,params)
                    results[connection_name] = res
                except Exception as e:
                    print(f"Error occurred when running query on {connection_name} :{e}")
                    self.connections[connection_name].rollback()   #鍑洪敊鏃跺洖婊?        return results

if __name__ == '__main__':
    pass           
util_yaml.py
import os
import yaml

class YamlReader:
    #鍒濆鍖栵紝鍒ゆ柇鏂囦歡鏄惁瀛樺湪
    def __init__(self,yaml_file):
        if os.path.exists(yaml_file):
            self.yaml_file = yaml_file
        else:
            raise FileNotFoundError("yaml鏂囦歡涓嶅瓨鍦?)
        self._data = None
        self._data_all = None

    def yaml_data(self):  #yaml鏂囦歡璇誨彇 --鍗曚釜鏂囨。璇誨彇
        #绗竴娆¤皟鐢╠ata,璇誨彇yaml鏂囨。锛屽鏋滀笉鏄紝鐩存帴杩斿洖涔嬪墠淇濆瓨鐨勬暟鎹?        if not self._data:
            with open(self.yaml_file,'rb') as f:
                self._data = yaml.safe_load(f)
        return self._data

    def yaml_data_all(self):  #澶氫釜鏂囨。鐨勮鍙?        if not self._data_all:
            with open(self.yaml_file,'rb') as f:
                self._data_all = yaml.safe_load_all(f)
        return self._data_all