一、前言
最近连续发布了几篇Golang开发数据库应用的技术文章。有读者询问项目背景,这里简单介绍一下。笔者现在主导设计开发一个大数据项目,涉及到ElasticSearch、HBase、MongoDB、MySql、ClickHouse等系统。
之所以用到这么多数据库系统,是因为整个流程涉及了数据开发、清洗、统计、分析、查询和展示等环节,业务逻辑复杂,数据量庞大。为了最大化满足功能需求同时兼顾性能,需要考虑到每个系统在数据处理方面的优点并将其发挥到极致。
本文是Golang开发HBase应用方面第三篇技术心得,主要介绍如何通过Apache Thrift2这个协议库进行HBase相关应用的开发,特别介绍了其中容易导致问题的一些技术陷阱。
笔者在调研过程中发现,关于Thrift协议和开发库的介绍资料比较多,使用Go语言结合Thrift开发的资料也能找到一些,但是中文社区里,系统性的从头到尾梳理Go语言结合Thrift进行HBase开发,并对技术细节加以分析的内容还是略显匮乏。
也许本文没有所谓技术创新点,但是结合可操作的实例的介绍开发流程,同时指出潜在的技术陷阱并给出解决方案,应该算是具有实践指导意义的经验总结,不算重新发明轮子。笔者一个小小的愿望就是读者通过阅读这篇文章,直接就能上手Go语言HBase开发,解决实际问题。
接下来部分按如下组织:第二部分介绍准备工作,包括如何下载、编译和安装Thrift;第三部分结合实例说明如何操作HBase数据库;第四部分是高阶话题,介绍如何如何通过连接池方式,达到Thrift对并发操作的支持;最后第五部分是全文总结。
本文预设背景是读者已经熟悉Golang程序开发工具,并能够进行基本程序开发。另外,本文所有实例都在Ubuntu-20.04.1Linux系统环境下运行测试通过,如无特别说明,文中会将Thrift/Thrift2混用,这里仅指Thrift2版本。
二、准备工作
2.1 Thrift2 下载,编译和安装
Thirft库的源代码需要从网上下载,在Linux环境下,运行:
wget https://dlcdn.apache.org/thrift/0.16.0/thrift-0.16.0.tar.gz
即可下载Thrift源码包,当前最新版本是0.16.0。
下载完成后,运行tar xzvf thrift-0.16.0.tar.gz 解压缩,然后开始编译。在编译前,确保环境所有工具包都已经齐备,如果不确定,在Ubuntu Linux环境下,运行以下命令把所需要组件一股脑装上:
./configuremake && make install
完成编译和安装。如果不运行make install,编译好的程序会在thrift-0.16.0/compiler/cpp 路径下,名称为thrift。
2.2 启动Thrift2服务
这一步要保证使用的HBase系统开启Thrift服务,具体方法是在HBase的主节点,运行以下启动命令:
下载https://raw.githubusercontent.com/apache/hbase/2.4.11RC0/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift2/hbase.thrift 文件,不知为何,这里无法使用wget下载。运行命令 thrift --out ./hbase --gen go Hbase.thrift 运行成功后,当前路径下生成hbase/ 目录,就是对应的Go开发包
## 运行命令后,在目录下将新生成一名为 hbase 的目录,里面就包含了所需的go文件了## hbase的目录结构如下:hbase └── GoUnusedProtection__.go ├── Hbase-consts.go ├── Hbase.go ├── t_h_base_service-remote ## 该文件夹是demo文件夹,可以删掉 └── t_h_base_service-remote.go ## 客户端的测试代码,直接可拿来用
将生成hbase目录拷贝到你的Go开发环境目录下,至此就完成了Thrift库准备工作。
生成的hbase开发包,比较重要的一个文件是hbase/Hbase.go,大概有2万余行,相当于访问Thrift远程过程调用(RPC) 的包裹器(Wrapper),将Go语言对HBase的操作请求转成RPC请求,并处理返回结果。
三、开发工作
本章节给出一个例子,实现基本的HBase操作。
3.1 数据表准备
在HBase数据库中创建数据表'demo_tab',包含一个列簇(Family)'info',建表命令如下:
create 'demo_tab', 'info'
然后输入数据,这里输入10条数据,行键(row key)分别是'row_1' ... 'row10',数据列(Qualifier)分别是info:val_1, info:val_2。
3.2 简单例子
这里假设开发环境目录为dev/,另外,为了节省篇幅,这里省去了所有错误处理。
package mainimport ( "fmt" "github.com/apache/thrift/lib/go/thrift" // Apache提供的go thrift工具包 "context" "dev/hbase" // thrift工具生成hbase工具包,放在dev/ 目录下)const HOST = "hbase.master.node" // 根据具体环境设置,hbase主节点const PORT = "9090" func main() { table := "demo_tab" // 测试数据表 rowKey := "row_5" // 某一行,这里row_5行 newRowKey := "row_11" // 新写入一行 family := "info" // 列簇名 protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() transport, _ := thrift.NewTSocket(HOST + ":" + PORT) // 创建连接 client := hbase.NewTHBaseServiceClientFactory(transport, protocolFactory) // 客户端与连接关联 transport.Open() // 执行连接操作 defer transport.Close() // 结束关闭 /*** 判断数据表是否存在 ***/ isExists, _ := client.TableExists(context.Background(), &hbase.TTableName{Ns: []byte(family), Qualifier: []byte(table)}) fmt.Printf("table{%s} Exists:%t\n", table, isExists) /*** 判断记录是否存在 ***/ isExists, _ = client.Exists(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)}) fmt.Printf("rowkey{%s} in table{%s} Exists:%t\n", rowKey, table, isExists) /*** Put 一条数据 ***/ cvArr := []*hbase.TColumnValue{ { Family: []byte(family), Qualifier: []byte("val_1"), Value: []byte("new value of column val_1"), }, } data2Put := hbase.TPut{Row: []byte(newRowKey), ColumnValues: cvArr} client.Put(context.Background(), []byte(table), &data2Put) /*** Get 写入的数据 ***/ result, _ := client.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(newRowKey)}) fmt.Println("Rowkey:" + string(result.Row)) for _, cv := range result.ColumnValues { fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value)) } /*** 带有过滤器的Scan操作 ***/ startRow, stopRow := "row_1", "row_9" qual, val := "val_1", "aaa" col1 := hbase.TColumn{Family: []byte(family), Qualifier: []byte("val_1")} col2 := hbase.TColumn{Family: []byte(family), Qualifier: []byte("val_2")} filterStr := fmt.Sprintf("SingleColumnValueFilter('%s','%s',=,'binary:%s', false, true)", family, qual, val) results, _ := client.GetScannerResults(context.Background(), []byte(table), &hbase.TScan{ StartRow: []byte(startRow), // 扫描范围起始 StopRow: []byte(stopRow), // 扫描范围结束 FilterString: []byte(filterStr), // 设定过滤字段和值 Columns: []*hbase.TColumn{&col1, &col2}, // 要读取的字段 MaxVersions: 1, // ** 注意版本,如果需要读取最新版本,要跟Hbase表属性设置一致,默认为0 }, 100) fmt.Printf("GetScannerResults %d done\n", len(results)) for _, k := range results { fmt.Println("scan Rowkey:" + string(k.Row)) for _, cv := range k.ColumnValues { fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value)) } }}
上述可以正常编译运行,注意HBase中对应的表要有数据,以及程序中各类环境设置要与真实情况一致。
3.3 过滤器问题
3.2节给出的例子里面,有一个带有过滤器的扫描操作,具体作用是对扫描查询的结果按照一定条件过滤。关于过滤器的说明,这里有个参考链接,将过滤器的种类、作用和格式都做了详细说明,非常具有参考价值:
https://www.cnblogs.com/ZisZ/p/9667271.html
关于扫描过滤器,有一个新手很容易出问题的地方,在3.2节程序例子中我用星号标记出来(54行)——MaxVersions属性的设置。
Go语言会对结构体自动设置默认值,如果不显式设置MaxVersions属性的值,在程序中这个值就会是0,然后用户运行时就会发现,明明数据库有符合条件的值,怎么返回结果是空?在其他设置都确认无误情况下,MaxVersions是一个容易掉入陷阱的地方。这个属性意义是返回数据库中数据版本允许的最大值——HBase对于字段可以设置版本属性,允许最多保留N个历史版本,每次更新,版本也会增加。第一次写入的数据,版本号就是1,如果只保留最新版本,版本号就停留在1。如果MaxVersions设置成0,就没有符合条件的数据被返回,这就是出现结果为空的原因。
四、高阶话题 —— 连接池问题
介绍完基本例子,本章介绍一个高级话题——Thrift连接池问题。现用一个例子说明,我们上一章节例子加以改造,利用Go协程的并发特性,对HBase开10个协程并发读取,看看结果怎样。
package mainimport ( "fmt" "time" "context" "dev/hbase" "github.com/apache/thrift/lib/go/thrift" )const HOST = "hbase.master.node" // 根据具体环境设置,hbase主节点const PORT = "9090"func main() { table := "demo_tab" // 测试数据表 protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() transport, _ := thrift.NewTSocket(HOST + ":" + PORT) // 创建连接 client := hbase.NewTHBaseServiceClientFactory(transport, protocolFactory) // 客户端与连接关联 transport.Open() // 执行连接操作 defer transport.Close() // 结束关闭 // 运行10个协程,并行读取HBase数据 for j := 0; j < 10; j++ { go func() { rowKey := "row_" + fmt.Sprintf("%d", j) // 构造行键 result, err := client.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)}) if err != nil { // 这里增加错误处理,为了看到错误 fmt.Println(err) } if result != nil { fmt.Println("Routine: ", j, "Rowkey:" + string(result.Row)) for _, cv := range result.ColumnValues { fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value)) } } }() } time.Sleep(time.Second * 10) // 主进程睡眠10秒,等待协程结束}
运行后,会出现错误提示"get: out of order sequence response"。这是因为Apache提供的Go语言版本Thrift开发包不支持并发引起的。究竟什么怎样的实现导致不能支持并发呢?我们深入代码分析一下。
这里看一下Apache Thrift官方库提供的Go语言API,文件路径:https://github.com/apache/thrift/blob/master/lib/go/thrift/client.go
// ResponseMeta represents the metadata attached to the response.type ResponseMeta struct { // The headers in the response, if any. // If the underlying transport/protocol is not THeader, this will always be nil. Headers THeaderMap}type TClient interface { Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error)}type TStandardClient struct { seqId int32 iprot, oprot TProtocol}// TStandardClient implements TClient, and uses the standard message format for Thrift.// It is not safe for concurrent use. ***** 这里加了星号func NewTStandardClient(inputProtocol, outputProtocol TProtocol) *TStandardClient { return &TStandardClient{ iprot: inputProtocol, oprot: outputProtocol, }}
由上面代码片段可见,Apache Thrift库Go版本接口官方已经注释,这个客户端不能安全的支持并发应用(上面加了五个 *部分,15行)。
我们来分析一下为什么这个实现不支持并发应用。从Thrift库的client.go文件中找到的一个API方法,名字叫Call,开发者通过调用Call方法,与远程HBase的Thrift服务交互。可以看到方法中使用了Send/Recv实现数据收发。这里面数据发送和接收的会话通过seqId来标记。在单线程情况下是没问题的。如果并发情况下,假设一个线程运行到下面加了星号(第2行)部分之前,此时seqId记作1,然后另一个线程开始运行,也运行到星号部分(第2行)并执行完,此时seqId变成2(由开头p.seqId++引发)。然后再切换到线程1继续运行,这时候线程1看到的seqId是2,后面也拿这个seqId操作,这样线程1和2都使用seqId操作,引发序列号冲突。其实在我看来,一个解决冲突的办法是对p.seqId++操作加锁,不知为什么没有这样实现。
func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error) { p.seqId++ // *** 注意这里加了星号 seqId := p.seqId if err := p.Send(ctx, p.oprot, seqId, method, args); err != nil { return ResponseMeta{}, err } // method is oneway if result == nil { return ResponseMeta{}, nil } err := p.Recv(ctx, p.iprot, seqId, method, result) var headers THeaderMap if hp, ok := p.iprot.(*THeaderProtocol); ok { headers = hp.transport.readHeaders } return ResponseMeta{ Headers: headers, }, err}
通过上述分析可知,我们既可以修改Thrift库代码解决这个问题,也可以在外部应用程序中解决。前两篇关于HBase的技术文章,都是通过直接修改开发包达到目的。这里因为对Go语言的Thrift库并不熟悉,贸然改动代码恐怕会引起未知问题,我们就手动实现一个连接池,将连接资源管理起来,应对并发请求。
实现一个连接池并不复杂,有很多现成的参考实例。实际上,我们这里给出的连接池也是根据网上实例,结合Thrift应用实际场景加以改造完成。下面给出连接池的例子。这里需要读者对具备Go语言的接口和通道的相关背景知识。作为Go语言初学者,个人觉得接口是Go语言很优秀的特性,灵活、高效,兼具了C++指针和类的功能,借助接口,可以实现类似多态、动态数据转换等功能。
package utilsimport ( "sync" "time" "errors" "dev/hbase" // thrift工具生成的hbase 包 "github.com/apache/thrift/lib/go/thrift") type ConnRes interface { // 连接池成员接口 Close() error;} type Factory func() (ConnRes, error) // 创建连接资源的工厂方法 type Conn struct { // 连接结构体,包括一个连接资源,一个创建时间 conn ConnRes time time.Time} type ConnPool struct { // 连接池 mu sync.Mutex // 互斥量,用于并发访问控制 conns chan *Conn // 实际的“池”,用一个通道保存连接 factory Factory // 创建连接的工厂方法 closed bool // 连接池是否关闭 connTimeout time.Duration // 每个连接超时时限} func NewConnPool(factory Factory, capacity int, connTimeout time.Duration) (*ConnPool, error) { if capacity < 0 { // 设置容量,不能小于0 return nil, errors.New("Invalid params: capacity cannot be less than 0.") } if connTimeout <= 0 { return nil, errors.New("Invalid prams: connTimeout cannot be less than 0.") } cp := &ConnPool{ // 参数检查后,设置连接池属性 mu: sync.Mutex{}, conns: make(chan *Conn, capacity), factory: factory, closed: false, connTimeout: connTimeout, } for i := 0; i < capacity; i++ { // 预先创建capacity个连接,放入池子中 connRes, err := cp.factory() if err != nil { cp.Close() return nil, errors.New("Error in NewConnPool while calling factory") } cp.conns <- &Conn{conn: connRes, time: time.Now()} // 连接放入池中 } return cp, nil} func (cp *ConnPool) Get() (ConnRes, error) { // 取出一个连接 if cp.closed { return nil, errors.New("Connection pool closed.") } for { select { case connRes, ok := <-cp.conns: // 有资源可以获取到 if !ok { return nil, errors.New("Connection pool closed.") } if time.Now().Sub(connRes.time) > cp.connTimeout { // 拿到的连接已经超时,将其关闭,继续取下一个 connRes.conn.Close() continue } return connRes.conn, nil // 成功获取到一个连接 default: // 获取不到连接,池子已空,那就新创建一个 connRes, err := cp.factory() if err != nil { return nil, err } return connRes, nil } }} func (cp *ConnPool) Put(conn ConnRes) error { // 用完归还一个连接到连接池 if cp.closed { return errors.New("Connection pool closed.") } select { case cp.conns <- &Conn{conn: conn, time: time.Now()}: // 将连接放回到池子中,更新时间 return nil default: conn.Close() // 连接池已满,无法放入资源,将这个连接关闭 return errors.New("Connection pool is full.") }} func (cp *ConnPool) Close() { // 关闭连接池 if cp.closed { return } cp.mu.Lock() cp.closed = true close(cp.conns) // 关闭通道,即连接“池” for conn := range cp.conns { conn.conn.Close() } cp.mu.Unlock()}
有了连接池后,结合上面例子,我们测试一下并发访问HBase。
先设置好环境,当前开发环境目录是dev/,dev下面目录结构如下:
dev # 开发环境根目录├── hbase # thrift 生成的HBase程序包│ ├── GoUnusedProtection__.go│ ├── Hbase-consts.go│ ├── Hbase.go│ └── hbase-remote│ └── hbase-remote.go├── main.go # 测试主程序,并发访问HBase└── utils # 连接池 └── conn_pool.go
修改后,使用了连接池功能的并发访问程序main.go内容如下:
package mainimport ( "fmt" "time" "errors" "context" "dev/utils" // 连接池所在Go程序包 "dev/hbase" // thrift工具生成hbase工具包 "github.com/apache/thrift/lib/go/thrift" // Apache提供的go thrift工具包)const HOST = "hbase.master.node" // 根据具体环境设置,hbase主节点const PORT = "9090" // Hbase Thrift 服务端口const DEFAULT_TIMEOUT = time.Second * 10 // 连接默认超时时间const DEFAULT_CONN_CAPCITY = 10 // 默认连接池容量 type HbaseConn struct { SocketConn *thrift.TSocket ClientConn *hbase.THBaseServiceClient}func (p *HbaseConn) Close() error { // 实现Close() error 方法兼容utils.ConnRes接口 if p.SocketConn != nil { p.SocketConn.Close() return nil } return errors.New("Unable to call close the socket connection.")}func main() { table := "demo_tab" // 测试数据表 protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() dbConnPool, _ := utils.NewConnPool(func() (utils.ConnRes, error) { // 重要:创建连接池 tcpTransport, err := thrift.NewTSocket(HOST + ":" + PORT) if err != nil { fmt.Printf("Error when setup HBase socket connection: %s.\n", err.Error()) } connClient := hbase.NewTHBaseServiceClientFactory(tcpTransport, protocolFactory) return &HbaseConn{SocketConn: tcpTransport, ClientConn: connClient}, nil }, // 重要:HbaseConn类型兼容ConnRes,因此可以作为Factory方法的返回 DEFAULT_CONN_CAPCITY, DEFAULT_TIMEOUT) defer dbConnPool.Close() // 程序退出后,勿忘记关闭连接池 // 并发访问 for j := 0; j < 10; j++ { go func() { rowKey := "row_" + fmt.Sprintf("%d", j) // 设定rowKey conn, _ := dbConnPool.Get() // 从池子中获取一个连接 defer dbConnPool.Put(conn) // 用后勿忘归还连接入池 if !conn.(*HbaseConn).SocketConn.IsOpen() { // 判断取到的连接TCP是否关闭,如果关闭,重新打开 conn.(*HbaseConn).SocketConn.Open() // 这里有一个接口转换,将ConnRes转成HbaseConn,类似C++的dynamic_cast<> } result, err := conn.(*HbaseConn).ClientConn.Get(context.Background(), []byte(table), &hbase.TGet{Row: []byte(rowKey)}) // 获取记录 if err != nil { fmt.Println(err) } if result != nil { fmt.Println("Routine: ", j, "Rowkey:" + string(result.Row)) for _, cv := range result.ColumnValues { fmt.Println("Family:", string(cv.Family), "Qualifer:", string(cv.Qualifier), "Value:", string(cv.Value)) } } return }() } time.Sleep(time.Second * 10) // 主程序睡眠10秒钟,等待协程结束}
可以将上述程序在开发环境中编译运行,运行结果发现不再有“get: out of order sequence response” 这样的报错,输出结果也是正常的。
五、总结
本文介绍了使用Go语言通过Thrift程序包访问HBase的基本操作,包括环境建立、工具使用、程序开发以及一些技术陷阱的解决方法等,力求从头到尾对这个主题进行完整的总结。囿于作者水平所限,文中错误疏漏在所难免,作为一个只有个把月Go学习经验,一万来行Go项目开发经验的初级人士,给出的程序实现肯定不是最优的,存在较大的优化空间。后面的工作中,如有更有价值的心得或者更优化的解决方案,也会及时输出更新。
注:本文系原创,亦发表于作者微信公众号,转载请注明出处。