天天看點

Python采用并發查詢mysql以及調用API灌資料 (八)- 異步并發加鎖,保證資料安全...

Python采用并發查詢mysql以及調用API灌資料 (八)- 異步并發加鎖,保證資料安全...
前情回顧

上一篇文章已經編寫了異步并發API請求灌資料,那麼本章節我們來繼續編寫異步并發加鎖,保證資料安全

實戰任務

本次因為服務架構重構,表優化、重構,帶來的任務就是需要從原來的mysql資料庫中,讀取原表資料(部分存在多張關聯查詢)然後通過調用API的服務方式灌入新的資料庫表中(包含mysql、mongodb)。

執行流程如下
Python采用并發查詢mysql以及調用API灌資料 (八)- 異步并發加鎖,保證資料安全...

那麼根據流程所需要的功能,需要以下的執行個體進行支撐:

1.并發執行個體

2.查詢資料執行個體

3.執行post請求執行個體

目标:循環查詢處理并發資料,并且加鎖保證資料安全

給查詢資料表添加​

​is_import​

​​字段,在mysql表中添加查詢辨別,插入成功則為​

​1​

​​,無插入則為​

​0​

Python采用并發查詢mysql以及調用API灌資料 (八)- 異步并發加鎖,保證資料安全...

然後初始化 ​

​is_import = 0​

​ 即可,下面來給我們之前的model方法的查詢中添加條件查詢。

編寫model類中selectTable方法,增加條件查詢
Python采用并發查詢mysql以及調用API灌資料 (八)- 異步并發加鎖,保證資料安全...
# 根據設定的舊表字段,查詢舊庫的資料庫資料
    def selectTable(self,DB_NAME,TABLE_NAME,fields,order,cond_dict=''):
        # 選擇資料庫
        self.mydb.selectDataBase(DB_NAME)
        # 資料查詢
        result = self.mydb.select(TABLE_NAME, fields=fields,order=order,cond_dict=cond_dict)
        # 關閉連接配接
        self.mydb.close()
        # 傳回查詢的資料
        return result      
增加條件查詢cond_dict字典,測試使用。
Python采用并發查詢mysql以及調用API灌資料 (八)- 異步并發加鎖,保證資料安全...
Python采用并發查詢mysql以及調用API灌資料 (八)- 異步并發加鎖,保證資料安全...

測試成功之後,就要在model方法中增加一個更新​

​is_import​

​為1的方法了。

在model類中增加更新​

​is_import​

​為1的方法

有些時候,因為傳入的可能字段名不是​

​is_import​

​​,可能是​

​is_import_xxx​

​。那麼就要根據傳入的字典擷取字段名稱了。

# 更新is_import字段為1的方法
    def updateIsImport(self, TABLE_NAME, attrs_dict, cond_dict):
        """更新資料

                    args:
                        tablename  :表名字
                        attrs_dict  :更新屬性鍵值對字典
                        cond_dict  :更新條件字典

                    example:
                        params = {"name" : "caixinglong", "age" : "38"}
                        cond_dict = {"name" : "liuqiao", "age" : "18"}
                        mydb.update(table, params, cond_dict)

        """
        # 選擇資料庫
        result = self.mydb.update(TABLE_NAME, attrs_dict=attrs_dict, cond_dict=cond_dict)
        return result      

寫好了,更新字段的方法之後,下面我們在API請求成功之後進行使用。

在消費者方法中引用更新方法
Python采用并發查詢mysql以及調用API灌資料 (八)- 異步并發加鎖,保證資料安全...

此時消費者已經在上一個篇章中寫了異步并發的方法,但是這樣調用的話,會導緻mysql更新的時候報錯。

為了保證資料安全,我隻能降低效率,增加鎖了。

首先先看一個線程加鎖的僞代碼
#-* coding: utf-8 -*
import threading
import time
import os

def func1(k):
    global lock
    while True:
        lock.acquire()  # 開始鎖程序
         .... 執行任務 ...
        lock.release()  # 釋放程序鎖

if __name__=='__main__':
    
    # 初始化程序鎖
    lock = threading.Lock()
    
    # 使用4個CPU開啟程序并發
    for k in range(4):
        new_thread = threading.Thread(target=func1,args=(k,))  # 開啟一個程序調用func1,并且傳入參數k
        new_thread.start()      

從示例代碼可以看出,程序鎖的基本使用方法。下面我們來使用一下程序鎖來保證資料安全。

使用程序鎖
result_row = []
    lock = threading.Lock()  # 初始化程序鎖
    for row in select_result:
        lock.acquire()  # 開啟程序鎖
        consume(row, url, model,lock=lock) # 消費請求API      
Python采用并發查詢mysql以及調用API灌資料 (八)- 異步并發加鎖,保證資料安全...

初步代碼基本就寫到這裡了。後面肯定有很多需要優化的地方。

例如:

1、使用查詢分頁再開啟線程并發處理。

2、拆分生産者與消費者,加入rabbitmq等中間件來對付異常處理