實時統計廣告點選數:Spark StructuredStreaming + Redis Stream
業務場景介紹
某廣告公司在網頁上投遞動态圖檔廣告,廣告的展現形式是根據熱點圖檔動态生成的。為了收入的最大化,需要統計每個廣告的點選數來決定哪些廣告可以投放的更長時間,哪些需要及時更換。大部分的廣告生命周期很短,實時擷取廣告的點選數可以讓我們快速确定哪些廣告對業務是關鍵的。是以我們理想的解決方案是有流處理資料的能力,可以統計所有廣告的點選量以及統計實時的點選量。
業務資料流
來看下我們業務資料鍊路。
廣告點選資料通過手機或者電腦的網頁傳遞到“資料提取”,提取後的資料經過“資料處理”計算實時的點選數,最後存儲到資料庫,使用“資料查詢”用于統計分析,統計每個廣告的點選總數。
根據我們的資料特點,整個資料鍊路的資料輸入輸出如下:
輸入
針對每個點選事件我們使用asset id以及cost 兩個字段來表示一個廣告資訊,例如:
asset [asset id] cost [actual cost]
asset aksh1hf98qwdst9q7 cost 39
asset aksh1hf98qwdst9q8 cost 19
輸出
經過上圖中步驟2:資料處理後,我們把結果集存儲到一個資料表中,資料表可以用于上圖步驟3使用Sql查詢,例如:
select asset, count from clicks order by count desc
asset count
----------------- -----
aksh1hf98qwdst9q7 2392
aksh1hf98qwdst9q8 2010
aksh1hf98qwdst9q6 1938
解決方案
基于以上訴求選擇StructuredStreaming + Redis Stream作為解決方案。先介紹下方案中涉及到的元件。
- Spark StructuredStreaming是Spark在2.0後推出的基于Spark SQL上的一種實時處理流資料的架構。處理時延可達毫秒級别。
- Redis Stream是在Redis 5.0後引入的一種新的資料結構,可高速收集、存儲和分布式處理資料,處理時延可達亞毫秒級别。
- Spark-Redis 連接配接器提供了Spark對接Redis的橋梁。通過Spark-Redis 連接配接器, StructuredStreaming可以使用Redis Stream作為資料源,經過Spark處理後資料再寫回Redis。
資料處理流
現在讓我們看下如何使用StructuredStreaming + Redis Stream
通過上圖可以看到點選資料首先存儲到Redis Stream,然後通過StructuredStreaming消費資料、處理聚合資料,再把處理的結果入庫到Redis,最後通過Spark Sql查詢Redis進行統計分析。下面分别看下每個步驟:
資料提取:
Redis Stream是Redis内置的資料結構,具備每秒百萬級别的讀寫能力,另外存儲的資料可以根據時間自動排序。Spark-Redis連接配接器支援使用Redis Stream作為資料源,非常适用這個場景,把Redis Stream資料對接到Spark 引擎。
資料處理:
Spark的StructuredStreaming 非常适合此場景的資料處理部分,Spark-Redis連接配接器可以擷取Redis Stream的資料轉換成Spark的DataFrames。在StructuredStreaming處理流資料的過程中,可以對微批次資料或者整體資料進行查詢。資料的處理結果可以通過自定義的“writer”輸出到不同的目的地,本場景中我們直接把資料輸出到Redis的Hash資料結構。
資料查詢:
Spark-Redis連接配接器可以把Redis的資料結構映射成Spark的DataFrames,然後我們把DataFrames建立成一個臨時表,表的字段映射Redis的Hash資料結構。借助Redis的亞毫米級的延遲,使用Spark-SQL進行實時的資料查詢。
開發步驟
通過下面執行個體介紹下開發的步驟
Redis Stream存儲資料
Redis Streams 是一個append-only的資料結構。部署Redis Streams後使用redis-cli向Redis發送資料。
redis-cli使用方法可參考
redis-cli連接配接。下面的指令是Redis向Stream clicks發送資料。
XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29
資料處理
在StructuredStreaming中把資料處理步驟分成3個子步驟。
- 從Redis Stream讀取、處理資料。
- 存儲資料到Redis。
- 運作StructuredStreaming程式。
從Redis Stream讀取、處理資料
在Spark中讀取Redis Stream資料需要确定如何去連接配接Redis,以及Redis Stream的schema資訊。這裡使用Spark-Redis連接配接器,需要建立一個SparkSession并帶上Redis的連接配接資訊。
val spark = SparkSession
.builder()
.appName("StructuredStreaming on Redis")
.config("spark.redis.host", redisHost)
.config("spark.redis.port", redisPort)
.config("spark.redis.auth", redisPassword)
.getOrCreate()
在Spark中建構schema,我們給流資料命名為“clicks”,并且需要設定參數“stream.kes”的值為“clicks”。由于Redis Stream中的資料包含兩個字段:“asset”和“cost”,是以我們要建立StructType映射這兩個字段。
val clicks = spark
.readStream
.format("redis")
.option("stream.keys", redisTableName)
.schema(StructType(Array(
StructField("asset", StringType),
StructField("cost", LongType)
)))
.load()
在這裡統計下每個asset的點選次數,可以建立一個DataFrames根據asset彙聚資料。
val bypass = clicks.groupBy("asset").count()
最後一個步驟啟動StructuredStreaming。
val query = bypass
.writeStream
.outputMode("update")
.foreach(clickWriter)
.start()
存儲資料到Redis
我們通過自定義的ClickForeachWriter向Redis寫資料。ClickForeachWriter繼承自FroeachWriter,使用Redis的Java用戶端Jedis連接配接到Redis。
class ClickForeachWriter(redisHost: String, redisPort: String, redisPassword: String) extends ForeachWriter[Row] {
var jedis: Jedis = _
def connect() = {
val shardInfo: JedisShardInfo = new JedisShardInfo(redisHost, redisPort.toInt)
shardInfo.setPassword(redisPassword)
jedis = new Jedis(shardInfo)
}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(value: Row): Unit = {
val asset = value.getString(0)
val count = value.getLong(1)
if (jedis == null) {
connect()
}
jedis.hset("click:" + asset, "asset", asset)
jedis.hset("click:" + asset, "count", count.toString)
jedis.expire("click:" + asset, 300)
}
override def close(errorOrNull: Throwable): Unit = {}
}
程式完成打包後,可以通過Spark控制台送出任務,運作Spark StructuredStreaming任務。
--class com.aliyun.spark.redis.StructuredStremingWithRedisStream
--jars /spark_on_redis/ali-spark-redis-2.3.1-SNAPSHOT_2.3.2-1.0-SNAPSHOT.jar,/spark_on_redis/commons-pool2-2.0.jar,/spark_on_redis/jedis-3.0.0-20181113.105826-9.jar
--driver-memory 1G
--driver-cores 1
--executor-cores 1
--executor-memory 2G
--num-executors 1
--name spark_on_polardb
/spark_on_redis/structuredstreaming-0.0.1-SNAPSHOT.jar
xxx1 6379 xxx2 clicks
參數說明:
- xxx1: Redis的内網連接配接位址(host)。
- 6379:Redis的端口号(port)。
- xxx2: Redis的登陸密碼。
- clicks: Redis的Stream名稱
資料查詢
資料查詢使用Spark-SQL建立表讀取Redis Hash資料庫。這裡使用Spark控制台的“互動式查詢”,輸入如下語句:
CREATE TABLE IF NOT EXISTS clicks(asset STRING, count INT)
USING org.apache.spark.sql.redis
OPTIONS (
'host' 'xxx1',
'port' '6379',
'auth' 'xxx2',
'table' 'click'
)
- click: Redis的Hash 表名稱。
然後運作查詢語句:
select * from clicks;
例如下圖:
Spark-SQL通過Spark-Redis連接配接器直接查詢Redis資料,統計了廣告的點選數。
小結
本文主要介紹了Spark如何把Redis作為資料源以及Spark的StructuredStreaming與Redis Stream的結合。更多Spark介紹請參考:
X-Pack Spark 分析引擎;更多Redis介紹請參考:
雲資料庫 Redis 版。
參考清單
本文的代碼可參考
Spark樣例代碼Spark對接Redis可參考
Spark對接Redis快速入門