天天看點

第十五章 并發版爬蟲第二版 -- 完結

并發版爬蟲, 在上一篇單機版爬蟲的基礎上演變而來

這裡隻有并發引擎的代碼, 基本的解析器代碼參考: https://www.cnblogs.com/ITPower/articles/12450374.html

一. 單節點版爬蟲的問題

拉取資料的速度太慢, 慢有兩部分. 一部分是網絡請求, 根據url拉取zhenai網的資料. 另一部分是: 解析. 兩部分相比較, 第一部分更慢

既然慢, 我們就要想辦法解決慢的問題. 

其實拉取資料和資料解析, 可以看成是一個部分. 他們都是具體的工作者. 是以, 我們把他們化為工作者模型

第十五章 并發版爬蟲第二版 -- 完結

 那麼, 一個工作者工作, 速度很慢. 很多個工作者同時工作, 速度肯定比一個工作者要快很多. 

這裡想到java的多線程同時工作. go裡面應該就是多協程一起工作了. 将工作者抽象出來, 然後建立多個協程, 比如, 5個, 10個, 20個

在單機版的爬蟲裡面. 會将資料延綿不斷的放入隊列中. 那麼, 在并發版的應該也有一個類似于隊列的東西, 來儲存request. 這裡定義為一個Scheduler

第十五章 并發版爬蟲第二版 -- 完結

 這是并發版的架構模型. 有一個engine, 一個scheduler, 多個worker. 這裡的一個,多個映射到代碼裡表示的是協程. 

sheduler是一個協程排程者, 他把收到的request分發給worker. worker拿到request進行處理. 處理結果輸出為Requests, Items, 交給engine執行引擎, 執行引擎再把新的Request放入到Scheduler排程者中, 然後循環往複

這裡每條線都代表一個chan. 協程和協程之間的通信, 使用的是chan. 

 二. 并發版爬蟲第一版---所有的worker公用一個輸入

第十五章 并發版爬蟲第二版 -- 完結

什麼意思呢?

在單機版是一個隊列, 單線程執行, 去隊列裡去url, 然後get網頁内容, 在進行解析.

在并發版, 我們讓多個工作者同時工作, 去隊列裡取request, 然後同時工作, get網頁内容, 解析網頁.

在go裡多個工作者同時工作, 我們可以考慮開多個協程. 同時去隊列去資料. 這第一版, 把隊列改成了任務排程器Scheduler.  任務排程器是單獨的一個goroutine. 

拿到的request放到Scheduler裡面, 然後多個worker去Scheduler裡拿request進行處理. 

增加一個并發的引擎--concurrectEngine

package engine

import "fmt"

// 定義了一個
type ConcurrectEngine struct {
    Scheduler Scheduler
    WorkerCount int
}

type Scheduler interface{
    Submit(Request)
    ConfigureMasterWorkerChan(chan Request)
}

func (c *ConcurrectEngine) Run(seeds ...Request) {
    // 第一步: 做初始化工作
    in := make(chan Request)
    out := make(chan ParseResult)
    c.Scheduler.ConfigureMasterWorkerChan(in)

    // 建立工作者, 從in輸入管道中取出request進行處理, 處理後放入到ParseResult的管道中
    for i := 0; i<c.WorkerCount; i++  {
        // 建立WorkerCount個工作者
        c.createWorker(in, out)
    }

    // 第二步: 把種子放到任務排程器中
    for _, seed := range seeds {
        c.Scheduler.Submit(seed)
    }

    // 第三步: 處理工作者線程傳回的ParseResult, 将items列印, requests再次添加到任務排程器中
    for {
        pr := <- out
        for _, item := range pr.Items  {
            fmt.Printf("内容項: %v \n", item)
        }

        for _, p := range pr.Req{
            c.Scheduler.Submit(p)
        }
    }
}

func (c *ConcurrectEngine) createWorker(in chan Request, out chan ParseResult) {
    go func() {
        // 從in中取出request請求
        req := <- in
        parseResult, e := worker(req)
        if e != nil {
            return
        }
        out <- parseResult
    }()
}      

