
前情回顧
上一篇文章已經編寫了異步并發API請求灌資料,那麼本章節我們來繼續編寫異步并發加鎖,保證資料安全
實戰任務
本次因為服務架構重構,表優化、重構,帶來的任務就是需要從原來的mysql資料庫中,讀取原表資料(部分存在多張關聯查詢)然後通過調用API的服務方式灌入新的資料庫表中(包含mysql、mongodb)。
執行流程如下
那麼根據流程所需要的功能,需要以下的執行個體進行支撐:
1.并發執行個體
2.查詢資料執行個體
3.執行post請求執行個體
目标:循環查詢處理并發資料,并且加鎖保證資料安全
給查詢資料表添加字段,在mysql表中添加查詢辨別,插入成功則為
is_import
,無插入則為
1
0
然後初始化
is_import = 0
即可,下面來給我們之前的model方法的查詢中添加條件查詢。
編寫model類中selectTable方法,增加條件查詢
# 根據設定的舊表字段,查詢舊庫的資料庫資料
def selectTable(self,DB_NAME,TABLE_NAME,fields,order,cond_dict=''):
# 選擇資料庫
self.mydb.selectDataBase(DB_NAME)
# 資料查詢
result = self.mydb.select(TABLE_NAME, fields=fields,order=order,cond_dict=cond_dict)
# 關閉連接配接
self.mydb.close()
# 傳回查詢的資料
return result
增加條件查詢cond_dict字典,測試使用。
測試成功之後,就要在model方法中增加一個更新
is_import
為1的方法了。
在model類中增加更新 is_import
為1的方法
有些時候,因為傳入的可能字段名不是
is_import
,可能是
is_import_xxx
。那麼就要根據傳入的字典擷取字段名稱了。
# 更新is_import字段為1的方法
def updateIsImport(self, TABLE_NAME, attrs_dict, cond_dict):
"""更新資料
args:
tablename :表名字
attrs_dict :更新屬性鍵值對字典
cond_dict :更新條件字典
example:
params = {"name" : "caixinglong", "age" : "38"}
cond_dict = {"name" : "liuqiao", "age" : "18"}
mydb.update(table, params, cond_dict)
"""
# 選擇資料庫
result = self.mydb.update(TABLE_NAME, attrs_dict=attrs_dict, cond_dict=cond_dict)
return result
寫好了,更新字段的方法之後,下面我們在API請求成功之後進行使用。
在消費者方法中引用更新方法
此時消費者已經在上一個篇章中寫了異步并發的方法,但是這樣調用的話,會導緻mysql更新的時候報錯。
為了保證資料安全,我隻能降低效率,增加鎖了。
首先先看一個線程加鎖的僞代碼
#-* coding: utf-8 -*
import threading
import time
import os
def func1(k):
global lock
while True:
lock.acquire() # 開始鎖程序
.... 執行任務 ...
lock.release() # 釋放程序鎖
if __name__=='__main__':
# 初始化程序鎖
lock = threading.Lock()
# 使用4個CPU開啟程序并發
for k in range(4):
new_thread = threading.Thread(target=func1,args=(k,)) # 開啟一個程序調用func1,并且傳入參數k
new_thread.start()
從示例代碼可以看出,程序鎖的基本使用方法。下面我們來使用一下程序鎖來保證資料安全。
使用程序鎖
result_row = []
lock = threading.Lock() # 初始化程序鎖
for row in select_result:
lock.acquire() # 開啟程序鎖
consume(row, url, model,lock=lock) # 消費請求API
初步代碼基本就寫到這裡了。後面肯定有很多需要優化的地方。
例如:
1、使用查詢分頁再開啟線程并發處理。
2、拆分生産者與消費者,加入rabbitmq等中間件來對付異常處理