天天看點

golang筆記16--go語言并發版爬蟲1 介紹2 并發版爬蟲3 注意事項4 說明

golang筆記16--go語言并發版爬蟲

  • 1 介紹
  • 2 并發版爬蟲
    • 2.1 并發版爬蟲架構
    • 2.2 簡單排程器
    • 2.3 并發排程器
    • 2.4 隊列實作排程器
    • 2.5 重構和總結
    • 2.6 更多城市
    • 2.7 更多使用者與去重
  • 3 注意事項
  • 4 說明

1 介紹

本文繼上文 golang筆記15-go 語言單任務版爬蟲, 進一步了解 go 語言并發版爬蟲項目,以及相應注意事項。

具體包括: 并發版爬蟲架構、簡單排程器、并發排程器、隊列實作排程器、重構和總結、更多城市、更多使用者與去重 等内容。

2 并發版爬蟲

2.1 并發版爬蟲架構

  在 golang筆記15-go 語言單任務版爬蟲 中已經通過iftop 初步得出了單任務爬蟲性能低的結論,是以本節将在此基礎上優化為并發版本的爬蟲。

優化後的爬蟲架構如下圖所示:

  在單任務中,很明顯可以發現Fetcher 是影響效率的主要因素,是以可以考慮将Fetcher改動為并發子產品;實際中Fetcher 擷取資料後被Praser 消費了,是以可以考慮把Fetcher 和 Parser 封裝為一個Worker 子產品,将Worker 作為一個并發的子產品。

golang筆記16--go語言并發版爬蟲1 介紹2 并發版爬蟲3 注意事項4 說明

為了後續優化,需要抽象出一個Scheduler,且Engine、Scheduler和Worker 直接通過chan 來實作資料傳輸。如下圖所示:

golang筆記16--go語言并發版爬蟲1 介紹2 并發版爬蟲3 注意事項4 說明

Scheduler 的一個簡單實作是:Engine 擷取的Request 都送到Scheduler, 然後Worker 完成任務後從Scheduler 的chan 中搶資料,如此循環的進行排程;其流程如下圖所示:

golang筆記16--go語言并發版爬蟲1 介紹2 并發版爬蟲3 注意事項4 說明

2.2 簡單排程器

  将第一版的Engine 更改為simple.go, 并新加一個concurrent.go 專門用來來實作并發版本Engine,同時抽象出 Worker.go 專用與接收請求并解析相關資料。

具體實作如下:

learngo/crawler$ tree -L 2
.
├── engine
│   ├── concurrent.go
│   ├── simple.go
│   ├── types.go
│   └── worker.go
├── fetcher
│   └── fetcher.go
├── mian.go
├── scheduler
│   └── simple.go
└── zhenai
    └── parser
5 directories, 7 files

vim engine/concurrent.go
package engine

import "fmt"

type ConcurrentEngine struct {
	Scheduler   Scheduler
	WorkerCount int
}
type Scheduler interface {
	Submit(Request)
	ConfigureMasterWorkerChan(chan Request)
}

func (e *ConcurrentEngine) Run(seeds ...Request) {
	in := make(chan Request)
	out := make(chan ParseResult)
	e.Scheduler.ConfigureMasterWorkerChan(in)

	for i := 0; i < e.WorkerCount; i++ {
		createWorker(in, out)
	}
	for _, r := range seeds {
		e.Scheduler.Submit(r)
	}
	for {
		result := <-out
		for _, item := range result.Items {
			fmt.Printf("Got item: %v\n", item)
		}
		for _, request := range result.Requests {
			e.Scheduler.Submit(request)
		}
	}
}

func createWorker(in chan Request, out chan ParseResult) {
	go func() {
		for {
			request := <-in
			result, err := Worker(request)
			if err != nil {
				continue
			}
			out <- result
		}
	}()
}

vim engine/worker.go
package engine

import (
	"learngo/crawler/fetcher"
	"log"
)

func Worker(r Request) (ParseResult, error) {
	log.Printf("Fetching %s", r.Url)
	body, err := fetcher.Fetch(r.Url)
	if err != nil {
		log.Printf("Fetcher: error,fetching url %s: %v", r.Url, err)
		return ParseResult{}, err
	}
	return r.ParserFunc(body), nil
}

vim scheduler/simple.go
package scheduler

import "learngo/crawler/engine"

type SimpleScheduler struct {
	workerChan chan engine.Request
}

func (s *SimpleScheduler) ConfigureMasterWorkerChan(c chan engine.Request) {
	s.workerChan = c
}

