需求:
小程式背景用的sqllite資料庫,剛開始用的時候,沒有考慮多線程,而且當時因為資料量少,沒有出現過多線程查詢報錯,現在資料量大了。多線程查詢經常報錯
ProgrammingError: Recursive use of cursors not allowed.
就是這個頭疼的錯。在網上查了大量的資料,要麼就是加lock=threading.lock(),要麼就是加sleep.終究還是解決不了問題。
剛好最近在網上看了一個小哥哥用Queue來解決這個問題。我改進了一下。目前能夠使用該方法進行增删改查。查詢出來的結果以字典的形式傳回。
話不多說,下面上代碼
代碼
# -*- coding: UTF-8 -*-
import sqlite3
import time
from Queue import Queue
from threading import Thread
def sqllite_escape(key_word):
key_word = key_word.encode("utf-8")
key_word = key_word.replace("'", "''")
return key_word
class SelectConnect(object):
'''
隻能用來查詢
'''
def __init__(self):
# isolation_level=None為智能送出模式,不需要commit
self.conn = sqlite3.connect('resource/data.ta', check_same_thread=False, isolation_level=None)
self.conn.execute('PRAGMA journal_mode = WAL')
cursor = self.conn.cursor()
cursor.execute('PRAGMA synchronous=OFF')
self.conn.text_factory = str
# 把結果用元祖的形式取出來
self.curosr = self.conn.cursor()
self.conn.row_factory = self.dict_factory
# 把結果用字典的形式取出來
self.curosr_diction = self.conn.cursor()
def commit(self):
self.conn.commit()
def dict_factory(self, cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def close_db(self):
# self.curosr.close()
self.conn.close()
class SqliteMultithread(Thread):
"""
Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
This is done by internally queueing the requests and processing them sequentially
in a separate thread (in the same order they arrived).
"""
def __init__(self, filename, autocommit, journal_mode):
super(SqliteMultithread, self).__init__()
self.filename = filename
self.autocommit = autocommit
self.journal_mode = journal_mode
self.reqs = Queue() # use request queue of unlimited size
self.setDaemon(True) # python2.5-compatible
self.running = True
self.start()
def dict_factory(self, cursor, row):
# field = [i[0] for i in cursor.description]
# value = [dict(zip(field, i)) for i in records]
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def run(self):
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
cursor.execute('PRAGMA synchronous=OFF')
conn.row_factory = self.dict_factory
curosr_diction = conn.cursor()
curosr_diction.execute('PRAGMA synchronous=OFF')
# 把結果用字典的形式取出來
while self.running:
req, arg, res = self.reqs.get()
if req == '--close--':
break
elif req == '--commit--':
conn.commit()
else:
# print(arg)
curosr_diction.execute(req, arg)
# if res:
# for rec in cursor:
# res.put(rec)
# res.put('--no more--')
if res:
res.put(curosr_diction.fetchall())
if self.autocommit:
conn.commit()
conn.close()
def execute(self, req, arg=None, res=None):
"""
`execute` calls are non-blocking: just queue up the request and return immediately.
"""
self.reqs.put((req, arg or tuple(), res))
def executemany(self, req, items):
for item in items:
self.execute(req, item)
def select_all_dict(self, req, arg=None):
'''
直接傳回一個list
:param req:
:param arg:
:return:
'''
res = Queue() # results of the select will appear as items in this queue
self.execute(req, arg, res)
rec = res.get()
return rec
def select_one_dict(self, req, arg=None):
'''
直接傳回list裡的第一個元素,并且以字典展示
:param req:
:param arg:
:return:
'''
res = Queue() # results of the select will appear as items in this queue
self.execute(req, arg, res)
rec = res.get()
if len(rec) != 0:
rec = rec[0]
else:
rec = None
return rec
def commit(self):
self.execute('--commit--')
def close(self):
self.execute('--close--')
class Cursor(object):
'''
以元祖的形式查詢出資料
'''
def __init__(self):
old_con = SelectConnect()
self.conn = old_con.conn
self.curosr = old_con.curosr
self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
def execute(self, string, *args):
try:
if string.startswith('select'):
return self.curosr.execute(string, *args)
else:
return self.curosr2.execute(string, *args)
except Exception:
print("失敗一次")
print(string)
time.sleep(0.1)
self.execute(string, *args)
def executescript(self, string):
try:
self.curosr.executescript(string)
except Exception:
print("失敗一次")
print(string)
time.sleep(0.1)
self.executescript(string)
def fetchall(self):
return self.curosr.fetchall()
def fetchone(self):
return self.curosr.fetchone()
def rowcount(self):
return self.curosr.rowcount
def close(self):
self.curosr2.running = False
self.curosr.close()
self.conn.close()
class Curosrdiction(object):
'''
以字典的形式查詢出資料,建議全部用這種。
'''
def __init__(self):
old_con = SelectConnect()
self.conn = old_con.conn
self.curosrdiction = old_con.curosr_diction
self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
def execute(self, string, *args):
try:
if string.startswith('select'):
return self.curosrdiction.execute(string, *args)
else:
return self.curosr2.execute(string, *args)
except Exception:
print("失敗一次")
print(string)
time.sleep(0.1)
self.execute(string, *args)
def executescript(self, string):
result = True
try:
self.curosrdiction.executescript(string)
except Exception:
print("失敗一次")
# print(string)
time.sleep(0.1)
# self.executescript(string)
result = False
return result
def fetchall(self):
return self.curosrdiction.fetchall()
def fetchone(self):
return self.curosrdiction.fetchone()
def rowcount(self):
return self.curosrdiction.rowcount
def select_all_dict(self, string, *args):
return self.curosr2.select_all_dict(string, *args)
def select_one_dict(self, string, *args):
return self.curosr2.select_one_dict(string, *args)
def close(self):
self.curosr2.running = False
self.curosrdiction.close()
self.conn.close()
def commit(self):
self.conn.commit()
self.curosr2.commit()
# curosr = Cursor()
curosr_diction = Curosrdiction()
def commit():
curosr_diction.commit()
def close_db():
# curosr.close()
curosr_diction.close()