本文将記錄對知乎專欄中的一篇小項目,自己嘗試中所遇到的問題。
知乎連結:https://zhuanlan.zhihu.com/p/21967333
先貼出完整源代碼。
代碼中:使用二維碼登入網頁版微信,并下載下傳好友頭像,并将好友資料資料存入csv檔案,待進一步處理。
筆者使用的環境為python3.5。
import os
import requests
import re
import time
import xml.dom.minidom
import json
import csv
import codecs
import sys
import math
import subprocess
import ssl
import threading
import urllib.request, urllib.parse, urllib.error, urllib.request, urllib.error, urllib.parse
DEBUG = False # 測試時候 可以改為True,輸出測試的json資料
MAX_GROUP_NUM = 2 # 每組人數
INTERFACE_CALLING_INTERVAL = 5 # 接口調用時間間隔, 間隔太短容易出現"操作太頻繁", 會被限制操作半小時左右
MAX_PROGRESS_LEN = 50
QRImagePath = os.path.join(os.getcwd(), 'qrcode.jpg') # 存放登入二維碼的位置,不需要改
tip = 0
uuid = ''
base_uri = ''
redirect_uri = ''
push_uri = ''
skey = ''
wxsid = ''
wxuin = ''
pass_ticket = ''
deviceId = 'e000000000000000'
BaseRequest = {}
ContactList = []
My = []
SyncKey = []
try:
xrange
range = xrange
except:
# python 3
pass
def responseState(func, BaseResponse):
ErrMsg = BaseResponse['ErrMsg']
Ret = BaseResponse['Ret']
if DEBUG or Ret != 0:
print('func: %s, Ret: %d, ErrMsg: %s' % (func, Ret, ErrMsg))
if Ret != 0:
return False
return True
def getUUID():
global uuid
url = 'https://login.weixin.qq.com/jslogin'
params = {
'appid': 'wx782c26e4c19acffb',
'fun': 'new',
'lang': 'zh_CN',
'_': int(time.time()),
}
r = myRequests.get(url=url, params=params)
r.encoding = 'utf-8'
data = r.text
# print(data)
# window.QRLogin.code = 200; window.QRLogin.uuid = "oZwt_bFfRg==";
regx = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"'
pm = re.search(regx, data)
code = pm.group(1)
uuid = pm.group(2)
if code == '200':
return True
return False
def showQRImage():
global tip
url = 'https://login.weixin.qq.com/qrcode/' + uuid
params = {
't': 'webwx',
'_': int(time.time()),
}
r = myRequests.get(url=url, params=params)
tip = 1
f = open(QRImagePath, 'wb')
f.write(r.content)
f.close()
time.sleep(1)
if sys.platform.find('darwin') >= 0: # 'darwin'
subprocess.call(['open', QRImagePath])
elif sys.platform.find('linux') >= 0:
subprocess.call(['xdg-open', QRImagePath])
else:
os.startfile(QRImagePath)
print('請使用微信掃描二維碼以登入')
def waitForLogin():
global tip, base_uri, redirect_uri, push_uri
url = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s' % (
tip, uuid, int(time.time()))
r = myRequests.get(url=url)
r.encoding = 'utf-8'
data = r.text
# print(data)
# window.code=500;
regx = r'window.code=(\d+);'
pm = re.search(regx, data)
code = pm.group(1)
if code == '201': # 已掃描
print('成功掃描,請在手機上點選确認以登入')
tip = 0
elif code == '200': # 已登入
print('正在登入...')
regx = r'window.redirect_uri="(\S+?)";'
pm = re.search(regx, data)
redirect_uri = pm.group(1) + '&fun=new'
base_uri = redirect_uri[:redirect_uri.rfind('/')]
# push_uri與base_uri對應關系(排名分先後)(就是這麼奇葩..)
services = [
('wx2.qq.com', 'webpush2.weixin.qq.com'),
('qq.com', 'webpush.weixin.qq.com'),
('web1.wechat.com', 'webpush1.wechat.com'),
('web2.wechat.com', 'webpush2.wechat.com'),
('wechat.com', 'webpush.wechat.com'),
('web1.wechatapp.com', 'webpush1.wechatapp.com'),
]
push_uri = base_uri
for (searchUrl, pushUrl) in services:
if base_uri.find(searchUrl) >= 0:
push_uri = 'https://%s/cgi-bin/mmwebwx-bin' % pushUrl
break
# closeQRImage
if sys.platform.find('darwin') >= 0: # for OSX with Preview
os.system("osascript -e 'quit app \"Preview\"'")
elif code == '408': # 逾時
pass
# elif code == '400' or code == '500':
return code
def login():
global skey, wxsid, wxuin, pass_ticket, BaseRequest
r = myRequests.get(url=redirect_uri)
r.encoding = 'utf-8'
data = r.text
# print(data)
doc = xml.dom.minidom.parseString(data)
root = doc.documentElement
for node in root.childNodes:
if node.nodeName == 'skey':
skey = node.childNodes[0].data
elif node.nodeName == 'wxsid':
wxsid = node.childNodes[0].data
elif node.nodeName == 'wxuin':
wxuin = node.childNodes[0].data
elif node.nodeName == 'pass_ticket':
pass_ticket = node.childNodes[0].data
# print('skey: %s, wxsid: %s, wxuin: %s, pass_ticket: %s' % (skey, wxsid,
# wxuin, pass_ticket))
if not all((skey, wxsid, wxuin, pass_ticket)):
return False
BaseRequest = {
'Uin': int(wxuin),
'Sid': wxsid,
'Skey': skey,
'DeviceID': deviceId,
}
return True
def webwxinit():
url = (base_uri +
'/webwxinit?pass_ticket=%s&skey=%s&r=%s' % (
pass_ticket, skey, int(time.time())))
params = {'BaseRequest': BaseRequest}
headers = {'content-type': 'application/json; charset=UTF-8'}
r = myRequests.post(url=url, data=json.dumps(params), headers=headers)
r.encoding = 'utf-8'
data = r.json()
if DEBUG:
f = open(os.path.join(os.getcwd(), 'webwxinit.json'), 'wb')
f.write(r.content)
f.close()
# print(data)
global ContactList, My, SyncKey
dic = data
ContactList = dic['ContactList']
My = dic['User']
SyncKey = dic['SyncKey']
state = responseState('webwxinit', dic['BaseResponse'])
return state
def webwxgetcontact():
url = (base_uri +
'/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' % (
pass_ticket, skey, int(time.time())))
headers = {'content-type': 'application/json; charset=UTF-8'}
r = myRequests.post(url=url, headers=headers)
r.encoding = 'utf-8'
data = r.json()
if DEBUG:
f = open(os.path.join(os.getcwd(), 'webwxgetcontact.json'), 'wb')
f.write(r.content)
f.close()
dic = data
MemberList = dic['MemberList']
# 倒序周遊,不然删除的時候出問題..
SpecialUsers = ["newsapp", "fmessage", "filehelper", "weibo", "qqmail", "tmessage", "qmessage", "qqsync",
"floatbottle", "lbsapp", "shakeapp", "medianote", "qqfriend", "readerapp", "blogapp", "facebookapp",
"masssendapp",
"meishiapp", "feedsapp", "voip", "blogappweixin", "weixin", "brandsessionholder", "weixinreminder",
"wxid_novlwrv3lqwv11", "gh_22b87fa7cb3c", "officialaccounts", "notification_messages", "wxitil",
"userexperience_alarm"]
for i in range(len(MemberList) - 1, -1, -1):
Member = MemberList[i]
if Member['VerifyFlag'] & 8 != 0: # 公衆号/服務号
MemberList.remove(Member)
elif Member['UserName'] in SpecialUsers: # 特殊賬号
MemberList.remove(Member)
elif Member['UserName'].find('@@') != -1: # 群聊
MemberList.remove(Member)
elif Member['UserName'] == My['UserName']: # 自己
MemberList.remove(Member)
return MemberList
def syncKey():
SyncKeyItems = ['%s_%s' % (item['Key'], item['Val'])
for item in SyncKey['List']]
SyncKeyStr = '|'.join(SyncKeyItems)
return SyncKeyStr
def syncCheck():
url = push_uri + '/synccheck?'
params = {
'skey': BaseRequest['Skey'],
'sid': BaseRequest['Sid'],
'uin': BaseRequest['Uin'],
'deviceId': BaseRequest['DeviceID'],
'synckey': syncKey(),
'r': int(time.time()),
}
r = myRequests.get(url=url, params=params)
r.encoding = 'utf-8'
data = r.text
# print(data)
# window.synccheck={retcode:"0",selector:"2"}
regx = r'window.synccheck={retcode:"(\d+)",selector:"(\d+)"}'
pm = re.search(regx, data)
retcode = pm.group(1)
selector = pm.group(2)
return selector
def webwxsync():
global SyncKey
url = base_uri + '/webwxsync? % (
BaseRequest['Skey'], BaseRequest['Sid'], urllib.parse.quote_plus(pass_ticket))
params = {
'BaseRequest': BaseRequest,
'SyncKey': SyncKey,
'rr': ~int(time.time()),
}
headers = {'content-type': 'application/json; charset=UTF-8'}
r = myRequests.post(url=url, data=json.dumps(params))
r.encoding = 'utf-8'
data = r.json()
# print(data)
dic = data
SyncKey = dic['SyncKey']
state = responseState('webwxsync', dic['BaseResponse'])
return state
def heartBeatLoop():
while True:
selector = syncCheck()
if selector != '0':
webwxsync()
time.sleep(1)
def main():
global myRequests
if hasattr(ssl, '_create_unverified_context'):
ssl._create_default_https_context = ssl._create_unverified_context
headers = {
'User-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36'}
myRequests = requests.Session()
myRequests.headers.update(headers)
if not getUUID():
print('擷取uuid失敗')
return
print('正在擷取二維碼圖檔...')
showQRImage()
while waitForLogin() != '200':
pass
os.remove(QRImagePath)
if not login():
print('登入失敗')
return
if not webwxinit():
print('初始化失敗')
return
MemberList = webwxgetcontact()
threading.Thread(target=heartBeatLoop)
MemberCount = len(MemberList)
print('通訊錄共%s位好友' % MemberCount)
print(MemberList)
d = {}
imageIndex = 0
# 寫入csv
csvfile = open('friend2.csv', 'w', newline='')
# csvfile.write(codecs.BOM_UTF8)
writer = csv.writer(csvfile)
writer.writerow(['name', 'city', 'male', 'star', 'signature', 'remark', 'alias', 'nick'])
for Member in MemberList:
global writer
imageIndex = imageIndex + 1
name = 'D:\\Python\\Demo\\image\\image' + str(imageIndex) + '.jpg'
imageUrl = 'https://wx.qq.com' + Member['HeadImgUrl']
r = myRequests.get(url=imageUrl, headers=headers)
imageContent = (r.content)
fileImage = open(name, 'wb')
fileImage.write(imageContent)
fileImage.close()
print('正在下載下傳第:' + str(imageIndex) + '位好友頭像')
d[Member['UserName']] = (Member['NickName'], Member['RemarkName'])
city = Member['City']
city = 'nocity' if city == '' else city
name = Member['NickName']
name = 'noname' if name == '' else name
sign = Member['Signature']
sign = 'nosign' if sign == '' else sign
remark = Member['RemarkName']
remark = 'noremark' if remark == '' else remark
alias = Member['Alias']
alias = 'noalias' if alias == '' else alias
nick = Member['NickName']
nick = 'nonick' if nick == '' else nick
print(name, ' ^+*+^ ', city, ' ^+*+^ ', Member['Sex'], ' ^+*+^ ', Member['StarFriend'], ' ^+*+^ ', sign,
' ^+*+^ ', remark, ' ^+*+^ ', alias, ' ^+*+^ ', nick)
# 寫入csv
writer.writerow([name.encode('gbk', 'ignore').decode('gbk'), city.encode('gbk', 'ignore').decode('gbk'),
Member['Sex'],
Member['StarFriend'], sign.encode('gbk', 'ignore').decode('gbk'),
remark.encode('gbk', 'ignore').decode('gbk'),
alias.encode('gbk', 'ignore').decode('gbk'),
nick.encode('gbk', 'ignore').decode('gbk')])
csvfile.close()
if __name__ == '__main__':
main()
print('程式已安全退出...')
簡單講幾點:
一、寫入csv檔案編碼問題
在上一篇文章中以及解釋了。
二、登入出現問題
沒有找到系統,在120行左右:
if sys.platform.find('darwin') >= 0: # 'darwin'
subprocess.call(['open', QRImagePath])
elif sys.platform.find('linux') >= 0:
subprocess.call(['xdg-open', QRImagePath])
else:
os.startfile(QRImagePath)
最後的else分句不可少。
若有其他問題歡迎留言。