拆解分析

// 定義了一個并發的引擎結構, 目的是, 差別于之前寫的單機版的Engine.
type ConcurrectEngine struct {
    Scheduler Scheduler
    WorkerCount int
}      

排程器接口

這裡是定義了一個并發引擎. 每個引擎engine都有自己的Run方法.

// 定義了排程器的接口. 既然是排程器, 就要有一個接受請求的方法.
type Scheduler interface{
    // 接收請求的方法
    Submit(Request)
    // 這裡比較巧妙的地方是, 其實排程器的chan類型和工作者輸入的chan類型是同一個.
    // 排程器輸出的request, 就是工作者的輸入request, 這裡讓他們指向同一個位址.
    ConfigureMasterWorkerChan(chan Request)
}      

定義了排程器的接口. 

在任務開始前, 我們需要将要處理的request放入到排程器中. 是以, 排程器需要有一個方法submit(request)

排程器的輸出request,其實就是工作者的輸入request. 這裡讓他們指向同一個位址, 一個變就都變了. 不用再進行拷貝了

引擎執行的代碼

在并發的引擎結構中, 第一個方法是排程器的接口. 再具體使用的時候, 傳什麼類型的排程器, 就執行其具體的排程規則

func (c *ConcurrectEngine) Run(seeds ...Request) {
    // 第一步: 做初始化工作
    in := make(chan Request)
    out := make(chan ParseResult)
    c.Scheduler.ConfigureMasterWorkerChan(in)

    // 建立工作者, 從in輸入管道中取出request進行處理, 處理後放入到ParseResult的管道中
    for i := 0; i<c.WorkerCount; i++  {
        // 建立WorkerCount個工作者
        c.createWorker(in, out)
    }

    // 第二步: 把種子放到任務排程器中
    for _, seed := range seeds {
        c.Scheduler.Submit(seed)
    }

    // 第三步: 處理工作者線程傳回的ParseResult, 将items列印, requests再次添加到任務排程器中
    for {
        pr := <- out
        for _, item := range pr.Items  {
            fmt.Printf("内容項: %v \n", item)
        }

        for _, p := range pr.Req{
            c.Scheduler.Submit(p)
        }
    }
}

      

開始工作. 

首先, 我們再來思考這個模型

第十五章 并發版爬蟲第二版 -- 完結

剛開始, 種子請求過來了, 我們将其放入到排程器中. 排程器是單獨工作的. 隻需要有一個排程器. 

然後從排程器中取出一個request. 通過管道in chan request, 将請求發送給worker. worker處理請求, 處理完成以後, 将處理結果放入out chan ParseResut類型的管道中. 通過管道進行通訊

然後引擎從管道中取出ParseResult, 将其中的items部分列印出來, 将新的requests添加到Scheduler排程器中.

循環反複執行

看看engine的Run代碼

第一步: 做了初始化操作. 輸入管道, 輸出管道. 以及Scheduler排程器中的管道就是in輸入管道.

第二步: 将初始的種子請求, 放入到任務排程器中. 

第三步: 從排程器中取出一個請求, 進行任務處理. 

第四步: 處理傳回的處理結果.

具體排程器的代碼

package sechduler

import "aaa/crawler/zhenai/engine"

type SimpleScheduler struct {
    workerChan chan engine.Request

}

// 這裡需要是一個位址拷貝, 而不是值拷貝. 也就是workerChan和in使用的是同一個位址
func (s *SimpleScheduler) ConfigureMasterWorkerChan(in chan engine.Request) {
    s.workerChan = in
}

// 将請求添加到任務排程器
func (s *SimpleScheduler) Submit(r engine.Request) {
    go func() {
        s.workerChan <- r
    }()
}      

這裡, 排程器裡的submit定義為一個go routine, 原因是, 如果不定義為goroutine, 會出現循環等待. 然後卡死, 為什麼會卡死呢?

根本原因還是管道的特性. 必須要有人從管道中取走資料, 且同時有人向管道中放資料, 這樣才可以, 沒有人取資料或沒有人發資料, 管道就會一直等待. 

