golang筆記16--go語言并發版爬蟲
- 1 介紹
- 2 并發版爬蟲
-
- 2.1 并發版爬蟲架構
- 2.2 簡單排程器
- 2.3 并發排程器
- 2.4 隊列實作排程器
- 2.5 重構和總結
- 2.6 更多城市
- 2.7 更多使用者與去重
- 3 注意事項
- 4 說明
1 介紹
本文繼上文 golang筆記15-go 語言單任務版爬蟲, 進一步了解 go 語言并發版爬蟲項目,以及相應注意事項。
具體包括: 并發版爬蟲架構、簡單排程器、并發排程器、隊列實作排程器、重構和總結、更多城市、更多使用者與去重 等内容。
2 并發版爬蟲
2.1 并發版爬蟲架構
在 golang筆記15-go 語言單任務版爬蟲 中已經通過iftop 初步得出了單任務爬蟲性能低的結論,是以本節将在此基礎上優化為并發版本的爬蟲。
優化後的爬蟲架構如下圖所示:
在單任務中,很明顯可以發現Fetcher 是影響效率的主要因素,是以可以考慮将Fetcher改動為并發子產品;實際中Fetcher 擷取資料後被Praser 消費了,是以可以考慮把Fetcher 和 Parser 封裝為一個Worker 子產品,将Worker 作為一個并發的子產品。
為了後續優化,需要抽象出一個Scheduler,且Engine、Scheduler和Worker 直接通過chan 來實作資料傳輸。如下圖所示:
Scheduler 的一個簡單實作是:Engine 擷取的Request 都送到Scheduler, 然後Worker 完成任務後從Scheduler 的chan 中搶資料,如此循環的進行排程;其流程如下圖所示:
2.2 簡單排程器
将第一版的Engine 更改為simple.go, 并新加一個concurrent.go 專門用來來實作并發版本Engine,同時抽象出 Worker.go 專用與接收請求并解析相關資料。
具體實作如下:
learngo/crawler$ tree -L 2
.
├── engine
│ ├── concurrent.go
│ ├── simple.go
│ ├── types.go
│ └── worker.go
├── fetcher
│ └── fetcher.go
├── mian.go
├── scheduler
│ └── simple.go
└── zhenai
└── parser
5 directories, 7 files
vim engine/concurrent.go
package engine
import "fmt"
type ConcurrentEngine struct {
Scheduler Scheduler
WorkerCount int
}
type Scheduler interface {
Submit(Request)
ConfigureMasterWorkerChan(chan Request)
}
func (e *ConcurrentEngine) Run(seeds ...Request) {
in := make(chan Request)
out := make(chan ParseResult)
e.Scheduler.ConfigureMasterWorkerChan(in)
for i := 0; i < e.WorkerCount; i++ {
createWorker(in, out)
}
for _, r := range seeds {
e.Scheduler.Submit(r)
}
for {
result := <-out
for _, item := range result.Items {
fmt.Printf("Got item: %v\n", item)
}
for _, request := range result.Requests {
e.Scheduler.Submit(request)
}
}
}
func createWorker(in chan Request, out chan ParseResult) {
go func() {
for {
request := <-in
result, err := Worker(request)
if err != nil {
continue
}
out <- result
}
}()
}
vim engine/worker.go
package engine
import (
"learngo/crawler/fetcher"
"log"
)
func Worker(r Request) (ParseResult, error) {
log.Printf("Fetching %s", r.Url)
body, err := fetcher.Fetch(r.Url)
if err != nil {
log.Printf("Fetcher: error,fetching url %s: %v", r.Url, err)
return ParseResult{}, err
}
return r.ParserFunc(body), nil
}
vim scheduler/simple.go
package scheduler
import "learngo/crawler/engine"
type SimpleScheduler struct {
workerChan chan engine.Request
}
func (s *SimpleScheduler) ConfigureMasterWorkerChan(c chan engine.Request) {
s.workerChan = c
}
func (s *SimpleScheduler) Submit(r engine.Request) {
s.workerChan <- r
}
vim main.go
package main
import (
"learngo/crawler/engine"
"learngo/crawler/scheduler"
"learngo/crawler/zhenai/parser"
)
func main() {
url := "http://www.zhenai.com/zhenghun"
//engine.SimpleEngine{}.Run(engine.Request{Url: url, ParserFunc: parser.ParseCityList})
e := engine.ConcurrentEngine{Scheduler: &scheduler.SimpleScheduler{}, WorkerCount: 10}
e.Run(engine.Request{Url: url, ParserFunc: parser.ParseCityList})
}
輸出:
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun
Got item: City 阿壩
......
Got item: City 資陽
Got item: City 遵義
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/anshun
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/akesu
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/aba
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/ali
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/alashanmeng
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/ankang
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/aletai
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/anhui
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/anqing
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/anshan
此時剛好擷取10個城市後就會處于等待,其原因為Scheduler 通過 Scheduler.Submit 來送出資料給worker,前提是有空閑的worker 來接收資料,若worker 都是忙的裝填就會導緻卡死。
2.3 并發排程器
由于2.2 小節中worker 都在工作狀态存在卡死的問題解,是以可以通過goroutine 來防止卡死;決卡死後,使用10個worker 發現網速能達到220KB作用,基本提高了10倍的樣子。
解決卡死後,其通路效率大大提高,但是有可能觸發網站的放爬蟲機制,是以可以添加 rateLimiter 防止爬取太快;具體實施方法為:去掉原有city.go 中的sleep, 更改為 time.Tick 的方式。
第二版的排程結構如下圖所示,其為每個request 建立一個 goroutine,然後将請求轉發給 Worker處理,進而能規避卡死問題。
具體代碼改動如下:
1) 去掉city.go 中的 sleep
time.Sleep(time.Duration(time.Millisecond * 100))
2) vim fetcher/fetcher.go
var rateLimiter = time.Tick(100 * time.Millisecond)
func Fetch(url string) ([]byte, error) {
<-rateLimiter
request, err := http.NewRequest(http.MethodGet, url, nil)
......
}
3) vim scheduler/simple.go
func (s *SimpleScheduler) Submit(r engine.Request) {
// goroutine 來submit,防止卡死
go func() {
s.workerChan <- r
}()
}
執行main.go 輸出:
......
Got item 9782: 1306029314,等待那份愛,49,男士,未婚,四川資陽,172,null,3001-5000元,http://album.zhenai.com/u/1306029314, https://photo.zastatic.com/images/photo/326508/13048442696655.jpg
Got item 9783: 1068899110,良子,25,男士,未婚,四川資陽,172,null,8001-12000元,http://album.zhenai.com/u/1068899110, https://photo.zastatic.com/images/photo/267225/10688991897189073.jpg
Got item 9784: 1934996202,藍色初戀,33,女士,未婚,四川資陽,160,中專,null,http://album.zhenai.com/u/1934996202, https://photo.zastatic.com/images/photo/483750/1934996202/487.jpg
......
此時可以正常輸出,直到所有Request 解析完後才會卡住
具體代碼見:具體代碼見:crawler-v1
2.4 隊列實作排程器
上一節中通過 goroutine 來防止Scheduler卡死,本節像對其進一步優化,通過隊列的方式實作資源排程。
具體排程器結構如下所示,Scheduler 将所有request都放在request隊列中,然後從 request 中拿請求給 worker 隊列中的 worker 處理。
改動代碼如下:
learngo/crawler$ tree -L 2
.
├── engine
│ ├── concurrent.go
│ ├── simple.go
│ ├── types.go
│ └── worker.go
├── fetcher
│ └── fetcher.go
├── mian.go
├── scheduler
│ ├── queued.go
│ └── simple.go
└── zhenai
└── parser
5 directories, 8 files
vim engine/concurrent.go
package engine
import "fmt"
type ConcurrentEngine struct {
Scheduler Scheduler
WorkerCount int
}
type Scheduler interface {
Submit(Request)
ConfigureMasterWorkerChan(chan Request)
WorkerReady(chan Request)
Run()
}
func (e *ConcurrentEngine) Run(seeds ...Request) {
out := make(chan ParseResult)
e.Scheduler.Run()
for i := 0; i < e.WorkerCount; i++ {
createWorker(out, e.Scheduler)
}
for _, r := range seeds {
e.Scheduler.Submit(r)
}
itemCount := 0
for {
result := <-out
for _, item := range result.Items {
fmt.Printf("Got item %d: %v\n", itemCount, item)
itemCount++
}
for _, request := range result.Requests {
e.Scheduler.Submit(request)
}
}
}
func createWorker(out chan ParseResult, s Scheduler) {
in := make(chan Request)
go func() {
for {
// tell scheduler worker ready
s.WorkerReady(in)
request := <-in
result, err := Worker(request)
if err != nil {
continue
}
out <- result
}
}()
}
vim scheduler/queued.go
package scheduler
import "learngo/crawler/engine"
type QueueScheduler struct {
requestChan chan engine.Request
workerChan chan chan engine.Request
}
func (s *QueueScheduler) ConfigureMasterWorkerChan(c chan engine.Request) {
s.requestChan = c
}
func (s *QueueScheduler) WorkerReady(w chan engine.Request) {
s.workerChan <- w
}
func (s *QueueScheduler) Submit(r engine.Request) {
s.requestChan <- r
}
func (s *QueueScheduler) Run() {
s.workerChan = make(chan chan engine.Request)
s.requestChan = make(chan engine.Request)
go func() {
var requestQ []engine.Request
var workerQ []chan engine.Request
for {
var activeRequest engine.Request
var activeWorker chan engine.Request
if len(requestQ) > 0 && len(workerQ) > 0 {
activeWorker = workerQ[0]
activeRequest = requestQ[0]
}
select {
case r := <-s.requestChan:
requestQ = append(requestQ, r)
case w := <-s.workerChan:
workerQ = append(workerQ, w)
case activeWorker <- activeRequest:
workerQ = workerQ[1:]
requestQ = requestQ[1:]
}
}
}()
}
vim mian.go
package main
import (
"learngo/crawler/engine"
"learngo/crawler/scheduler"
"learngo/crawler/zhenai/parser"
)
func main() {
url := "http://www.zhenai.com/zhenghun"
e := engine.ConcurrentEngine{Scheduler: &scheduler.QueueScheduler{}, WorkerCount: 10}
e.Run(engine.Request{Url: url, ParserFunc: parser.ParseCityList})
}
執行結果同上一小節
具體代碼見:crawler-v2
2.5 重構和總結
重構說明:
由于Scheduler 知道worker 是共用 chan 還是每個都配置設定一個 chan,是以可以考慮調整代碼使其同時相容 simple.go 和queued.go。
總結:
本章節中主要使用三種方式實作并發爬蟲,首先使用最普通的方式送出任務給worker(該版本存在卡死的問題) ,其次通過goroutine + 多worker 的方式實作排程,最後通過 request+worker 雙隊列的方式實作排程。
除此之外,進一步優化代碼,使之能夠同時相容兩種模式下的排程器;通過下一頁的功能來擷取更多request 資源,期間通過記錄 url 對重複的請求進行了适當過濾。
具體代碼見:crawler-v3
2.6 更多城市
上述并發爬蟲隻爬取了每個城市額第一頁使用者,但是實際中還存在下一頁,是以可以通過下一頁擷取繼續資料,直到沒有下一頁為止。
具體方式為:在city.go 中通過正則擷取下一頁對應的連結,并設定傳回request的 ParserFunc 為 ParseCity, 即:繼續通過ParseCity 來解析下一頁的内容。
vim city.go
const (
cityRe = `<a href="(http://album\.zhenai\.com/u/[0-9]+)" target="_blank" rel="external nofollow" [^>]*>([^<]+)</a>`
......
imageRe = `<img src="(.+)?.+" alt=".*">`
nextPageRe = `<a href="(http://www.zhenai.com/zhenghun/[a-z]+/[0-6]+)" target="_blank" rel="external nofollow" >下一頁</a>`
)
unc ParseCity(contents []byte) engine.ParseResult {
// 先從網頁中取出所有使用者相關的html資訊,然後通過特有字段list-item切割,最後再解析每一個使用者的資訊
re := regexp.MustCompile(`<div class="(list-item"><div class="photo"><a .* <div class="item-btn">打招呼</div></div>)`)
matches := re.FindAllSubmatch(contents, -1)
var listStr string
for _, m := range matches {
listStr = string(m[0])
}
users := strings.Split(listStr, "list-item")
result := engine.ParseResult{}
for i, user := range users {
if i == 0 {
} else {
userInfo := getUserInfo(user)
result.Items = append(result.Items, userInfo)
}
}
// 每解析一個頁面都會确認是否存在下一頁,存在則加入到其Request.Url 中可以作為新的爬取的種子
nextPageUrl := extractString(string(contents), 1, regexp.MustCompile(nextPageRe))
if nextPageUrl == "null" {
} else {
result.Requests = append(result.Requests, engine.Request{
Url: nextPageUrl,
ParserFunc: ParseCity,
})
}
// time.Sleep(time.Duration(time.Millisecond * 100))
return result
}
執行main函數輸出:
......
2021/02/27 15:11:08 Fetching http://www.zhenai.com/zhenghun/xiangxi/2
Got item 16104: 1435923150,一分鐘的情緒,39,女士,離異,重慶,158,大專,null,http://album.zhenai.com/u/1435923150, https://photo.zastatic.com/images/photo/358981/1435923150/555.jpg
Got item 16105: 1636254473,喜歡運動,32,男士,離異,重慶,170,null,5001-8000元,http://album.zhenai.com/u/1636254473, https://photo.zastatic.com/images/photo/409064/16362544931991331
......
Got item 16108: 1888774315,未知的未來,32,女士,未婚,重慶,167,大專,null,http://album.zhenai.com/u/1888774315, https://photo.zastatic.com/images/photo/472194/1888774315/3274.png
.....
正常情況下 470個城市每個前 20個使用者,約 9400條資訊,此時已經明顯超過第一頁總使用者資料量了。
2.7 更多使用者與去重
若通過網頁上更多網頁來形成目标輸入資料,那麼可能存在城市重複和使用者重複的資訊,此時有必要對一些重複的資料進行去重操作。
由于筆者隻通過城市資訊擷取使用者資料,而不是通過使用者首頁擷取每個的資料,是以實際上隻對城市資訊進行過濾。
常見 URL 去重方法:
- 哈希表;
- 計算 MD5 等哈希,再存哈希表;
- 使用 bloom filter 多重哈希結構;
- 使用 Redis 的 key-value存儲系統實時分布式去重;
本案例中主要實作并發爬蟲,降低其它子產品的複雜度,直接通過最簡單的哈希表去重。具體代碼實作如下所示:
vim engine/concurrent.go
......
func (e *ConcurrentEngine) Run(seeds ...Request) {
out := make(chan ParseResult)
e.Scheduler.Run()
for i := 0; i < e.WorkerCount; i++ {
createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
}
for _, r := range seeds {
if isDuplicate(r.Url) {
fmt.Printf("Duplicate request: %s\n", r.Url)
continue
}
e.Scheduler.Submit(r)
}
itemCount := 0
for {
result := <-out
for _, item := range result.Items {
fmt.Printf("Got item %d: %v\n", itemCount, item)
itemCount++
}
for _, request := range result.Requests {
if isDuplicate(request.Url) {
fmt.Printf("Duplicate request: %s\n", request.Url)
continue
}
e.Scheduler.Submit(request)
}
}
}
var visitedUrls = make(map[string]bool)
func isDuplicate(url string) bool {
if visitedUrls[url] {
return true
}
visitedUrls[url] = true
return false
}
注:此處隻填寫改動部分
具體代碼見:crawler-v4
3 注意事項
- 本章節中的案例已經上傳到 csdn go語言單并發版爬蟲–crawler-v1-v4, 歡迎有需要的讀者下載下傳。
- 本案例中通過三種方式實作并發版本的爬蟲,但是最終版本暫未實作儲存現有爬取内容的功能,是以重新開機後需要全部重新從頭爬取,該部分内容會在下一章節進行完善。
4 說明
-
軟體環境
go版本:go1.15.8
作業系統:Ubuntu 20.04 Desktop
Idea:2020.01.04
-
參考文檔
由淺入深掌握Go語言 --慕課網