天天看點

opentsdb探索之路——部分設計與實作

  • opentsdb 概覽(overview)
  • opentsdb 存儲細節(Writing)
    • rowkey的設計
    • rowkey的具體實作
    • 壓縮(compaction)
    • 追加模式(appends)
  • opentsdb UID的配置設定(UID Assignment)
  • opentsdb 查詢細節(Reading)
  • rowkey中加salt的情況(Salting)
  • 其他配置(Configuration)
  • http接口(HTTP API)
  • opentsdb連接配接Kerberos認證的HBase(非重點,僅順手記錄于此)
    • 具體操作
  • 寫在後面

基于opentsdb-2.4.0版本,本篇開啟opentsdb探索之路(主要涉及

讀寫特性

以及一些

其他細節

),下一篇将開啟opentsdb優化之路——

性能優化思路與建議

(總結目前痛點問題、優化思路和解決方案,同時也歡迎朋友提出更好的思路與方案)。

注意:閱讀本篇文章應該要對

HBase

有最基本的認識,比如

rowkey

region

store

ColumnFamily

ColumnQualifier

等概念以及

HBase

邏輯結構、實體存儲結構有大緻的認知。

opentsdb 概覽(overview)

opentsdb探索之路——部分設計與實作

上圖取自官方

http://opentsdb.net/overview.html

。其中的

TSD

(對應實際程序名是

TSDMain

)就是

opentsdb

元件。每個執行個體

TSD

都是獨立的。沒有

master

,沒有共享狀态(

shared state

),是以實際生産部署可能會通過

nginx

+

Consul

運作多個

TSD

執行個體以實作

負載均衡

Each TSD uses the open source database HBase or hosted Google Bigtable service to store and retrieve time-series data

我們大多應該還是用

HBase

作為資料存儲。

安裝部署一文中提到過在HBase中建立表結構,這裡先簡單介紹一下這4張表(

table

),随着探究的深入會對

tsdb

tsdb-uid

這兩張表有更深刻的認識,至于

tsdb-meta

tsdb-tree

兩張表不是這裡讨論的重點,簡單了解一下即可。相關文檔:

http://opentsdb.net/docs/build/html/user_guide/backends/index.html

  • tsdb:

    opentsdb

    全部的時序資料都存在這張表中,該表隻有一個名為"t"的列族(

    ColumnFamily

    )。是以這張表的資料非常大,大多情況下讀寫性能瓶頸也就與這張表密切相關,進而優化也可能與它相關。

    rowkey的設計為

    an optional salt, the metric UID, a base timestamp and the UID for tagk/v pairs

    ,即[可選的salt位+metric的UID+小時級别的時間戳+依次有序的tagk、tagv組成的UID鍵值對],如下:
[salt]<metric_uid><timestamp><tagk1><tagv1>[...<tagkN><tagvN>]
           

暫不考慮

salt

位,關于加

salt

下面有章節單獨拿出來看它的設計與實作。來看一個不加

salt

且含有兩個

tag

的時序資料的

rowkey

組成:

00000150E22700000001000001000002000004
'----''------''----''----''----''----'
metric  time   tagk  tagv  tagk  tagv
           

至于

rowkey

為什麼要這樣設計以及具體實作,後面詳細介紹,這裡先有個基本認知。

  • tsdb-uid: 為了減少

    rowkey

    的長度,

    opentsdb

    會将

    metric

    tagk

    tagv

    都映射成

    UID

    ,映射是雙向的,比如說既可以根據

    tagk

    找到對應的

    UID

    ,也可以根據

    UID

    直接找到相應的

    tagk

    。而這些

    映射關系

    就記錄在

    tsdb-uid

    表中。該表有兩個

    ColumnFamily

    ,分别是

    name

    id

    ,另外這兩個

    ColumnFamily

    下都有三列,分别是

    metric

    tagk

    tagv

    。如下圖所示:
RowKey id:metric id:tagk id:tagv name:metric name:tagk name:tagv
metric01 0x01
metric02 0x02
tagk01 0x01
tagv01 0x01
tagv02 0x02
0x01 metric01
0x01 tagk01
0x01 tagv01
0x02 metric02
0x02 tagv02

從上面可以看出,

metric

tagk

tagv

三種類型的

UID

映射互不幹擾,這也就使得

0x01

這個

UID

在不同類型中有着不同的含義。後面會從源碼角度講一下uid大緻的配置設定。

  • tsdb-meta: 在完成時序資料的寫入之後,會根據目前

    opentsdb

    執行個體的配置決定是否為相關時序記錄中繼資料資訊。看一下

    opentsdb.conf

    配置檔案中

    tsd.core.meta.enable_tsuid_tracking

    配置項即可。

    tsd.core.meta.enable_tsuid_tracking

    (預設

    false

    ): 如果開啟該選項,每次寫入一個

    DataPoint

    (時序資料)的同時還會向

    tsdb-meta

    表中寫入

    rowkey

    為該時序資料的

    tsuid

    (下面會講到它,即完整的

    rowkey

    除去

    salt

    timestamp

    後的資料),

    value

    為1的記錄。這樣,每個點就對應兩次

    HBase

    的寫入,一定程度上加大了HBase叢集的壓力。相關代碼見

    TSDB#storeIntoDB()#WriteCB#call()

