天天看點

InfluxDB 2.0 原理與應用實踐

點選

上方藍字

關注我們~

什麼是時序資料庫

時序資料庫,全稱時間序列資料庫(Time Series Database,TSDB),用于存儲大量基于時間的資料,時序資料(Time Series Data)指的是一系列基于時間的資料,例如CPU使用率,北京的房價變化趨勢,某一地區的溫度變化等。

時序資料庫支援時序資料的快速寫入、持久化,多元度查詢、聚合等操作,同時可以記錄所有的曆史資料,查詢時将時間作為資料的過濾條件。

時序資料的使用場景廣泛,包括DevOps監控,應用程式名額,IoT傳感器資料,實時動态資料分析等場景。

1

初識InfluxDB

InfluxDB是時序資料庫中應用比較廣泛的一種,在DB-Engines TSDB rank中位居首位,可見InfluxDB在網際網路的受歡迎程度是非常高的。下圖是截止到2021年8月時序資料庫的排名情況。

InfluxDB 2.0 原理與應用實踐

它是go語言開發的資料庫,InfluxDB自釋出至今,已經有兩個版本,InfluxDB1.x系列提供一種類似SQL的查詢語言InfluxQL,用于資料互動。2019年1月新推出的influxDB2.0 alpha版本,主推全新的查詢語言Flux,支援TICK架構。在 2020 年底推出InfluxDB 2.0 正式版本,該版本又分為InfluxDB Cloud 和 InfluxDB OSS兩個系列。

時序資料庫與我們熟悉的關系型資料庫有所不同,首先需要了解一下InfluxDB中字段的含義,如下圖所示:

InfluxDB 2.0 原理與應用實踐

2

TICK架構分析與各元件功能介紹

TICK架構 是 InfluxData 平台的元件的集合首字母縮寫,該集合包括Telegraf、InfluxDB、Chronograf和 Kapacitor。TICK架構以及各元件分工情況如圖所示:

InfluxDB 2.0 原理與應用實踐

除了上圖可視化管理工具Chronograf外,還有一種可視化工具Grafana,它也是用于大規模名額資料的可視化展示,提供包括折線圖,餅圖,儀表盤等多種監控資料可視化UI,若應用過程中考慮到擴充性問題,也會使用Grafana代替Chronograf。

3

InfluxDB的特點

● 資料寫入:

①. 高并發高吞吐,可持續的資料寫入。

②. 寫多讀少,時序資料95%以上都是寫操作,例如在監控系統資料的時候,監控資料特别多,但是通常隻會關注幾個關鍵名額。

③. 資料實時寫入,不支援資料更新,但是可以人為更新修改。

● 資料分析與查詢:

①. 資料查詢是按照時間段讀取,例如1小時,1分鐘,給出具體時間範圍。

②. 最近的資料讀取率高,越舊的資料讀取率越低。

③. 多種精度查詢和多種次元分析。

● 資料存儲:

①. 存儲資料規模大的資料,監控資料的資料大多數情況下都是TB或者PB級。

②. 資料存放具有時效性,InfluxDB提供了儲存政策,可以認為是資料的保存期限,超過周期範圍,就可以認為資料失效,需要回收。節約存儲成本,清理低價值的資料。

4

InfluxDB存儲原理

InfluxDB的存儲結構樹是時間結構合并樹(Time-Structured Merge Tree,TSM),它是由日志結構化合并樹(Log-Structured Merge Tree,LSM),根據實際需求變化而來的。

**①. LSM樹 **

LSM樹包含三部分:Memtable,Immutable和SSTable。MemTable是記憶體中的資料結構,用于儲存最近産生的資料,并按照Key有序地組織資料。記憶體并不是可靠存儲,若斷電就會丢失資料,是以通常會使用預寫式日志(Write-ahead logging,WAL)的方式來保證資料的可靠性。

InfluxDB 2.0 原理與應用實踐

②. TSM存儲引擎

