天天看點

SpringBoot2.0內建Redis的sub/pub(訂閱/釋出)功能實作擷取Redis實時資料

實作場景如下:項目在SparkStreaming中對資料進行實時處理 處理結果會存儲到redis中,其中一部分資料還會通過redis釋出功能 釋出給Redis的訂閱用戶端 ,達到實時資料處理結果的擷取,(用戶端訂閱該頻道,會實時的收到推送資訊,然後将資訊通過WebScoket在推送到前台,WebScoket的推送在另偏部落格有寫)。

1.SpringBoot項目中的pom依賴 需要引入redis相關依賴

<!-- Spring Boot Redis依賴 -->
        <!-- 注意:1.5版本的依賴和2.0的依賴不一樣,注意看哦 1.5我記得名字裡面應該沒有“data”, 2.0必須是“spring-boot-starter-data-redis” 這個才行-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <!-- 1.5的版本預設采用的連接配接池技術是jedis  2.0以上版本預設連接配接池是lettuce, 在這裡采用jedis,是以需要排除lettuce的jar -->
            <exclusions>
                <exclusion>
                    <groupId>redis.clients</groupId>
                    <artifactId>jedis</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- 添加jedis用戶端 -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

        <!--spring2.0內建redis所需common-pool2-->
        <!-- 必須加上,jedis依賴此  -->
        <!-- spring boot 2.0 的操作手冊有标注 大家可以去看看 位址是:https://docs.spring.io/spring-boot/docs/2.0.3.RELEASE/reference/htmlsingle/-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.5.0</version>
        </dependency>

        <!-- 将作為Redis對象序列化器 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
           

2.SparkingStreaming的依賴也需要添加redis的用戶端 才能實作資料的publish(釋出)--沒有使用流處理的可以跳過這個

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.4.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.3</version>
    </dependency>
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>
           

3.SparkStreaming中實作資料處理結果存儲到redis中,其中一部分資料通過redis釋出功能 釋出給Redis的訂閱用戶端,沒用到流處理的這一步也不用看了~

package com.dyf

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import java.util.regex.Pattern

import redis.clients.jedis.JedisPool
import scala.collection.JavaConverters._


/*
*created by 丁郁非
*date 2019/7/16
*/ object SparkStreaming_UserLog_UserRegisterCountsByMonthAndCity {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("UserApplication_Log").setMaster("local[4]")
    var checkPointPath = "file:///D:/checkpoints"
    val ssc = StreamingContext.getOrCreate(checkPointPath, () => {

      val ssc = new StreamingContext(conf, Seconds(2))
      ssc.checkpoint(checkPointPath) //設定檢查點 友善下次從該檢查的擷取此次的ssc

      val stream = ssc.socketTextStream("192.168.79.130", 9999)
        .filter(line => {
          //2019-07-10 10:07:32.206 INFO  c.b.controller.UserRestController:95- REGISTER {"city":"廣東省","point":[113.25,23.1167]}
          val pattern = Pattern.compile("^(.*)INFO\\s.*UserRestController.*\\sREGISTER\\s(\\{\"city\":\"([\\u4E00-\\u9FA5]+)\".*)$")
          val matcher = pattern.matcher(line)
          print(matcher.matches() + "-------------------")
          matcher.matches()
        })
        .map(line => {
          //val pattern = Pattern.compile("^(.*)INFO\\s.*UserRestController.*\\sREGISTER\\s(\\{\"city\":\"([\\u4E00-\\u9FA5]+)\".*)$")
          val pattern = Pattern.compile("^(.*)INFO\\s.*UserRestController.*\\sREGISTER\\s(\\{\"city\":\"([\\u4E00-\\u9FA5]+)\".*)$")
          val matcher = pattern.matcher(line)
          matcher.matches()

          val time = matcher.group(1)
          val city = matcher.group(3)
          val date = time.substring(0, 10)

          (city, date)
        })



      //Month統計
      stream.mapWithState(StateSpec.function((k: String, v: Option[String], state: State[Int]) => {
        var total = 0
        if (state.exists()) {
          total = state.getOption().getOrElse(0)
        }
        total += 1
        state.update(total) //更新狀态
        (v.getOrElse("default"), total)
      }))
        //将流轉換為RDD進行操作
        .foreachRDD(rdd => {
        rdd.foreachPartition(items => {
          val scalaMap = items.map(t => (t._1, t._2 + "")).toMap
          val map = scalaMap.asJava
          val jedisPoll = new JedisPool("192.168.79.130", 6379)
          val jedis = jedisPoll.getResource
          val pipeline = jedis.pipelined()
          pipeline.hmset("UserRegisterCountsByMonth", map) //存入redis

          /*注意對iterator周遊 會清空iterator  是以先轉成scalaMap  然後推送到訂閱userlocation的用戶端*/
          for (elem <- scalaMap) {
            val strings = elem._1.split("-")
            pipeline.publish("userlocation", strings(1) + ":" + elem._2)
          }

          println("map:" + map.size() + " :" + items)
          pipeline.sync()
          jedis.close()
        })
      })

      ssc
    })

    ssc.sparkContext.setLogLevel("FATAL")
    ssc.start()
    ssc.awaitTermination()


  }

}
           

4.項目中內建redis及配置不在贅述~現在準備訂閱/釋出的相關配置RedisReceiver與RedisSubListenerConfig 建議配置類放在conf包下 友善管理~

package com.****.config;

import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class RedisReceiver {
    /*redis訂閱推送 會回調該方法  message:推送的資訊*/
    public void receiveMessage(String message) throws IOException {

//這裡拿到message就可以為所欲為了  我這裡是通過webscoket向前台所有建立回話的session推送該message
        System.out.println("message:"+message);
        WebSocketServer.sendtoAll(message);
    }
}
           
package com.*****.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/*
 * 配置redis sub訂閱監聽*/
@Configuration
public class RedisSubListenerConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("userlocation"));
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
        /*方法名與RedisReceiver中用于回調的方法名保持一緻 */
        return new MessageListenerAdapter(redisReceiver, "receiveMessage");
    }
}
           

5.最後啟動就行了   receiveMessage這個方法會接收到message 有了message就可以做推送 做處理了