天天看點

spark on yarn模式安裝和配置carbondata

本節将介紹如何在 Spark on YARN 模式的叢集上安裝和配置 CarbonData。carbondata1.5.1的編譯可以看上一篇

版本:spark2.3.1,carbondata1.5.1

前置條件

  • Hadoop HDFS 和 Yarn 需要安裝和運作。
  • Spark 需要在所有的叢集節點上安裝并且運作。
  • CarbonData 使用者需要有權限通路 HDFS.

以下步驟僅針對于 Driver 程式所在的節點. (Driver 節點就是啟動 SparkContext 的節點)

  1. 編譯carbondata工程,并且從 

    ./assembly/target/scala-2.1x/carbondata_xxx.jar

    路徑擷取 assembly jar。最後将這個 jar 複制到 

    $SPARK_HOME/carbonlib

     檔案夾。

    注意: 如果 

    $SPARK_HOME

     路徑下不存在 carbonlib 檔案夾,請事先建立它。
  2. 從 CarbonData repository 複制 

    ./conf/carbon.properties.template

     檔案到 

    $SPARK_HOME/conf/

     檔案夾下面,并将它重命名為 

    carbon.properties

  3. 壓縮 carbonlib 檔案夾的内容到

    tar.gz

     檔案中,并将這個壓縮檔案移到 carbonlib 檔案夾下面。
cd $SPARK_HOME
tar -zcvf carbondata.tar.gz carbonlib/
mv carbondata.tar.gz carbonlib/           

     4.在 

$SPARK_HOME/conf/spark-defaults.conf

 檔案中配置下表提到的屬性。

spark.master yarn-client
spark.yarn.dist.files /home/jason/bigdata/spark2.3/spark-2.3.1-bin-hadoop2.7/conf/carbon.properties
spark.yarn.dist.archives /home/jason/bigdata/spark2.3/spark-2.3.1-bin-hadoop2.7/carbonlib/carbondata.tar.gz
spark.executor.extraJavaOptions -Dcarbon.properties.filepath=carbon.properties -XX:+OmitStackTraceInFastThrow -XX:+UseGCOverheadLimit
spark.executor.extraClassPath carbondata.tar.gz/carbonlib/*
spark.driver.extraClassPath /home/jason/bigdata/spark2.3/spark-2.3.1-bin-hadoop2.7/carbonlib/*
spark.driver.extraJavaOptions -Dcarbon.properties.filepath=/home/jason/bigdata/spark2.3/spark-2.3.1-bin-hadoop2.7/conf/carbon.properties -Dhdp.version=current
spark.yarn.executor.memoryOverhead 1024
spark.yarn.driver.memoryOverhead 1024
spark.yarn.am.extraJavaOptions -Dhdp.version=current
spark.yarn.scheduler.heartbeat.interval-ms 120000
spark.executor.heartbeatInterval 120000
spark.network.timeout 720000
           

   5.将下面的配置添加到 

$SPARK_HOME/conf/carbon.properties

 檔案中:

carbon.storelocation=hdfs://master:9000/Carbon/CarbonStore
#Base directory for Data files
carbon.ddl.base.hdfs.url=hdfs://master:9000/Carbon/data
#Path where the bad records are stored
carbon.badRecords.location=hdfs://master:9000/Carbon/badrecords           

6,把hive-site.xml放到spark的conf下面,(這個一定要放)

7.測試

spark-shell --master yarn-client --driver-memory 1g
 --executor-cores 2 --executor-memory 2G           
package carbondata

import java.io.File
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}

object carbondataSpark {
  def main(args: Array[String]): Unit = {
    /* //hive store location
     val warehouse = new File("hdfs://master:9000/jason/carbondata_warehouse").getCanonicalPath
     //metastore location
     */
    //val metastore = new File("hdfs://master:9000/hive").getCanonicalPath

    val storeLocation = new File("hdfs://master:9000/jason/carbondata").getCanonicalPath
  

    val spark = SparkSession
      .builder()
      .appName("carbondata streaming")
      //.config("spark.driver.host","master")
      .getOrCreateCarbonSession("hdfs://master:9000/jason/carbondata")

    spark.sql(s"DROP TABLE IF EXISTS carbon_table")
    spark.sql(
      s"""
         | CREATE TABLE carbon_table (
         | col1 STRING,
         | col2 STRING
         | )
         | STORED BY 'carbondata'
         | TBLPROPERTIES('streaming'='true')""".stripMargin)

    val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark)
    val tablePath = carbonTable.getTablePath

    // batch load
    var qry: StreamingQuery = null
    val readSocketDF = spark.readStream
      .format("socket")
      .option("host", "192.168.17.142")
      .option("port", 9999)
      .load()

    // Write data from socket stream to carbondata file
    qry = readSocketDF.writeStream
      .format("carbondata")
      .trigger(ProcessingTime("1 seconds"))
      .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
      .option("dbName", "default")
      .option("tableName", "carbon_table")
      .start()

    // start new thread to show data
    new Thread() {
      override def run(): Unit = {
        do {
          spark.sql("select * from carbon_table").show(false)
          Thread.sleep(10000)
        } while (true)
      }
    }.start()
    qry.awaitTermination()
  }
}
           

繼續閱讀