天天看點

使用Golang和Thrift2庫操作HBase技術總結

作者:大資料與人工智能分享

一、前言

最近連續釋出了幾篇Golang開發資料庫應用的技術文章。有讀者詢問項目背景,這裡簡單介紹一下。筆者現在主導設計開發一個大資料項目,涉及到ElasticSearch、HBase、MongoDB、MySql、ClickHouse等系統。

之是以用到這麼多資料庫系統,是因為整個流程涉及了資料開發、清洗、統計、分析、查詢和展示等環節,業務邏輯複雜,資料量龐大。為了最大化滿足功能需求同時兼顧性能,需要考慮到每個系統在資料處理方面的優點并将其發揮到極緻。

本文是Golang開發HBase應用方面第三篇技術心得,主要介紹如何通過Apache Thrift2這個協定庫進行HBase相關應用的開發,特别介紹了其中容易導緻問題的一些技術陷阱。

筆者在調研過程中發現,關于Thrift協定和開發庫的介紹資料比較多,使用Go語言結合Thrift開發的資料也能找到一些,但是中文社群裡,系統性的從頭到尾梳理Go語言結合Thrift進行HBase開發,并對技術細節加以分析的内容還是略顯匮乏。

也許本文沒有所謂技術創新點,但是結合可操作的執行個體的介紹開發流程,同時指出潛在的技術陷阱并給出解決方案,應該算是具有實踐指導意義的經驗總結,不算重新發明輪子。筆者一個小小的願望就是讀者通過閱讀這篇文章,直接就能上手Go語言HBase開發,解決實際問題。

接下來部分按如下組織:第二部分介紹準備工作,包括如何下載下傳、編譯和安裝Thrift;第三部分結合執行個體說明如何操作HBase資料庫;第四部分是高階話題,介紹如何如何通過連接配接池方式,達到Thrift對并發操作的支援;最後第五部分是全文總結。

本文預設背景是讀者已經熟悉Golang程式開發工具,并能夠進行基本程式開發。另外,本文所有執行個體都在Ubuntu-20.04.1Linux系統環境下運作測試通過,如無特别說明,文中會将Thrift/Thrift2混用,這裡僅指Thrift2版本。

二、準備工作

2.1 Thrift2 下載下傳,編譯和安裝

Thirft庫的源代碼需要從網上下載下傳,在Linux環境下,運作:

wget https://dlcdn.apache.org/thrift/0.16.0/thrift-0.16.0.tar.gz           

即可下載下傳Thrift源碼包,目前最新版本是0.16.0。

下載下傳完成後,運作tar xzvf thrift-0.16.0.tar.gz 解壓縮,然後開始編譯。在編譯前,確定環境所有工具包都已經齊備,如果不确定,在Ubuntu Linux環境下,運作以下指令把所需要元件一股腦裝上:

./configuremake && make install           

完成編譯和安裝。如果不運作make install,編譯好的程式會在thrift-0.16.0/compiler/cpp 路徑下,名稱為thrift。

2.2 啟動Thrift2服務

這一步要保證使用的HBase系統開啟Thrift服務,具體方法是在HBase的主節點,運作以下啟動指令:

下載下傳https://raw.githubusercontent.com/apache/hbase/2.4.11RC0/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift2/hbase.thrift 檔案,不知為何,這裡無法使用wget下載下傳。運作指令 thrift --out ./hbase --gen go Hbase.thrift 運作成功後,目前路徑下生成hbase/ 目錄,就是對應的Go開發包           
## 運作指令後,在目錄下将新生成一名為 hbase 的目錄,裡面就包含了所需的go檔案了## hbase的目錄結構如下:hbase   └── GoUnusedProtection__.go       ├── Hbase-consts.go       ├── Hbase.go       ├── t_h_base_service-remote         ## 該檔案夾是demo檔案夾,可以删掉           └── t_h_base_service-remote.go  ## 用戶端的測試代碼,直接可拿來用           

将生成hbase目錄拷貝到你的Go開發環境目錄下,至此就完成了Thrift庫準備工作。

生成的hbase開發包,比較重要的一個檔案是hbase/Hbase.go,大概有2萬餘行,相當于通路Thrift遠端過程調用(RPC) 的包裹器(Wrapper),将Go語言對HBase的操作請求轉成RPC請求,并處理傳回結果。

三、開發工作

