天天看點

Spark Streaming消費Kafka并手動使用Redis管理Kafka Offset1 Spark Streaming讀取Kafka的兩種模式2 使用Docker安裝實驗環境3 消費語義4 使用Redis管理Kafka Offset(支援多Topiic)5 參考文獻

1 Spark Streaming讀取Kafka的兩種模式

Spark Streaming消費Kafka的資料有兩種模式:Receiver和Direct模式,學習時候重點關注下Direct即可,因為在最新讀取方式中已經不支援Receiver。

1.1 Receiver模式

在Spark 1.3之前,Spark Streaming消費Kafka中的資料采用基于Kafka進階消費API實作的Receiver模式,如圖1所示。首先是Receiver從Kafka中消費資料并存儲到Spark Executor中,然後由Spark Streaming啟動的Job将處理資料。為了保證高可用,可以啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL),将接收到的Kafka資料同步寫到外部存儲中(如HDFS)。當系統發生故障适合,可以重新将資料加載到系統中,進而避免出現資料丢失的問題。

Spark Streaming消費Kafka并手動使用Redis管理Kafka Offset1 Spark Streaming讀取Kafka的兩種模式2 使用Docker安裝實驗環境3 消費語義4 使用Redis管理Kafka Offset(支援多Topiic)5 參考文獻

圖 1(來自百度圖檔)

1.2 Direct模式

在Spark 1.3及以後版本中,Spark推薦無Receiver的Direct模式,如圖2所示。首先Spark周期性地查詢Kafka中自己訂閱的Topic下每個分區的最新offset,通過對比上一批次的offset,進而擷取本批次offset的範圍。然後啟動處理資料的Job, 最後使用Kafka低級消費API去拉取本批次的資料。

Spark Streaming消費Kafka并手動使用Redis管理Kafka Offset1 Spark Streaming讀取Kafka的兩種模式2 使用Docker安裝實驗環境3 消費語義4 使用Redis管理Kafka Offset(支援多Topiic)5 參考文獻

圖 2(來自百度圖檔)

1.3 兩種接收方式的對比

在最新的API中,Receiver模式已經被廢棄。Spark的官網上是推薦Direct模式,理由如下:

1) 簡化并行度

Receiver模式中,Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。是以在KafkaUtils.createStream()中提高partition的數量,隻會增加一個Receiver中讀取partition的線程數,不會增加Spark處理資料的并行度

Direct模式中,将建立與要使用的Kafka分區一樣多的RDD分區,所有這些分區都将從Kafka并行讀取資料。是以,Kafka和RDD分區之間存在一對一的映射,這更易于了解和調整。

2) 高效率

Receiver模式中為了保證高可用,需要啟用WAL機制,在KafkaUtils.createStream()中設定的持久化級别是StorageLevel.MEMORY_AND_DISK_SER,将資料複制一份到外存中。在處理大資料情況下,很容易成為系統瓶頸。

Direct模式中,為了保證不丢失資料,不需要使用WAL這種重量級的儲存機制,隻需要将上一批次的Offset儲存即可。如果系統挂掉,Spark會重新讀取上一次儲存的Offset,進而保證資料的零丢失。

3) 占用資源低

Receiver不處理資料,隻是持續不斷的接受資料,與其他Exector是異步的,但卻會占用系統資源。在配置設定相同cup和記憶體資源的前提下,Receiver會占用Cpu時間,影響Spark的處理速度,如果資料量大,Receiver會占用更大記憶體。

2 使用Docker安裝實驗環境

以前部落客一般是用虛拟機來從頭到尾來安裝各種大資料元件,每次學半天都是在安裝,但是公司中大資料平台都是運維已經安裝好的,我們每天要做的更多是在平台上開發業務,是以學習時候使用Docker快速安裝就顯得很有意義。

2.1 安裝Reids

docker pull redislabs/rebloom
docker run -d -p 6379:6379 redislabs/rebloom
           

2.2 安裝Zookeeper

docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
           

2.3 安裝Kafka

