天天看點

hbase熱點問題(資料傾斜)解決方案---rowkey散列和預分區設計

     Hbase的表會被劃分為1....n個Region,被托管在RegionServer中。Region二個重要的屬性:Startkey與EndKey表示這個Region維護的rowkey的範圍,當我們要讀寫資料時,如果rowkey落在某個start-end key範圍内,那麼就會定位到目标region并且讀寫到相關的資料。

    預設情況下,當我們通過hbaseAdmin指定TableDescriptor來建立一張表時,隻有一個region正處于混沌時期,start-end key無邊界,可謂海納百川。所有的rowkey都寫入到這個region裡,然後資料越來越多,region的size越來越大時,大到一定的閥值,hbase就會将region一分為二,成為2個region,這個過程稱為分裂(region-split)。

    如果我們就這樣預設建表,表裡不斷的put資料,更嚴重的是我們的rowkey還是順序增大的,是比較可怕的。存在的缺點比較明顯:首先是熱點寫,我們總是向最大的start key所在的region寫資料,因為我們的rowkey總是會比之前的大,并且hbase的是按升序方式排序的。是以寫操作總是被定位到無上界的那個region中;其次,由于熱點,我們總是往最大的start key的region寫記錄,之前分裂出來的region不會被寫資料,有點打入冷宮的感覺,他們都處于半滿狀态,這樣的分布也是不利的。

    如果在寫比較頻繁的場景下,資料增長太快,split的次數也會增多,由于split是比較耗費資源的,是以我們并不希望這種事情經常發生。

    在叢集中為了得到更好的并行性,我們希望有好的load blance,讓每個節點提供的請求都是均衡的,我們也不希望,region不要經常split,因為split會使server有一段時間的停頓,如何能做到呢?

    随機散列與預分區二者結合起來,是比較完美的。預分區一開始就預建好了一部分region,這些region都維護着自己的start-end keys,在配合上随機散列,寫資料能均衡的命中這些預建的region,就能解決上面的那些缺點,大大提供性能。

一、解決思路

    提供兩種思路:hash與partition。

1、hash方案

    hash就是rowkey前面由一串随機字元串組成,随機字元串生成方式可以由SHA或者MD5方式生成,隻要region所管理的start-end keys範圍比較随機,那麼就可以解決寫熱點問題。例如:

long currentId = 1L;
byte [] rowkey = Bytes.add(MD5Hash.getMD5AsHex(Bytes.toBytes(currentId))
                    .substring(0, 8).getBytes(),Bytes.toBytes(currentId));
           

     假如rowkey原本是自增長的long型,可以将rowkey轉為hash再轉為bytes,加上本身id轉為bytes,這樣就生成随便的rowkey。那麼對于這種方式的rowkey設計,如何去進行預分區呢?

  1. 取樣,先随機生成一定數量的rowkey,将取樣資料按升序排序放到一個集合裡。
  2. 根據預分區的region個數,對整個集合平均分割,即是相關的splitkeys。
  3. HBaseAdmin.createTable(HTableDescriptor tableDescriptor,byte[][] splitkeys)可以指定預分區的splitkey,即指定region間的rowkey臨界值。

    建立split電腦,用于從抽樣資料生成一個比較合适的splitkeys

public class HashChoreWoker implements SplitKeysCalculator{
    //随機取機數目
    private int baseRecord;
    //rowkey生成器
    private RowKeyGenerator rkGen;
    //取樣時,由取樣數目及region數相除所得的數量.
    private int splitKeysBase;
    //splitkeys個數
    private int splitKeysNumber;
    //由抽樣計算出來的splitkeys結果
    private byte[][] splitKeys;

    public HashChoreWoker(int baseRecord, int prepareRegions) {
        this.baseRecord = baseRecord;
        //執行個體化rowkey生成器
        rkGen = new HashRowKeyGenerator();
        splitKeysNumber = prepareRegions - 1;
        splitKeysBase = baseRecord / prepareRegions;
    }

    public byte[][] calcSplitKeys() {
        splitKeys = new byte[splitKeysNumber][];
        //使用treeset儲存抽樣資料,已排序過
        TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
        for (int i = 0; i < baseRecord; i++) {
            rows.add(rkGen.nextId());
        }
        int pointer = 0;
        Iterator<byte[]> rowKeyIter = rows.iterator();
        int index = 0;
        while (rowKeyIter.hasNext()) {
            byte[] tempRow = rowKeyIter.next();
            rowKeyIter.remove();
            if ((pointer != 0) && (pointer % splitKeysBase == 0)) {
                if (index < splitKeysNumber) {
                    splitKeys[index] = tempRow;
                    index ++;
                }
            }
            pointer ++;
        }
        rows.clear();
        rows = null;
        return splitKeys;
    }
}
           

     KeyGenerator及實作

//interface
public interface RowKeyGenerator {
    byte [] nextId();
}
//implements
public class HashRowKeyGenerator implements RowKeyGenerator {
    private long currentId = 1;
    private long currentTime = System.currentTimeMillis();
    private Random random = new Random();
    public byte[] nextId() {
        try {
            currentTime += random.nextInt(1000);
            byte[] lowT = Bytes.copy(Bytes.toBytes(currentTime), 4, 4);
            byte[] lowU = Bytes.copy(Bytes.toBytes(currentId), 4, 4);
            return Bytes.add(MD5Hash.getMD5AsHex(Bytes.add(lowU, lowT)).substring(0, 8).getBytes(),
                    Bytes.toBytes(currentId));
        } finally {
            currentId++;
        }
    }
}
           

     unit test case測試