func (c *ConcurrectEngine) createWorker(in chan Request, out chan ParseResult) {
    go func() {
        for  {
            // 從in中取出request請求
            req := <- in
            parseResult, e := worker(req)
            if e != nil {
                return
            }
            out <- parseResult
        }
    }()
}      

看這個工作者, 從in中取出request, 然後處理後發送給out. 在run中在取出out的ParseResult, 發送給任務排程器. 任務排程器的submit方法裡, 将request添加到workerChan. 也就是隻有workerChan添加成功了, 反過來這一些列的流程才能繼續執行. 但是workerChan是否能夠添加成功呢? 這又取決于, 是否有woker取走workerChan中的request. 現在有10個goroutine, 10個goroutine都工作起來的, 都開始等待新的worker取走request. 但是有沒有新的worker執行工作了, 是以, 就進入了循環等待. 出現卡住的現象

要解決這個問題, 其實也很簡單. 保證workerChan不會循環等待. 給workerChan開一個單獨的goroutine. 這樣, 在這裡就不會循環等待了. 執行到submit, 就開了一個goroutine. 然後這個動作就執行完了, 那麼worker就有功夫去workerChan中取資料了, 這樣submit中的request也可以添加到workerChan了, 整個鍊路有活起來了.

 其實最終的架構變成了這樣

第十五章 并發版爬蟲第二版 -- 完結

 為每一個request建立了一個goroutine. 然後等待去發送到worker. 發送完成以後, 就關閉了.

執行結果: 

第十五章 并發版爬蟲第二版 -- 完結

三. 并發爬蟲第二版---将scheduler和worker都變成一個隊列

為什麼要将Scheduler和worker變成隊列呢?

在上面, 我們剛開始會出現循環等待, 程式卡死, 為了解決這個問題, 我們給scheduler的submit添加了一個單獨的goroutine. 讓request放入workerChan的過程中迅速執行完畢, 不要等待. 

但這樣有個缺點, 我開了一個goroutine, 這個goroutine執行的怎麼樣, 執行了麼?我們是不知道的. 沒有辦法跟蹤

對于worker來說, 有10個worker. 每次都是這10個worker去workerChan裡面搶request, 如果我想要把某個request分發給指定的worker去執行, 這樣是不可以的. 

這樣如果我們想做負載均衡, 就會很困難了. 是以. 我們将scheduler和worker都變成一個隊列. 然後可以受我們的控制. 我們可以主動分發

 感受go使用channel進行通信

這個demo寫完了以後, 最大的感受就是go使用channel進行通信. 在scheduler和request使用的是channel進行通信, scheduler和worker之間也是使用的channel進行通信.

結構

第十五章 并發版爬蟲第二版 -- 完結

 這個結構的重點, 在于Scheduler, 在Scheduler裡面, 管理了兩個隊列. 一個是Request隊列, 一個是worker隊列. 過來的請求, 發送給空閑的worker去工作. 

也就是說, worker還是原來的worker . request還是原來的request, 變化的是排程器, 排程器變成了一個隊列排程器

第十五章 并發版爬蟲第二版 -- 完結

來看看具體的代碼實作

package sechduler

import "aaa/crawler/zhenai/engine"

type QueuedScheduler struct {
    RequestChan chan engine.Request
    // 工作者channel, 他的類型是request類型的chan. 工作者需要接受request類型的chan, 然後進行工作
    // 多個工作者之間, 也是一個chan
    WorkerChan chan chan engine.Request
}

func (q *QueuedScheduler) Submit(r engine.Request) {
    q.RequestChan <- r
}

func (q *QueuedScheduler) ConfigureMasterWorkerChan(chan engine.Request) {
    panic("implement me")
}

/**
 * 告訴我哪一個worker已經準備好, 可以工作了
 */
func (q *QueuedScheduler) WorkerReady(worker chan engine.Request) {
    q.WorkerChan <- worker
}

