闇€瑕佹簮鐮佸彲浠ョ洿鎺ヨ仈绯?
鏂規涓€锛?/h1>
鐩存帴濂楃敤鑴氭湰锛岄渶鍙互鐪嬫噦涓€浜涜剼鏈€昏緫
杩欎釜绫誨疄鐜頒簡鍚屾椂杩炴帴澶氫釜 SQL Server 鏁版嵁搴擄紝骞舵彁渚涗簡鎵ц鏌ヨ鍜岄潪鏌ヨ鎿嶄綔鐨勮兘鍔?
import pymssql
class SqlServerConnector:
def __init__(self, servers):
self.connections = {} # 瀛樺偍鏁版嵁搴撹繛鎺ョ殑瀛楀吀
for server in servers:
conn = pymssql.connect(
server=server['host'],
port=server['port'],
user=server['username'],
password=server['password'],
database=server['database']
)
self.connections[server['name']] = conn
def execute_query(self, connection_name, query, params=None):
with self.connections[connection_name].cursor() as cursor: # 鑾峰彇娓告爣(cursor)
cursor.execute(query, params) # 鎵ц鏌ヨ璇彞
result = cursor.fetchall() # 鑾峰彇缁撴灉闆? return result
def execute_non_query(self, connection_name, query, params=None):
with self.connections[connection_name].cursor() as cursor: # 鑾峰彇娓告爣(cursor)
cursor.execute(query, params) # 鎵ц澧炲垹鏀矽鍙? self.connections[connection_name].commit() # 鎻愪氦浜嬪姟
return cursor.rowcount # 杩斿洖鍙楀獎鍝嶇殑琛屾暟
def execute_query_all(self, servers, query, params=None):
results = {} # 瀛樺偍鎵€鏈夋煡璇㈢粨鏋滅殑瀛楀吀
for server in servers:
connection_name = server['name']
results[connection_name] = self.execute_query(connection_name, query, params)
return results
def execute_non_query_all(self, servers, query, params=None):
results = {} # 瀛樺偍鎵€鏈夋搷浣滅粨鏋滅殑瀛楀吀
for server in servers:
connection_name = server['name']
try:
res = self.execute_non_query(connection_name, query, params)
results[connection_name] = res
except Exception as e:
print(f"Error occurred when running query on {connection_name}: {e}")
self.connections[connection_name].rollback() # 鍑洪敊鏃跺洖婊? return results
浠ヤ笅鏄唬鐮佷腑鐨勮В閲? import pymssql锛氬鍏?pymssql 搴擄紝鐢ㄤ簬涓?SQL Server 鏁版嵁搴撹繘琛岃繛鎺ュ拰浜や簰銆?
class SqlServerConnector:锛氬畾涔変竴涓悕涓?SqlServerConnector 鐨勭被銆?
def __init__(self, servers):锛氱被鐨勫垵濮嬪寲鏂規硶锛屾帴鍙椾竴涓寘鍚涓暟鎹簱淇℃伅鐨勫垪琛ㄤ綔涓哄弬鏁般€?
self.connections = {}锛氬垵濮嬪寲涓€涓瓧鍏革紝鐢ㄤ簬瀛樺偍鏁版嵁搴撹繛鎺ャ€?
for server in servers:锛氶亶鍘嗘暟鎹簱鍒楄〃涓殑姣忎釜鏁版嵁搴撲俊鎭€?
conn = pymssql.connect(...)锛氭牴鎹暟鎹簱淇℃伅鍒涘緩鏁版嵁搴撹繛鎺ュ璞°€俿erver['host']锛氭暟鎹簱鏈嶅姟鍣ㄧ殑涓繪満鍦闆潃銆俿erver['port']锛氭暟鎹簱鏈嶅姟鍣ㄧ殑绔彛鍙楓€俿erver['username']锛氳繛鎺ユ暟鎹簱鐨勭敤鎴峰悕銆俿erver['password']锛氳繛鎺ユ暟鎹簱鐨勫瘑鐮併€俿erver['database']锛氳杩炴帴鐨勬暟鎹簱鍚嶇О銆?
self.connections[server['name']] = conn锛氬皢鏁版嵁搴撹繛鎺ュ璞℃坊鍔犲埌瀛楀吀涓紝浠ユ暟鎹簱鍚嶇О浣滀負閿€?
def execute_query(self, connection_name, query, params=None):锛氭墽琛屾煡璇㈣鍙ョ殑鏂規硶銆俢onnection_name锛氭暟鎹簱鍚嶇О銆俼uery锛氳鎵ц鐨勬煡璇㈣鍙ャ€俻arams=None锛氬彲閫夌殑鏌ヨ鍙傛暟銆?def __init__(self, servers):锛氱被鐨勫垵濮嬪寲鏂規硶锛屾帴鍙椾竴涓寘鍚涓暟鎹簱淇℃伅鐨勫垪琛ㄤ綔涓哄弬鏁般€?
self.connections = {}锛氬垵濮嬪寲涓€涓瓧鍏革紝鐢ㄤ簬瀛樺偍鏁版嵁搴撹繛鎺ャ€?
for server in servers:锛氶亶鍘嗘暟鎹簱鍒楄〃涓殑姣忎釜鏁版嵁搴撲俊鎭€?
conn = pymssql.connect(...)锛氭牴鎹暟鎹簱淇℃伅鍒涘緩鏁版嵁搴撹繛鎺ュ璞°€俿erver['host']锛氭暟鎹簱鏈嶅姟鍣ㄧ殑涓繪満鍦闆潃銆俿erver['port']锛氭暟鎹簱鏈嶅姟鍣ㄧ殑绔彛鍙楓€俿erver['username']锛氳繛鎺ユ暟鎹簱鐨勭敤鎴峰悕銆俿erver['password']锛氳繛鎺ユ暟鎹簱鐨勫瘑鐮併€俿erver['database']锛氳杩炴帴鐨勬暟鎹簱鍚嶇О銆?
self.connections[server['name']] = conn锛氬皢鏁版嵁搴撹繛鎺ュ璞℃坊鍔犲埌瀛楀吀涓紝浠ユ暟鎹簱鍚嶇О浣滀負閿€?
def execute_query(self, connection_name, query, params=None):锛氭墽琛屾煡璇㈣鍙ョ殑鏂規硶銆俢onnection_name锛氭暟鎹簱鍚嶇О銆俼uery锛氳鎵ц鐨勬煡璇㈣鍙ャ€俻arams=None锛氬彲閫夌殑鏌ヨ鍙傛暟銆?/code>
鏂規浜岋細鐩存帴璋冪敤灏佽鑴氭湰锛堝啓鐢ㄤ緥锛屾墽琛岃剼鏈嵆鍙級
鑴氭湰瀹炵幇灏佽鍚庯紝鍙渶瑕佸湪SqlServer.yaml鏂囦歡涓啓鐢ㄤ緥鍗沖彲锛屾鍚庢墽琛孲qlServer.py鑴氭湰鍗沖疄鐜版暟鎹簱鐨勫鍒犳敼鏌ユ搷浣?
鐩綍浠嬬粛锛?
SqlServer.yaml 缂栧啓澶氫釜鏁版嵁搴撹繛鎺ヤ俊鎭?Sqlserver_jzjy_uat:
-
serverid: 0
host: 127.0.0.1
user: sa
password: pwd1
db: run
charset: utf-8
-
serverid: 1
host: 127.0.0.2
user: sa
password: pwd2
db: run
charset: utf-8
-
serverid: 2
host: 127.0.0.3
user: sa
password: pwd2
db: run
charset: utf-8
PublicConfig.py鑴氭湰锛?閰嶇疆璇誨彇淇℃伅锛屾柟渚胯皟鐢? import os
from Public_Utils.util_yaml import YamlReader
class YamlPath:
def __init__(self):
current = os.path.abspath(__file__)
self.base_dir = os.path.dirname(os.path.dirname(current))
self._config_path = self.base_dir + os.sep + "Public_Config\Public_yaml"
def get_sqlserver_file(self):
self._config_file = self._config_path + os.sep + "SqlServer.yaml"
return self._config_file
class ConfigYaml:
def __init__(self): #鍒濆yaml璇誨彇閰嶇疆鏂囦歡
self.sqlserver_config = YamlReader(YamlPath().get_sqlserver_file()).yaml_data()
def get_sqlserver_yaml(self):
return self.sqlserver_config['Sqlserver_jzjy_uat']
def get_serveridlist_yaml(self):
return self.sqlserver_config['uat_serverlist']
if __name__ == '__main__':
pass
SqlServer.py 鎵ц鑴氭湰 import pymssql
from Public_Config.PublicConfig import ConfigYaml
class SQLServer:
def __init__(self):
# 杩炴帴鏁版嵁搴? self.connections = {}
for server in ConfigYaml().get_sqlserver_yaml():
conn = pymssql.connect(host =server['host'],
user=server['user'],
password=server['password'],
database=server['db'],
charset='cp936', as_dict=True)
self.connections[server['serverid']] = conn
def execute_query(self, connection_name, query, params=None):
with self.connections[connection_name].cursor() as cursor: #鑾峰彇娓告爣锛坈ursor锛? cursor.execute(query,params) #鎵ц鏌ヨ璇彞
result = cursor.fetchall() #鑾峰彇缁撴灉闆嗗悎
return result
def execute_query_all(self, query, params=None): #鍏ㄩ儴鏁版嵁搴撴墽琛屾煡璇紙select锛? results = {}
for serverid in ConfigYaml().get_sqlserver_yaml():
connection_name = serverid['serverid']
results[connection_name] = self.execute_query(connection_name,query,params)
return results
def execute_query_special(self, query, params=None): #鎸囧畾鐨勫嚑涓暟鎹簱涓€璧鋒墽琛岃鍙?锛坰elect锛? results = {}
serveridlist = ConfigYaml().get_serveridlist_yaml()['serveridlist']
for serverid in ConfigYaml().get_sqlserver_yaml():
if str(serverid['serverid']) in serveridlist.split(','):
connection_name = serverid['serverid']
results[connection_name] = self.execute_query(connection_name, query, params)
return results
def execute_non_query(self,connection_name,query,params=None):
with self.connections[connection_name].cursor() as cursor:
cursor.execute(query,params) #鎵ц澧炲垹鏌ョ瓑鎿嶄綔
self.connections[connection_name].commit() #鎻愪氦浜嬪姟
return cursor.rowcount #杩斿洖鍙楀獎鍝嶇殑琛屾暟
def execute_non_query_all(self, query, params=None): #瀹炵幇閰嶇疆琛ㄤ腑鐨勬墍鏈夊鍒犳敼绛夋搷浣渋nsert,delete,update锛? results = {} #鍌ㄥ瓨鎵€鏈夋煡璇㈢粨鏋滅殑瀛楀吀
for serverid in ConfigYaml().get_sqlserver_yaml():
connection_name = serverid['serverid']
try:
res = self.execute_non_query(connection_name,query,params)
results[connection_name] = res
except Exception as e:
print(f"Error occurred when running query on {connection_name} :{e}")
self.connections[connection_name].rollback() #鍑洪敊鏃跺洖婊? return results
def execute_non_query_special(self, query, params=None): #瀹炵幇閰嶇疆琛ㄤ腑鎸囧畾鏁版嵁搴撶殑澧炲垹鏀圭瓑鎿嶄綔insert,delete,update锛? results = {} #鍌ㄥ瓨鎵€鏈夋煡璇㈢粨鏋滅殑瀛楀吀
for serverid in ConfigYaml().get_sqlserver_yaml():
serveridlist = ConfigYaml().get_serveridlist_yaml()['serveridlist']
if str(serverid['serverid']) in serveridlist.split(','):
connection_name = serverid['serverid']
try:
res = self.execute_non_query(connection_name,query,params)
results[connection_name] = res
except Exception as e:
print(f"Error occurred when running query on {connection_name} :{e}")
self.connections[connection_name].rollback() #鍑洪敊鏃跺洖婊? return results
if __name__ == '__main__':
pass
util_yaml.py import os
import yaml
class YamlReader:
#鍒濆鍖栵紝鍒ゆ柇鏂囦歡鏄惁瀛樺湪
def __init__(self,yaml_file):
if os.path.exists(yaml_file):
self.yaml_file = yaml_file
else:
raise FileNotFoundError("yaml鏂囦歡涓嶅瓨鍦?)
self._data = None
self._data_all = None
def yaml_data(self): #yaml鏂囦歡璇誨彇 --鍗曚釜鏂囨。璇誨彇
#绗竴娆¤皟鐢╠ata,璇誨彇yaml鏂囨。锛屽鏋滀笉鏄紝鐩存帴杩斿洖涔嬪墠淇濆瓨鐨勬暟鎹? if not self._data:
with open(self.yaml_file,'rb') as f:
self._data = yaml.safe_load(f)
return self._data
def yaml_data_all(self): #澶氫釜鏂囨。鐨勮鍙? if not self._data_all:
with open(self.yaml_file,'rb') as f:
self._data_all = yaml.safe_load_all(f)
return self._data_all