1.初始化
def __init__(self, **kwargs):
self.size = kwargs.get('size', 10)
self.kwargs = kwargs
self.conn_queue = queue.Queue(maxsize=self.size)
for i in range(self.size):
self.conn_queue.put(self._create_new_conn())
size:連接配接池支援的連接配接數,這裡定義為10
conn_queue:定義了一個隊列,隊列存放的是資料庫的連接配接
for循環:建立好十個與資料庫的連接配接,把這些連接配接放到隊列中
2.私有方法:連接配接資料庫
def _create_new_conn(self):
return MySQLdb.connect(host=self.kwargs.get('host', '127.0.0.1'),
user=self.kwargs.get('user'),
passwd=self.kwargs.get('password'),
port=self.kwargs.get('port', 3306),
connect_timeout=5)
connect_timeout:在擷取連接配接階段起作用
擷取MySQL連接配接是多次握手的結果,除了使用者名和密碼的比對校驗外,還有IP->HOST->DNS->IP驗證,任何一步都可能因為網絡問題導緻線程阻塞。為了防止線程浪費在不必要的校驗等待上,超過connect_timeout的連接配接請求将會被拒絕。
官方描述:connect_timeout(The number of seconds that the mysqld server waits for a connect packet before responding with Bad handshake. The default value is 10 seconds)
這裡設定了5秒。
3.私有方法:put
def _put_conn(self, conn):
self.conn_queue.put(conn)
使用了queue的put方法,将連接配接放到隊列中,put預設是阻塞調用,非阻塞版本為put_nowait()方法,相當于put(conn, False)
阻塞調用是指:調用傳回結果之前,目前線程會被挂起,線程進入非可執行狀态,在這個狀态下CPU不會給線程配置設定時間片,線程暫停運作。函數隻有在得到結果之後才會傳回。
4.私有方法:擷取連接配接
def _get_conn(self):
conn = self.conn_queue.get()
if conn is None:
self._create_new_conn()
return conn
當要連接配接資料庫時,不再建立新的連接配接,而是之間從隊列中擷取連接配接,用完之後再把連接配接放回隊列中,如果擷取的連接配接為空,再建立連接配接。
get():如果隊列為空,get會等待,直到隊列裡有資料以後再取值,get取值會在隊列中移除一個資料,是以當取完連接配接用完之後,要再使用put方法把連接配接放回連接配接池。(get預設為阻塞調用,非阻塞調用方法為get_nowait())
get_nowait():取值的時候不等待,如果取不到值,程式直接崩潰,是以在擷取隊列的資料的時候要統一使用get,代碼才不會有問題
使用get_nowait()和put_nowait()的時候要做捕獲異常處理。
5.執行SQL語句函數
def exec_sql(self, sql):
conn = self._get_conn()
try:
with conn as cur:
cur.execute(sql)
return cur.fetchall()
except MySQLdb.ProgrammingError as e:
LOG.error("execute sql ({0}) error {1}".format(sql, e))
raise e
except MySQLdb.OperationalError as e:
conn = self._create_new_conn()
raise e
finally:
self._put_conn(conn)
傳遞進來的參數是SQL語句
擷取連接配接後,一般要建立遊标,然後使用遊标再進行對資料的增删改查,這裡使用了with語句,就不用再建立遊标了。cur執行SQL語句,然後使用fetchall傳回所有比對的每個元素,每個元素作為一個元組組成一個大元組,最後傳回的是這一個大元組。
然後捕獲了兩個異常,一個是文法錯誤異常,另一個是編碼問題的異常。
最後無論是正常建立連接配接執行了SQL語句,還是發生了異常,都要把連接配接放回到連接配接隊列中去。
6.删除連接配接
def __del__(self):
try:
while True:
conn = self.conn_queue.get_nowait()
if conn:
conn.close()
except queue.Empty:
pass
擷取到連接配接之後用close關閉連接配接,如果取到的隊列中的連接配接已經為空了,直接pass
其中queue.Empty的作用是,如果隊列為空,傳回True,如果不為空,傳回False。
7.完整代碼
# -*- coding:UTF-8 -*-
import queue
import MySQLdb
class ConnectionPool(object):
def __init__(self, **kwargs):
self.size = kwargs.get('size', 10)
self.kwargs = kwargs
self.conn_queue = queue.Queue(maxsize=self.size)
for i in range(self.size):
self.conn_queue.put(self._create_new_conn())
def _create_new_conn(self):
return MySQLdb.connect(host=self.kwargs.get('host', '127.0.0.1'),
user=self.kwargs.get('user'),
passwd=self.kwargs.get('password'),
port=self.kwargs.get('port', 3306),
connect_timeout=5)
def _put_conn(self, conn):
self.conn_queue.put(conn)
def _get_conn(self):
conn = self.conn_queue.get()
if conn is None:
self._create_new_conn()
return conn
def exec_sql(self, sql):
conn = self._get_conn()
try:
with conn as cur:
cur.execute(sql)
return cur.fetchall()
except MySQLdb.ProgrammingError as e:
# 可以加一行将異常記錄到日志中
raise e
except MySQLdb.OperationalError as e:
conn = self._create_new_conn()
raise e
finally:
self._put_conn(conn)
def __del__(self):
try:
while True:
conn = self.conn_queue.get_nowait()
if conn:
conn.close()
except queue.Empty:
pass