@Test
public void testHashAndCreateTable() throws Exception{
        HashChoreWoker worker = new HashChoreWoker(1000000,10);
        byte [][] splitKeys = worker.calcSplitKeys();
        
        HBaseAdmin admin = new HBaseAdmin(HBaseConfiguration.create());
        TableName tableName = TableName.valueOf("hash_split_table");
        
        if (admin.tableExists(tableName)) {
            try {
                admin.disableTable(tableName);
            } catch (Exception e) {
            }
            admin.deleteTable(tableName);
        }

        HTableDescriptor tableDesc = new HTableDescriptor(tableName);
        HColumnDescriptor columnDesc = new HColumnDescriptor(Bytes.toBytes("info"));
        columnDesc.setMaxVersions(1);
        tableDesc.addFamily(columnDesc);

        admin.createTable(tableDesc ,splitKeys);

        admin.close();
    }
           

     檢視建表結果,執行:scan 'hbase:meta'

hbase熱點問題(資料傾斜)解決方案---rowkey散列和預分區設計

    以上我們隻是顯示了部分region的資訊,可以看到region的start-end key還是比較随機散列的。同樣可以檢視hdfs的目錄結構,的确和預期的38個預分區一緻: 

hbase熱點問題(資料傾斜)解決方案---rowkey散列和預分區設計

    以上就是按照hash方式,預建好分區,以後再插入資料的時候,也是按照此rowkeyGenerator的方式生成rowkey。

2、partition的方式

    partition顧名思義就是分區式,這種分區有點類似于mapreduce中的partitioner,将區域用長整數作為分區号,每個region管理着相應的區域資料,在rowkey生成時,将ID取模後,然後拼上ID整體作為rowkey,這個比較簡單,不需要取樣,splitkeys也非常簡單,直接是分區号即可。直接上代碼:

public class PartitionRowKeyManager implements RowKeyGenerator,
        SplitKeysCalculator {

    public static final int DEFAULT_PARTITION_AMOUNT = 20;
    private long currentId = 1;
    private int partition = DEFAULT_PARTITION_AMOUNT;
    public void setPartition(int partition) {
        this.partition = partition;
    }

    public byte[] nextId() {
        try {
            long partitionId = currentId % partition;
            return Bytes.add(Bytes.toBytes(partitionId),
                    Bytes.toBytes(currentId));
        } finally {
            currentId++;
        }
    }

    public byte[][] calcSplitKeys() {
        byte[][] splitKeys = new byte[partition - 1][];
        for(int i = 1; i < partition ; i ++) {
            splitKeys[i-1] = Bytes.toBytes((long)i);
        }
        return splitKeys;
    }
}
           

    calcSplitKeys方法比較單純,splitkey就是partition的編号,測試類如下:

@Test
    public void testPartitionAndCreateTable() throws Exception{
        
        PartitionRowKeyManager rkManager = new PartitionRowKeyManager();
        //隻預建10個分區
        rkManager.setPartition(10);
        
        byte [][] splitKeys = rkManager.calcSplitKeys();
        
        HBaseAdmin admin = new HBaseAdmin(HBaseConfiguration.create());
        TableName tableName = TableName.valueOf("partition_split_table");
        
        if (admin.tableExists(tableName)) {
            try {
                admin.disableTable(tableName);

            } catch (Exception e) {
            }
            admin.deleteTable(tableName);
        }

        HTableDescriptor tableDesc = new HTableDescriptor(tableName);
        HColumnDescriptor columnDesc = new HColumnDescriptor(Bytes.toBytes("info"));
        columnDesc.setMaxVersions(1);
        tableDesc.addFamily(columnDesc);

        admin.createTable(tableDesc ,splitKeys);

        admin.close();
    }
           

     同樣我們可以看看meta表和hdfs的目錄結果,其實和hash類似,region都會分好區。

     通過partition實作的loadblance寫的話,當然生成rowkey方式也要結合目前的region數目取模而求得,大家同樣也可以做些實驗,看看資料插入後的分布。

     在這裡也順提一下,如果是順序的增長型原id,可以将id儲存到一個資料庫,傳統的也好,redis的也好,每次取的時候,将數值設大1000左右,以後id可以在記憶體内增長,當記憶體數量已經超過1000的話,再去load下一個,有點類似于oracle中的sqeuence.

     随機分布加預分區也不是一勞永逸的。因為資料是不斷地增長的,随着時間不斷地推移,已經分好的區域,或許已經裝不住更多的資料,當然就要進一步進行split了,同樣也會出現性能損耗問題,是以我們還是要規劃好資料增長速率,觀察好資料定期維護,按需分析是否要進一步分行手工将分區再分好,也或者是更嚴重的是建立表,做好更大的預分區然後進行資料遷移。如果資料裝不住了,對于partition方式預分區的話,如果讓它自然分裂的話,情況分嚴重一點。因為分裂出來的分區号會是一樣的,是以計算到partitionId的話,其實還是回到了順序寫年代,會有部分熱點寫問題出現,如果使用partition方式生成主鍵的話,資料增長後就要不斷地調整分區了,比如增多預分區,或者加入子分區号的處理.(我們的分區号為long型,可以将它作為多級partition)

    以上基本已經講完了防止熱點寫使用的方法和防止頻繁split而采取的預分區。但rowkey設計,遠遠也不止這些,比如rowkey長度,然後它的長度最大可以為char的MAXVALUE,但是看過之前我寫KeyValue的分析知道,我們的資料都是以KeyValue方式存儲在MemStore或者HFile中的,每個KeyValue都會存儲rowKey的資訊,如果rowkey太大的話,比如是128個位元組,一行10個字段的表,100萬行記錄,光rowkey就占了1.2G+是以長度還是不要過長,另外設計,還是按需求來吧。

繼續閱讀