本文執行個體講述了Flask架構使用DBUtils子產品連接配接資料庫的操作方法。分享給大家供大家參考,具體如下:
Flask連接配接資料庫
資料庫連接配接池:
Django使用:django ORM(pymysql/MySqldb)
Flask/其他使用:
-原生SQL
-pymysql(支援python2/3)
-MySqldb(支援python2)
-SQLAchemy(ORM)
原生SQL
需要解決的問題:
-不能為每個使用者建立一個連接配接
-建立一定數量的連接配接池,如果有人來
使用DBUtils子產品
兩種使用模式:
1 為每個線程建立一個連接配接,連接配接不可控,需要控制線程數
2 建立指定數量的連接配接在連接配接池,當線程通路的時候去取,如果不夠了線程排隊,直到有人釋放。平時建議使用這種!!!
模式一:
import pymysql
from DBUtils.PersistentDB import PersistentDB
POOL = PersistentDB(
creator=pymysql, # 使用連結資料庫的子產品
maxusage=None, # 一個連結最多被重複使用的次數,None表示無限制
setsession=[], # 開始會話前執行的指令清單。如:["set datestyle to ...", "set time zone ..."]
ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
closeable=False,
# 建議為False,如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉連結。如果為True時, conn.close()則關閉連結,那麼再次調用pool.connection時就會報錯,因為已經真的關閉了連接配接(pool.steady_connection()可以擷取一個新的連結)
threadlocal=None, # 本線程獨享值得對象,用于儲存連結對象,如果連結對象被重置
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8'
)
def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close()
conn.close()
func()
模式二(推薦):
import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
creator=pymysql, # 使用連結資料庫的子產品
maxconnections=6, # 連接配接池允許的最大連接配接數,0和None表示不限制連接配接數
mincached=2, # 初始化時,連結池中至少建立的空閑的連結,0表示不建立
maxcached=5, # 連結池中最多閑置的連結,0和None不限制
maxshared=3, # 連結池中最多共享的連結數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等子產品的 threadsafety都為1,所有值無論設定為多少,_maxcached永遠為0,是以永遠是所有連結都共享。
blocking=True, # 連接配接池中如果沒有可用連接配接後,是否阻塞等待。True,等待;False,不等待然後報錯
maxusage=None, # 一個連結最多被重複使用的次數,None表示無限制
setsession=[], # 開始會話前執行的指令清單。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8'
)
def func():
# 檢測目前正在運作連接配接數的是否小于最大連結數,如果不小于則:等待或報raise TooManyConnections異常
# 否則
# 則優先去初始化時建立的連結中擷取連結 SteadyDBConnection。
# 然後将SteadyDBConnection對象封裝到PooledDedicatedDBConnection中并傳回。
# 如果最開始建立的連結沒有連結,則去建立一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中并傳回。
# 一旦關閉連結後,連接配接就傳回到連接配接池讓後續線程繼續使用。
conn = POOL.connection()
# print(th, '連結被拿走了', conn1._con)
# print(th, '池子裡目前有', pool._idle_cache, '\r\n')
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
conn.close()
func()
具體寫法:
通過導入的方式
app.py
from flask import Flask
from db_helper import SQLHelper
app = Flask(__name__)
@app.route("/")
def hello():
result = SQLHelper.fetch_one('select * from xxx',[])
print(result)
return "Hello World"
if __name__ == '__main__':
app.run()
DBUTILs
以下為兩種寫法:
第一種是用靜态方法裝飾器,通過直接執行類的方法來連接配接使用資料庫
第二種是通過執行個體化對象,通過對象來調用方法執行語句
建議使用第一種,更友善,第一種還可以在修改優化為,将一些公共語句在摘出來使用。
import time
import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用連結資料庫的子產品
maxconnections=6, # 連接配接池允許的最大連接配接數,0和None表示不限制連接配接數
mincached=2, # 初始化時,連結池中至少建立的空閑的連結,0表示不建立
maxcached=5, # 連結池中最多閑置的連結,0和None不限制
maxshared=3, # 連結池中最多共享的連結數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等子產品的 threadsafety都為1,所有值無論設定為多少,_maxcached永遠為0,是以永遠是所有連結都共享。
blocking=True, # 連接配接池中如果沒有可用連接配接後,是否阻塞等待。True,等待;False,不等待然後報錯
maxusage=None, # 一個連結最多被重複使用的次數,None表示無限制
setsession=[], # 開始會話前執行的指令清單。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8'
)
"""
class SQLHelper(object):
@staticmethod
def fetch_one(sql,args):
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchone()
conn.close()
return result
@staticmethod
def fetch_all(self,sql,args):
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchone()
conn.close()
return result
# 調用方式:
result = SQLHelper.fetch_one('select * from xxx',[])
print(result)
"""
"""
#第二種:
class SQLHelper(object):
def __init__(self):
self.conn = POOL.connection()
self.cursor = self.conn.cursor()
def close(self):
self.cursor.close()
self.conn.close()
def fetch_one(self,sql, args):
self.cursor.execute(sql, args)
result = self.cursor.fetchone()
self.close()
return result
def fetch_all(self, sql, args):
self.cursor.execute(sql, args)
result = self.cursor.fetchall()
self.close()
return result
obj = SQLHelper()
obj.fetch_one()
"""
希望本文所述對大家基于Flask架構的Python程式設計有所幫助。