本章節給出一個例子,實作基本的HBase操作。

3.1 資料表準備

在HBase資料庫中建立資料表'demo_tab',包含一個列簇(Family)'info',建表指令如下:

create 'demo_tab', 'info'           

然後輸入資料,這裡輸入10條資料,行鍵(row key)分别是'row_1' ... 'row10',資料列(Qualifier)分别是info:val_1, info:val_2。

3.2 簡單例子

這裡假設開發環境目錄為dev/,另外,為了節省篇幅,這裡省去了所有錯誤處理。

package mainimport (    "fmt"    "github.com/apache/thrift/lib/go/thrift"  // Apache提供的go thrift工具包    "context"    "dev/hbase" // thrift工具生成hbase工具包,放在dev/ 目錄下)const HOST = "hbase.master.node"  // 根據具體環境設定,hbase主節點const PORT = "9090" func main() {    table := "demo_tab"  // 測試資料表    rowKey := "row_5"    // 某一行,這裡row_5行    newRowKey := "row_11" // 新寫入一行    family := "info"  // 列簇名    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()    transport, _ := thrift.NewTSocket(HOST + ":" + PORT)  // 建立連接配接    client := hbase.NewTHBaseServiceClientFactory(transport, protocolFactory)  // 用戶端與連接配接關聯    transport.Open()  // 執行連接配接操作    defer transport.Close()  // 結束關閉    /*** 判斷資料表是否存在 ***/    isExists, _ := client.TableExists(context.Background(), &hbase.TTableName{Ns: []byte(family), Qualifier: []byte(table)})    fmt.Printf("table{%s} Exists:%t\n", table, isExists)    /*** 判斷記錄是否存在 ***/    isExists, _ = client.Exists(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)})    fmt.Printf("rowkey{%s} in table{%s} Exists:%t\n", rowKey, table, isExists)    /*** Put 一條資料 ***/    cvArr := []*hbase.TColumnValue{              {                  Family: []byte(family),                  Qualifier: []byte("val_1"),                  Value: []byte("new value of column val_1"),              },          }    data2Put := hbase.TPut{Row: []byte(newRowKey), ColumnValues: cvArr}    client.Put(context.Background(), []byte(table), &data2Put)    /*** Get 寫入的資料 ***/    result, _ := client.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(newRowKey)})    fmt.Println("Rowkey:" + string(result.Row))    for _, cv := range result.ColumnValues {      fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value))     }    /*** 帶有過濾器的Scan操作 ***/     startRow, stopRow := "row_1", "row_9"    qual, val := "val_1", "aaa"      col1 := hbase.TColumn{Family: []byte(family), Qualifier: []byte("val_1")}    col2 := hbase.TColumn{Family: []byte(family), Qualifier: []byte("val_2")}    filterStr := fmt.Sprintf("SingleColumnValueFilter('%s','%s',=,'binary:%s', false, true)", family, qual, val)    results, _ := client.GetScannerResults(context.Background(), []byte(table), &hbase.TScan{           StartRow: []byte(startRow),  // 掃描範圍起始               StopRow: []byte(stopRow),   // 掃描範圍結束           FilterString: []byte(filterStr),   // 設定過濾字段和值           Columns: []*hbase.TColumn{&col1, &col2},  // 要讀取的字段           MaxVersions: 1,  // ** 注意版本,如果需要讀取最新版本,要跟Hbase表屬性設定一緻,預設為0        }, 100)    fmt.Printf("GetScannerResults %d done\n", len(results))    for _, k := range results {        fmt.Println("scan Rowkey:" + string(k.Row))        for _, cv := range k.ColumnValues {           fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value))        }    }}           

上述可以正常編譯運作,注意HBase中對應的表要有資料,以及程式中各類環境設定要與真實情況一緻。

3.3 過濾器問題

3.2節給出的例子裡面,有一個帶有過濾器的掃描操作,具體作用是對掃描查詢的結果按照一定條件過濾。關于過濾器的說明,這裡有個參考連結,将過濾器的種類、作用和格式都做了詳細說明,非常具有參考價值:

https://www.cnblogs.com/ZisZ/p/9667271.html

關于掃描過濾器,有一個新手很容易出問題的地方,在3.2節程式例子中我用星号标記出來(54行)——MaxVersions屬性的設定。

Go語言會對結構體自動設定預設值,如果不顯式設定MaxVersions屬性的值,在程式中這個值就會是0,然後使用者運作時就會發現,明明資料庫有符合條件的值,怎麼傳回結果是空?在其他設定都确認無誤情況下,MaxVersions是一個容易掉入陷阱的地方。這個屬性意義是傳回資料庫中資料版本允許的最大值——HBase對于字段可以設定版本屬性,允許最多保留N個曆史版本,每次更新,版本也會增加。第一次寫入的資料,版本号就是1,如果隻保留最新版本,版本号就停留在1。如果MaxVersions設定成0,就沒有符合條件的資料被傳回,這就是出現結果為空的原因。

四、高階話題 —— 連接配接池問題

介紹完基本例子,本章介紹一個進階話題——Thrift連接配接池問題。現用一個例子說明,我們上一章節例子加以改造,利用Go協程的并發特性,對HBase開10個協程并發讀取,看看結果怎樣。

package mainimport (    "fmt"    "time"    "context"    "dev/hbase"     "github.com/apache/thrift/lib/go/thrift"  )const HOST = "hbase.master.node"   // 根據具體環境設定,hbase主節點const PORT = "9090"func main() {    table := "demo_tab"  // 測試資料表    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()    transport, _ := thrift.NewTSocket(HOST + ":" + PORT)  // 建立連接配接    client := hbase.NewTHBaseServiceClientFactory(transport, protocolFactory)  // 用戶端與連接配接關聯    transport.Open()  // 執行連接配接操作    defer transport.Close()  // 結束關閉    // 運作10個協程,并行讀取HBase資料      for j := 0; j < 10; j++ {        go func() {            rowKey := "row_" + fmt.Sprintf("%d", j) // 構造行鍵            result, err := client.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)})            if err != nil {  // 這裡增加錯誤處理,為了看到錯誤                fmt.Println(err)            }            if result != nil {                fmt.Println("Routine: ", j, "Rowkey:" + string(result.Row))                for _, cv := range result.ColumnValues {                    fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value))                }            }        }()    }      time.Sleep(time.Second * 10)  // 主程序睡眠10秒,等待協程結束}           

運作後,會出現錯誤提示"get: out of order sequence response"。這是因為Apache提供的Go語言版本Thrift開發包不支援并發引起的。究竟什麼怎樣的實作導緻不能支援并發呢?我們深入代碼分析一下。

這裡看一下Apache Thrift官方庫提供的Go語言API,檔案路徑:https://github.com/apache/thrift/blob/master/lib/go/thrift/client.go

// ResponseMeta represents the metadata attached to the response.type ResponseMeta struct {    // The headers in the response, if any.    // If the underlying transport/protocol is not THeader, this will always be nil.    Headers THeaderMap}type TClient interface {    Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error)}type TStandardClient struct {    seqId        int32    iprot, oprot TProtocol}// TStandardClient implements TClient, and uses the standard message format for Thrift.// It is not safe for concurrent use. ***** 這裡加了星号func NewTStandardClient(inputProtocol, outputProtocol TProtocol) *TStandardClient {    return &TStandardClient{        iprot: inputProtocol,        oprot: outputProtocol,    }}           

由上面代碼片段可見,Apache Thrift庫Go版本接口官方已經注釋,這個用戶端不能安全的支援并發應用(上面加了五個 *部分,15行)。

我們來分析一下為什麼這個實作不支援并發應用。從Thrift庫的client.go檔案中找到的一個API方法,名字叫Call,開發者通過調用Call方法,與遠端HBase的Thrift服務互動。可以看到方法中使用了Send/Recv實作資料收發。這裡面資料發送和接收的會話通過seqId來标記。在單線程情況下是沒問題的。如果并發情況下,假設一個線程運作到下面加了星号(第2行)部分之前,此時seqId記作1,然後另一個線程開始運作,也運作到星号部分(第2行)并執行完,此時seqId變成2(由開頭p.seqId++引發)。然後再切換到線程1繼續運作,這時候線程1看到的seqId是2,後面也拿這個seqId操作,這樣線程1和2都使用seqId操作,引發序列号沖突。其實在我看來,一個解決沖突的辦法是對p.seqId++操作加鎖,不知為什麼沒有這樣實作。

func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error) {    p.seqId++  // *** 注意這裡加了星号    seqId := p.seqId    if err := p.Send(ctx, p.oprot, seqId, method, args); err != nil {       return ResponseMeta{}, err    }    // method is oneway    if result == nil {        return ResponseMeta{}, nil    }    err := p.Recv(ctx, p.iprot, seqId, method, result)    var headers THeaderMap    if hp, ok := p.iprot.(*THeaderProtocol); ok {        headers = hp.transport.readHeaders    }    return ResponseMeta{        Headers: headers,    }, err}           

通過上述分析可知,我們既可以修改Thrift庫代碼解決這個問題,也可以在外部應用程式中解決。前兩篇關于HBase的技術文章,都是通過直接修改開發包達到目的。這裡因為對Go語言的Thrift庫并不熟悉,貿然改動代碼恐怕會引起未知問題,我們就手動實作一個連接配接池,将連接配接資源管理起來,應對并發請求。

實作一個連接配接池并不複雜,有很多現成的參考執行個體。實際上,我們這裡給出的連接配接池也是根據網上執行個體,結合Thrift應用實際場景加以改造完成。下面給出連接配接池的例子。這裡需要讀者對具備Go語言的接口和通道的相關背景知識。作為Go語言初學者,個人覺得接口是Go語言很優秀的特性,靈活、高效,兼具了C++指針和類的功能,借助接口,可以實作類似多态、動态資料轉換等功能。

package utilsimport (    "sync"    "time"    "errors"    "dev/hbase"  // thrift工具生成的hbase 包    "github.com/apache/thrift/lib/go/thrift") type ConnRes interface {  // 連接配接池成員接口    Close() error;} type Factory func() (ConnRes, error)  // 建立連接配接資源的工廠方法 type Conn struct {  // 連接配接結構體,包括一個連接配接資源,一個建立時間    conn ConnRes    time time.Time} type ConnPool struct {    // 連接配接池    mu  sync.Mutex        // 互斥量,用于并發通路控制    conns   chan *Conn    // 實際的“池”,用一個通道儲存連接配接    factory Factory       // 建立連接配接的工廠方法    closed  bool           // 連接配接池是否關閉    connTimeout time.Duration        // 每個連接配接逾時時限} func NewConnPool(factory Factory, capacity int, connTimeout time.Duration) (*ConnPool, error) {    if capacity < 0 { // 設定容量,不能小于0        return nil, errors.New("Invalid params: capacity cannot be less than 0.")    }    if connTimeout <= 0 {        return nil, errors.New("Invalid prams: connTimeout cannot be less than 0.")    }    cp := &ConnPool{   // 參數檢查後,設定連接配接池屬性        mu:         sync.Mutex{},        conns:      make(chan *Conn, capacity),        factory:    factory,        closed:     false,        connTimeout:    connTimeout,    }    for i := 0; i < capacity; i++ {    // 預先建立capacity個連接配接,放入池子中        connRes, err := cp.factory()        if err != nil {            cp.Close()            return nil, errors.New("Error in NewConnPool while calling factory")        }        cp.conns <- &Conn{conn: connRes, time: time.Now()}  // 連接配接放入池中    }    return cp, nil} func (cp *ConnPool) Get() (ConnRes, error) {  // 取出一個連接配接    if cp.closed {        return nil, errors.New("Connection pool closed.")    }    for {        select {        case connRes, ok := <-cp.conns:  // 有資源可以擷取到            if !ok {                return nil, errors.New("Connection pool closed.")            }            if time.Now().Sub(connRes.time) > cp.connTimeout { // 拿到的連接配接已經逾時,将其關閉,繼續取下一個                connRes.conn.Close()                continue            }            return connRes.conn, nil  // 成功擷取到一個連接配接        default:  // 擷取不到連接配接,池子已空,那就新建立一個            connRes, err := cp.factory()            if err != nil {                return nil, err            }            return connRes, nil        }    }} func (cp *ConnPool) Put(conn ConnRes) error {  // 用完歸還一個連接配接到連接配接池    if cp.closed {        return errors.New("Connection pool closed.")    }    select {    case cp.conns <- &Conn{conn: conn, time: time.Now()}: // 将連接配接放回到池子中,更新時間        return nil    default:        conn.Close()       // 連接配接池已滿,無法放入資源,将這個連接配接關閉        return errors.New("Connection pool is full.")    }} func (cp *ConnPool) Close() {  // 關閉連接配接池    if cp.closed {        return    }    cp.mu.Lock()    cp.closed = true    close(cp.conns)  // 關閉通道,即連接配接“池”    for conn := range cp.conns {        conn.conn.Close()    }    cp.mu.Unlock()}           

有了連接配接池後,結合上面例子,我們測試一下并發通路HBase。

先設定好環境,目前開發環境目錄是dev/,dev下面目錄結構如下:

dev  # 開發環境根目錄├── hbase      # thrift 生成的HBase程式包│   ├── GoUnusedProtection__.go│   ├── Hbase-consts.go│   ├── Hbase.go│   └── hbase-remote│       └── hbase-remote.go├── main.go   # 測試主程式,并發通路HBase└── utils  # 連接配接池    └── conn_pool.go           

修改後,使用了連接配接池功能的并發通路程式main.go内容如下:

package mainimport (    "fmt"    "time"    "errors"    "context"    "dev/utils" // 連接配接池所在Go程式包    "dev/hbase" // thrift工具生成hbase工具包    "github.com/apache/thrift/lib/go/thrift"  // Apache提供的go thrift工具包)const HOST = "hbase.master.node"  // 根據具體環境設定,hbase主節點const PORT = "9090"  // Hbase Thrift 服務端口const DEFAULT_TIMEOUT = time.Second * 10  // 連接配接預設逾時時間const DEFAULT_CONN_CAPCITY = 10  // 預設連接配接池容量 type HbaseConn struct {    SocketConn  *thrift.TSocket    ClientConn  *hbase.THBaseServiceClient}func (p *HbaseConn) Close() error {  // 實作Close() error 方法相容utils.ConnRes接口    if p.SocketConn != nil {        p.SocketConn.Close()        return nil    }    return errors.New("Unable to call close the socket connection.")}func main() {    table := "demo_tab"  // 測試資料表    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()    dbConnPool, _ := utils.NewConnPool(func() (utils.ConnRes, error) {   // 重要:建立連接配接池                tcpTransport, err := thrift.NewTSocket(HOST + ":" + PORT)                if err != nil {                    fmt.Printf("Error when setup HBase socket connection: %s.\n", err.Error())                }                connClient := hbase.NewTHBaseServiceClientFactory(tcpTransport, protocolFactory)                return &HbaseConn{SocketConn: tcpTransport, ClientConn: connClient}, nil            },  // 重要:HbaseConn類型相容ConnRes,是以可以作為Factory方法的傳回            DEFAULT_CONN_CAPCITY,            DEFAULT_TIMEOUT)    defer dbConnPool.Close()  // 程式退出後,勿忘記關閉連接配接池    //  并發通路     for j := 0; j < 10; j++ {        go func() {            rowKey := "row_" + fmt.Sprintf("%d", j)  // 設定rowKey            conn, _ := dbConnPool.Get() // 從池子中擷取一個連接配接            defer dbConnPool.Put(conn) // 用後勿忘歸還連接配接入池            if !conn.(*HbaseConn).SocketConn.IsOpen() { // 判斷取到的連接配接TCP是否關閉,如果關閉,重新打開                conn.(*HbaseConn).SocketConn.Open() // 這裡有一個接口轉換,将ConnRes轉成HbaseConn,類似C++的dynamic_cast<>            }            result, err := conn.(*HbaseConn).ClientConn.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)}) // 擷取記錄            if err != nil {                fmt.Println(err)            }            if result != nil {                fmt.Println("Routine: ", j, "Rowkey:" + string(result.Row))                for _, cv := range result.ColumnValues {                    fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value))                }            }            return        }()    }      time.Sleep(time.Second * 10) // 主程式睡眠10秒鐘,等待協程結束}           

可以将上述程式在開發環境中編譯運作,運作結果發現不再有“get: out of order sequence response” 這樣的報錯,輸出結果也是正常的。

五、總結

本文介紹了使用Go語言通過Thrift程式包通路HBase的基本操作,包括環境建立、工具使用、程式開發以及一些技術陷阱的解決方法等,力求從頭到尾對這個主題進行完整的總結。囿于作者水準所限,文中錯誤疏漏在所難免,作為一個隻有個把月Go學習經驗,一萬來行Go項目開發經驗的初級人士,給出的程式實作肯定不是最優的,存在較大的優化空間。後面的工作中,如有更有價值的心得或者更優化的解決方案,也會及時輸出更新。

注:本文系原創,亦發表于作者微信公衆号,轉載請注明出處。