一、前言
最近連續釋出了幾篇Golang開發資料庫應用的技術文章。有讀者詢問項目背景,這裡簡單介紹一下。筆者現在主導設計開發一個大資料項目,涉及到ElasticSearch、HBase、MongoDB、MySql、ClickHouse等系統。
之是以用到這麼多資料庫系統,是因為整個流程涉及了資料開發、清洗、統計、分析、查詢和展示等環節,業務邏輯複雜,資料量龐大。為了最大化滿足功能需求同時兼顧性能,需要考慮到每個系統在資料處理方面的優點并将其發揮到極緻。
本文是Golang開發HBase應用方面第三篇技術心得,主要介紹如何通過Apache Thrift2這個協定庫進行HBase相關應用的開發,特别介紹了其中容易導緻問題的一些技術陷阱。
筆者在調研過程中發現,關于Thrift協定和開發庫的介紹資料比較多,使用Go語言結合Thrift開發的資料也能找到一些,但是中文社群裡,系統性的從頭到尾梳理Go語言結合Thrift進行HBase開發,并對技術細節加以分析的内容還是略顯匮乏。
也許本文沒有所謂技術創新點,但是結合可操作的執行個體的介紹開發流程,同時指出潛在的技術陷阱并給出解決方案,應該算是具有實踐指導意義的經驗總結,不算重新發明輪子。筆者一個小小的願望就是讀者通過閱讀這篇文章,直接就能上手Go語言HBase開發,解決實際問題。
接下來部分按如下組織:第二部分介紹準備工作,包括如何下載下傳、編譯和安裝Thrift;第三部分結合執行個體說明如何操作HBase資料庫;第四部分是高階話題,介紹如何如何通過連接配接池方式,達到Thrift對并發操作的支援;最後第五部分是全文總結。
本文預設背景是讀者已經熟悉Golang程式開發工具,并能夠進行基本程式開發。另外,本文所有執行個體都在Ubuntu-20.04.1Linux系統環境下運作測試通過,如無特别說明,文中會将Thrift/Thrift2混用,這裡僅指Thrift2版本。
二、準備工作
2.1 Thrift2 下載下傳,編譯和安裝
Thirft庫的源代碼需要從網上下載下傳,在Linux環境下,運作:
wget https://dlcdn.apache.org/thrift/0.16.0/thrift-0.16.0.tar.gz
即可下載下傳Thrift源碼包,目前最新版本是0.16.0。
下載下傳完成後,運作tar xzvf thrift-0.16.0.tar.gz 解壓縮,然後開始編譯。在編譯前,確定環境所有工具包都已經齊備,如果不确定,在Ubuntu Linux環境下,運作以下指令把所需要元件一股腦裝上:
./configuremake && make install
完成編譯和安裝。如果不運作make install,編譯好的程式會在thrift-0.16.0/compiler/cpp 路徑下,名稱為thrift。
2.2 啟動Thrift2服務
這一步要保證使用的HBase系統開啟Thrift服務,具體方法是在HBase的主節點,運作以下啟動指令:
下載下傳https://raw.githubusercontent.com/apache/hbase/2.4.11RC0/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift2/hbase.thrift 檔案,不知為何,這裡無法使用wget下載下傳。運作指令 thrift --out ./hbase --gen go Hbase.thrift 運作成功後,目前路徑下生成hbase/ 目錄,就是對應的Go開發包
## 運作指令後,在目錄下将新生成一名為 hbase 的目錄,裡面就包含了所需的go檔案了## hbase的目錄結構如下:hbase └── GoUnusedProtection__.go ├── Hbase-consts.go ├── Hbase.go ├── t_h_base_service-remote ## 該檔案夾是demo檔案夾,可以删掉 └── t_h_base_service-remote.go ## 用戶端的測試代碼,直接可拿來用
将生成hbase目錄拷貝到你的Go開發環境目錄下,至此就完成了Thrift庫準備工作。
生成的hbase開發包,比較重要的一個檔案是hbase/Hbase.go,大概有2萬餘行,相當于通路Thrift遠端過程調用(RPC) 的包裹器(Wrapper),将Go語言對HBase的操作請求轉成RPC請求,并處理傳回結果。
三、開發工作
本章節給出一個例子,實作基本的HBase操作。
3.1 資料表準備
在HBase資料庫中建立資料表'demo_tab',包含一個列簇(Family)'info',建表指令如下:
create 'demo_tab', 'info'
然後輸入資料,這裡輸入10條資料,行鍵(row key)分别是'row_1' ... 'row10',資料列(Qualifier)分别是info:val_1, info:val_2。
3.2 簡單例子
這裡假設開發環境目錄為dev/,另外,為了節省篇幅,這裡省去了所有錯誤處理。
package mainimport ( "fmt" "github.com/apache/thrift/lib/go/thrift" // Apache提供的go thrift工具包 "context" "dev/hbase" // thrift工具生成hbase工具包,放在dev/ 目錄下)const HOST = "hbase.master.node" // 根據具體環境設定,hbase主節點const PORT = "9090" func main() { table := "demo_tab" // 測試資料表 rowKey := "row_5" // 某一行,這裡row_5行 newRowKey := "row_11" // 新寫入一行 family := "info" // 列簇名 protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() transport, _ := thrift.NewTSocket(HOST + ":" + PORT) // 建立連接配接 client := hbase.NewTHBaseServiceClientFactory(transport, protocolFactory) // 用戶端與連接配接關聯 transport.Open() // 執行連接配接操作 defer transport.Close() // 結束關閉 /*** 判斷資料表是否存在 ***/ isExists, _ := client.TableExists(context.Background(), &hbase.TTableName{Ns: []byte(family), Qualifier: []byte(table)}) fmt.Printf("table{%s} Exists:%t\n", table, isExists) /*** 判斷記錄是否存在 ***/ isExists, _ = client.Exists(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)}) fmt.Printf("rowkey{%s} in table{%s} Exists:%t\n", rowKey, table, isExists) /*** Put 一條資料 ***/ cvArr := []*hbase.TColumnValue{ { Family: []byte(family), Qualifier: []byte("val_1"), Value: []byte("new value of column val_1"), }, } data2Put := hbase.TPut{Row: []byte(newRowKey), ColumnValues: cvArr} client.Put(context.Background(), []byte(table), &data2Put) /*** Get 寫入的資料 ***/ result, _ := client.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(newRowKey)}) fmt.Println("Rowkey:" + string(result.Row)) for _, cv := range result.ColumnValues { fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value)) } /*** 帶有過濾器的Scan操作 ***/ startRow, stopRow := "row_1", "row_9" qual, val := "val_1", "aaa" col1 := hbase.TColumn{Family: []byte(family), Qualifier: []byte("val_1")} col2 := hbase.TColumn{Family: []byte(family), Qualifier: []byte("val_2")} filterStr := fmt.Sprintf("SingleColumnValueFilter('%s','%s',=,'binary:%s', false, true)", family, qual, val) results, _ := client.GetScannerResults(context.Background(), []byte(table), &hbase.TScan{ StartRow: []byte(startRow), // 掃描範圍起始 StopRow: []byte(stopRow), // 掃描範圍結束 FilterString: []byte(filterStr), // 設定過濾字段和值 Columns: []*hbase.TColumn{&col1, &col2}, // 要讀取的字段 MaxVersions: 1, // ** 注意版本,如果需要讀取最新版本,要跟Hbase表屬性設定一緻,預設為0 }, 100) fmt.Printf("GetScannerResults %d done\n", len(results)) for _, k := range results { fmt.Println("scan Rowkey:" + string(k.Row)) for _, cv := range k.ColumnValues { fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value)) } }}
上述可以正常編譯運作,注意HBase中對應的表要有資料,以及程式中各類環境設定要與真實情況一緻。
3.3 過濾器問題
3.2節給出的例子裡面,有一個帶有過濾器的掃描操作,具體作用是對掃描查詢的結果按照一定條件過濾。關于過濾器的說明,這裡有個參考連結,将過濾器的種類、作用和格式都做了詳細說明,非常具有參考價值:
https://www.cnblogs.com/ZisZ/p/9667271.html
關于掃描過濾器,有一個新手很容易出問題的地方,在3.2節程式例子中我用星号标記出來(54行)——MaxVersions屬性的設定。
Go語言會對結構體自動設定預設值,如果不顯式設定MaxVersions屬性的值,在程式中這個值就會是0,然後使用者運作時就會發現,明明資料庫有符合條件的值,怎麼傳回結果是空?在其他設定都确認無誤情況下,MaxVersions是一個容易掉入陷阱的地方。這個屬性意義是傳回資料庫中資料版本允許的最大值——HBase對于字段可以設定版本屬性,允許最多保留N個曆史版本,每次更新,版本也會增加。第一次寫入的資料,版本号就是1,如果隻保留最新版本,版本号就停留在1。如果MaxVersions設定成0,就沒有符合條件的資料被傳回,這就是出現結果為空的原因。
四、高階話題 —— 連接配接池問題
介紹完基本例子,本章介紹一個進階話題——Thrift連接配接池問題。現用一個例子說明,我們上一章節例子加以改造,利用Go協程的并發特性,對HBase開10個協程并發讀取,看看結果怎樣。
package mainimport ( "fmt" "time" "context" "dev/hbase" "github.com/apache/thrift/lib/go/thrift" )const HOST = "hbase.master.node" // 根據具體環境設定,hbase主節點const PORT = "9090"func main() { table := "demo_tab" // 測試資料表 protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() transport, _ := thrift.NewTSocket(HOST + ":" + PORT) // 建立連接配接 client := hbase.NewTHBaseServiceClientFactory(transport, protocolFactory) // 用戶端與連接配接關聯 transport.Open() // 執行連接配接操作 defer transport.Close() // 結束關閉 // 運作10個協程,并行讀取HBase資料 for j := 0; j < 10; j++ { go func() { rowKey := "row_" + fmt.Sprintf("%d", j) // 構造行鍵 result, err := client.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)}) if err != nil { // 這裡增加錯誤處理,為了看到錯誤 fmt.Println(err) } if result != nil { fmt.Println("Routine: ", j, "Rowkey:" + string(result.Row)) for _, cv := range result.ColumnValues { fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value)) } } }() } time.Sleep(time.Second * 10) // 主程序睡眠10秒,等待協程結束}
運作後,會出現錯誤提示"get: out of order sequence response"。這是因為Apache提供的Go語言版本Thrift開發包不支援并發引起的。究竟什麼怎樣的實作導緻不能支援并發呢?我們深入代碼分析一下。
這裡看一下Apache Thrift官方庫提供的Go語言API,檔案路徑:https://github.com/apache/thrift/blob/master/lib/go/thrift/client.go
// ResponseMeta represents the metadata attached to the response.type ResponseMeta struct { // The headers in the response, if any. // If the underlying transport/protocol is not THeader, this will always be nil. Headers THeaderMap}type TClient interface { Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error)}type TStandardClient struct { seqId int32 iprot, oprot TProtocol}// TStandardClient implements TClient, and uses the standard message format for Thrift.// It is not safe for concurrent use. ***** 這裡加了星号func NewTStandardClient(inputProtocol, outputProtocol TProtocol) *TStandardClient { return &TStandardClient{ iprot: inputProtocol, oprot: outputProtocol, }}
由上面代碼片段可見,Apache Thrift庫Go版本接口官方已經注釋,這個用戶端不能安全的支援并發應用(上面加了五個 *部分,15行)。
我們來分析一下為什麼這個實作不支援并發應用。從Thrift庫的client.go檔案中找到的一個API方法,名字叫Call,開發者通過調用Call方法,與遠端HBase的Thrift服務互動。可以看到方法中使用了Send/Recv實作資料收發。這裡面資料發送和接收的會話通過seqId來标記。在單線程情況下是沒問題的。如果并發情況下,假設一個線程運作到下面加了星号(第2行)部分之前,此時seqId記作1,然後另一個線程開始運作,也運作到星号部分(第2行)并執行完,此時seqId變成2(由開頭p.seqId++引發)。然後再切換到線程1繼續運作,這時候線程1看到的seqId是2,後面也拿這個seqId操作,這樣線程1和2都使用seqId操作,引發序列号沖突。其實在我看來,一個解決沖突的辦法是對p.seqId++操作加鎖,不知為什麼沒有這樣實作。
func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error) { p.seqId++ // *** 注意這裡加了星号 seqId := p.seqId if err := p.Send(ctx, p.oprot, seqId, method, args); err != nil { return ResponseMeta{}, err } // method is oneway if result == nil { return ResponseMeta{}, nil } err := p.Recv(ctx, p.iprot, seqId, method, result) var headers THeaderMap if hp, ok := p.iprot.(*THeaderProtocol); ok { headers = hp.transport.readHeaders } return ResponseMeta{ Headers: headers, }, err}
通過上述分析可知,我們既可以修改Thrift庫代碼解決這個問題,也可以在外部應用程式中解決。前兩篇關于HBase的技術文章,都是通過直接修改開發包達到目的。這裡因為對Go語言的Thrift庫并不熟悉,貿然改動代碼恐怕會引起未知問題,我們就手動實作一個連接配接池,将連接配接資源管理起來,應對并發請求。
實作一個連接配接池并不複雜,有很多現成的參考執行個體。實際上,我們這裡給出的連接配接池也是根據網上執行個體,結合Thrift應用實際場景加以改造完成。下面給出連接配接池的例子。這裡需要讀者對具備Go語言的接口和通道的相關背景知識。作為Go語言初學者,個人覺得接口是Go語言很優秀的特性,靈活、高效,兼具了C++指針和類的功能,借助接口,可以實作類似多态、動态資料轉換等功能。
package utilsimport ( "sync" "time" "errors" "dev/hbase" // thrift工具生成的hbase 包 "github.com/apache/thrift/lib/go/thrift") type ConnRes interface { // 連接配接池成員接口 Close() error;} type Factory func() (ConnRes, error) // 建立連接配接資源的工廠方法 type Conn struct { // 連接配接結構體,包括一個連接配接資源,一個建立時間 conn ConnRes time time.Time} type ConnPool struct { // 連接配接池 mu sync.Mutex // 互斥量,用于并發通路控制 conns chan *Conn // 實際的“池”,用一個通道儲存連接配接 factory Factory // 建立連接配接的工廠方法 closed bool // 連接配接池是否關閉 connTimeout time.Duration // 每個連接配接逾時時限} func NewConnPool(factory Factory, capacity int, connTimeout time.Duration) (*ConnPool, error) { if capacity < 0 { // 設定容量,不能小于0 return nil, errors.New("Invalid params: capacity cannot be less than 0.") } if connTimeout <= 0 { return nil, errors.New("Invalid prams: connTimeout cannot be less than 0.") } cp := &ConnPool{ // 參數檢查後,設定連接配接池屬性 mu: sync.Mutex{}, conns: make(chan *Conn, capacity), factory: factory, closed: false, connTimeout: connTimeout, } for i := 0; i < capacity; i++ { // 預先建立capacity個連接配接,放入池子中 connRes, err := cp.factory() if err != nil { cp.Close() return nil, errors.New("Error in NewConnPool while calling factory") } cp.conns <- &Conn{conn: connRes, time: time.Now()} // 連接配接放入池中 } return cp, nil} func (cp *ConnPool) Get() (ConnRes, error) { // 取出一個連接配接 if cp.closed { return nil, errors.New("Connection pool closed.") } for { select { case connRes, ok := <-cp.conns: // 有資源可以擷取到 if !ok { return nil, errors.New("Connection pool closed.") } if time.Now().Sub(connRes.time) > cp.connTimeout { // 拿到的連接配接已經逾時,将其關閉,繼續取下一個 connRes.conn.Close() continue } return connRes.conn, nil // 成功擷取到一個連接配接 default: // 擷取不到連接配接,池子已空,那就新建立一個 connRes, err := cp.factory() if err != nil { return nil, err } return connRes, nil } }} func (cp *ConnPool) Put(conn ConnRes) error { // 用完歸還一個連接配接到連接配接池 if cp.closed { return errors.New("Connection pool closed.") } select { case cp.conns <- &Conn{conn: conn, time: time.Now()}: // 将連接配接放回到池子中,更新時間 return nil default: conn.Close() // 連接配接池已滿,無法放入資源,将這個連接配接關閉 return errors.New("Connection pool is full.") }} func (cp *ConnPool) Close() { // 關閉連接配接池 if cp.closed { return } cp.mu.Lock() cp.closed = true close(cp.conns) // 關閉通道,即連接配接“池” for conn := range cp.conns { conn.conn.Close() } cp.mu.Unlock()}
有了連接配接池後,結合上面例子,我們測試一下并發通路HBase。
先設定好環境,目前開發環境目錄是dev/,dev下面目錄結構如下:
dev # 開發環境根目錄├── hbase # thrift 生成的HBase程式包│ ├── GoUnusedProtection__.go│ ├── Hbase-consts.go│ ├── Hbase.go│ └── hbase-remote│ └── hbase-remote.go├── main.go # 測試主程式,并發通路HBase└── utils # 連接配接池 └── conn_pool.go
修改後,使用了連接配接池功能的并發通路程式main.go内容如下:
package mainimport ( "fmt" "time" "errors" "context" "dev/utils" // 連接配接池所在Go程式包 "dev/hbase" // thrift工具生成hbase工具包 "github.com/apache/thrift/lib/go/thrift" // Apache提供的go thrift工具包)const HOST = "hbase.master.node" // 根據具體環境設定,hbase主節點const PORT = "9090" // Hbase Thrift 服務端口const DEFAULT_TIMEOUT = time.Second * 10 // 連接配接預設逾時時間const DEFAULT_CONN_CAPCITY = 10 // 預設連接配接池容量 type HbaseConn struct { SocketConn *thrift.TSocket ClientConn *hbase.THBaseServiceClient}func (p *HbaseConn) Close() error { // 實作Close() error 方法相容utils.ConnRes接口 if p.SocketConn != nil { p.SocketConn.Close() return nil } return errors.New("Unable to call close the socket connection.")}func main() { table := "demo_tab" // 測試資料表 protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() dbConnPool, _ := utils.NewConnPool(func() (utils.ConnRes, error) { // 重要:建立連接配接池 tcpTransport, err := thrift.NewTSocket(HOST + ":" + PORT) if err != nil { fmt.Printf("Error when setup HBase socket connection: %s.\n", err.Error()) } connClient := hbase.NewTHBaseServiceClientFactory(tcpTransport, protocolFactory) return &HbaseConn{SocketConn: tcpTransport, ClientConn: connClient}, nil }, // 重要:HbaseConn類型相容ConnRes,是以可以作為Factory方法的傳回 DEFAULT_CONN_CAPCITY, DEFAULT_TIMEOUT) defer dbConnPool.Close() // 程式退出後,勿忘記關閉連接配接池 // 并發通路 for j := 0; j < 10; j++ { go func() { rowKey := "row_" + fmt.Sprintf("%d", j) // 設定rowKey conn, _ := dbConnPool.Get() // 從池子中擷取一個連接配接 defer dbConnPool.Put(conn) // 用後勿忘歸還連接配接入池 if !conn.(*HbaseConn).SocketConn.IsOpen() { // 判斷取到的連接配接TCP是否關閉,如果關閉,重新打開 conn.(*HbaseConn).SocketConn.Open() // 這裡有一個接口轉換,将ConnRes轉成HbaseConn,類似C++的dynamic_cast<> } result, err := conn.(*HbaseConn).ClientConn.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)}) // 擷取記錄 if err != nil { fmt.Println(err) } if result != nil { fmt.Println("Routine: ", j, "Rowkey:" + string(result.Row)) for _, cv := range result.ColumnValues { fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value)) } } return }() } time.Sleep(time.Second * 10) // 主程式睡眠10秒鐘,等待協程結束}
可以将上述程式在開發環境中編譯運作,運作結果發現不再有“get: out of order sequence response” 這樣的報錯,輸出結果也是正常的。
五、總結
本文介紹了使用Go語言通過Thrift程式包通路HBase的基本操作,包括環境建立、工具使用、程式開發以及一些技術陷阱的解決方法等,力求從頭到尾對這個主題進行完整的總結。囿于作者水準所限,文中錯誤疏漏在所難免,作為一個隻有個把月Go學習經驗,一萬來行Go項目開發經驗的初級人士,給出的程式實作肯定不是最優的,存在較大的優化空間。後面的工作中,如有更有價值的心得或者更優化的解決方案,也會及時輸出更新。
注:本文系原創,亦發表于作者微信公衆号,轉載請注明出處。