天天看點

python調用libs.dbutil_Flask架構使用DBUtils子產品連接配接資料庫操作示例

本文執行個體講述了Flask架構使用DBUtils子產品連接配接資料庫的操作方法。分享給大家供大家參考,具體如下:

Flask連接配接資料庫

資料庫連接配接池:

Django使用:django ORM(pymysql/MySqldb)

Flask/其他使用:

-原生SQL

-pymysql(支援python2/3)

-MySqldb(支援python2)

-SQLAchemy(ORM)

原生SQL

需要解決的問題:

-不能為每個使用者建立一個連接配接

-建立一定數量的連接配接池,如果有人來

使用DBUtils子產品

兩種使用模式:

1 為每個線程建立一個連接配接,連接配接不可控,需要控制線程數

2 建立指定數量的連接配接在連接配接池,當線程通路的時候去取,如果不夠了線程排隊,直到有人釋放。平時建議使用這種!!!

模式一:

import pymysql

from DBUtils.PersistentDB import PersistentDB

POOL = PersistentDB(

creator=pymysql, # 使用連結資料庫的子產品

maxusage=None, # 一個連結最多被重複使用的次數,None表示無限制

setsession=[], # 開始會話前執行的指令清單。如:["set datestyle to ...", "set time zone ..."]

ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always

closeable=False,

# 建議為False,如果為False時, conn.close() 實際上被忽略,供下次使用,再線程關閉時,才會自動關閉連結。如果為True時, conn.close()則關閉連結,那麼再次調用pool.connection時就會報錯,因為已經真的關閉了連接配接(pool.steady_connection()可以擷取一個新的連結)

threadlocal=None, # 本線程獨享值得對象,用于儲存連結對象,如果連結對象被重置

host='127.0.0.1',

port=3306,

user='root',

password='123',

database='pooldb',

charset='utf8'

)

def func():

conn = POOL.connection(shareable=False)

cursor = conn.cursor()

cursor.execute('select * from tb1')

result = cursor.fetchall()

cursor.close()

conn.close()

func()

模式二(推薦):

import time

import pymysql

import threading

from DBUtils.PooledDB import PooledDB, SharedDBConnection

POOL = PooledDB(

creator=pymysql, # 使用連結資料庫的子產品

maxconnections=6, # 連接配接池允許的最大連接配接數,0和None表示不限制連接配接數

mincached=2, # 初始化時,連結池中至少建立的空閑的連結,0表示不建立

maxcached=5, # 連結池中最多閑置的連結,0和None不限制

maxshared=3, # 連結池中最多共享的連結數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等子產品的 threadsafety都為1,所有值無論設定為多少,_maxcached永遠為0,是以永遠是所有連結都共享。

blocking=True, # 連接配接池中如果沒有可用連接配接後,是否阻塞等待。True,等待;False,不等待然後報錯

maxusage=None, # 一個連結最多被重複使用的次數,None表示無限制

setsession=[], # 開始會話前執行的指令清單。如:["set datestyle to ...", "set time zone ..."]

ping=0,

# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always

host='127.0.0.1',

port=3306,

user='root',

password='123',

database='pooldb',

charset='utf8'

)

def func():

# 檢測目前正在運作連接配接數的是否小于最大連結數,如果不小于則:等待或報raise TooManyConnections異常

# 否則

# 則優先去初始化時建立的連結中擷取連結 SteadyDBConnection。

# 然後将SteadyDBConnection對象封裝到PooledDedicatedDBConnection中并傳回。

# 如果最開始建立的連結沒有連結,則去建立一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中并傳回。

# 一旦關閉連結後,連接配接就傳回到連接配接池讓後續線程繼續使用。

conn = POOL.connection()

# print(th, '連結被拿走了', conn1._con)

# print(th, '池子裡目前有', pool._idle_cache, '\r\n')

cursor = conn.cursor()

cursor.execute('select * from tb1')

result = cursor.fetchall()

conn.close()

func()

具體寫法:

通過導入的方式

app.py

from flask import Flask

from db_helper import SQLHelper

app = Flask(__name__)

@app.route("/")

def hello():

result = SQLHelper.fetch_one('select * from xxx',[])

print(result)

return "Hello World"

if __name__ == '__main__':

app.run()

DBUTILs

以下為兩種寫法:

第一種是用靜态方法裝飾器,通過直接執行類的方法來連接配接使用資料庫

第二種是通過執行個體化對象,通過對象來調用方法執行語句

建議使用第一種,更友善,第一種還可以在修改優化為,将一些公共語句在摘出來使用。

import time

import pymysql

from DBUtils.PooledDB import PooledDB

POOL = PooledDB(

creator=pymysql, # 使用連結資料庫的子產品

maxconnections=6, # 連接配接池允許的最大連接配接數,0和None表示不限制連接配接數

mincached=2, # 初始化時,連結池中至少建立的空閑的連結,0表示不建立

maxcached=5, # 連結池中最多閑置的連結,0和None不限制

maxshared=3, # 連結池中最多共享的連結數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等子產品的 threadsafety都為1,所有值無論設定為多少,_maxcached永遠為0,是以永遠是所有連結都共享。

blocking=True, # 連接配接池中如果沒有可用連接配接後,是否阻塞等待。True,等待;False,不等待然後報錯

maxusage=None, # 一個連結最多被重複使用的次數,None表示無限制

setsession=[], # 開始會話前執行的指令清單。如:["set datestyle to ...", "set time zone ..."]

ping=0,

# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always

host='127.0.0.1',

port=3306,

user='root',

password='123',

database='pooldb',

charset='utf8'

)

"""

class SQLHelper(object):

@staticmethod

def fetch_one(sql,args):

conn = POOL.connection()

cursor = conn.cursor()

cursor.execute(sql, args)

result = cursor.fetchone()

conn.close()

return result

@staticmethod

def fetch_all(self,sql,args):

conn = POOL.connection()

cursor = conn.cursor()

cursor.execute(sql, args)

result = cursor.fetchone()

conn.close()

return result

# 調用方式:

result = SQLHelper.fetch_one('select * from xxx',[])

print(result)

"""

"""

#第二種:

class SQLHelper(object):

def __init__(self):

self.conn = POOL.connection()

self.cursor = self.conn.cursor()

def close(self):

self.cursor.close()

self.conn.close()

def fetch_one(self,sql, args):

self.cursor.execute(sql, args)

result = self.cursor.fetchone()

self.close()

return result

def fetch_all(self, sql, args):

self.cursor.execute(sql, args)

result = self.cursor.fetchall()

self.close()

return result

obj = SQLHelper()

obj.fetch_one()

"""

希望本文所述對大家基于Flask架構的Python程式設計有所幫助。