天天看點

python資料庫連接配接池_Python實作資料庫連接配接池

1.初始化

def __init__(self, **kwargs):

self.size = kwargs.get('size', 10)

self.kwargs = kwargs

self.conn_queue = queue.Queue(maxsize=self.size)

for i in range(self.size):

self.conn_queue.put(self._create_new_conn())

size:連接配接池支援的連接配接數,這裡定義為10

conn_queue:定義了一個隊列,隊列存放的是資料庫的連接配接

for循環:建立好十個與資料庫的連接配接,把這些連接配接放到隊列中

2.私有方法:連接配接資料庫

def _create_new_conn(self):

return MySQLdb.connect(host=self.kwargs.get('host', '127.0.0.1'),

user=self.kwargs.get('user'),

passwd=self.kwargs.get('password'),

port=self.kwargs.get('port', 3306),

connect_timeout=5)

connect_timeout:在擷取連接配接階段起作用

擷取MySQL連接配接是多次握手的結果,除了使用者名和密碼的比對校驗外,還有IP->HOST->DNS->IP驗證,任何一步都可能因為網絡問題導緻線程阻塞。為了防止線程浪費在不必要的校驗等待上,超過connect_timeout的連接配接請求将會被拒絕。

官方描述:connect_timeout(The number of seconds that the mysqld server waits for a connect packet before responding with Bad handshake. The default value is 10 seconds)

這裡設定了5秒。

3.私有方法:put

def _put_conn(self, conn):

self.conn_queue.put(conn)

使用了queue的put方法,将連接配接放到隊列中,put預設是阻塞調用,非阻塞版本為put_nowait()方法,相當于put(conn, False)

阻塞調用是指:調用傳回結果之前,目前線程會被挂起,線程進入非可執行狀态,在這個狀态下CPU不會給線程配置設定時間片,線程暫停運作。函數隻有在得到結果之後才會傳回。

4.私有方法:擷取連接配接

def _get_conn(self):

conn = self.conn_queue.get()

if conn is None:

self._create_new_conn()

return conn

當要連接配接資料庫時,不再建立新的連接配接,而是之間從隊列中擷取連接配接,用完之後再把連接配接放回隊列中,如果擷取的連接配接為空,再建立連接配接。

get():如果隊列為空,get會等待,直到隊列裡有資料以後再取值,get取值會在隊列中移除一個資料,是以當取完連接配接用完之後,要再使用put方法把連接配接放回連接配接池。(get預設為阻塞調用,非阻塞調用方法為get_nowait())

get_nowait():取值的時候不等待,如果取不到值,程式直接崩潰,是以在擷取隊列的資料的時候要統一使用get,代碼才不會有問題

使用get_nowait()和put_nowait()的時候要做捕獲異常處理。

5.執行SQL語句函數

def exec_sql(self, sql):

conn = self._get_conn()

try:

with conn as cur:

cur.execute(sql)

return cur.fetchall()

except MySQLdb.ProgrammingError as e:

LOG.error("execute sql ({0}) error {1}".format(sql, e))

raise e

except MySQLdb.OperationalError as e:

conn = self._create_new_conn()

raise e

finally:

self._put_conn(conn)

傳遞進來的參數是SQL語句

擷取連接配接後,一般要建立遊标,然後使用遊标再進行對資料的增删改查,這裡使用了with語句,就不用再建立遊标了。cur執行SQL語句,然後使用fetchall傳回所有比對的每個元素,每個元素作為一個元組組成一個大元組,最後傳回的是這一個大元組。

然後捕獲了兩個異常,一個是文法錯誤異常,另一個是編碼問題的異常。

最後無論是正常建立連接配接執行了SQL語句,還是發生了異常,都要把連接配接放回到連接配接隊列中去。

6.删除連接配接

def __del__(self):

try:

while True:

conn = self.conn_queue.get_nowait()

if conn:

conn.close()

except queue.Empty:

pass

擷取到連接配接之後用close關閉連接配接,如果取到的隊列中的連接配接已經為空了,直接pass

其中queue.Empty的作用是,如果隊列為空,傳回True,如果不為空,傳回False。

7.完整代碼

# -*- coding:UTF-8 -*-

import queue

import MySQLdb

class ConnectionPool(object):

def __init__(self, **kwargs):

self.size = kwargs.get('size', 10)

self.kwargs = kwargs

self.conn_queue = queue.Queue(maxsize=self.size)

for i in range(self.size):

self.conn_queue.put(self._create_new_conn())

def _create_new_conn(self):

return MySQLdb.connect(host=self.kwargs.get('host', '127.0.0.1'),

user=self.kwargs.get('user'),

passwd=self.kwargs.get('password'),

port=self.kwargs.get('port', 3306),

connect_timeout=5)

def _put_conn(self, conn):

self.conn_queue.put(conn)

def _get_conn(self):

conn = self.conn_queue.get()

if conn is None:

self._create_new_conn()

return conn

def exec_sql(self, sql):

conn = self._get_conn()

try:

with conn as cur:

cur.execute(sql)

return cur.fetchall()

except MySQLdb.ProgrammingError as e:

# 可以加一行将異常記錄到日志中

raise e

except MySQLdb.OperationalError as e:

conn = self._create_new_conn()

raise e

finally:

self._put_conn(conn)

def __del__(self):

try:

while True:

conn = self.conn_queue.get_nowait()

if conn:

conn.close()

except queue.Empty:

pass