天天看點

Phoenix映射HBase時間戳的一種實作HBase使用者福利HBase技術交流

HBase使用者福利

新使用者9.9元即可使用6個月雲資料庫HBase,更有低至1元包年的入門規格供廣大HBase愛好者學習研究,更多内容

請參考連結

官方實作

Apache Phoenix從4.6版本開始,提供了ROW_TIMESTAMP标簽,來映射HBase的原生時間戳。但使用起來有以下限制:

  • 隻有主鍵中的TIME, DATE, TIMESTAMP, BIGINT, UNSIGNED_LONG類型的字段才能設定成ROW_TIMESTAMP
  • 隻能有一個主鍵列能被設定成ROW_TIMESTAMP
  • ROW_TIMESTAMP标志的字段不能為null值
  • 隻有在建表的時候,某一列才能被設定成ROW_TIMESTAMP
  • ROW_TIMESTAMP标志的列不能為負數

除了上面使用上的限制,還有應用場景的限制。根據上面的描述,ROW_TIMESTAMP字段有以下幾種形式。

  • 業務主鍵在前
    Phoenix映射HBase時間戳的一種實作HBase使用者福利HBase技術交流
  • ROW_TIMESTAMP字段在前
    Phoenix映射HBase時間戳的一種實作HBase使用者福利HBase技術交流
  • 隻有ROW_TIMESTAMP字段
    Phoenix映射HBase時間戳的一種實作HBase使用者福利HBase技術交流

我們來看下各個形式的優劣

  • 業務主鍵在前。無論ROW_TIMESTAMP字段如何取值,都可以通過業務主鍵1進行單點查詢,即在知道業務主鍵1的情況下是可以通過字首精确快速的查詢的。
  • ROW_TIMESTAMP字段在前。如果不知道某條資料對應的ROW_TIMESTAMP字段值,則無法通過主鍵查詢;如果通過業務主鍵可以映射ROW_TIMESTAMP字段值,雖然可以通過主鍵查詢,但該字段将無法修改。因為修改就意味着目前記錄删除,重新插入。
  • 隻有ROW_TIMESTAMP字段。在一些時序資料比較常見,也就是沒有業務主鍵,不會也不便通過主鍵查詢,一般都是範圍掃描。

其實官方提供的ROW_TIMESTAMP字段實作,最大的問題就是原有記錄不能更新,隻能删除、然後插入,這就極大的限制了它的應用場景。

我們的實作

背景

我們用Phoenix存儲了所有需要實時查詢的表,寫Phoenix-Sql查詢目前最新的資料。基本架構如下:

Phoenix映射HBase時間戳的一種實作HBase使用者福利HBase技術交流

問題

正常情況下,實時抽取MySQL的binlog,寫入Phoenix;每天會有Hive批量抽取MySQL資料,對Phoenix進行校驗、補數。

實時寫入時,需要考慮binlog更新的順序,至少要做到MySQL原資料每行更新的順序;離線補數時,需要考慮是否會覆寫實時寫入的資料。

實時寫入

實時寫入的順序,大都由CDC(canal、debezium等)控制。針對每一條資料的更新,CDC都會對“表名+主鍵”進行Hash,路由到Kafka對應的分區。其實針對某個表某條記錄的更新,消費時是有嚴格的順序的。但如果後期更改kafka分區個數,就會稍微麻煩點。如果不停服更新,就意味着同一條記錄的不同更新,分布在不同的分區,也就不能保證嚴格的順序,插入Phoenix表就會出現覆寫的問題。如果停服更新,就需要先停掉CDC,等消費者把資料消費完,然後再調整分區,啟動消費者,這樣才能避免互相覆寫的問題。

實時寫入還有一個潛在的問題,那就是資料丢失。不管是網絡抖動,還是元件的健壯性,都會造成資料丢失。一旦發生資料丢失,就需要校驗、補數的邏輯。

離線補數

離線補數就是為了防止出現實時資料丢失的問題。離線補數包含校驗和補數兩個步驟。

  • 校驗。拿目前全量或增量資料,與Phoenix表中相同主鍵的資料進行比對,确定Phoenix是否丢數或丢失更新。
    • 丢數就是Phoenix應該有的資料卻沒有
    • 丢失更新就是Phoenix的資料不是最新的
  • 補數。根據上一步驟計算的丢失的資料或更新,寫入Phoenix

離線補數看似完美,但最大的問題就是,校驗和補數是兩個步驟,也就是說不在一個事務裡面。有可能某條資料在校驗階段,的确是丢失的,但在校驗之後、補數之前,該條資料又被寫到Phoenix表了,那麼在補數之後,該資料又被更新成舊資料了。

解決方案

細心的讀者會發現,使用官方提供的ROW_TIMESTAMP是無法很好的解決資料亂序覆寫的問題的。

那麼,究竟該怎麼辦呢?有沒有一種方案能完美的解決上面的問題呢?下面是我解決這個問題的思路和具體實作。

思路

