天天看點

flume 自定義 hbase sink 類

參考(向原作者緻敬)

  • http://ydt619.blog.51cto.com/316163/1230586
  • https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase

flume 1.5 的配置檔案示例

#Name the  components on this agent
a1.sources  = r1
a1.sinks =  k1
a1.channels  = c1

#  Describe/configure the source
a1.sources.r1.type  = spooldir
a1.sources.r1.spoolDir  = /home/scut/Downloads/testFlume

# Describe  the sink
a1.sinks.k1.type  = org.apache.flume.sink.hbase.AsyncHBaseSink
a1.sinks.k1.table = Router #設定hbase的表名
a1.sinks.k1.columnFamily = log #設定hbase中的columnFamily
a1.sinks.k1.serializer.payloadColumn=serviceTime,browerOS,clientTime,screenHeight,screenWidth,url,userAgent,mobileDevice,gwId,mac # 設定hbase的column
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.BaimiAsyncHbaseEventSerializer # 設定serializer的處理類

# Use a  channel which buffers events in memory
a1.channels.c1.type  = memory
a1.channels.c1.capacity  = 1000
a1.channels.c1.transactionCapacity  = 100

# Bind the  source and sink to the channel
a1.sources.r1.channels  = c1
a1.sinks.k1.channel  = c1
           

重點說明幾個屬性

  • a1.sinks.k1.serializer.payloadColumn 中列出了所有的列名。
  • a1.sinks.k1.serializer設定了flume serializer的處理類。BaimiAsyncHbaseEventSerializer類中會擷取payloadColumn的内容,将它以逗号分隔,進而得出所有的列名。

BaimiAsyncHbaseEventSerializer類

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.flume.sink.hbase;

import java.util.ArrayList;
import java.util.List;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;

import com.google.common.base.Charsets;

public class BaimiAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
  private byte[] table;
  private byte[] cf;
  private byte[][] payload;
  private byte[][] payloadColumn;
  private final String payloadColumnSplit = "\\^A";
  private byte[] incrementColumn;
  private String rowSuffix;
  private String rowSuffixCol;
  private byte[] incrementRow;
  private KeyType keyType;

  @Override
  public void initialize(byte[] table, byte[] cf) {
    this.table = table;
    this.cf = cf;
  }

  @Override
  public List<PutRequest> getActions() {
    List<PutRequest> actions = new ArrayList<PutRequest>();
    if(payloadColumn != null){
      byte[] rowKey;
      try {
        switch (keyType) {
          case TS:
            rowKey = SimpleRowKeyGenerator.getTimestampKey(rowSuffix);
            break;
          case TSNANO:
            rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowSuffix);
            break;
          case RANDOM:
            rowKey = SimpleRowKeyGenerator.getRandomKey(rowSuffix);
            break;
          default:
            rowKey = SimpleRowKeyGenerator.getUUIDKey(rowSuffix);
            break;
        }

	// for 循環,送出所有列和對于資料的put請求。
	for (int i = 0; i < this.payload.length; i++)
	{
        	PutRequest putRequest =  new PutRequest(table, rowKey, cf,payloadColumn[i], payload[i]);
        	actions.add(putRequest);
	}

      } catch (Exception e){
        throw new FlumeException("Could not get row key!", e);
      }
    }
    return actions;
  }

  public List<AtomicIncrementRequest> getIncrements(){
    List<AtomicIncrementRequest> actions = new
        ArrayList<AtomicIncrementRequest>();
    if(incrementColumn != null) {
      AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
          incrementRow, cf, incrementColumn);
      actions.add(inc);
    }
    return actions;
  }

  @Override
  public void cleanUp() {
    // TODO Auto-generated method stub

  }

  @Override
  public void configure(Context context) {
    String pCol = context.getString("payloadColumn", "pCol");
    String iCol = context.getString("incrementColumn", "iCol");
    rowSuffixCol = context.getString("rowPrefixCol", "mac");
    String suffix = context.getString("suffix", "uuid");
    if(pCol != null && !pCol.isEmpty()) {
      if(suffix.equals("timestamp")){
        keyType = KeyType.TS;
      } else if (suffix.equals("random")) {
        keyType = KeyType.RANDOM;
      } else if(suffix.equals("nano")){
        keyType = KeyType.TSNANO;
      } else {
        keyType = KeyType.UUID;
      }
 
     	// 從配置檔案中讀出column。 
     	String[] pCols = pCol.replace(" ", "").split(",");
     	payloadColumn = new byte[pCols.length][];
     	for (int i = 0; i < pCols.length; i++)
	{
		// 列名轉為小寫
		payloadColumn[i] = pCols[i].toLowerCase().getBytes(Charsets.UTF_8);
	}
    }

    if(iCol != null && !iCol.isEmpty()) {
      incrementColumn = iCol.getBytes(Charsets.UTF_8);
    }
    incrementRow =
        context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
  }

  @Override
  public void setEvent(Event event) {
	String strBody = new String(event.getBody());
	String[] subBody = strBody.split(this.payloadColumnSplit);
	if (subBody.length == this.payloadColumn.length)
	{
		this.payload = new byte[subBody.length][];
		for (int i = 0; i < subBody.length; i++)
		{
			this.payload[i] = subBody[i].getBytes(Charsets.UTF_8);
			if ((new String(this.payloadColumn[i]).equals(this.rowSuffixCol)))
			{
				// rowkey 字首是某一列的值, 預設情況是mac位址
				this.rowSuffix = subBody[i];
			}
		}
	}
  }

  @Override
  public void configure(ComponentConfiguration conf) {
    // TODO Auto-generated method stub
  }
}
           

重點可以檢視setEent,configure,getActions函數。

  • configure函數:讀取flume配置檔案内容,包括列名,rowkey字尾等資訊
  • setEvent函數:擷取flume event 内容,将其儲存到payload數組中。
  • getActions函數:建立PutRequest執行個體,将rowkey,columnfamily,column,value等資訊寫入putrequest執行個體中。

源碼編譯和執行

     編寫好自定義的BaimiAsyncHbaseEventSerializer函數後,接下來需要編譯源碼,生成flume-ng-hbase-sink.*.jar包,替換flume中原來的flume-ng-hbase-sink.*.jar包。

  • 下載下傳flume 1.5 源碼,解壓後進入目錄flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/src/main/java/org/apache/flume/sink/hbase/
  • 複制上面的BaimiAsyncHbaseEventSerializer類到上面的目錄中。
  • 進入flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/,運作mvn編譯指令【mvn install -Dmaven.test.skip=true】
  • mvn編譯後會在flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/target目錄下生成flume-ng-hbase-sink-1.5.0.jar,将這個jar包替換$FLUME_HOME/lib下的jar包
  • 運作flume執行指令【flume-ng agent -c . -f conf/spoolDir.conf -n a1  -Dflume.root.logger=INFO,console】