docker pull wurstmeister/kafka
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.150:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.150:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
           

這裡2個ip分别是zk和kafka所在裝置的ip。

2.4 檢視運作的容器

Spark Streaming消費Kafka并手動使用Redis管理Kafka Offset1 Spark Streaming讀取Kafka的兩種模式2 使用Docker安裝實驗環境3 消費語義4 使用Redis管理Kafka Offset(支援多Topiic)5 參考文獻

2.5 注意事項

使用docker最好将鏡像倉庫配成阿裡雲,否詞下載下傳會很慢,配置方法網上很多,本文不再累述。

3 消費語義

在上Offset管理代碼之前,有個重要概念需要交代下,就是消費語義。Spark Streaming從Kafka broker中拉取資料時候,有三種消費語意:

3.1 至少消費一次

從字面上看就是會有重複資料。Spark從Kafka拉取資料進行處理後,如果offset還沒有儲存而系統挂掉時候,就會重新從上一次的Offset處開始消費,會造成至少消費一次。

3.2 恰好消費一次

Kafka broker存儲的資料隻會被消費一次。實作這種模式主要有2種方法

a 在至少消費一次的基礎上,下遊系統自己做幂等。比如有2條uuid一樣的資料被Spark處理且存在資料庫,下遊系統讀取的時候要根據uuid去重。

b 使用資料庫的事務。保證資料處理入庫和送出offset這2個操作處于同一個事務之中。

3.3 最多消費一次

簡單點說就是資料可能發生丢失。比如使用自動送出offset時,因為是定時送出offset,有可能剛拉去到一批資料,還沒進行處理,系統就自動幫你送出了offset,而你在處理過程中挂了,這就造成了資料丢失。

4 使用Redis管理Kafka Offset(支援多Topiic)

4.1 向Kafka中寫入資料

1)建立topic

import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 建立主題
 */
public class CreatTopicService {
    public static final String brokerList = "192.168.2.50:9092";
    public static final String topic = "first";

    public static void main(String[] args) {
        Properties properties =  new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        AdminClient client = AdminClient.create(properties);

        NewTopic newTopic = new NewTopic(topic, 2, (short) 1);
        CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
        try {
            result.all().get();
        } catch (InterruptedException | ExecutionException e){
            e.printStackTrace();
        }finally{
            if(client != null){
                client.close() ;
            }
        }

    }
}
           

2)向Kafka中寫入資料

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;

public class ProducerService {

    public static final String brokerList = "localhost:9092";
    public static final String topic = "first";

    public static void main(String[] args) {
        Properties properties =  new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);

        //自己直生産者用戶端參數并建立KafkaProducer執行個體
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        //建構所需妥發送的消息
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello Spark!");
        try {
            //發送消息3種方法

            //1)發後即忘
//            producer.send(record);

            //2)同步
//            Future<RecordMetadata> future = producer.send(record);
//            RecordMetadata metadata = future.get();
//            System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());

            //3)異步
            for (int i = 0; i < 200; i++){
                producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null){
                            exception.printStackTrace();
                        }else{
                            System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
                        }
                    }
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //關閉生産着用戶端執行個體
            if(producer != null){
                producer.close();
            }
        }
    }
}
           

4.2 基于Jedis的Redis讀寫工具

package com.dhhy.redis

import redis.clients.jedis.{JedisPool, JedisPoolConfig,Jedis}

/**
  * Redis連接配接工具
  */
object RedisUtil {

  @volatile private var jedisPool:JedisPool = null


  /**
    * 擷取Jedis執行個體
    */
  def getResource()={
    if(jedisPool == null){
      synchronized(
        if(jedisPool == null){
          val config = new JedisPoolConfig
          config.setMaxTotal(5)
          config.setMaxIdle(2)
          config.setMaxWaitMillis(3000)
          jedisPool = new JedisPool(config,"127.0.0.1",6379);
        }
      )
    }

    jedisPool.getResource

  }