// if the meta cache plugin is instantiated then tracking goes through it
if (meta_cache != null) {
  meta_cache.increment(tsuid);
} else {
// tsd.core.meta.enable_tsuid_tracking
  if (config.enable_tsuid_tracking()) {
  // tsd.core.meta.enable_realtime_ts
    if (config.enable_realtime_ts()) {
    // tsd.core.meta.enable_tsuid_incrementing
      if (config.enable_tsuid_incrementing()) {
        TSMeta.incrementAndGetCounter(TSDB.this, tsuid);
      } else {
        TSMeta.storeIfNecessary(TSDB.this, tsuid);
      }
    } else {
    // 寫入rowkey為tsuid,value為1的記錄
      final PutRequest tracking = new PutRequest(meta_table, tsuid,
          TSMeta.FAMILY(), TSMeta.COUNTER_QUALIFIER(), Bytes.fromLong(1));
      client.put(tracking);
    }
  }
}
           
  • tsdb-tree: 作用,可按照樹形層次結構組織時序,就像浏覽檔案系統一樣浏覽時序。相關介紹

    http://opentsdb.net/docs/build/html/user_guide/trees.html

    。這裡就不細說了,有興趣的話看下上面連結中官方介紹的

    Examples

    ,就能秒懂是幹嘛的。

opentsdb 存儲細節(Writing)

相關文檔:

http://opentsdb.net/docs/build/html/user_guide/writing/index.html

rowkey的設計

隻有一個名為"t"的列族

  • 時序資料的

    metric

    tagk

    tagv

    三部分字元串都會被轉成

    UID

    ,這樣再長的字元串在

    rowkey

    中也會由

    UID

    代替,大大縮短了

    rowkey

    的長度
  • rowkey

    中的時序資料的

    timestamp

    并非實際的時序資料時間,是格式化成以

    小時

    為機關的時間戳(所謂的

    base_time

    ),也就是說該

    rowkey

    中的

    base_time

    表示的是該時序資料發生在哪個整點(小時)。每個資料寫入的時候,會用該時序資料實際時間戳相對

    base_time

    的偏移量(

    offset

    )作為

    ColumnQualifier

    寫入。

    結合下面的圖以及之後的代碼,就一目了然。

rowkey t: +1 t: +2 t: +3 t: ... t: +3600
salt+metric_uid+base_time+tagk1+tagv1+...+tagkN+tagvN 10 9 12 ... 8

rowkey的具體實作

在沒有啟用

salt

的情況下,我整理出來生成

rowkey

的代碼如下(注意一下:源碼中并沒有這段代碼哦):

public byte[] generateRowKey(String metricName, long timestamp, Map<String, String> tags) {
        // 擷取metricUid
        byte[] metricUid = tsdb.getUID(UniqueId.UniqueIdType.METRIC, metricName);

        // 将時間戳轉為秒
        if ((timestamp & Const.SECOND_MASK) != 0L) {
            timestamp /= 1000L;
        }

        final long timestamp_offset = timestamp % Const.MAX_TIMESPAN;//3600
        // 提取出時間戳所在的整點(小時)時間
        final long basetime = timestamp - timestamp_offset;

        // 用TreeMap存儲<tagkUid,tagvUid>, 排序用的是memcmp()方法,下面會有介紹
        Map<byte[], byte[]> tagsUidMap = new org.hbase.async.Bytes.ByteMap<>();

        tags.forEach((k, v) -> tagsUidMap.put(
                tsdb.getUID(UniqueId.UniqueIdType.TAGK, k),
                tsdb.getUID(UniqueId.UniqueIdType.TAGV, v)));

        // 不加salt的rowkey,metricUid+整點時間戳+所有的tagK、tagV
        byte[] rowkey = new byte[metricUid.length + Const.TIMESTAMP_BYTES +
                tags.size() * (TSDB.tagk_width() + TSDB.tagv_width())];

        // 下面拷貝相應的資料到rowkey位元組數組中的相應位置
        System.arraycopy(metricUid, 0, rowkey, 0, metricUid.length);
        Bytes.setInt(rowkey, (int) basetime, metricUid.length);

        int startOffset = metricUid.length + Const.TIMESTAMP_BYTES;
        for (Map.Entry<byte[], byte[]> entry : tagsUidMap.entrySet()) {
            System.arraycopy(entry.getKey(), 0, rowkey, startOffset, TSDB.tagk_width());
            startOffset += TSDB.tagk_width();

            System.arraycopy(entry.getValue(), 0, rowkey, startOffset, TSDB.tagv_width());
            startOffset += TSDB.tagv_width();
        }

        return rowkey;
    }
           