熟悉HBase的讀者一定知道,HBase插入或更新資料的時候是可以指定時間戳(版本号)的,而且HBase查詢時預設顯示時間戳最大的資料。那如果Phoenix在根據主鍵寫入資料時,能把該條資料的更新時間寫入HBase的時間戳字段,是不是就能解決互相覆寫的問題了呢?

的确能。

其實每一條更新都是資料的一個版本。如果寫入時能指定時間戳,就意味着指定了資料的版本,無論每個更新到達的順序是怎樣的,Phoenix讀取時都會讀取最新的資料。

如果能實作,那麼Kafka重新設定分區個數和離線補數将不再需要考慮覆寫的問題。

但Phoenix目前并沒有實作上面邏輯的機制,我們需要對其進行簡單的更新。

實作方案

其實在Phoenix官方實作中,有一個CurrentSCN屬性,它可以控制每一次DDL、DML、QUERY的時間戳的,也就是在插入或更新時,會根據CurrentSCN的值設定目前資料對應的HBase的時間戳。但很不幸,它隻能控制每一次commit的資料,也就是無法精确控制每一條資料的時間戳。當然了,如果每一條資料Upsert時都設定CurrentSCN,然後commit也是可以解決問題的,但這就無法進行批量送出,會一定程度的影響性能。

其實我在實作時也是參考了CurrentSCN屬性的原理。

經過分析,我找到了MutationState類的generateMutations方法的下面一段代碼。

PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);           

上面的代碼其實是建立了一條資料,後續的upset的資料就是由此而來。根據timestampToUse命名可以猜想,它就是該條資料的時間戳。

/**
     * Creates a new row at the specified timestamp using the key
     * for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])}
     * and the optional key values specified using values.
     * @param ts the timestamp that the key value will have when committed
     * @param key the row key of the key value
     * @param hasOnDupKey true if row has an ON DUPLICATE KEY clause and false otherwise.
     * @param values the optional key values
     * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
     * generate the Row to send to the HBase server.
     * @throws ConstraintViolationException if row data violates schema
     * constraint
     */
    PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, boolean hasOnDupKey, byte[]... values);           

由newRow的描述可以确定我們的猜想,timestampToUse就是目前資料的時間戳。

根據調用鍊,我們找到了timestampToUse指派最近的地方:UpsertCompiler.setValues方法,裡面有一個RowTimestampColInfo類型的rowTsColInfo字段。其實還是找到timestampToUse最初的地方,也就是擷取CurrentSCN的代碼段,但考慮到不對原有的CurrentSCN功能過多幹涉,我們選擇優化UpsertCompiler.setValues方法。下面是改造後的代碼片段:

for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
            byte[] value = values[j];
            PColumn column = table.getColumns().get(columnIndexes[i]);
            if (SchemaUtil.isPKColumn(column)) {
                pkValues[pkSlotIndex[i]] = value;
                if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) {
                    if (!useServerTimestamp) {
                        PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
                        rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
                        if (rowTimestamp < 0) {
                            throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero");
                        }
                        rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
                    } 
                }
            } else {
                columnValues.put(column, value);
                columnValueSize += (column.getEstimatedSize() + value.length);
            }
            if(column.getDataType().getSqlTypeName().equals(PRowts.INSTANCE.getSqlTypeName()) && rowTimestamp == null){
                rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, column.getSortOrder());
                if (rowTimestamp < 0) {
                    throw new IllegalDataException("Value of a column designated as ROW_TS cannot be less than zero");
                }
                rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
            }
        }           

在處理每行資料每個字段值的時候,判斷目前字段類型是否為PRowts類型,如果是,則根據該值建立RowTimestampColInfo。這樣就達到了根據資料改變HBase時間戳的目的。

考慮到快速、簡單的實作PRowts類型,我們選擇将PRowts設定為Long類型的别名,其實就是根據PLong類建立PRowts,二者的邏輯完全一緻。隻不過個别參數名稱不同。

下面是PRowts的預設構造函數。

private PRowts() {
        super("ROW_TS", 21, Long.class, new PLong.LongCodec(), 48);
    }
           

至此,我們就實作了将資料的時間戳映射到HBase的時間戳的功能。簡單來說分為兩步:

  1. 新增PRowts類型。建立表時,指定某個字段為PRowts,該字段原始類型必須是long;或者修改字段的類型為PRowts。
  2. 根據資料構造HBase的Put指令時,将PRowts的值寫入row timestamp

實作過程看似簡單,但作者還是花了很大的精力閱讀、梳理Phoenix的源碼的,隻有在了解的基礎上才能進行改造、更新。

當然,限于篇幅還是有很多細節沒有解釋的,而且也不一定選擇改造UpsertCompiler.setValues,讀者可以根據實際情況自行實作。另外也可以擴充PRowts,使其支援其他時間類型的資料,比如TIME、DATE、TIMESTAMP、BIGINT。

HBase技術交流

歡迎加群進一步交流溝通:

Phoenix映射HBase時間戳的一種實作HBase使用者福利HBase技術交流
Phoenix映射HBase時間戳的一種實作HBase使用者福利HBase技術交流