  /**
    * 釋放jedis連接配接資源
    * @param jedis
    */
  def returnResource(jedis: Jedis) = {
    if (jedis != null) jedis.close()
  }


  /**
    * 原子計數器
    * @param key
    * @param step
    * @return
    */
  def incrBy(key: String ,step: Long): Long ={
    val jedis = getResource()
    val count = jedis.incrBy(key, step)
    returnResource(jedis)
    count
  }
           

4.3 Spark消費Kafka的代碼

import com.dhhy.redis.RedisUtil
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory


/**
  * 使用redis管理kafka Offset
  */
object KafkaOffseManageDemo2 {
  val LOG = LoggerFactory.getLogger(KafkaOffseManageDemo.getClass)


  /**
    * 從Redis擷取上一次儲存的偏移量
    * @param topics
    * @return
    */
  def getOffsets(topics: Array[String]): Map[TopicPartition, Long] = {
    val fromOffsets = collection.mutable.Map.empty[TopicPartition, Long]
    import scala.collection.JavaConversions._
    val jedis = RedisUtil.getResource()
    topics.foreach(topic => {
      jedis.hgetAll(topic).foreach(kv =>{
        fromOffsets += (new TopicPartition(topic, kv._1.toInt) -> kv._2.toLong)
      })
    })
    RedisUtil.returnResource(jedis)
    fromOffsets.toMap
  }



  def main(args: Array[String]): Unit = {
    val LOG = LoggerFactory.getLogger(KafkaOffseManageDemo2.getClass)

    //設定日志級别
    Logger.getLogger("org").setLevel(Level.INFO)

    //配置項
    val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaOffseManageDemo")

    //建立流式上下文,設定批處理間隔
    val ssc = new StreamingContext(conf, Seconds(5))


    //kafka配置參數
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092", //kafka叢集位址
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],  //序列化類型,此處為字元類型
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],  //序列化類型,此處為字元類型
      ConsumerConfig.GROUP_ID_CONFIG -> "KafkaOffseManageDemo1",//Kafka消費組l
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",  //讀取最新offset
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)// 是否向定時向zookeeper寫入每個分區的offse
    )

//消費的topic
//如果消費多個topic,則val topics = Array(topic1,topic2,…)
    val topics = Array("first")
    val offsets = getOffsets(topics)

    // 初始化輸入流
    val stream  = if(offsets.isEmpty){
      KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Subscribe[String, String](topics, kafkaParams)
      )
    }else{
      KafkaUtils.createDirectStream(ssc,
        PreferConsistent,
        Subscribe[String, String](topics, kafkaParams, offsets))
    }


    stream.foreachRDD{rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      rdd.foreachPartition { iter =>
        val jedis = RedisUtil.getResource()
        val pipeline = jedis.pipelined()
        iter.foreach { msg =>
          msg.value().split(" ").foreach (f =>
            if (f == "Hello"){
              pipeline.incr("Hello")
            }
          )
        }

        val o = offsetRanges(TaskContext.get.partitionId)
        //列印偏移量資訊
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        //儲存Kafka的偏移量
        pipeline.hset(o.topic, o.partition + "", o.untilOffset + "")
        pipeline.sync()
        RedisUtil.returnResource(jedis)
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }
}
           

4.4 測試結果

登陸redis的用戶端,檢視key為Hello的值,可以發現kafka 生産者一共寫入了200個”Hello”

Spark Streaming消費Kafka并手動使用Redis管理Kafka Offset1 Spark Streaming讀取Kafka的兩種模式2 使用Docker安裝實驗環境3 消費語義4 使用Redis管理Kafka Offset(支援多Topiic)5 參考文獻

5 參考文獻

1)Spark-Streaming擷取kafka資料的兩種方式-Receiver與Direct的方式

https://blog.csdn.net/kwu_ganymede/article/details/50314901

2) Apache spark官方文檔

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

3) 耿嘉安,Spark核心設計的藝術:架構設計與實作 第1版. 2018, 機械工業出版社.

繼續閱讀