func (q *QueuedScheduler) Run() {
    q.RequestChan = make(chan engine.Request)
    q.WorkerChan = make(chan chan engine.Request)
    go func() {
        var requestQ []engine.Request
        var workerQ []chan engine.Request
        for {
            var requestActive engine.Request
            var workerActive chan engine.Request
            if len(requestQ) > 0 && len(workerQ) > 0 {
                requestActive = requestQ[0]
                workerActive = workerQ[0]
            }

            select {
            case r := <-q.RequestChan:
                // 發送給worker
                requestQ = append(requestQ, r)
            case w := <-q.WorkerChan:
                // 将下一個請求發送給worker
                workerQ = append(workerQ, w)
            case workerActive <- requestActive:
                    requestQ = requestQ[1:]
                    workerQ = workerQ[1:]
            }
        }
    }()
}      

這個結構, 充分展現了, 使用channel進行通信

1. 外部有一個請求過來了, 那麼調動submit将請求發送到request chan

2. 外部有一個worker已經準備好可以工作了, 調用workerReady,将準備好的worker放入到workerChan

3. 接下來, 如果有請求來, 我們就想請求添加到request隊列. 如果有worker準備好了, 從workerChan中取出放入到worker隊列

4. 當request隊列中有請求過來, 且worker隊列中有等待的worker的時候, 就把這個請求發送給這個worker, 讓worker開始工作, 處理request

engine做簡單修改

package engine

import "fmt"

// 定義了一個并發的引擎結構, 目的是, 差別于之前寫的單機版的Engine.
type ConcurrectEngine struct {
    Scheduler Scheduler
    WorkerCount int
    WorkChan chan interface{}
}

// 定義了排程器的接口. 既然是排程器, 就要有一個接受請求的方法.
type Scheduler interface{
    // 接收請求的方法
    Submit(Request)
    // 這裡比較巧妙的地方是, 其實排程器的chan類型和工作者輸入的chan類型是同一個.
    // 排程器輸出的request, 就是工作者的輸入request, 這裡讓他們指向同一個位址.
    ConfigureMasterWorkerChan(chan Request)
    WorkerReady(chan Request)
    Run()
}

func (c *ConcurrectEngine) Run(seeds ...Request) {
    // 第一步: 做初始化工作
    out := make(chan ParseResult)
    c.Scheduler.Run()

    // 建立工作者, 從in輸入管道中取出request進行處理, 處理後放入到ParseResult的管道中
    for i := 0; i<c.WorkerCount; i++  {
        // 建立WorkerCount個工作者
        c.createWorker(c.Scheduler, out)
    }

    // 第二步: 把種子放到任務排程器中
    for _, seed := range seeds {
        c.Scheduler.Submit(seed)
    }

    // 第三步: 處理工作者線程傳回的ParseResult, 将items列印, requests再次添加到任務排程器中
    itemCount := 0
    for {
        pr := <- out
        for _, item := range pr.Items  {
            fmt.Printf("内容項:  %d, %v \n", itemCount, item)
            itemCount ++
        }

        for _, p := range pr.Req {
            c.Scheduler.Submit(p)
        }
    }
}

func (c *ConcurrectEngine) createWorker(sche Scheduler, out chan ParseResult) {
    in := make(chan Request)
    go func() {
        for  {
            // 告訴排程器, 我已經準備好開始工作了
            sche.WorkerReady(in)
            // 從in中取出request請求
            req := <- in
            parseResult, e := worker(req)
            if e != nil {
                return
            }
            out <- parseResult
        }
    }()
}      

 這樣功能是完成了, but, 來看看simple和queued這兩個Scheduler排程器. 先來看看他們的接口

// 定義了排程器的接口. 既然是排程器, 就要有一個接受請求的方法.
type Scheduler interface{
    // 接收請求的方法
    Submit(Request)
    // 這裡比較巧妙的地方是, 其實排程器的chan類型和工作者輸入的chan類型是同一個.
    // 排程器輸出的request, 就是工作者的輸入request, 這裡讓他們指向同一個位址.
    ConfigureMasterWorkerChan(chan Request)
    WorkerReady(chan Request)
    Run()
}      

