天天看点

使用Golang和Thrift2库操作HBase技术总结

作者:大数据与人工智能分享

一、前言

最近连续发布了几篇Golang开发数据库应用的技术文章。有读者询问项目背景,这里简单介绍一下。笔者现在主导设计开发一个大数据项目,涉及到ElasticSearch、HBase、MongoDB、MySql、ClickHouse等系统。

之所以用到这么多数据库系统,是因为整个流程涉及了数据开发、清洗、统计、分析、查询和展示等环节,业务逻辑复杂,数据量庞大。为了最大化满足功能需求同时兼顾性能,需要考虑到每个系统在数据处理方面的优点并将其发挥到极致。

本文是Golang开发HBase应用方面第三篇技术心得,主要介绍如何通过Apache Thrift2这个协议库进行HBase相关应用的开发,特别介绍了其中容易导致问题的一些技术陷阱。

笔者在调研过程中发现,关于Thrift协议和开发库的介绍资料比较多,使用Go语言结合Thrift开发的资料也能找到一些,但是中文社区里,系统性的从头到尾梳理Go语言结合Thrift进行HBase开发,并对技术细节加以分析的内容还是略显匮乏。

也许本文没有所谓技术创新点,但是结合可操作的实例的介绍开发流程,同时指出潜在的技术陷阱并给出解决方案,应该算是具有实践指导意义的经验总结,不算重新发明轮子。笔者一个小小的愿望就是读者通过阅读这篇文章,直接就能上手Go语言HBase开发,解决实际问题。

接下来部分按如下组织:第二部分介绍准备工作,包括如何下载、编译和安装Thrift;第三部分结合实例说明如何操作HBase数据库;第四部分是高阶话题,介绍如何如何通过连接池方式,达到Thrift对并发操作的支持;最后第五部分是全文总结。

本文预设背景是读者已经熟悉Golang程序开发工具,并能够进行基本程序开发。另外,本文所有实例都在Ubuntu-20.04.1Linux系统环境下运行测试通过,如无特别说明,文中会将Thrift/Thrift2混用,这里仅指Thrift2版本。

二、准备工作

2.1 Thrift2 下载,编译和安装

Thirft库的源代码需要从网上下载,在Linux环境下,运行:

wget https://dlcdn.apache.org/thrift/0.16.0/thrift-0.16.0.tar.gz           

即可下载Thrift源码包,当前最新版本是0.16.0。

下载完成后,运行tar xzvf thrift-0.16.0.tar.gz 解压缩,然后开始编译。在编译前,确保环境所有工具包都已经齐备,如果不确定,在Ubuntu Linux环境下,运行以下命令把所需要组件一股脑装上:

./configuremake && make install           

完成编译和安装。如果不运行make install,编译好的程序会在thrift-0.16.0/compiler/cpp 路径下,名称为thrift。

2.2 启动Thrift2服务

这一步要保证使用的HBase系统开启Thrift服务,具体方法是在HBase的主节点,运行以下启动命令:

下载https://raw.githubusercontent.com/apache/hbase/2.4.11RC0/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift2/hbase.thrift 文件,不知为何,这里无法使用wget下载。运行命令 thrift --out ./hbase --gen go Hbase.thrift 运行成功后,当前路径下生成hbase/ 目录,就是对应的Go开发包           
## 运行命令后,在目录下将新生成一名为 hbase 的目录,里面就包含了所需的go文件了## hbase的目录结构如下:hbase   └── GoUnusedProtection__.go       ├── Hbase-consts.go       ├── Hbase.go       ├── t_h_base_service-remote         ## 该文件夹是demo文件夹,可以删掉           └── t_h_base_service-remote.go  ## 客户端的测试代码,直接可拿来用           

将生成hbase目录拷贝到你的Go开发环境目录下,至此就完成了Thrift库准备工作。

生成的hbase开发包,比较重要的一个文件是hbase/Hbase.go,大概有2万余行,相当于访问Thrift远程过程调用(RPC) 的包裹器(Wrapper),将Go语言对HBase的操作请求转成RPC请求,并处理返回结果。

三、开发工作

本章节给出一个例子,实现基本的HBase操作。

3.1 数据表准备