TSM存儲引擎主要包括四部分:Cache,WAL,TSM File,Compactor。下圖中shard與TSM引擎主要部分放在一起,但其實shard在是TSM存儲引擎之上的一個概念。在 InfluxDB 中按照資料産生的時間範圍,會建立不同的shard分組,每個 shard 都有本身的 cache、wal、tsm file 以及 compactor。

InfluxDB 2.0 原理與應用實踐

InfluxDB 2.0應用實踐

準備工作

安裝虛拟機,建立Ubuntu環境,安裝Jmeter5.4版本。

1. 安裝influxdb2.0

wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.7-amd64.deb      
sudo dpkg -i influxdb2-2.0.7-amd64.deb      

③. 啟動influxdb服務

sudo service influxdb start      

④. 關閉influxdb服務

sudo service influxdb stop      
sudo service influxdb status      

⑥. 解除安裝influxdb

sudo dbkg –r influxdb2
      

2. 安裝telegraf

wget https://dl.influxdata.com/telegraf/releases/telegraf_1.19.0~rc1-1_amd64.deb      

②. 安裝telegraf

sudo dpkg -i telegraf_1.19.0~rc1-1_amd64.deb      

③. 啟動telegraf

systemctl start      
systemctl status      

應用實踐

1. Telegraf+InfluxDB2.x系統可視化監控

Influxdb2.0 已經內建了圖形界面,簡單的可視化監控可以無需使用 Grafana,本文例子暫時沒有使用Grafana。

①. 通路influxdb2

首先在終端鍵入啟動influxdb服務的指令,然後在虛拟機的火狐浏覽器位址欄輸入 ​​http://localhost:8086,點選Enter鍵進入influxDB浏覽器通路首頁。首次登入會設定使用者名,密碼,點選sign​​ in完成登入。

InfluxDB 2.0 原理與應用實踐

②. 建立bucket,存儲資料

依次點選 Data → Bucket → Create Bucket,在彈框中輸入bucket名字,然後點選建立,完成bucket的建立,此處建立了一個name為demo_bucket的 bucket。

InfluxDB 2.0 原理與應用實踐

③. 建立一個telegraf收集資料

依次點選Data → Telegraf → Create Configuration → continue。在Bucket選擇剛才建立的 “demo_bucket“ ,監控目标選擇系統System,然後點選continue。

InfluxDB 2.0 原理與應用實踐

點選 continue 之後,可以知道要系統資料包括哪些,Telegraf Configuration name需要填充,此處填寫為system_data,然後點選create and verity,完成這一操作。

InfluxDB 2.0 原理與應用實踐

④. 使用Telegraf搜集資料

點選 create and verity 之後,并不能馬上實作資料搜集,需要在目前頁面擷取API Token和Telegraf的啟動指令,依次複制,并在終端執行指令。

InfluxDB 2.0 原理與應用實踐
InfluxDB 2.0 原理與應用實踐

⑤. 系統資料可視化呈現與儲存

點選Export,在From這裡選擇剛才建立的bucket:demo_bucket,在第一個filter那裡會在選擇bucket之後,預設出現_measurement,選擇cpu,相當于關系型資料庫中的表,在第2個Filter處選擇可視化的字段。例子選擇的是usage_system(系統用量百分比),相關的Filter標明之後,點選submit,就會呈現出可視化的結果,這裡預設的是Graph,出現如圖所示的曲線圖。在圖中黃色線框部分可以選擇動态頁面重新整理時間,在綠色線框部分可以選擇資料開始的時間。

InfluxDB 2.0 原理與應用實踐

點選Save會儲存剛才的圖,在選擇target dashboard,可以選擇已經存在的system ,這裡将可視化圖命名為usage_system_graph。

InfluxDB 2.0 原理與應用實踐
InfluxDB 2.0 原理與應用實踐

2. Jmeter+InfluxDB2.0搭建性能監控環境