simple 實作了接口的前兩個方法, 而queued沒有實作第二個方法. 下面我們來統一一下接口的方法. 先看看第二個方法

第二個方法是幹什麼用的? 

ConfigureMasterWorkerChan(chan Request)

在simple中, 我們所有的worker都有一個共同的輸入request. 通過 這個方法ConfigureMasterWorkerChan(chan Request), 我們将worker的輸入和simple中的workerChan關聯了

在Queued中, 我們的每一個worker都有一個自己的輸入 request, 和第一個的差別是第一個是所有worker公用一個chan request. 而第二種是每個worker自己一個request

我們對這件事進行一個抽象, 在engine中, 調用哪一個Scheduler, 他是不知道的. 那麼到底worker是公用一個chan request, 還是每個worker有一個chan reqeust呢, 他也是不知道的.

那麼誰知道呢? 具體的Scheduler排程器知道. 也就是, Simple Scheduler知道, Queued Scheduler也知道.

是以, 我們寫一個方法, 來問各種類型的排程器要chan request. 

上面是把獲得的item列印了, 下面我們來設計入庫的子產品

分析: 其實,我們要做是什麼呢? 将列印變成入庫操作. but, 有一個問題. 我們在fetch階段, 能直接入庫麼?和資料庫互動的速度, 肯定會影響fetch的速度.

其實這裡有兩種解決方案

1. 放到記憶體list中. 批量儲存

2. 使用chan, 間資料發送到管道裡面, 在單開一個goroutine, 去取資料, 将取出來的資料save到資料庫.

因為, 我們是在學習go, 是以,采用第二種方式

三. docker和ElasticSearh

資料庫---我們使用elasticSearch, 搭建在docker容器之上

因為都是初次接觸, 是以,一步一步. 先搭建起docker的環境,然後在docker上安裝elasticSearch.

Docker 的主要用途,目前有三大類

Docker 的主要用途,目前有三大類。

(1)提供一次性的環境。比如,本地測試他人的軟體、持續內建的時候提供單元測試和建構的環境。

(2)提供彈性的雲服務。因為 Docker 容器可以随開随關,很适合動态擴容和縮容。

(3)組建微服務架構。通過多個容器,一台機器可以跑多個服務,是以在本機就可以模拟出微服務架構。

而我們這裡應該是使用docker的第三類用途. 後面我們要做分布式版的爬蟲, 可以使用docker來模拟多台伺服器間調用.

docker的安裝和使用

第一步: 官網docker: https://docs.docker.com/

    我是mac: 下載下傳位址: https://hub.docker.com/editions/community/docker-ce-desktop-mac/

第二步: 注冊docker. 我之前注冊過,直接登入即可

第三步: 基本指令

檢視docker有哪些指令
docker      
第十五章 并發版爬蟲第二版 -- 完結
檢視docker的版本資訊
docker version      
第十五章 并發版爬蟲第二版 -- 完結
檢視docker的基本資訊
docker info      
第十五章 并發版爬蟲第二版 -- 完結

 比如docker有兩部分組成,sercer和client. 我們啟動的docker desktop就是一個server. 在控制台使用docker指令就是一個client

Server中的Registry是下載下傳鏡像的位址.

elasticSearch是什麼

參考這篇文章, 說的很明白:https://blog.csdn.net/paicmis/article/details/82535018

Lucene是單機的模式,如果你的資料量超過了一台實體機的容量,你需要擴容,将資料拆分成2份放在不同的叢集,這個就是典型的分布式計算了。需要拷貝容錯,機器當機,資料一緻性等複雜的場景,這個實作就比較複雜了。

ES解決了這些問題

1、自動維護資料的分布到多個節點的索引的建立,還有搜尋請求分布到多個節點的執行

2、自動維護資料的備援副本,保證了一旦機器當機,不會丢失資料

3、封裝了更多進階的功能,例如聚合分析的功能,基于地理位置的搜尋

ElasticSearch的功能

分布式的搜尋引擎和資料分析引擎

搜尋:網站的站内搜尋,IT系統的檢索

資料分析:電商網站,統計銷售排名前10的商家