在HBase数据库中创建数据表'demo_tab',包含一个列簇(Family)'info',建表命令如下:

create 'demo_tab', 'info'           

然后输入数据,这里输入10条数据,行键(row key)分别是'row_1' ... 'row10',数据列(Qualifier)分别是info:val_1, info:val_2。

3.2 简单例子

这里假设开发环境目录为dev/,另外,为了节省篇幅,这里省去了所有错误处理。

package mainimport (    "fmt"    "github.com/apache/thrift/lib/go/thrift"  // Apache提供的go thrift工具包    "context"    "dev/hbase" // thrift工具生成hbase工具包,放在dev/ 目录下)const HOST = "hbase.master.node"  // 根据具体环境设置,hbase主节点const PORT = "9090" func main() {    table := "demo_tab"  // 测试数据表    rowKey := "row_5"    // 某一行,这里row_5行    newRowKey := "row_11" // 新写入一行    family := "info"  // 列簇名    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()    transport, _ := thrift.NewTSocket(HOST + ":" + PORT)  // 创建连接    client := hbase.NewTHBaseServiceClientFactory(transport, protocolFactory)  // 客户端与连接关联    transport.Open()  // 执行连接操作    defer transport.Close()  // 结束关闭    /*** 判断数据表是否存在 ***/    isExists, _ := client.TableExists(context.Background(), &hbase.TTableName{Ns: []byte(family), Qualifier: []byte(table)})    fmt.Printf("table{%s} Exists:%t\n", table, isExists)    /*** 判断记录是否存在 ***/    isExists, _ = client.Exists(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)})    fmt.Printf("rowkey{%s} in table{%s} Exists:%t\n", rowKey, table, isExists)    /*** Put 一条数据 ***/    cvArr := []*hbase.TColumnValue{              {                  Family: []byte(family),                  Qualifier: []byte("val_1"),                  Value: []byte("new value of column val_1"),              },          }    data2Put := hbase.TPut{Row: []byte(newRowKey), ColumnValues: cvArr}    client.Put(context.Background(), []byte(table), &data2Put)    /*** Get 写入的数据 ***/    result, _ := client.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(newRowKey)})    fmt.Println("Rowkey:" + string(result.Row))    for _, cv := range result.ColumnValues {      fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value))     }    /*** 带有过滤器的Scan操作 ***/     startRow, stopRow := "row_1", "row_9"    qual, val := "val_1", "aaa"      col1 := hbase.TColumn{Family: []byte(family), Qualifier: []byte("val_1")}    col2 := hbase.TColumn{Family: []byte(family), Qualifier: []byte("val_2")}    filterStr := fmt.Sprintf("SingleColumnValueFilter('%s','%s',=,'binary:%s', false, true)", family, qual, val)    results, _ := client.GetScannerResults(context.Background(), []byte(table), &hbase.TScan{           StartRow: []byte(startRow),  // 扫描范围起始               StopRow: []byte(stopRow),   // 扫描范围结束           FilterString: []byte(filterStr),   // 设定过滤字段和值           Columns: []*hbase.TColumn{&col1, &col2},  // 要读取的字段           MaxVersions: 1,  // ** 注意版本,如果需要读取最新版本,要跟Hbase表属性设置一致,默认为0        }, 100)    fmt.Printf("GetScannerResults %d done\n", len(results))    for _, k := range results {        fmt.Println("scan Rowkey:" + string(k.Row))        for _, cv := range k.ColumnValues {           fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value))        }    }}           

上述可以正常编译运行,注意HBase中对应的表要有数据,以及程序中各类环境设置要与真实情况一致。

3.3 过滤器问题

3.2节给出的例子里面,有一个带有过滤器的扫描操作,具体作用是对扫描查询的结果按照一定条件过滤。关于过滤器的说明,这里有个参考链接,将过滤器的种类、作用和格式都做了详细说明,非常具有参考价值:

https://www.cnblogs.com/ZisZ/p/9667271.html

关于扫描过滤器,有一个新手很容易出问题的地方,在3.2节程序例子中我用星号标记出来(54行)——MaxVersions属性的设置。

Go语言会对结构体自动设置默认值,如果不显式设置MaxVersions属性的值,在程序中这个值就会是0,然后用户运行时就会发现,明明数据库有符合条件的值,怎么返回结果是空?在其他设置都确认无误情况下,MaxVersions是一个容易掉入陷阱的地方。这个属性意义是返回数据库中数据版本允许的最大值——HBase对于字段可以设置版本属性,允许最多保留N个历史版本,每次更新,版本也会增加。第一次写入的数据,版本号就是1,如果只保留最新版本,版本号就停留在1。如果MaxVersions设置成0,就没有符合条件的数据被返回,这就是出现结果为空的原因。

四、高阶话题 —— 连接池问题

介绍完基本例子,本章介绍一个高级话题——Thrift连接池问题。现用一个例子说明,我们上一章节例子加以改造,利用Go协程的并发特性,对HBase开10个协程并发读取,看看结果怎样。

package mainimport (    "fmt"    "time"    "context"    "dev/hbase"     "github.com/apache/thrift/lib/go/thrift"  )const HOST = "hbase.master.node"   // 根据具体环境设置,hbase主节点const PORT = "9090"func main() {    table := "demo_tab"  // 测试数据表    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()    transport, _ := thrift.NewTSocket(HOST + ":" + PORT)  // 创建连接    client := hbase.NewTHBaseServiceClientFactory(transport, protocolFactory)  // 客户端与连接关联    transport.Open()  // 执行连接操作    defer transport.Close()  // 结束关闭    // 运行10个协程,并行读取HBase数据      for j := 0; j < 10; j++ {        go func() {            rowKey := "row_" + fmt.Sprintf("%d", j) // 构造行键            result, err := client.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)})            if err != nil {  // 这里增加错误处理,为了看到错误                fmt.Println(err)            }            if result != nil {                fmt.Println("Routine: ", j, "Rowkey:" + string(result.Row))                for _, cv := range result.ColumnValues {                    fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value))                }            }        }()    }      time.Sleep(time.Second * 10)  // 主进程睡眠10秒,等待协程结束}           

运行后,会出现错误提示"get: out of order sequence response"。这是因为Apache提供的Go语言版本Thrift开发包不支持并发引起的。究竟什么怎样的实现导致不能支持并发呢?我们深入代码分析一下。

这里看一下Apache Thrift官方库提供的Go语言API,文件路径:https://github.com/apache/thrift/blob/master/lib/go/thrift/client.go

// ResponseMeta represents the metadata attached to the response.type ResponseMeta struct {    // The headers in the response, if any.    // If the underlying transport/protocol is not THeader, this will always be nil.    Headers THeaderMap}type TClient interface {    Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error)}type TStandardClient struct {    seqId        int32    iprot, oprot TProtocol}// TStandardClient implements TClient, and uses the standard message format for Thrift.// It is not safe for concurrent use. ***** 这里加了星号func NewTStandardClient(inputProtocol, outputProtocol TProtocol) *TStandardClient {    return &TStandardClient{        iprot: inputProtocol,        oprot: outputProtocol,    }}           

由上面代码片段可见,Apache Thrift库Go版本接口官方已经注释,这个客户端不能安全的支持并发应用(上面加了五个 *部分,15行)。

我们来分析一下为什么这个实现不支持并发应用。从Thrift库的client.go文件中找到的一个API方法,名字叫Call,开发者通过调用Call方法,与远程HBase的Thrift服务交互。可以看到方法中使用了Send/Recv实现数据收发。这里面数据发送和接收的会话通过seqId来标记。在单线程情况下是没问题的。如果并发情况下,假设一个线程运行到下面加了星号(第2行)部分之前,此时seqId记作1,然后另一个线程开始运行,也运行到星号部分(第2行)并执行完,此时seqId变成2(由开头p.seqId++引发)。然后再切换到线程1继续运行,这时候线程1看到的seqId是2,后面也拿这个seqId操作,这样线程1和2都使用seqId操作,引发序列号冲突。其实在我看来,一个解决冲突的办法是对p.seqId++操作加锁,不知为什么没有这样实现。

func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error) {    p.seqId++  // *** 注意这里加了星号    seqId := p.seqId    if err := p.Send(ctx, p.oprot, seqId, method, args); err != nil {       return ResponseMeta{}, err    }    // method is oneway    if result == nil {        return ResponseMeta{}, nil    }    err := p.Recv(ctx, p.iprot, seqId, method, result)    var headers THeaderMap    if hp, ok := p.iprot.(*THeaderProtocol); ok {        headers = hp.transport.readHeaders    }    return ResponseMeta{        Headers: headers,    }, err}           

通过上述分析可知,我们既可以修改Thrift库代码解决这个问题,也可以在外部应用程序中解决。前两篇关于HBase的技术文章,都是通过直接修改开发包达到目的。这里因为对Go语言的Thrift库并不熟悉,贸然改动代码恐怕会引起未知问题,我们就手动实现一个连接池,将连接资源管理起来,应对并发请求。

实现一个连接池并不复杂,有很多现成的参考实例。实际上,我们这里给出的连接池也是根据网上实例,结合Thrift应用实际场景加以改造完成。下面给出连接池的例子。这里需要读者对具备Go语言的接口和通道的相关背景知识。作为Go语言初学者,个人觉得接口是Go语言很优秀的特性,灵活、高效,兼具了C++指针和类的功能,借助接口,可以实现类似多态、动态数据转换等功能。

package utilsimport (    "sync"    "time"    "errors"    "dev/hbase"  // thrift工具生成的hbase 包    "github.com/apache/thrift/lib/go/thrift") type ConnRes interface {  // 连接池成员接口    Close() error;} type Factory func() (ConnRes, error)  // 创建连接资源的工厂方法 type Conn struct {  // 连接结构体,包括一个连接资源,一个创建时间    conn ConnRes    time time.Time} type ConnPool struct {    // 连接池    mu  sync.Mutex        // 互斥量,用于并发访问控制    conns   chan *Conn    // 实际的“池”,用一个通道保存连接    factory Factory       // 创建连接的工厂方法    closed  bool           // 连接池是否关闭    connTimeout time.Duration        // 每个连接超时时限} func NewConnPool(factory Factory, capacity int, connTimeout time.Duration) (*ConnPool, error) {    if capacity < 0 { // 设置容量,不能小于0        return nil, errors.New("Invalid params: capacity cannot be less than 0.")    }    if connTimeout <= 0 {        return nil, errors.New("Invalid prams: connTimeout cannot be less than 0.")    }    cp := &ConnPool{   // 参数检查后,设置连接池属性        mu:         sync.Mutex{},        conns:      make(chan *Conn, capacity),        factory:    factory,        closed:     false,        connTimeout:    connTimeout,    }    for i := 0; i < capacity; i++ {    // 预先创建capacity个连接,放入池子中        connRes, err := cp.factory()        if err != nil {            cp.Close()            return nil, errors.New("Error in NewConnPool while calling factory")        }        cp.conns <- &Conn{conn: connRes, time: time.Now()}  // 连接放入池中    }    return cp, nil} func (cp *ConnPool) Get() (ConnRes, error) {  // 取出一个连接    if cp.closed {        return nil, errors.New("Connection pool closed.")    }    for {        select {        case connRes, ok := <-cp.conns:  // 有资源可以获取到            if !ok {                return nil, errors.New("Connection pool closed.")            }            if time.Now().Sub(connRes.time) > cp.connTimeout { // 拿到的连接已经超时,将其关闭,继续取下一个                connRes.conn.Close()                continue            }            return connRes.conn, nil  // 成功获取到一个连接        default:  // 获取不到连接,池子已空,那就新创建一个            connRes, err := cp.factory()            if err != nil {                return nil, err            }            return connRes, nil        }    }} func (cp *ConnPool) Put(conn ConnRes) error {  // 用完归还一个连接到连接池    if cp.closed {        return errors.New("Connection pool closed.")    }    select {    case cp.conns <- &Conn{conn: conn, time: time.Now()}: // 将连接放回到池子中,更新时间        return nil    default:        conn.Close()       // 连接池已满,无法放入资源,将这个连接关闭        return errors.New("Connection pool is full.")    }} func (cp *ConnPool) Close() {  // 关闭连接池    if cp.closed {        return    }    cp.mu.Lock()    cp.closed = true    close(cp.conns)  // 关闭通道,即连接“池”    for conn := range cp.conns {        conn.conn.Close()    }    cp.mu.Unlock()}           

有了连接池后,结合上面例子,我们测试一下并发访问HBase。

先设置好环境,当前开发环境目录是dev/,dev下面目录结构如下:

dev  # 开发环境根目录├── hbase      # thrift 生成的HBase程序包│   ├── GoUnusedProtection__.go│   ├── Hbase-consts.go│   ├── Hbase.go│   └── hbase-remote│       └── hbase-remote.go├── main.go   # 测试主程序,并发访问HBase└── utils  # 连接池    └── conn_pool.go           

修改后,使用了连接池功能的并发访问程序main.go内容如下:

package mainimport (    "fmt"    "time"    "errors"    "context"    "dev/utils" // 连接池所在Go程序包    "dev/hbase" // thrift工具生成hbase工具包    "github.com/apache/thrift/lib/go/thrift"  // Apache提供的go thrift工具包)const HOST = "hbase.master.node"  // 根据具体环境设置,hbase主节点const PORT = "9090"  // Hbase Thrift 服务端口const DEFAULT_TIMEOUT = time.Second * 10  // 连接默认超时时间const DEFAULT_CONN_CAPCITY = 10  // 默认连接池容量 type HbaseConn struct {    SocketConn  *thrift.TSocket    ClientConn  *hbase.THBaseServiceClient}func (p *HbaseConn) Close() error {  // 实现Close() error 方法兼容utils.ConnRes接口    if p.SocketConn != nil {        p.SocketConn.Close()        return nil    }    return errors.New("Unable to call close the socket connection.")}func main() {    table := "demo_tab"  // 测试数据表    protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()    dbConnPool, _ := utils.NewConnPool(func() (utils.ConnRes, error) {   // 重要:创建连接池                tcpTransport, err := thrift.NewTSocket(HOST + ":" + PORT)                if err != nil {                    fmt.Printf("Error when setup HBase socket connection: %s.\n", err.Error())                }                connClient := hbase.NewTHBaseServiceClientFactory(tcpTransport, protocolFactory)                return &HbaseConn{SocketConn: tcpTransport, ClientConn: connClient}, nil            },  // 重要:HbaseConn类型兼容ConnRes,因此可以作为Factory方法的返回            DEFAULT_CONN_CAPCITY,            DEFAULT_TIMEOUT)    defer dbConnPool.Close()  // 程序退出后,勿忘记关闭连接池    //  并发访问     for j := 0; j < 10; j++ {        go func() {            rowKey := "row_" + fmt.Sprintf("%d", j)  // 设定rowKey            conn, _ := dbConnPool.Get() // 从池子中获取一个连接            defer dbConnPool.Put(conn) // 用后勿忘归还连接入池            if !conn.(*HbaseConn).SocketConn.IsOpen() { // 判断取到的连接TCP是否关闭,如果关闭,重新打开                conn.(*HbaseConn).SocketConn.Open() // 这里有一个接口转换,将ConnRes转成HbaseConn,类似C++的dynamic_cast<>            }            result, err := conn.(*HbaseConn).ClientConn.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)}) // 获取记录            if err != nil {                fmt.Println(err)            }            if result != nil {                fmt.Println("Routine: ", j, "Rowkey:" + string(result.Row))                for _, cv := range result.ColumnValues {                    fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value))                }            }            return        }()    }      time.Sleep(time.Second * 10) // 主程序睡眠10秒钟,等待协程结束}           

可以将上述程序在开发环境中编译运行,运行结果发现不再有“get: out of order sequence response” 这样的报错,输出结果也是正常的。

五、总结

本文介绍了使用Go语言通过Thrift程序包访问HBase的基本操作,包括环境建立、工具使用、程序开发以及一些技术陷阱的解决方法等,力求从头到尾对这个主题进行完整的总结。囿于作者水平所限,文中错误疏漏在所难免,作为一个只有个把月Go学习经验,一万来行Go项目开发经验的初级人士,给出的程序实现肯定不是最优的,存在较大的优化空间。后面的工作中,如有更有价值的心得或者更优化的解决方案,也会及时输出更新。

注:本文系原创,亦发表于作者微信公众号,转载请注明出处。