func (s *SimpleScheduler) Submit(r engine.Request) {
	s.workerChan <- r
}

vim main.go
package main

import (
	"learngo/crawler/engine"
	"learngo/crawler/scheduler"
	"learngo/crawler/zhenai/parser"
)

func main() {
	url := "http://www.zhenai.com/zhenghun"
	//engine.SimpleEngine{}.Run(engine.Request{Url: url, ParserFunc: parser.ParseCityList})
	e := engine.ConcurrentEngine{Scheduler: &scheduler.SimpleScheduler{}, WorkerCount: 10}
	e.Run(engine.Request{Url: url, ParserFunc: parser.ParseCityList})
}
輸出:
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun
Got item: City 阿壩
......
Got item: City 資陽
Got item: City 遵義
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/anshun
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/akesu
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/aba
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/ali
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/alashanmeng
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/ankang
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/aletai
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/anhui
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/anqing
2021/02/27 11:05:25 Fetching http://www.zhenai.com/zhenghun/anshan
           

此時剛好擷取10個城市後就會處于等待,其原因為Scheduler 通過 Scheduler.Submit 來送出資料給worker,前提是有空閑的worker 來接收資料,若worker 都是忙的裝填就會導緻卡死。

2.3 并發排程器

  由于2.2 小節中worker 都在工作狀态存在卡死的問題解,是以可以通過goroutine 來防止卡死;決卡死後,使用10個worker 發現網速能達到220KB作用,基本提高了10倍的樣子。

  解決卡死後,其通路效率大大提高,但是有可能觸發網站的放爬蟲機制,是以可以添加 rateLimiter 防止爬取太快;具體實施方法為:去掉原有city.go 中的sleep, 更改為 time.Tick 的方式。

  第二版的排程結構如下圖所示,其為每個request 建立一個 goroutine,然後将請求轉發給 Worker處理,進而能規避卡死問題。

golang筆記16--go語言并發版爬蟲1 介紹2 并發版爬蟲3 注意事項4 說明

具體代碼改動如下:

1) 去掉city.go 中的 sleep
 time.Sleep(time.Duration(time.Millisecond * 100))

2) vim fetcher/fetcher.go
var rateLimiter = time.Tick(100 * time.Millisecond)

func Fetch(url string) ([]byte, error) {
	<-rateLimiter
	request, err := http.NewRequest(http.MethodGet, url, nil)
	......
}

3) vim scheduler/simple.go
func (s *SimpleScheduler) Submit(r engine.Request) {
	// goroutine 來submit,防止卡死
	go func() {
		s.workerChan <- r
	}()
}
執行main.go 輸出:
......
Got item 9782: 1306029314,等待那份愛,49,男士,未婚,四川資陽,172,null,3001-5000元,http://album.zhenai.com/u/1306029314, https://photo.zastatic.com/images/photo/326508/13048442696655.jpg
Got item 9783: 1068899110,良子,25,男士,未婚,四川資陽,172,null,8001-12000元,http://album.zhenai.com/u/1068899110, https://photo.zastatic.com/images/photo/267225/10688991897189073.jpg
Got item 9784: 1934996202,藍色初戀,33,女士,未婚,四川資陽,160,中專,null,http://album.zhenai.com/u/1934996202, https://photo.zastatic.com/images/photo/483750/1934996202/487.jpg
......
此時可以正常輸出,直到所有Request 解析完後才會卡住
           

具體代碼見:具體代碼見:crawler-v1

2.4 隊列實作排程器

  上一節中通過 goroutine 來防止Scheduler卡死,本節像對其進一步優化,通過隊列的方式實作資源排程。

  具體排程器結構如下所示,Scheduler 将所有request都放在request隊列中,然後從 request 中拿請求給 worker 隊列中的 worker 處理。

golang筆記16--go語言并發版爬蟲1 介紹2 并發版爬蟲3 注意事項4 說明

改動代碼如下:

learngo/crawler$ tree -L 2
.
├── engine
│   ├── concurrent.go
│   ├── simple.go
│   ├── types.go
│   └── worker.go
├── fetcher
│   └── fetcher.go
├── mian.go
├── scheduler
│   ├── queued.go
│   └── simple.go
└── zhenai
    └── parser
5 directories, 8 files

vim engine/concurrent.go
package engine

import "fmt"

type ConcurrentEngine struct {
	Scheduler   Scheduler
	WorkerCount int
}
type Scheduler interface {
	Submit(Request)
	ConfigureMasterWorkerChan(chan Request)
	WorkerReady(chan Request)
	Run()
}