全文檢索,結構化檢索,資料分析

全文檢索:我想搜尋商品名稱包含某個關鍵字的商品

結構化檢索:我想搜尋商品分類為日化用品的商品都有哪些

資料分析:我們分析每一個商品分類下有多少個商品

對海量資料進行近實時的處理

分布式:ES自動可以将海量資料分散到多台伺服器上去存儲和檢索

海聯資料的處理:分布式以後,就可以采用大量的伺服器去存儲和檢索資料,自然而然就可以實作海量資料的處理了

近實時:檢索資料要花費1小時(這就不要近實時,離線批處理,batch-processing);在秒級别對資料進行搜尋和分析

ElasticSearch的應用場景

維基百科

The Guardian(國外新聞網站)

Stack Overflow(國外的程式異常讨論論壇)

GitHub(開源代碼管理)

電商網站

日志資料分析

商品價格監控網站

BI系統

站内搜尋

ElasticSearch的特點

可以作為一個大型分布式叢集(數百台伺服器)技術,處理PB級資料,服務大公司;也可以運作在單機上,服務小公司

Elasticsearch不是什麼新技術,主要是将全文檢索、資料分析以及分布式技術,合并在了一起

對使用者而言,是開箱即用的,非常簡單,作為中小型的應用,直接3分鐘部署一下ES

Elasticsearch作為傳統資料庫的一個補充,比如全文檢索,同義詞處理,相關度排名,複雜資料分析,海量資料的近實時處理;

先大概了解一下elasticSearch可以幹什麼, 接下來我們在使用

ElasticSearch安裝和使用

第一步: 下載下傳并運作elasticSearch

下載下傳并運作elasticSearch
docker run -d -p9200:9200 daocloud.io/library/elasticsearch

這裡的-p 後面跟的是端口号. 
第一個9200表示映射到實體機的端口是9200
第二個9200表示elasticSearch在虛拟機中運作的端口是9200      

第二步: 查詢運作情況

第十五章 并發版爬蟲第二版 -- 完結

 表示目前已經運作起來的elasticSearch

第三步: 在浏覽器輸入localhost:9200

第十五章 并發版爬蟲第二版 -- 完結

 看到如上資訊, 表示已經啟動成功了

第四步: 簡單了解如何使用elasticSearch

打開postman

我們可以直接運作localhost:9200 ,GET請求, 擷取到目前已經連接配接的elasticSearch資料庫

接下來我們添加一條記錄

在elasticSearch中Index/Type/id ,對應于資料庫的是index--->database, Type---->table, id--->記錄的id

比如:

添加一條記錄為1的資料

請求方式: POST
請求的url: localhost:9200/pachong/user/1      

執行後的結果

{
    "_index": "pachong",
    "_type": "user",
    "_id": "1",
    "_version": 1,
    "result": "created",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    },
    "created": true
}

      

 前三個字段分别是: 庫名, 表明, 記錄的id. result表示目前是created.

來查詢剛剛儲存的記錄

請求方式: GET
請求url: localhost:9200/pachong/user/1      

就擷取到了剛剛添加的記錄

{
    "_index": "pachong",
    "_type": "user",
    "_id": "1",
    "_version": 1,
    "found": true,
    "_source": {
        "name": "lxl",
        "age": 12
    }
}      

如果想查詢所有的記錄呢?

請求方式: GET
請求url: localhost:9200/pachong/user/_search
輸入參數: 空      

傳回結果:

{
    "took": 97,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 2,
        "max_score": 1,
        "hits": [
            {
                "_index": "pachong",
                "_type": "user",
                "_id": "2",
                "_score": 1,
                "_source": {
                    "name": "ykk",
                    "age": 43
                }
            },
            {
                "_index": "pachong",
                "_type": "user",
                "_id": "1",
                "_score": 1,
                "_source": {
                    "name": "lxl",
                    "age": 12
                }
            }
        ]
    }
}      

這就查詢到了資料庫中的所有記錄

根據條件查詢, 比如查詢名字是ykk的

請求方式: GET
請求url: localhost:9200/pachong/user/_search?q=ykk      