其中的

ByteMap

就是

TreeMap

,見

org.hbase.async.Bytes.ByteMap

/** A convenient map keyed with a byte array.  */
public static final class ByteMap<V> extends TreeMap<byte[], V>
  implements Iterable<Map.Entry<byte[], V>> {

  public ByteMap() {
    super(MEMCMP);
  }
}
           

多個

tag

的排序規則是對

tag_id

bytes

進行排序,調用的是

org.hbase.async.Bytes#memcmp(final byte[] a, final byte[] b)

方法,如下

/**
   * {@code memcmp} in Java, hooray.
   * @param a First non-{@code null} byte array to compare.
   * @param b Second non-{@code null} byte array to compare.
   * @return 0 if the two arrays are identical, otherwise the difference
   * between the first two different bytes, otherwise the different between
   * their lengths.
   */
public static int memcmp(final byte[] a, final byte[] b) {
  final int length = Math.min(a.length, b.length);
  if (a == b) {  // Do this after accessing a.length and b.length
    return 0;    // in order to NPE if either a or b is null.
  }
  for (int i = 0; i < length; i++) {
    if (a[i] != b[i]) {
      return (a[i] & 0xFF) - (b[i] & 0xFF);  // "promote" to unsigned.
    }
  }
  return a.length - b.length;
}
           

壓縮(compaction)

相關文檔:

http://opentsdb.net/docs/build/html/user_guide/definitions.html#compaction

An OpenTSDB compaction takes multiple columns in an HBase row and merges them into a single column to reduce disk space. This is not to be confused with HBase compactions where multiple edits to a region are merged into one. OpenTSDB compactions can occur periodically for a TSD after data has been written, or during a query.

tsd.storage.enable_compaction

:是否開啟壓縮(預設為true,開啟壓縮)

為了減少存儲空間(講道理對查詢也有好處),

opentsdb

在寫入時序資料的同時會把

rowkey

放到

ConcurrentSkipListMap

中,一個

daemon

線程不斷檢查

System.currentTimeMillis()/1000-3600-1

之前的資料能否被壓縮,滿足壓縮條件則會把一小時内的時序資料(它們的

rowkey

是相同的)查出來在記憶體壓縮(

compact

)成一列回寫(

write

)到

HBase

中,然後

delete

之前的原始資料。或者是查詢(

query

)操作可能也會觸發

compaction

操作。代碼見

CompactionQueue

final class CompactionQueue extends ConcurrentSkipListMap<byte[], Boolean> {

  public CompactionQueue(final TSDB tsdb) {
      super(new Cmp(tsdb));
      // tsd.storage.enable_appends
      if (tsdb.config.enable_compactions()) {
        // 啟用了壓縮則會啟一個daemon的線程
        startCompactionThread();
      }
  }

  /**
     * Helper to sort the byte arrays in the compaction queue.
     * <p>
     * This comparator sorts things by timestamp first, this way we can find
     * all rows of the same age at once.
     */
    private static final class Cmp implements Comparator<byte[]> {

      /** The position with which the timestamp of metric starts.  */
      private final short timestamp_pos;

      public Cmp(final TSDB tsdb) {
        timestamp_pos = (short) (Const.SALT_WIDTH() + tsdb.metrics.width());
      }

      @Override
      public int compare(final byte[] a, final byte[] b) {
      // 取rowkey中的base_time進行排序
        final int c = Bytes.memcmp(a, b, timestamp_pos, Const.TIMESTAMP_BYTES);
        // If the timestamps are equal, sort according to the entire row key.
        return c != 0 ? c : Bytes.memcmp(a, b);
      }
  }
}
           

看看上面啟動的

daemon

線程在做啥

CompactionQueue#Thrd

/**
   * Background thread to trigger periodic compactions.
   */
  final class Thrd extends Thread {
    public Thrd() {
      super("CompactionThread");
    }

    @Override
    public void run() {
      while (true) {
        final int size = size();
        // 達到最小壓縮門檻值則觸發flush()
          if (size > min_flush_threshold) {
            final int maxflushes = Math.max(min_flush_threshold,
              size * flush_interval * flush_speed / Const.MAX_TIMESPAN);
            final long now = System.currentTimeMillis();
            // 檢查上個整點的資料能否被壓縮
            flush(now / 1000 - Const.MAX_TIMESPAN - 1, maxflushes);
          }
      }
    }
}
           

再看

CompactionQueue#flush(final long cut_off, int maxflushes)