func (e *ConcurrentEngine) Run(seeds ...Request) {
	out := make(chan ParseResult)
	e.Scheduler.Run()

	for i := 0; i < e.WorkerCount; i++ {
		createWorker(out, e.Scheduler)
	}
	for _, r := range seeds {
		e.Scheduler.Submit(r)
	}
	itemCount := 0
	for {
		result := <-out
		for _, item := range result.Items {
			fmt.Printf("Got item %d: %v\n", itemCount, item)
			itemCount++
		}
		for _, request := range result.Requests {
			e.Scheduler.Submit(request)
		}
	}
}

func createWorker(out chan ParseResult, s Scheduler) {
	in := make(chan Request)
	go func() {
		for {
			// tell scheduler worker ready
			s.WorkerReady(in)
			request := <-in
			result, err := Worker(request)
			if err != nil {
				continue
			}
			out <- result
		}
	}()
}

vim scheduler/queued.go
package scheduler

import "learngo/crawler/engine"

type QueueScheduler struct {
	requestChan chan engine.Request
	workerChan  chan chan engine.Request
}

func (s *QueueScheduler) ConfigureMasterWorkerChan(c chan engine.Request) {
	s.requestChan = c
}

func (s *QueueScheduler) WorkerReady(w chan engine.Request) {
	s.workerChan <- w
}

func (s *QueueScheduler) Submit(r engine.Request) {
	s.requestChan <- r
}

func (s *QueueScheduler) Run() {
	s.workerChan = make(chan chan engine.Request)
	s.requestChan = make(chan engine.Request)
	go func() {
		var requestQ []engine.Request
		var workerQ []chan engine.Request
		for {
			var activeRequest engine.Request
			var activeWorker chan engine.Request
			if len(requestQ) > 0 && len(workerQ) > 0 {
				activeWorker = workerQ[0]
				activeRequest = requestQ[0]
			}
			select {
			case r := <-s.requestChan:
				requestQ = append(requestQ, r)
			case w := <-s.workerChan:
				workerQ = append(workerQ, w)
			case activeWorker <- activeRequest:
				workerQ = workerQ[1:]
				requestQ = requestQ[1:]
			}
		}
	}()
}

vim mian.go
package main

import (
	"learngo/crawler/engine"
	"learngo/crawler/scheduler"
	"learngo/crawler/zhenai/parser"
)

func main() {
	url := "http://www.zhenai.com/zhenghun"
	e := engine.ConcurrentEngine{Scheduler: &scheduler.QueueScheduler{}, WorkerCount: 10}
	e.Run(engine.Request{Url: url, ParserFunc: parser.ParseCityList})
}
執行結果同上一小節
           

具體代碼見:crawler-v2

2.5 重構和總結

重構說明:

  由于Scheduler 知道worker 是共用 chan 還是每個都配置設定一個 chan,是以可以考慮調整代碼使其同時相容 simple.go 和queued.go。

總結:

  本章節中主要使用三種方式實作并發爬蟲,首先使用最普通的方式送出任務給worker(該版本存在卡死的問題) ,其次通過goroutine + 多worker 的方式實作排程,最後通過 request+worker 雙隊列的方式實作排程。

  除此之外,進一步優化代碼,使之能夠同時相容兩種模式下的排程器;通過下一頁的功能來擷取更多request 資源,期間通過記錄 url 對重複的請求進行了适當過濾。

具體代碼見:crawler-v3

2.6 更多城市

  上述并發爬蟲隻爬取了每個城市額第一頁使用者,但是實際中還存在下一頁,是以可以通過下一頁擷取繼續資料,直到沒有下一頁為止。

  具體方式為:在city.go 中通過正則擷取下一頁對應的連結,并設定傳回request的 ParserFunc 為 ParseCity, 即:繼續通過ParseCity 來解析下一頁的内容。

vim city.go 
const (
	cityRe      = `<a href="(http://album\.zhenai\.com/u/[0-9]+)" target="_blank" rel="external nofollow" [^>]*>([^<]+)</a>`
    ......
	imageRe     = `<img src="(.+)?.+" alt=".*">`
	nextPageRe  = `<a href="(http://www.zhenai.com/zhenghun/[a-z]+/[0-6]+)" target="_blank" rel="external nofollow" >下一頁</a>`
)