查詢結果

{
    "took": 4,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 1,
        "max_score": 0.25811607,
        "hits": [
            {
                "_index": "pachong",
                "_type": "user",
                "_id": "2",
                "_score": 0.25811607,
                "_source": {
                    "name": "ykk",
                    "age": 43
                }
            }
        ]
    }
}      

四. 将資料儲存到elasticSearch

通過上面的demo, 我們知道elasticSearch使用的是restful的風格增删改查資料的. 那麼我們可以直接使用http.Get, http.Post就可以實作

處理這種方式, 市面上還有對應的elasticSearch用戶端, 我們使用用戶端會更友善

百度 elasticSearch client--->找到對應的官網, https://www.elastic.co/guide/en/elasticsearch/client/index.html

第十五章 并發版爬蟲第二版 -- 完結

 點選最後一個社群版用戶端. 進去之後檢視go的elasticSearch client

第十五章 并發版爬蟲第二版 -- 完結

 這裡我們使用第二類Google go, 點選進去是插件的源碼, 看後面的README. 

我們使用的elasticSearch版本是5.12, 是以下載下傳對應的5版本的client.

在go中執行下載下傳client.

第十五章 并發版爬蟲第二版 -- 完結

下載下傳完成就可以使用了, 使用文檔: https://godoc.org/gopkg.in/olivere/elastic.v5

func Save(item interface{}) (string, error) {
    // 第一步: 建立一個elasticSearch client
    // 文檔: https://godoc.org/gopkg.in/olivere/elastic.v5
    client, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"),
        // sniff: 是用來維護用戶端叢集的狀态的. 但是我們的叢集不跑在本機上,而是跑在docker上.
        // docker隻有一個内網, 内網我們看不見.是以沒有辦法維護狀态, 設定為false
        elastic.SetSniff(false))
    if err != nil {
        return "", err
    }

    // 将item資料儲存到的elasticSearch中
    // elastic search 儲存資料使用的Index.
    // elasticSearch 資料中的三部分分别是 /index/type/id  . 對應資料庫的/database/table/id
    response, err := client.Index().
        Index("dataing_profile").
        Type("zhenai").
        BodyJson(item).
        Do(context.Background())
    if err != nil {
        return "", err
    }

    // +v 表示列印結構體帶上結構體的名稱
    return response.Id, nil

}      

第一步: 建立elasticSearch 連接配接

第二步: 儲存資料. elasticSearch儲存資料的方法是Index

完畢, 是不是很簡單....

接下來寫一個單元測試, 測試我們是否添加資料成功了

func TestSave(t *testing.T) {
    tests := []struct {
        name string
        item model.Profile
    }{
        {"1",
        model.Profile{
                Name: "冰靓晴雪",
                Marry: "已婚",
                Age:23,
                Xingzuo: "白羊座",
                Height: 154,
                Weight: 49,
                WorkerPlace: "北京",
                Salary: "10000-20000",
                Occuption: "銷售總監",
                Education: "大學大學",
                Jiguan: "四川",
                Hobby: "理财",
                House:"有房",
                Car:"有車",
                IsChild:"不要小孩",
             },
        },
    }
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            id, e := Save(tt.item)
            if e != nil {
                panic(e)
            }

            client, e := elastic.NewClient(elastic.SetSniff(false))
            if e != nil {
                panic(e)
            }

            resp, e := client.Get().
                Index("dataing_profile").
                Type("zhenai").
                Id(id).
                Do(context.Background())
            if e != nil {
                panic(e)
            }

            fmt.Printf("%s", *resp.Source)

            user := model.Profile{}
            e = json.Unmarshal(*resp.Source, &user)
            if e != nil {
                panic(e)
            }

            if user != tt.item {
                t.Errorf("error")
            }
        })
    }
}      

第一步: 寫測試的cases

第二步: 循環周遊cases, 調用save方法儲存

第三步: 取出save中儲存的内容

第四步: 和初始值對比, 是否一緻

第十五章 并發版爬蟲第二版 -- 完結

繼續閱讀