實作場景如下:項目在SparkStreaming中對資料進行實時處理 處理結果會存儲到redis中,其中一部分資料還會通過redis釋出功能 釋出給Redis的訂閱用戶端 ,達到實時資料處理結果的擷取,(用戶端訂閱該頻道,會實時的收到推送資訊,然後将資訊通過WebScoket在推送到前台,WebScoket的推送在另偏部落格有寫)。
1.SpringBoot項目中的pom依賴 需要引入redis相關依賴
<!-- Spring Boot Redis依賴 -->
<!-- 注意:1.5版本的依賴和2.0的依賴不一樣,注意看哦 1.5我記得名字裡面應該沒有“data”, 2.0必須是“spring-boot-starter-data-redis” 這個才行-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<!-- 1.5的版本預設采用的連接配接池技術是jedis 2.0以上版本預設連接配接池是lettuce, 在這裡采用jedis,是以需要排除lettuce的jar -->
<exclusions>
<exclusion>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</exclusion>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 添加jedis用戶端 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<!--spring2.0內建redis所需common-pool2-->
<!-- 必須加上,jedis依賴此 -->
<!-- spring boot 2.0 的操作手冊有标注 大家可以去看看 位址是:https://docs.spring.io/spring-boot/docs/2.0.3.RELEASE/reference/htmlsingle/-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.5.0</version>
</dependency>
<!-- 将作為Redis對象序列化器 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
2.SparkingStreaming的依賴也需要添加redis的用戶端 才能實作資料的publish(釋出)--沒有使用流處理的可以跳過這個
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
3.SparkStreaming中實作資料處理結果存儲到redis中,其中一部分資料通過redis釋出功能 釋出給Redis的訂閱用戶端,沒用到流處理的這一步也不用看了~
package com.dyf
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import java.util.regex.Pattern
import redis.clients.jedis.JedisPool
import scala.collection.JavaConverters._
/*
*created by 丁郁非
*date 2019/7/16
*/ object SparkStreaming_UserLog_UserRegisterCountsByMonthAndCity {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("UserApplication_Log").setMaster("local[4]")
var checkPointPath = "file:///D:/checkpoints"
val ssc = StreamingContext.getOrCreate(checkPointPath, () => {
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint(checkPointPath) //設定檢查點 友善下次從該檢查的擷取此次的ssc
val stream = ssc.socketTextStream("192.168.79.130", 9999)
.filter(line => {
//2019-07-10 10:07:32.206 INFO c.b.controller.UserRestController:95- REGISTER {"city":"廣東省","point":[113.25,23.1167]}
val pattern = Pattern.compile("^(.*)INFO\\s.*UserRestController.*\\sREGISTER\\s(\\{\"city\":\"([\\u4E00-\\u9FA5]+)\".*)$")
val matcher = pattern.matcher(line)
print(matcher.matches() + "-------------------")
matcher.matches()
})
.map(line => {
//val pattern = Pattern.compile("^(.*)INFO\\s.*UserRestController.*\\sREGISTER\\s(\\{\"city\":\"([\\u4E00-\\u9FA5]+)\".*)$")
val pattern = Pattern.compile("^(.*)INFO\\s.*UserRestController.*\\sREGISTER\\s(\\{\"city\":\"([\\u4E00-\\u9FA5]+)\".*)$")
val matcher = pattern.matcher(line)
matcher.matches()
val time = matcher.group(1)
val city = matcher.group(3)
val date = time.substring(0, 10)
(city, date)
})
//Month統計
stream.mapWithState(StateSpec.function((k: String, v: Option[String], state: State[Int]) => {
var total = 0
if (state.exists()) {
total = state.getOption().getOrElse(0)
}
total += 1
state.update(total) //更新狀态
(v.getOrElse("default"), total)
}))
//将流轉換為RDD進行操作
.foreachRDD(rdd => {
rdd.foreachPartition(items => {
val scalaMap = items.map(t => (t._1, t._2 + "")).toMap
val map = scalaMap.asJava
val jedisPoll = new JedisPool("192.168.79.130", 6379)
val jedis = jedisPoll.getResource
val pipeline = jedis.pipelined()
pipeline.hmset("UserRegisterCountsByMonth", map) //存入redis
/*注意對iterator周遊 會清空iterator 是以先轉成scalaMap 然後推送到訂閱userlocation的用戶端*/
for (elem <- scalaMap) {
val strings = elem._1.split("-")
pipeline.publish("userlocation", strings(1) + ":" + elem._2)
}
println("map:" + map.size() + " :" + items)
pipeline.sync()
jedis.close()
})
})
ssc
})
ssc.sparkContext.setLogLevel("FATAL")
ssc.start()
ssc.awaitTermination()
}
}
4.項目中內建redis及配置不在贅述~現在準備訂閱/釋出的相關配置RedisReceiver與RedisSubListenerConfig 建議配置類放在conf包下 友善管理~
package com.****.config;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class RedisReceiver {
/*redis訂閱推送 會回調該方法 message:推送的資訊*/
public void receiveMessage(String message) throws IOException {
//這裡拿到message就可以為所欲為了 我這裡是通過webscoket向前台所有建立回話的session推送該message
System.out.println("message:"+message);
WebSocketServer.sendtoAll(message);
}
}
package com.*****.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/*
* 配置redis sub訂閱監聽*/
@Configuration
public class RedisSubListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("userlocation"));
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
/*方法名與RedisReceiver中用于回調的方法名保持一緻 */
return new MessageListenerAdapter(redisReceiver, "receiveMessage");
}
}
5.最後啟動就行了 receiveMessage這個方法會接收到message 有了message就可以做推送 做處理了