Jmeter相對于InfluxDB來說,屬于外部系統,是以需要在InfluxDB中生成一個token用于外部時序資料寫入資料庫。

在InfluxDB中選擇 Data → Tokens, 點選 Generate → Read/Write Token ,完成token建立。

InfluxDB 2.0 原理與應用實踐

通過輕按兩下jmeter.bat進入Jmeter工作界面,建立一個線程組(Thread Group),線上程組中點選Loop Count Infinite,然後依次添加Java請求(Java Request),檢視結果樹(View Results Tree),彙總報告(Summary Report),後端監聽器(BackendListener)。主要是在後端監聽器填寫相關參數,influxdbUrl要修改host,填寫建立的組織org和bucket,influxdbToken就是上面建立的Jmeter_Token複制進去。若可以在influxdb中看到Java請求的資料結果,說明Jmeter與InfluxDB連接配接成功。

InfluxDB 2.0 原理與應用實踐
InfluxDB 2.0 原理與應用實踐

3. Flux在python3中的查詢應用

Flux查詢語言是InfluxDB2.0主推的查詢語言,提供FluxTable,CSV,DataFrame和Raw Data四種查詢API,但是在時間和日期上,較InfluxQL相對局限,僅支援RFC3339格式,預設值是當天零點。

下圖中紅色框圖則是曲線圖的Flux查詢語句,from表示資料源所在的bucket,|>表示管道連接配接符,range表示所查詢資料所在的時間範圍,其中 v.timeRangeStart 和 v.timeRangeStop 代表時間區間下拉框選中的時間段,filter是對range範圍内的資料進行過濾,filter中的參數fn,是基于列和屬性過濾資料邏輯的匿名函數,yield隻在同一個Flux中出現多查詢的時候才會出現,yield函數将過濾後的表作為Flux查詢結果輸出。

InfluxDB 2.0 原理與應用實踐

①. Flux語言實作系統資料的查詢

首先通過InfluxDBClient連接配接InfluxDB資料庫,InfluxDBClient中需要提供url,token的作用是保證外部系統可以通路資料,不同的bucket有不同的token,org是資料存儲所在的組織,在首次登入的時候完成建立。

import pandas as pd
from influxdb_client import InfluxDBClient
# 設定
my_token = "PePwz1xFzM_edpm6NB0DyR2B04XWqDNQEFPmp9i8hxVW8DmlTTSzywrTyh_p5uv_k1h0Qdxy3U99J2S7TV9X7A=="
client = InfluxDBClient(url='http://192.168.79.147:8086', token=my_token, org='org_demo')
query_api = client.query_api()
# Flux查詢語句
mem_query = '''
from(bucket: "demo_bucket")
  |> range(start: -5w, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "mem")
  |> filter(fn: (r) => r["_field"] == "available")
  |> filter(fn: (r) => r["host"] == "ubuntu")
  |> yield(name: "mean")
'''
table = query_api.query_data_frame(mem_query, "org_demo")
# 提取查詢結果的部分字段值
mem_example = pd.DataFrame(table, columns=['_start', '_value', '_field', 'host'])
print(mem_example.head(5))
client.close()      
InfluxDB 2.0 原理與應用實踐

②. Flux語言實作資料插入

from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS

my_token="ENL3dUfGzTBFGcHzJ8iCIfbKF0fF7C7-P5PDkGpDWLzvvHuP2v9tKVgeZAFqV3y8sLXJt8alK0e-jicHVDgOEg=="
client = InfluxDBClient(url='http://192.168.79.147:8086', token=my_token, org='org_demo')

write_api = client.write_api(write_options=SYNCHRONOUS)
"""
資料準備
"""
_point1 = Point("_measurement").tag("location", "Beijing").field("temperature", 36.0)
_point2 = Point("_measurement").tag("location", "Shanghai").field("temperature", 32.0)
 write_api.write(bucket="python_bucket", record=[_point1, _point2])      

結果: