天天看點

SparkSQL資料導入MySQL中MySQL的Jar包沖突解決辦法

測試jar沖突案例

import scala.collection.JavaConverters._
object SparkStreamWordCounts {
  def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("spark://CentOS:7077").setAppName("KafkaStreamWordCount")
      val ssc = new StreamingContext(conf, Seconds(1))
      ssc.checkpoint("hdfs:///checkpoints01")
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "CentOS:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "group1",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
    //001 zhangsan
    val cacheRDD = ssc.sparkContext.textFile("hdfs:///userdata")
      .map(item => (item.split("\\s+")(0), item.split("\\s+")(1)))
      .distinct()
      .cache()
      //001 apple 4.5 2
      KafkaUtils.createDirectStream(ssc,
        LocationStrategies.PreferConsistent,
        Subscribe[String,String](Array("topic01"),kafkaParams))
        .map(record => record.value())
        .map(value=>{
          val tokens = value.split("\\s+")
          (tokens(0),tokens(2).toDouble * tokens(3).toInt)//userid  消費
        }).transform(rdd=>{
            rdd.join(cacheRDD)// 001 (消費,使用者名)
        }).mapWithState(StateSpec.function((k:String,v:Option[(Double,String)],state:State[(String,Double)])=>{
              var username=v.get._2
              var historyCost:(String,Double)=("",0.0)
              if(state.exists()){
                historyCost=state.getOption().getOrElse((username,0.0))
              }
              var currentValue = v.get
              if(currentValue != null){
                 state.update((currentValue._2,currentValue._1+historyCost._2))
              }
              (k,username+":"+(currentValue._1+historyCost._2))
         })).foreachRDD(rdd=>{
          rdd.foreachPartition(list=>{
              val jedis = new Jedis("CentOS",6379)
              var jMap= list.toMap.asJava
             val pipeline = jedis.pipelined()
              pipeline.hmset("usercost",jMap)
              pipeline.sync()
              jedis.close()
          })
      })
    ssc.sparkContext.setLogLevel("FATAL")//關閉日志列印
    ssc.start()
    ssc.awaitTermination()
  }
           

即使在pom中使用shade插件, 生成fatjar也不能解決該問題

<plugin>
    <groupId>net.alchim31.maven</groupId>
    <artifactId>scala-maven-plugin</artifactId>
    <version>4.0.1</version>
    <executions>
        <execution>
            <id>scala-compile-first</id>
            <phase>process-resources</phase>
            <goals>
                <goal>add-source</goal>
                <goal>compile</goal>
            </goals>
        </execution>
    </executions>
</plugin>
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.4.3</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <filters>
                    <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
            </configuration>
        </execution>
    </executions>
</plugin>
           

即使用第二種方式:使用 --packages解決jar包依賴(需要聯網)發現也不行~

[[email protected] spark-2.4.3]# ./bin/spark-submit --master spark://CentOS:7077 --deploy-mode client  --class com.baizhi.demo10parkStreamWordCounts --total-executor-cores 4 --packages 'org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3,redis.clients:jedis:2.9.0'  /root/original-sparkstream-1.0-SNAPSHOT.jar
           

 But ! -----------------------------------------------------------------------------------------------------------------

使用spark.executor.extraClassPath和spark.driver.extraClassPath能夠解決MySQL依賴問題 !!!!!!!!!!!!!!!!!!

[[email protected] spark-2.4.3]# ./bin/spark-submit --master spark://CentOS:7077 --deploy-mode client  --class com.baizhi.demo10parkStreamWordCounts --total-executor-cores 4 --packages 'org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3,redis.clients:jedis:2.9.0' --conf spark.executor.extraClassPath=/root/mysql-xxx.jar --conf  spark.driver.extraClassPath=/root/mysql-xxx.jar  /root/original-sparkstream-1.0-SNAPSHOT.jar
           

如果大家覺得麻煩,還可以在

spark-defaut.conf

配置改參數

spark.executor.extraClassPath=/root/.ivy2/jars/*
spark.driver.extraClassPath=/root/.ivy2/jars/*
           

繼續閱讀