這個項目是通過pywinauto控制windows(win10)上的微信PC用戶端來實作公衆号文章的抓取。代碼分成server和client兩部分。server接收client抓取的微信公衆号文章,并且儲存到資料庫。另外server支援簡單的搜尋和導出功能。client通過pywinauto實作微信公衆号文章的抓取。
一、項目位址
https://github.com/fancyerii/wechat-gongzhonghao-crawler
二、pywinauto簡介
pywinauto是一個python的工具,可以用于控制Windows的GUI程式。詳細的文檔可以參考這裡。
三、WechatAutomator類
自動化微信的代碼封裝在了類WechatAutomator裡,完整的代碼可以參考這裡。這裡簡要的介紹一下其中的主要方法:
3.1init_window
這個方法完成類的初始化,它的代碼為:
def init_window(self, exe_path=r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe",
turn_page_interval=3,
click_url_interval=1,
win_width=1000,
win_height=600):
app = Application(backend="uia").connect(path=exe_path)
self.main_win = app.window(title=u"微信", class_name="WeChatMainWndForPC")
self.main_win.set_focus()
self.app = app
self.visible_top = 70
self.turn_page_interval = turn_page_interval
self.click_url_interval = click_url_interval
self.browser = None
self.win_width = win_width
self.win_height = win_height
# 為了讓移動視窗,同時使用非uia的backend,這是pywinauto的uia的一個bug
self.app2 = Application().connect(path=exe_path)
self.move_window()
我們首先來看函數的參數:
- exe_path
- 微信程式的位址
- turn_page_interval
- 抓取翻頁時的時間間隔,預設3s
- click_url_interval
- 在抓取一頁的url時的間隔,預設1s
- win_width
- 設定視窗的寬度
- win_height
- 設定視窗的高度,如果顯示器的分辨率較大,可以設定的更加高一些,進而一頁包含的文章數更多一些,進而翻頁少一點。注意:一定要保證視窗完全可見,也就是說win_height不能大于實際分辨率的高度!
這個函數的主要功能是建構Application對象進而通過pywinauto實作控制,這裡使用的是uia的backend,然後設定視窗的大小并且把視窗移到最左上角。因為根據so文章,pywinauto 0.6.8存在bug,隻能通過win32的backend來移到視窗,是以構造了self.app2然後調用move_window()函數把視窗移到最左上角。
3.2crawl_gongzhonghao
這個函數實作了某個公衆号的文章抓取。它的基本控制邏輯如下:
- 首先通過搜尋框根據名字搜尋公衆号并且點選它。
- 對于目前頁點選所有的連結并且下載下傳其内容。
- 使用PAGE_DOWN鍵往下翻頁
- 需要判斷是否繼續抓取
第一個是通過locate_user函數實作,後面會介紹。第二個是通過process_page函數實作,後面也會介紹。判斷是否繼續抓取的邏輯為:
- 如果翻頁超過max_pages,則停止抓取
- 如果碰到某個url曾經抓取過,那說明之前的文章都已經抓取過了,則停止抓取
- 如果lastest_date不是None并且一篇文章的釋出日期早于它,則停止抓取
是以我們通常會在第一次抓取的時候把max_pages設定的很大(比如100),然後通過latest_date來抓到指定的日期。而之後的抓取則設定max_pages為較小的值(比如預設的6),這樣隻要爬蟲在兩次抓取之間公衆号的更新不超過6頁,那麼就不會漏掉文章。具體的邏輯可以參考main.py,它會把抓取的文章通過http請求發給Server,并且每次抓取的時候從Server查詢抓取過的文章存放到states這個list裡states[i][“url”]就存儲了第i篇文章的url。
def crawl_gongzhonghao(self, account_name, articles, states, detail,
max_pages=6, latest_date=None, no_item_retry=3):
logger.debug(account_name)
if not self.locate_user(account_name):
return False
last_visited_titles = set()
visited_urls = set()
self.turn_page_up(min(20, max_pages * 2))
pagedown_retry = 0
last_visited_titles = []
for page in range(0, max_pages):
items = []
last_visited_titles = self.process_page(account_name, items, last_visited_titles, states, visited_urls, detail)
articles.extend(items)
if len(items) == 0:
pagedown_retry += 1
if pagedown_retry >= no_item_retry:
s = "break because of retry {}".format(pagedown_retry)
logger.debug(s)
WechatAutomator.add_to_detail(s, detail)
break
else:
pagedown_retry = 0
if len(items) > 0 and latest_date is not None:
html = items[-1][-1]
pub_date = WechatAutomator.get_pubdate(html)
if pub_date and pub_date < latest_date:
s = "stop because {} < {}".format(pub_date, latest_date)
logger.debug(s)
WechatAutomator.add_to_detail(s, detail)
break
url_exist = False
for item in items:
if WechatAutomator.url_in_states(item[0], states):
s = "stop because url exist {}".format(item[0])
logger.debug(s)
WechatAutomator.add_to_detail(s, detail)
url_exist = True
break
if url_exist:
break
self.click_right()
self.main_win.type_keys("{PGDN}")
time.sleep(self.turn_page_interval)
self.turn_page_up(page * 2)
return True
3.3locate_user
locate_user函數的控制流程為:
- 找到左上角的搜尋框并且點選它獲得焦點
- 使用ctrl+a選中可能有的文字(之前的bug?)并且使用後退鍵删除它們
- 輸入公衆号名稱
- 在彈出的list裡點選這個公衆号名稱進而進入公衆号
def locate_user(self, user, retry=5):
if not self.main_win:
raise RuntimeError("you should call init_window first")
search_btn = self.main_win.child_window(title="搜尋", control_type="Edit")
self.click_center(search_btn)
self.main_win.type_keys("^a")
self.main_win.type_keys("{BACKSPACE}")
self.main_win.type_keys(user)
for i in range(retry):
time.sleep(1)
try:
search_list = self.main_win.child_window(title="搜尋結果")
match_result = search_list.child_window(title=user, control_type="ListItem")
self.click_center(match_result)
return True
except:
pass
return False
這裡主要就是通過child_window函數進行定位,關于它的用法這裡不介紹。關于怎麼定位元素的方法可以使用Inspect.exe或者print_control_identifiers函數,具體參考這裡。
3.4process_page
這個函數是最主要的抓取代碼,它處理目前一頁的内容,它的控制流程如下:
- 建構目前頁的tree
- 使用recursive_get函數周遊這顆樹并且找到每篇文章對應的element
- 周遊每一篇文章
- 如果文章的名字在上一頁出現過,則跳過
- 獲得這篇文章的坐标資訊
- 如果文章不可見(rect.top >= win_rect.bottom or rect.bottom <= self.visible_top)則跳過
- 計算點選的坐标
- 點選文章打開新的視窗
- 在新的視窗中點選【複制連結】按鈕
- 從剪貼闆複制連結url
- 通過url下載下傳文章内容并且parse釋出日期
邏輯比較簡單,但是有一些很trick的地方:
- 微信翻頁的實作
- 微信用戶端的翻頁和浏覽器不同,它的内容是累加的,比如第一頁3篇文章,往下翻一頁可能變成6篇文章,再翻可能變成9篇。這個時候這9篇文章都是在tree中的,隻不過最後3篇的坐标(top和bottom)是空間的。
- 能否點選 一篇文章對應的框(圖)可能是部分可見的,甚至它的top非常接近螢幕的最下方,這個時候可能點不了。如下圖所示:

與此類似的是右上角的黑色頭部(不能滾到并且會遮擋)也有一定空間,如下圖所示:
- 點選的位置
因為這個框可能很窄(bottom-top很小)并且可能在很靠上或者靠下的位置。是以有如下代碼:
# 計算可見的高度
visible_height = min(rect.bottom, win_rect.bottom) - max(rect.top, win_rect.top+self.visible_top)
# 太窄的不點選,希望下次翻頁後能顯示更多像素進而可以點選,
# 但是如果微信的某個文章的框的高度小于10個像素,那麼這篇文章就無法被點選
# 不過作者目前為發現這麼窄的文章
if visible_height < 10:
continue
# 如果某個文章的框太大,則抛出異常,目前為止為發現這樣的問題。
if rect.bottom - rect.top >= win_rect.bottom - self.visible_top:
raise RuntimeError("{}-{}>={}-{}".format(rect.bottom, rect.top,
win_rect.bottom, self.visible_top))
# 如果下部部分可見,那麼點選上方是比較”安全“的
if rect.bottom >= win_rect.bottom:
click_up = True
# 如果下部完全可見,則點選下方是”安全“的
else:
click_up = False
def process_page(self, account_name, items, lastpage_clicked_titles, states, visited_urls, detail):
clicked_titles = set()
text = self.main_win.child_window(title=account_name, control_type="Text", found_index=0)
parent = text
while parent:
parent = parent.parent()
if '會話清單' == parent.element_info.name:
break
paths = [0, 2, 0, 0, 0, 1, 0]
for idx in paths:
parent = parent.children()[idx]
elems = []
self.recursive_get(parent, elems)
win_rect = self.main_win.rectangle()
for elem in elems:
rect = elem.rectangle()
if elem.element_info.name in lastpage_clicked_titles:
continue
if rect.top >= win_rect.bottom or rect.bottom <= self.visible_top:
continue
visible_height = min(rect.bottom, win_rect.bottom) - max(rect.top, win_rect.top+self.visible_top)
if visible_height < 10:
continue
if rect.bottom - rect.top >= win_rect.bottom - self.visible_top:
raise RuntimeError("{}-{}>={}-{}".format(rect.bottom, rect.top,
win_rect.bottom, self.visible_top))
if rect.bottom >= win_rect.bottom:
click_up = True
else:
click_up = False
if self.is_bad_elem(elem):
s = "not good elem {}".format(elem.element_info.name[0:10])
logger.debug(s)
WechatAutomator.add_to_detail(s, detail)
continue
try:
self.click_url(rect, win_rect, click_up)
copy_btn = self.browser.child_window(title="複制連結位址")
self.click_center(copy_btn, click_main=False)
url = clipboard.GetData()
if elem.element_info.name != '圖檔':
clicked_titles.add(elem.element_info.name)
if url and not url in visited_urls:
visited_urls.add(url)
html = None
try:
html = requests.get(url).text
except:
s = "fail get {}".format(url)
logger.debug(s)
WechatAutomator.add_to_detail(s, detail)
items.append((url, rect, elem.element_info.name, html))
except:
traceback.print_exc()
pass
finally:
if self.browser:
try:
self.browser.close()
except:
pass
self.browser = None
time.sleep(self.click_url_interval)
return clicked_titles