天天看点

基于DBUtils.PooledDB 实现Python中多线程写入读取数据库 提高执行速度

 python多线程并发操作数据库,会存在链接数据库超时、数据库连接丢失、数据库操作超时等问题

解决方法:使用数据库连接池,并且每次操作都从数据库连接池获取数据库操作句柄,操作完关闭连接返回数据库连接池

如果不使用数据库连接池, 直接使用多线程去操作数据库, 会遇到资源竞争, 争夺cursor游标, Thread对象的Lock和Rlock可以实现简单的线程同步,对cursor进行加锁,但是这个行为,反其道而行。 经测试加锁还不如不用多线程,而且执行速度会比正常慢

用于创建数据库连接池, 具体可以参考一下代码

import pymysql
from DBUtils.PooledDB import PooledDB
import traceback
from threading import Thread


# 基础类
class Base:
    """
    用于连接和关闭
    """

    def __init__(self):
        self.pool = self.create_pool()

    def create_pool(self):
        """
        创建数据库连接池
        :return: 连接池
        """
        pool = PooledDB(creator=pymysql,
                        maxconnections=0,  # 连接池允许的最大连接数,0和None表示不限制连接数
                        mincached=4,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
                        maxcached=0,  # 链接池中最多闲置的链接,0和None不限制
                        maxusage=1,  # 一个链接最多被重复使用的次数,None表示无限制
                        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                        host='127.0.0.1',  # 此处必须是是127.0.0.1
                        port=3306,
                        user='root',
                        passwd='123456',
                        db='localhost',
                        use_unicode=True,
                        charset='utf8')
        return pool

    def save_mysql(self, sql, args):
        """
        保存数据库
        :param sql: 执行sql语句
        :param args: 添加的sql语句的参数 list[tuple]
        """
        try:
            db = self.pool.connection()  # 连接数据池
            cursor = db.cursor()  # 获取游标
            cursor.executemany(sql, args)
            db.commit()
            cursor.close()
            db.close()
        except:
            traceback.print_exc()

    # 插入数据
    def insertdata(self, insert, data):
        args = [(id, name) for id, name in data if id and name]
        self.save_mysql(insert, args)      
  def save_mysql(self, sql, args): 关键就在于这个方法,创建出连接池, 此时多线程每次写入需要连接池子,要从连接池中拿出连接,获取游标进行操作
@util.fn_timer
def find_all_done(self):
    # 运行时间 0:00:06.695223  运行时间 0:00:10.402396
    insert1 = "INSERT INTO XXX(_id, title) VALUES (%s, %s)"
    # 执行的插入方法
    self.insertdata(insert=insert1, data=result_all_chinese)
    insert2 = "INSERT INTO XXX(_id, title) VALUES (%s, %s)"
    # 执行的插入方法
    self.insertdata(insert=insert2, data=result_non_chinese)
    
    # 执行多线程  运行时间 0:00:01.593220  运行时间 0:00:03.004854
    try:
        insert1 = "INSERT INTO xxx(_id, title) VALUES (%s, %s)"
        insert2 = "INSERT INTO xxx(_id, title) VALUES (%s, %s)"
        
        # target参数是线程运行方法名
        t1 = Thread(target=self.insertdata, args=(insert1, result_all_chinese))
        t2 = Thread(target=self.insertdata, args=(insert2, result_non_chinese))
        t1.start()
        t2.start()
    except:
        traceback.print_exc()