分布式消息傳遞的方式.
- REST
- RPC
- 消息隊列
都在什麼情況下使用這三種方式呢?
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcuYTM2QDM4EzN0ETL1gzMzQTM0QDM5EzMwAjMwITL2ETO3gTMx8CXzADMyAjMvwlNxkzN4ETMvwVY0VmYtk2Lc12bj5ycn9Gbi52YuAjMwIzZtl2Lc9CX6MHc0RHaiojIsJye.png)
1. 用戶端和主伺服器之間, 使用的是REST請求方式
2. 主伺服器和其他子伺服器之間通信,比如接口調用, 可以使用RPC
3. 伺服器和伺服器之間消息傳遞可以是用消息隊列
對外: 使用REST
子產品内部:使用RPC
子產品之間: 使用中間件或REST
分布式架構 VS 微服務架構
看上面的三點:
1. 微服務架構: 是按照業務子產品來劃分的
2. 分布式架構: 每個業務子產品部署多個節點, 同一個子產品之間節點是如何通信的. 不同子產品之間節點是如何通信的
3. 微服務架構是基礎, 知道我們項目如何拆分, 分布式架構是實作. 二者是結合使用的.
并發版爬蟲的架構
這是上一章最後做成了并發版爬蟲的架構.
思考:為什麼需要轉成分布式架構? 并發版架構不可以麼?
這個問題, 還需要從實際出發, 我們遇到了什麼樣的問題. 針對這些問題, 我們來思考解決方案
并發版爬蟲. 我們遇到了哪些問題呢?
1. 限流: 單個節點擷取流量的速度是有限的. 珍愛網限制了我們爬取的速度, 如果想要爬取資源, 就必須限速在其以下, 否則就會被攔截
2. 存儲問題: 存儲部分的結構, 技術棧和爬蟲差别很大. 如果想要在存儲上進一步優化, 就需要特殊ElasticSearch技術背景的人. 而這部分人可能對爬蟲的技術架構不是特别關心. 為了友善分工協作, 我們把存儲子產品單獨提取出來. 這部分呢叫做固有分布式, 也就是說,通常我們都按照這個進行劃分的.
3. 去重問題: 我們要過濾抓取到的同一個使用者的多次入庫, 如果去重邏輯很複雜,這一塊也會很耗時. 是以也需要提取出來單獨處理.
分布式爬蟲的架構
每一個方框都是一個節點
worker: 處理fetch速度慢的問題, 作為一個單獨的子產品. 并且部署成多個節點, 同時去并發抓取網頁資料
存儲問題: 儲存入庫也是比較浪費時間, 浪費性能的, 葉提取出來作為一個單獨的服務
我們下面就來實作這樣一個分布式. 我們使用docker來實作.
正好可以看看docker是如何具有良好的擴充性, 如何收,如何放的.
從Channel到分布式
并發版爬蟲到分布式爬蟲轉換, 他的關鍵在哪裡呢? 關鍵在于從channel到分布式
并發版爬蟲有很多goroutine , goroutine之間通過channel進行通信. 現在我們要做的就是将使用channel進行通信的節點, 換一種機制.
RPC都有哪些
- jsonRpc
- GRPC
- Thrift
本次我們使用jsonRpc來實作
什麼是RPC?
RPC(Remote Procedure Call)是遠端過程調用,它是一種通過網絡從遠端計算機程式上請求服務,而不需要了解底層網絡技術的協定。
RPC協定假定某些傳輸協定的存在,如TCP或UDP,為通信程式之間攜帶資訊資料。在OSI網絡通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分布式多程式在内的應用程式更加容易。
RPC采用客戶機/伺服器模式。請求程式就是一個客戶機,而服務提供程式就是一個伺服器。首先,客戶機調用程序發送一個有程序參數的調用資訊到服務程序,然後等待應答資訊。
接下來我們模拟一個rpc調用
go簡單模拟RPC實作
1. 實作服務端的業務代碼
package rpc
import "errors"
// 首先模拟一個服務
type DemoService struct {
}
type Args struct {
A, B int
}
// 我們來做一個除法
// rpc調用要求有兩個參數
func (DemoService) Div(args Args, result *float64) error {
if args.B == 0 {
return errors.New("除數不能為0")
}
*result = float64(args.A) / float64(args.B)
return nil
}
- 這裡就做了一個除法
- 共rpc調用的方法, 必須要有兩個參數, 一個是入參, 一個是傳回值
2. 服務端模拟
package main
import (
rpc2 "aaa/rpc"
"github.com/kelseyhightower/confd/log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
// 下面将模拟rpc的server, 将服務端調用封裝起來
func main() {
// 第一步: 把我們寫好的DemoService注冊到rpc上
rpc.Register(rpc2.DemoService{})
// 第二步: 開服務, 服務的端口是1234
listener, e := net.Listen("tcp", ":1234")
if e != nil {
panic(e)
}
for {
// 開始等待用戶端連接配接進來
conn, e := listener.Accept()
if e != nil {
log.Info("用戶端連接配接異常: %v", e)
}
// 運作一個jsonrpc服務, 這裡來了goroutine, 這樣就不用等待處理完成了,
// 下一個連接配接來了, 就可以直接連進來
go jsonrpc.ServeConn(conn)
}
}
- 将寫好的服務功能注冊到服務端
- 定義監聽服務的端口和協定
- 和用戶端建立連接配接, 等待用戶端連接配接
3. 用戶端模拟
package main
import (
rpc2 "aaa/rpc"
"fmt"
"net"
"net/rpc/jsonrpc"
)
func main() {
// 撥号, 和端口号為1234的tcp連接配接
conn, e := net.Dial("tcp", ":1234")
if e != nil {
panic(e)
}
// 建立連接配接
client := jsonrpc.NewClient(conn)
// 發送用戶端資料
var result float64
e = client.Call("DemoService.Div", rpc2.Args{10, 5}, &result)
if e != nil {
fmt.Printf("錯誤資訊: %v \n", e)
} else {
fmt.Printf("result: %f \n", result)
}
e = client.Call("DemoService.Div", rpc2.Args{3, 0}, &result)
if e != nil {
fmt.Printf("錯誤資訊: %v \n", e)
} else {
fmt.Printf("result: %f \n", result)
}
}
- 撥号, 向端口号為1234的服務端撥号
- 建立連接配接
- 發送用戶端消息
服務端收到消息的處理結果
result: 2.000000
錯誤資訊: 除數不能為0
Process finished with exit code 0
下面将并發版爬蟲, 改為分布式爬蟲.
2. 将worker也改為使用rpc進行通信
第一部分, ItemSaver我們已經順利的使用rpc進行通信了
接下來我們對第二部分worker, 也提取成rpc
我們來看看worker的實作
入參是一個Request
傳回值是: ParseResult,error
Request的如下:
type Request struct {
Url string
ParseFun func(content []byte) ParseResult
}
type ParseResult struct {
Req []Request
Items persist.Item
}
Request結構體有兩個參數, 一個是函數, 一個是Url
Url是一個字元串, 可以在網絡上傳輸, 網絡上傳輸的都是字元串,整數, 能夠翻譯成json的封包. ParseFunc是一個函數, 函數是不能直接在網絡上傳輸的, 于是, 我們要将函數進行序列化, 然後, 在用戶端在進行反序列化
是以我們要對解析器進行序列化和反序列化
序列化: 一端将内容轉換成能夠在網絡上傳輸的封包
反序列化: 另一端收到以後, 要進行反序列化, 執行裡面的代碼