unc ParseCity(contents []byte) engine.ParseResult {
	// 先從網頁中取出所有使用者相關的html資訊,然後通過特有字段list-item切割,最後再解析每一個使用者的資訊
	re := regexp.MustCompile(`<div class="(list-item"><div class="photo"><a .* <div class="item-btn">打招呼</div></div>)`)
	matches := re.FindAllSubmatch(contents, -1)
	var listStr string
	for _, m := range matches {
		listStr = string(m[0])
	}
	users := strings.Split(listStr, "list-item")
	result := engine.ParseResult{}
	for i, user := range users {
		if i == 0 {
		} else {
			userInfo := getUserInfo(user)
			result.Items = append(result.Items, userInfo)
		}
	}
	// 每解析一個頁面都會确認是否存在下一頁,存在則加入到其Request.Url 中可以作為新的爬取的種子
	nextPageUrl := extractString(string(contents), 1, regexp.MustCompile(nextPageRe))
	if nextPageUrl == "null" {
	} else {
		result.Requests = append(result.Requests, engine.Request{
			Url:        nextPageUrl,
			ParserFunc: ParseCity,
		})
	}
	// time.Sleep(time.Duration(time.Millisecond * 100))
	return result
}
執行main函數輸出:
......
2021/02/27 15:11:08 Fetching http://www.zhenai.com/zhenghun/xiangxi/2
Got item 16104: 1435923150,一分鐘的情緒,39,女士,離異,重慶,158,大專,null,http://album.zhenai.com/u/1435923150, https://photo.zastatic.com/images/photo/358981/1435923150/555.jpg
Got item 16105: 1636254473,喜歡運動,32,男士,離異,重慶,170,null,5001-8000元,http://album.zhenai.com/u/1636254473, https://photo.zastatic.com/images/photo/409064/16362544931991331
......
Got item 16108: 1888774315,未知的未來,32,女士,未婚,重慶,167,大專,null,http://album.zhenai.com/u/1888774315, https://photo.zastatic.com/images/photo/472194/1888774315/3274.png
.....
正常情況下 470個城市每個前 20個使用者,約 9400條資訊,此時已經明顯超過第一頁總使用者資料量了。
           

2.7 更多使用者與去重

  若通過網頁上更多網頁來形成目标輸入資料,那麼可能存在城市重複和使用者重複的資訊,此時有必要對一些重複的資料進行去重操作。

  由于筆者隻通過城市資訊擷取使用者資料,而不是通過使用者首頁擷取每個的資料,是以實際上隻對城市資訊進行過濾。

常見 URL 去重方法:

  • 哈希表;
  • 計算 MD5 等哈希,再存哈希表;
  • 使用 bloom filter 多重哈希結構;
  • 使用 Redis 的 key-value存儲系統實時分布式去重;

  本案例中主要實作并發爬蟲,降低其它子產品的複雜度,直接通過最簡單的哈希表去重。具體代碼實作如下所示:

vim engine/concurrent.go
......
func (e *ConcurrentEngine) Run(seeds ...Request) {
	out := make(chan ParseResult)
	e.Scheduler.Run()

	for i := 0; i < e.WorkerCount; i++ {
		createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
	}
	for _, r := range seeds {
		if isDuplicate(r.Url) {
			fmt.Printf("Duplicate request: %s\n", r.Url)
			continue
		}
		e.Scheduler.Submit(r)
	}
	itemCount := 0
	for {
		result := <-out
		for _, item := range result.Items {
			fmt.Printf("Got item %d: %v\n", itemCount, item)
			itemCount++
		}
		for _, request := range result.Requests {
			if isDuplicate(request.Url) {
				fmt.Printf("Duplicate request: %s\n", request.Url)
				continue
			}
			e.Scheduler.Submit(request)
		}
	}
}

var visitedUrls = make(map[string]bool)

func isDuplicate(url string) bool {
	if visitedUrls[url] {
		return true
	}
	visitedUrls[url] = true
	return false
}
注:此處隻填寫改動部分 
           

具體代碼見:crawler-v4

3 注意事項

  1. 本章節中的案例已經上傳到 csdn go語言單并發版爬蟲–crawler-v1-v4, 歡迎有需要的讀者下載下傳。
  2. 本案例中通過三種方式實作并發版本的爬蟲,但是最終版本暫未實作儲存現有爬取内容的功能,是以重新開機後需要全部重新從頭爬取,該部分内容會在下一章節進行完善。

4 說明

  1. 軟體環境

    go版本:go1.15.8

    作業系統:Ubuntu 20.04 Desktop

    Idea:2020.01.04

  2. 參考文檔

    由淺入深掌握Go語言 --慕課網