天天看點

使用阿裡雲InfluxDB®和Spark Streaming實時處理時序資料

本文重點介紹怎樣利用 阿裡雲InfluxDB® 和spark structured streaming來實時計算、存儲和可視化資料。下面将介紹如何購買和初始化阿裡雲InfluxDB®,擴充spark foreach writer,以及設計阿裡雲InfluxDB®資料庫時需要注意的事項。

使用阿裡雲InfluxDB®和Spark Streaming實時處理時序資料
在大資料進行中,一個主要的趨勢是人們希望看到metric是如何随着時間變化發展。這使得管理和處理時序資料(數值随時間變化的資料)成為資料科學家非常重要的研究方向。目前,已經有非常多的時序處理資料庫産品,如OpenTSDB,TimeScaleDB,InfluxDB以及Druid等。InfluxDB因為完整的生态、類SQL的查詢語言以及簡單快捷的布署而非常受使用者喜愛,居于DBEngine時序資料排列首位。阿裡雲已經将其進行開源托管,并且完善了TIG(Telegraf/InfluxDB/Grafana)生态,即将推出托管的Kapacitor流處理報警元件。

關于時序資料的一些重要概念和如何購買阿裡雲InfluxDB®可以參考之前的文章<

阿裡雲InfluxDB®教你玩轉A股資料

>和

官方文檔

。這裡補充一下阿裡雲InfluxDB®提供的執行個體規格和管理帳号的建立。

使用阿裡雲InfluxDB®和Spark Streaming實時處理時序資料

目前,阿裡雲InfluxDB®大緻提供2C8G/4C16G/8C32G/16C64G/32C128G/64C256G等大緻6種規格,每種規格的讀寫能力參考如上圖所示。阿裡雲InfluxDB®開放了開源版的幾乎全部功能,使用者可以在控制台建立管理者帳号,該帳号可以通過用戶端和SDK進行所有的操作。

使用阿裡雲InfluxDB®和Spark Streaming實時處理時序資料

Writing Data From Spark

Spark是目前大資料處理領域中最流行、最高效的開源工具,通過spark structured streaming寫資料到InfluxDB的開源擴充卡主要有

chronicler

reactive-influx

。chronicler與reactive-influx的差別是,在寫入資料點之前,chronicler必須要将資料格式轉換成influxdb行協定,在處理大量field字段和字元串值時會變得相當棘手,相較而言reactive-influx比較友善。

在sbt項目中引入reactive-influx:

libraryDependencies ++= Seq(
"com.pygmalios" % "reactiveinflux-spark_2.11" % "1.4.0.10.0.5.1",
"com.typesafe.netty" % "netty-http-pipelining" % "1.1.4"
)           

InfluxDB entry 配置,其中阿裡雲InfluxDB®内網和公網的URL可以在控制台上找到:

reactiveinflux {
  url = "ts-xxxxx.influxdata.rds.aliyuncs.com:3242/"
  spark {
    batchSize = 1000 // No of records to be send in each batch
  }
}           

擴充spark foreach writer, enable spark stuctured streaming 向阿裡雲InfluxDB®寫資料的僞代碼如下:

import com.pygmalios.reactiveinflux._
import com.pygmalios.reactiveinflux.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import com.pygmalios.reactiveinflux.{ReactiveInfluxConfig, ReactiveInfluxDbName}
import com.pygmalios.reactiveinflux.sync.{SyncReactiveInflux, SyncReactiveInfluxDb}
import scala.concurrent.duration._
class influxDBSink(dbName: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
        
    var  db:SyncReactiveInfluxDb = _
    implicit val awaitAtMost = 1.second
        
    // Define the database connection here
    def open(partitionId: Long, version: Long): Boolean = {
    val syncReactiveInflux =
    SyncReactiveInflux(ReactiveInfluxConfig(None))
    db = syncReactiveInflux.database(dbName);
    db.create() // create the database 
    true
  }
  
    // Write the process logic, and database commit code here
    def process(value: org.apache.spark.sql.Row): Unit = {
    val point = Point(
      time = time,  // system or event time 
      measurement = "measurement1",
      tags = Map(
        "t1" -> "A", 
        "t2" -> "B"
      ),
      fields = Map(
        "f1" -> 10.3, // BigDecimal field
        "f2" -> "x",  // String field
        "f3" -> -1,   // Long field
        "f4" -> true) // Boolean field
    )
    
    db.write(point)
  }
  
  // Close connection here
  def close(errorOrNull: Throwable): Unit = {
  }
}           

引入Writer:

val influxWriter = new influxDBSink("dbName")
val influxQuery = ifIndicatorData
                                    .writeStream
                                    .foreach(influxWriter)
                                    .outputMode("append")
                                    .start()           

可視化

資料寫入InfluxDB之後,便可以利用各種工具進行資料可視化,如Grafana,Chronograf等。一個簡單的可視化展示如下:

使用阿裡雲InfluxDB®和Spark Streaming實時處理時序資料

目前阿裡雲InfluxDB®已經自帶Grafana資料可視化,使用者隻需要在控制台一鍵開通既可,具體可以參考<

5分鐘快速完成監控系統搭建之實踐篇

>

總結

目前InfluxDB已經在阿裡雲完全托管,被使用者廣泛使用。随着商業化時間的發展,我們在提高穩定性和性能的同時,功能也一步步豐富起來。目前已經提供了TIG(Telegraf/InfluxDB/Grafana)生态,下一步将完全相容TICK(Telegraf/InfluxDB/Chorograf/Kapacitor)生态。覆寫的業務場景包括DevOps監控、車聯網、智慧交通、金融和IOT傳感器資料采集,歡迎大家試用并提供意見。

阿裡雲InfluxDB®為使用者提供7*24小時服務,歡迎加入下面的釘釘群咨詢。

使用阿裡雲InfluxDB®和Spark Streaming實時處理時序資料

參考文獻

  1. Processing Time Series Data in Real-Time with InfluxDB and Structured Streaming
  2. chronicler-spark
  3. reactiveinflux

繼續閱讀