private Deferred<ArrayList<Object>> flush(final long cut_off, int maxflushes) {
    final ArrayList<Deferred<Object>> ds =
      new ArrayList<Deferred<Object>>(Math.min(maxflushes, max_concurrent_flushes));
    int nflushes = 0;
    int seed = (int) (System.nanoTime() % 3);
    for (final byte[] row : this.keySet()) {
      final long base_time = Bytes.getUnsignedInt(row,
          Const.SALT_WIDTH() + metric_width);
      if (base_time > cut_off) {
        // base_time比較靠近目前時間,則直接跳出
        break;
      } else if (nflushes == max_concurrent_flushes) {
        break;
      }
      // 這裡會發向hbase發get請求擷取時序資料,在callback中進行壓縮操作
      ds.add(tsdb.get(row).addCallbacks(compactcb, handle_read_error));
    }
    return group;
}
           

最後看一下

compaction

具體做了啥,見

CompactionQueue#Compaction#compact()

public Deferred<Object> compact() {
  // merge the datapoints, ordered by timestamp and removing duplicates
  final ByteBufferList compacted_qual = new ByteBufferList(tot_values);
  final ByteBufferList compacted_val = new ByteBufferList(tot_values);

  mergeDatapoints(compacted_qual, compacted_val);

  // build the compacted columns
  final KeyValue compact = buildCompactedColumn(compacted_qual, compacted_val);

  final boolean write = updateDeletesCheckForWrite(compact);

  final byte[] key = compact.key();
  
  deleted_cells.addAndGet(to_delete.size());  // We're going to delete this.

  if (write) {
    // 把壓縮後的結果回寫到tsdb表
    Deferred<Object> deferred = tsdb.put(key, compact.qualifier(), compact.value(), compactedKVTimestamp);

    if (!to_delete.isEmpty()) {
      // 壓縮結果寫入成功後 delete查詢出來的cells
      deferred = deferred.addCallbacks(new DeleteCompactedCB(to_delete), handle_write_error);
    }
    return deferred;
  }  
}

// delete compacted cells的回調
private final class DeleteCompactedCB implements Callback<Object, Object> {

  /** What we're going to delete.  */
  private final byte[] key;
  private final byte[][] qualifiers;

  @Override
  public Object call(final Object arg) {
    return tsdb.delete(key, qualifiers).addErrback(handle_delete_error);
  }

  @Override
  public String toString() {
    return "delete compacted cells";
  }
}
           

追蹤整個

compaction

過程,我們不難發現其中多了不少

get

write

delete

請求,資料量非常大的情況下無形給

HBase

帶來不小壓力。留意一下,這裡可能也是我們重點優化的地方。

追加模式(appends)

相關文檔:

http://opentsdb.net/docs/build/html/user_guide/writing/index.html#appends

Also in 2.2, writing to HBase columns via appends is now supported. This can improve both read and write performance in that TSDs will no longer maintain a queue of rows to compact at the end of each hour, thus preventing a massive read and re-write operation in HBase. However due to the way appends operate in HBase, an increase in CPU utilization, store file size and HDFS traffic will occur on the region servers. Make sure to monitor your HBase servers closely.

tsd.storage.enable_appends

:預設是false

在追加模式下,

opentsdb

寫入的時候,會将

rowkey

相同的點的value值寫到一個單獨的

ColumnQualifier

(0x050000)中。是以與之前的直接寫入模式是相容的,這就意味着可以随時啟用或者禁用追加模式。

/** The prefix ID of append columns */
public static final byte APPEND_COLUMN_PREFIX = 0x05;

/** The full column qualifier for append columns */
public static final byte[] APPEND_COLUMN_QUALIFIER = new byte[]{APPEND_COLUMN_PREFIX, 0x00, 0x00};
           

顯然這就是我們想要的壓縮後的效果。少了把已經寫入

HBase

的資料拉過來在

opentsdb

記憶體壓縮,回寫資料,再删除原資料的一系列操作,當然了壓力應該是丢給了

HBase

追加模式會消耗更多的HBase叢集的資源(官方是這麼說的,究竟多大,有待研究),另外本人猜測對于大量高并發的寫入可能有鎖的同步問題,講道理單從瞬間寫入性能考慮,追加模式下的性能應該是不及之前的直接寫入。

opentsdb UID的配置設定(UID Assignment)

相關文檔:

http://opentsdb.net/docs/build/html/user_guide/uids.html#uid

相信到這裡應該已經到

UID

有一定的認識了,使用

UID

大大節省了存儲空間。

Within the storage system there is a counter that is incremented for each metric, tagk and tagv. When you create a new tsdb-uid table, this counter is set to 0 for each type.

很類似

mysql

中的自增主鍵。見

UniqueId#allocateUid()

private Deferred<Long> allocateUid() {
// randomize_id預設是false,兩種方式:一種是随機數uid,另外一種是遞增uid
// tagk和tagv目前無法配置,用的是遞增uid(metric倒是可配 tsd.core.uid.random_metrics預設false)
    if (randomize_id) {
    return Deferred.fromResult(RandomUniqueId.getRandomUID());
    } else { //實際走這裡,會去hbase的tsdb-uid表請求遞增uid
    return client.atomicIncrement(new AtomicIncrementRequest(table,
                                    MAXID_ROW, ID_FAMILY, kind));
    }
}
           

tsdb-uid

表中

rowkey

0x00

cell

中存有目前三種類型的最大

UID

opentsdb探索之路——部分設計與實作

這裡我們看到

metric

tagk

tagv

三種類型的

UID

映射是獨立的。另外,注意兩個與此相關的配置項

  • tsd.core.auto_create_metrics

    :預設為

    false

    ,是否給

    tsdb-uid

    表中不存在的

    metric

    配置設定

    UID

    false

    的情況下,寫入新的

    metric

    時序資料會抛出異常
  • tsd.core.preload_uid_cache

    :預設為

    false

    ,是否程式啟動時就從

    tsdb-uid

    表擷取

    UID

    并緩存在本地,見

    TSDB#TSDB(final HBaseClient client, final Config config)

if (config.getBoolean("tsd.core.preload_uid_cache")) {
    final ByteMap<UniqueId> uid_cache_map = new ByteMap<UniqueId>();
    uid_cache_map.put(METRICS_QUAL.getBytes(CHARSET), metrics);
    uid_cache_map.put(TAG_NAME_QUAL.getBytes(CHARSET), tag_names);
    uid_cache_map.put(TAG_VALUE_QUAL.getBytes(CHARSET), tag_values);
    UniqueId.preloadUidCache(this, uid_cache_map);
}
           

從這裡我們也可以看到使用這種

遞增UID

配置設定方式,先來的

tagk

必然會配置設定到數值較小的

UID

,後來的

tagk

會配置設定到數值較大的

UID

,如此一來結合上文寫入的時候

rowkey

中的

tags

會按照

tagk_uid

的byte數組進行排序,就能得出最先寫入的

tagk

是排在

rowkey

中較為靠前的位置,那麼知道了這種規則,在某些情況下對于查詢優化有沒有幫助呢?

opentsdb 查詢細節(Reading)

相關文檔:

http://opentsdb.net/docs/build/html/user_guide/query/index.html

查詢放在這個地方講是因為我們隻有弄清楚資料是怎麼存的,才會明白如何取。通過前文我們知道寫入的時候

rowkey

中的

tags

會按照

tagk_uid

的byte數組進行排序,那麼同樣從

HBase

讀資料的時候講道理也應該這樣排序是不是。來看

QueryUtil#setDataTableScanFilter()

但是,正常情況下的

scan

(除非查詢的時候設定

explicit_tags

true

),對于

tag

的過濾并不是直接拼在

rowkey

中,而是放在

scanner.setFilter(regex_filter)

final byte[] start_row = new byte[metric_salt_width + Const.TIMESTAMP_BYTES];
final byte[] end_row = new byte[metric_salt_width + Const.TIMESTAMP_BYTES];
scanner.setStartKey(start_row);
scanner.setStopKey(end_row);
 
// 關于regex_filter生成下面有簡單例子
if (!(explicit_tags && enable_fuzzy_filter)) {
    scanner.setFilter(regex_filter);
    return;
}
           

QueryUtil#getRowKeyUIDRegex()

// Generate a regexp for our tags.  Say we have 2 tags: { 0 0 1 0 0 2 }
// and { 4 5 6 9 8 7 }, the regexp will be:
// "^.{7}(?:.{6})*\\Q\000\000\001\000\000\002\\E(?:.{6})*\\Q\004\005\006\011\010\007\\E(?:.{6})*$"
           

官方對查詢設定

explicit_tags

true

的介紹:

http://opentsdb.net/docs/build/html/user_guide/query/filters.html#explicit-tags

意思我已經知道了要查詢的

metric

明确隻有這些

tags

,想查詢的時序資料不會出現其他

tag

,這樣

opentsdb

就會把使用者過濾的

tag

直接拼到

rowkey

中,一定程度上優化了查詢。見代碼

if (explicit_tags && enable_fuzzy_filter) {
    fuzzy_key = new byte[prefix_width + (row_key_literals.size() *
        (name_width + value_width))];
    fuzzy_mask = new byte[prefix_width + (row_key_literals.size() *
        (name_width + value_width))];
    System.arraycopy(scanner.getCurrentKey(), 0, fuzzy_key, 0,
        scanner.getCurrentKey().length);
 
    // 因為已經明确了隻有哪些指定的tag,這個時候才會把tags直接拼到startKey中
    scanner.setStartKey(fuzzy_key);
}
           

explicit_tags

true

的情況下,會用

FuzzyRowFilter

,看一下源碼中的描述

/**
 * FuzzyRowFilter is a server-side fast-forward filter that allows skipping
 * whole range of rows when scanning. The feature is available in HBase
 * 0.94.5 and above.
 * <p>
 * It takes two byte array to match a rowkey, one to hold the fixed value
 * and one to hold a mask indicating which bytes of the rowkey must match the
 * fixed value. The two arrays must have the same length.
 * <p>
 * Bytes in the mask can take two values, 0 meaning that the corresponding byte
 * in the rowkey must match the corresponding fixed byte and 1 meaning that the
 * corresponding byte in the rowkey can take any value.
 * <p>
 * One can combine several {@link FuzzyFilterPair} to match multiple patterns at
 * once.
 * <p>
 * Example :
 * You store logs with this rowkey design :
 *   group(3bytes)timestamp(4bytes)severity(1byte)
 *
 * You want to get all FATAL("5") logs :
 *   * Build a FuzzyFilterPair with
 *     - rowkey     : "????????5"
 *     - fuzzy mask : "111111110"
 * And CRITICAL("4") logs only for web servers :
 *   * Add another FuzzyFilterPair with
 *     - rowkey     : "web????4"
 *     - fuzzy mask : "00011110"
 *
 * @since 1.7
 */
public final class FuzzyRowFilter extends ScanFilter {
// ...
}
           

總結一下就是,如果你明确你要查的資料有哪幾個

tag

,建議查詢的時候指定

explicit_tags

true

,有助于查詢優化。

# Example 1: 
http://host:4242/api/query?start=1h-ago&m=sum:explicit_tags:sys.cpu.system{host=web01}

# Example 2:
http://host:4242/api/query?start=1h-ago&m=sum:explicit_tags:sys.cpu.system{host=*}{dc=*}

# Example 3:
{
    "start":1584408560754,
    "end":1584409460754,
    "msResolution":false,
    "queries":[
        {
            "aggregator":"avg",
            "metric":"metric.test",
            "downsample":"5m-avg",
            "explicitTags":true,
            "filters":[
                {
                    "type":"literal_or",
                    "tagk":"instance",
                    "filter":"total",
                    "groupBy":true
                },
                {
                    "type":"literal_or",
                    "tagk":"ip",
                    "filter":"192.168.1.1",
                    "groupBy":true
                }
            ]
        }
    ]
}
           

關于

tsd.storage.use_otsdb_timestamp

這個配置與

HBase

特性有關。下篇寫優化的時候再講,這裡提出來放在這裡。

TsdbQuery#getScanner(final int salt_bucket)

// tsd.storage.use_otsdb_timestamp
if (tsdb.getConfig().use_otsdb_timestamp()) {
      long stTime = (getScanStartTimeSeconds() * 1000);
      long endTime = end_time == UNSET ? -1 : (getScanEndTimeSeconds() * 1000);
      if (tsdb.getConfig().get_date_tiered_compaction_start() <= stTime &&
          rollup_query == null) {
        // TODO - we could set this for rollups but we also need to write
        // the rollup columns at the proper time.
        scanner.setTimeRange(stTime, endTime);
      }
}
           

rowkey中加salt的情況(Salting)

相關文檔:

http://opentsdb.net/docs/build/html/user_guide/writing/index.html#salting

時序資料的寫入,

寫熱點

是一個不可規避的問題,當某個

metric

下資料點很多時,則該

metric

很容易造成寫入熱點,即往一個

region server

寫,甚至同一個

region

,如果這樣,對這部分資料的讀寫都會落到

HBase

叢集中的一台機器上,無法發揮叢集的處理能力,甚至直接将某個

region server

壓垮。加

salt

就是為了将時序資料的

rowkey

打散,進而配置設定到不同的

region

中,以均衡負載。

When enabled, a configured number of bytes are prepended to each row key. Each metric and combination of tags is then hashed into one "bucket", the ID of which is written to the salt bytes

從2.2開始,

OpenTSDB

采取了允許将

metric

salt

,加

salt

後的變化就是在

rowkey

前會拼上一個桶編号(

bucket index

)。
To

enable salting

you must modify the config file parameter

tsd.storage.salt.width

and optionally

tsd.storage.salt.buckets

. We recommend setting the

salt width

to

1

and determine the number of

buckets

based on a factor of the number of

region servers

in your cluster. Note that at

query

time, the TSD will fire

tsd.storage.salt.buckets

number of

scanners

to fetch data. The proper number of salt buckets must be determined through experimentation as at some point

query performance

may suffer due to having too many scanners open and collating the results. In the future the salt width and buckets may be configurable but we didn't want folks changing settings on accident and losing data.

對上面的描述解釋一下:

tsd.storage.salt.width

rowkey

加多少個byte字首(預設0(即不開啟),如果啟用的話 建議1)

tsd.storage.salt.buckets

:分桶數(預設20,建議根據region servers數确定)

  • 寫入的時候如果啟用了

    salt

    ,則根據

    metric_uid

    +所有

    [tagK+tagV]uid

    組成的byte數組,計算

    hashcode

    值,對

    分桶數

    取模,得出

    salt

RowKey#prefixKeyWithSalt

(注意:取的是關鍵代碼,去除了幹擾資訊)

public static void prefixKeyWithSalt(final byte[] row_key) {
    // tsd.storage.salt.width
    if (Const.SALT_WIDTH() > 0) {
      final int tags_start = Const.SALT_WIDTH() + TSDB.metrics_width() + 
          Const.TIMESTAMP_BYTES;
      
      // we want the metric and tags, not the timestamp
      final byte[] salt_base = 
          new byte[row_key.length - Const.SALT_WIDTH() - Const.TIMESTAMP_BYTES];
      System.arraycopy(row_key, Const.SALT_WIDTH(), salt_base, 0, TSDB.metrics_width());
      System.arraycopy(row_key, tags_start,salt_base, TSDB.metrics_width(), 
          row_key.length - tags_start);
      // 這裡通過對salt_buckets取模得出salt位的數值
      int modulo = Arrays.hashCode(salt_base) % Const.SALT_BUCKETS();// tsd.storage.salt.buckets
    
      final byte[] salt = getSaltBytes(modulo);
      // 填充salt位的byte
      System.arraycopy(salt, 0, row_key, 0, Const.SALT_WIDTH());
    }
}
           
  • 這個時候大多數人就會疑惑了,在

    rowkey

    前加了

    salt

    位,那麼查詢的時候怎麼搞?

    用戶端查詢

    OpenTSDB

    一條資料,

    OpenTSDB

    将這個請求拆成

    分桶數

    個查詢到

    HBase

    ,然後傳回桶數個結果集到

    OpenTSDB

    層做合并。對

    HBase

    并發請求相應的也會桶數倍的擴大。見

    TsdbQuery#findSpans()

if (Const.SALT_WIDTH() > 0) {
    final List<Scanner> scanners = new ArrayList<Scanner>(Const.SALT_BUCKETS());
    for (int i = 0; i < Const.SALT_BUCKETS(); i++) {
      // 建構出等于分桶數大小個scanner
      scanners.add(getScanner(i));
    }
    scan_start_time = DateTime.nanoTime();
    return new SaltScanner(tsdb, metric, scanners, spans, scanner_filters,
        delete, rollup_query, query_stats, query_index, null, 
        max_bytes, max_data_points).scan();
}
           

在每一個

scanner

rowkey

前面填充

bucket index

作為

salt

位,這樣才能去

hbase

scan

到完整的結果,見

QueryUtil#getMetricScanner()

public static Scanner getMetricScanner(final TSDB tsdb, final int salt_bucket, 
      final byte[] metric, final int start, final int stop, 
      final byte[] table, final byte[] family) {
    
    if (Const.SALT_WIDTH() > 0) {
      final byte[] salt = RowKey.getSaltBytes(salt_bucket);
      // 這裡把salt_bucket填充到rowkey中
      System.arraycopy(salt, 0, start_row, 0, Const.SALT_WIDTH());
      System.arraycopy(salt, 0, end_row, 0, Const.SALT_WIDTH());
    }
    return scanner;
}
           

其他配置(Configuration)

相關文檔:

  • opentsdb

    的配置:

    http://opentsdb.net/docs/build/html/user_guide/configuration.html

  • AsyncHBase client

    的配置:

    http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html

opentsdb

使用的

hbase client

http://opentsdb.github.io/asynchbase/

public TSDB(final HBaseClient client, final Config config) {
    this.config = config;
    if (client == null) {
      final org.hbase.async.Config async_config;
      if (config.configLocation() != null && !config.configLocation().isEmpty()) {
        try {
          // AsyncHBase client讀取和opentsdb一樣的檔案
          // 是以 有一些需要設定AsyncHBase client的地方直接寫在opentsdb的配置檔案就能生效
          async_config = new org.hbase.async.Config(config.configLocation());
        } catch (final IOException e) {
          throw new RuntimeException("Failed to read the config file: " +
              config.configLocation(), e);
        }
      } else {
        async_config = new org.hbase.async.Config();
      }
      async_config.overrideConfig("hbase.zookeeper.znode.parent",
          config.getString("tsd.storage.hbase.zk_basedir"));
      async_config.overrideConfig("hbase.zookeeper.quorum",
          config.getString("tsd.storage.hbase.zk_quorum"));
      this.client = new HBaseClient(async_config);
    } else {
      this.client = client;
    }
}
           

性能優化的一方面可能與參數調優有關,有些與啟動參數,作業系統設定等有關,有些參數就是寫在配置檔案的(比如說最大連接配接數、逾時時間等等)

這裡提一下前面沒有講到的與

opentsdb

相關的兩個配置。

  • tsd.query.skip_unresolved_tagvs

    :預設為false,查詢的時候遇到不存在的tagv時候是否跳過,true則跳過,false則抛出異常,個人感覺這個預設false極不友好。

    TagVFilter#resolveTags()#TagVErrback

/**
  * Allows the filter to avoid killing the entire query when we can't resolve
  * a tag value to a UID.
  */
class TagVErrback implements Callback<byte[], Exception> {
  @Override
  public byte[] call(final Exception e) throws Exception {
    if (config.getBoolean("tsd.query.skip_unresolved_tagvs")) {
      LOG.warn("Query tag value not found: " + e.getMessage());
      return null;
    } else {
      // 預設情況下直接抛出異常
      throw e;
    }
  }
}
           
  • AsyncHBase Configuration

    中的

    hbase.rpc.timeout

    :How long, in milliseconds, to wait for a response to an RPC from a region server before failing the RPC with a RpcTimedOutException. This value can be overridden on a per-RPC basis. A value of 0 will not allow RPCs to timeout

http接口(HTTP API)

相關文檔:

http://opentsdb.net/docs/build/html/api_http/index.html

常用:

  • put:

    http://opentsdb.net/docs/build/html/api_http/put.html

  • query:

    http://opentsdb.net/docs/build/html/api_http/query/index.html

  • uid:

    http://opentsdb.net/docs/build/html/api_http/uid/index.html

  • stats:

    http://opentsdb.net/docs/build/html/api_http/stats/index.html

同時我們注意到:

OpenTSDB3.0

相關的工作正在進行中(

work-in-progress

),詳情:

http://opentsdb.net/docs/3x/build/html/index.html

opentsdb連接配接Kerberos認證的HBase(非重點,僅順手記錄于此)

相關文檔:

http://opentsdb.github.io/asynchbase/docs/build/html/authentication.html

http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html

(搜

kerberos

關鍵字)

相關問題讨論:

https://github.com/OpenTSDB/opentsdb/issues/491

參考帶有

Kerberos

認證hbase docker鏡像

Dockerfile

項目:

https://github.com/Knappek/docker-phoenix-secure

該項目中

bootstrap-phoenix.sh

docker-compose.yml

以及

config_files

下的配置檔案很有參考價值

具體操作

  1. 根據實際情況在

    /etc/opentsdb/opentsdb.conf

    配置 末尾添加:
hbase.security.auth.enable=true
hbase.security.authentication=kerberos
hbase.sasl.clientconfig=Client
hbase.kerberos.regionserver.principal=hbase/[email protected]
           
  1. 根據實際情況建立

    hbase-client.jaas

    檔案,檔案内容基本如下樣子
Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    useTicketCache=false
    keyTab="/etc/security/keytabs/hbase.keytab"
    principal="hbase/phoenix.docker.com";
};
           
  1. 修改

    /usr/share/opentsdb/etc/init.d/opentsdb

    檔案,修改啟動參數
# start command的位置(約第78行處)加上 -Djava.security.auth.login.config=hbase-client.jaas檔案路徑
# 注意:如果Zookeeper沒有加Kerberos認證,再加一個參數 -Dzookeeper.sasl.client=false
JVMARGS="-Djava.security.auth.login.config=/.../jaas.conf"
           

重新開機

opentsdb

,如果成功,則能看到如下示例日志:

13:31:55.045 INFO [

ZooKeeperSaslClient

.run] - Client will use

GSSAPI as SASL mechanism

.

13:31:55.062 INFO [Login.getRefreshTime] - TGT valid starting at: Fri Apr 03 13:31:54 CST 2020

13:31:55.062 INFO [Login.getRefreshTime] - TGT expires: Sat Apr 04 13:31:54 CST 2020

13:31:55.255 INFO [

KerberosClientAuthProvider

.run] - Client will use

GSSAPI as SASL mechanism

.

13:31:55.269 INFO [RegionClient.channelConnected] - Initialized security helper: org.hbase.async.SecureRpcHelper96@6471f1e for region client: RegionClient@63709091(chan=null, #pending_rpcs=2, #batched=0, #rpcs_inflight=0)

13:31:55.276 INFO [SecureRpcHelper96.handleResponse] -

SASL client context established

. Negotiated QoP: auth on for: RegionClient@63709091(chan=null, #pending_rpcs=2, #batched=0, #rpcs_inflight=0)

寫在後面

閱讀、探索的過程很累,遇到不太了解的地方又會很困惑,但柳暗花明又一村,淩絕頂一覽衆山小的喜悅卻難以言表。另外,整理的過程也挺煩人,既然花時間整理了,我盡量讓感興趣的讀者能從中有一絲收獲。當然了,整理的過程也鍛煉了我學習知識、解決問題的思路與能力。由于本人能力之有限、了解之不透徹,文中如有錯誤的了解、不恰當的描述,衷心希望朋友提出一起讨論!