天天看点

SpringBoot2.0集成Redis的sub/pub(订阅/发布)功能实现获取Redis实时数据

实现场景如下:项目在SparkStreaming中对数据进行实时处理 处理结果会存储到redis中,其中一部分数据还会通过redis发布功能 发布给Redis的订阅客户端 ,达到实时数据处理结果的获取,(客户端订阅该频道,会实时的收到推送信息,然后将信息通过WebScoket在推送到前台,WebScoket的推送在另偏博客有写)。

1.SpringBoot项目中的pom依赖 需要引入redis相关依赖

<!-- Spring Boot Redis依赖 -->
        <!-- 注意:1.5版本的依赖和2.0的依赖不一样,注意看哦 1.5我记得名字里面应该没有“data”, 2.0必须是“spring-boot-starter-data-redis” 这个才行-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <!-- 1.5的版本默认采用的连接池技术是jedis  2.0以上版本默认连接池是lettuce, 在这里采用jedis,所以需要排除lettuce的jar -->
            <exclusions>
                <exclusion>
                    <groupId>redis.clients</groupId>
                    <artifactId>jedis</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- 添加jedis客户端 -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

        <!--spring2.0集成redis所需common-pool2-->
        <!-- 必须加上,jedis依赖此  -->
        <!-- spring boot 2.0 的操作手册有标注 大家可以去看看 地址是:https://docs.spring.io/spring-boot/docs/2.0.3.RELEASE/reference/htmlsingle/-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.5.0</version>
        </dependency>

        <!-- 将作为Redis对象序列化器 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
           

2.SparkingStreaming的依赖也需要添加redis的客户端 才能实现数据的publish(发布)--没有使用流处理的可以跳过这个

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.4.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.3</version>
    </dependency>
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>
           

3.SparkStreaming中实现数据处理结果存储到redis中,其中一部分数据通过redis发布功能 发布给Redis的订阅客户端,没用到流处理的这一步也不用看了~

package com.dyf

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import java.util.regex.Pattern

import redis.clients.jedis.JedisPool
import scala.collection.JavaConverters._


/*
*created by 丁郁非
*date 2019/7/16
*/ object SparkStreaming_UserLog_UserRegisterCountsByMonthAndCity {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("UserApplication_Log").setMaster("local[4]")
    var checkPointPath = "file:///D:/checkpoints"
    val ssc = StreamingContext.getOrCreate(checkPointPath, () => {

      val ssc = new StreamingContext(conf, Seconds(2))
      ssc.checkpoint(checkPointPath) //设置检查点 方便下次从该检查的获取此次的ssc

      val stream = ssc.socketTextStream("192.168.79.130", 9999)
        .filter(line => {
          //2019-07-10 10:07:32.206 INFO  c.b.controller.UserRestController:95- REGISTER {"city":"广东省","point":[113.25,23.1167]}
          val pattern = Pattern.compile("^(.*)INFO\\s.*UserRestController.*\\sREGISTER\\s(\\{\"city\":\"([\\u4E00-\\u9FA5]+)\".*)$")
          val matcher = pattern.matcher(line)
          print(matcher.matches() + "-------------------")
          matcher.matches()
        })
        .map(line => {
          //val pattern = Pattern.compile("^(.*)INFO\\s.*UserRestController.*\\sREGISTER\\s(\\{\"city\":\"([\\u4E00-\\u9FA5]+)\".*)$")
          val pattern = Pattern.compile("^(.*)INFO\\s.*UserRestController.*\\sREGISTER\\s(\\{\"city\":\"([\\u4E00-\\u9FA5]+)\".*)$")
          val matcher = pattern.matcher(line)
          matcher.matches()

          val time = matcher.group(1)
          val city = matcher.group(3)
          val date = time.substring(0, 10)

          (city, date)
        })



      //Month统计
      stream.mapWithState(StateSpec.function((k: String, v: Option[String], state: State[Int]) => {
        var total = 0
        if (state.exists()) {
          total = state.getOption().getOrElse(0)
        }
        total += 1
        state.update(total) //更新状态
        (v.getOrElse("default"), total)
      }))
        //将流转换为RDD进行操作
        .foreachRDD(rdd => {
        rdd.foreachPartition(items => {
          val scalaMap = items.map(t => (t._1, t._2 + "")).toMap
          val map = scalaMap.asJava
          val jedisPoll = new JedisPool("192.168.79.130", 6379)
          val jedis = jedisPoll.getResource
          val pipeline = jedis.pipelined()
          pipeline.hmset("UserRegisterCountsByMonth", map) //存入redis

          /*注意对iterator遍历 会清空iterator  所以先转成scalaMap  然后推送到订阅userlocation的客户端*/
          for (elem <- scalaMap) {
            val strings = elem._1.split("-")
            pipeline.publish("userlocation", strings(1) + ":" + elem._2)
          }

          println("map:" + map.size() + " :" + items)
          pipeline.sync()
          jedis.close()
        })
      })

      ssc
    })

    ssc.sparkContext.setLogLevel("FATAL")
    ssc.start()
    ssc.awaitTermination()


  }

}
           

4.项目中集成redis及配置不在赘述~现在准备订阅/发布的相关配置RedisReceiver与RedisSubListenerConfig 建议配置类放在conf包下 方便管理~

package com.****.config;

import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class RedisReceiver {
    /*redis订阅推送 会回调该方法  message:推送的信息*/
    public void receiveMessage(String message) throws IOException {

//这里拿到message就可以为所欲为了  我这里是通过webscoket向前台所有建立回话的session推送该message
        System.out.println("message:"+message);
        WebSocketServer.sendtoAll(message);
    }
}
           
package com.*****.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/*
 * 配置redis sub订阅监听*/
@Configuration
public class RedisSubListenerConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("userlocation"));
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
        /*方法名与RedisReceiver中用于回调的方法名保持一致 */
        return new MessageListenerAdapter(redisReceiver, "receiveMessage");
    }
}
           

5.最后启动就行了   receiveMessage这个方法会接收到message 有了message就可以做推送 做处理了