天天看點

Spark對接Lindorm Phoenix5.x輕用戶端1. 背景2. Spark對接Phoenix 5.x輕用戶端3. Maven工程示例

1. 背景

Lindorm相容Phoenix提供的是Phoenix 5.x輕用戶端,在Spark官網上對接Phoenix的例子大多是Phoenix 4.x重用戶端,是以本文給出Spark對接Phoenix 5.x輕用戶端的例子,友善大家參考。

2. Spark對接Phoenix 5.x輕用戶端

2.1 從Spark官網下載下傳Spark安裝包

Spark官網

下載下傳Spark安裝包,版本自行選擇,本文以Spark-2.4.3版本為例。下載下傳後解壓,假定目錄為spark-2.4.3

2.2 從阿裡雲倉庫下載下傳Phoenix5.x輕用戶端

阿裡雲倉庫

下載下傳Phoenix5.x輕用戶端ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar, 放置于spark-2.4.3下的jars目錄下。

2.3 生成log4j.properties

cd spark-2.4.3/conf
cp log4j.properties.template log4j.properties           

2.3 啟動spark-shell

./bin/spark-shell            

2.4 粘貼運作代碼

2.4.1 Phoenix Statement方式通路

  1. 在spark-shell上輸入:paste可以輸入多行文本
:paste           
  1. 修改下面代碼中的url, user, password為自己的執行個體叢集資訊,然後全部粘貼于spark-shell中
import java.sql.{DriverManager, SQLException}
import java.util.Properties
val driver = "org.apache.phoenix.queryserver.client.Driver"
    val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
    val info = new Properties()
    info.put("user", "xxxx") //表示使用者名是root
    info.put("password", "xxxx") //表示密碼是hadoop
    try {
      Class.forName(driver)
    } catch {
      case e: ClassNotFoundException => e.printStackTrace
    }
    val conn = DriverManager.getConnection(url, info)
    val stmt = conn.createStatement
    try {
      stmt.execute("drop table if exists test")
      stmt.execute("create table test(c1 integer primary key, c2 integer)")
      stmt.execute("upsert into test(c1,c2) values(1,1)")
      stmt.execute("upsert into test(c1,c2) values(2,2)")
      val rs = stmt.executeQuery("select * from test limit 1000")
      while (rs.next()) {
        println(rs.getString(1) + " | " +
          rs.getString(2) )
      }
      stmt.execute("drop table if exists test")
    } catch {
      case e: SQLException => e.printStackTrace()
    } finally {
      if (null != stmt) {
        stmt.close()
      }
      if (null != conn) {
        conn.close()
      }
    }           
  1. 輸入Ctrl+D 結束文本輸入,即可看到運作結果, 會顯示類似如下資訊:
// Exiting paste mode, now interpreting.

1 | 1
2 | 2           

2.4.2 DataFrame方式通路

DataFrame方式隻能進行讀寫,建表操作和删表操作需要使用Phoenix Statement方式。

2.4.2.1 DataFrame方式讀

輸入:paste粘貼以下文本,然後輸入Ctrl+D後開始運作。記得修改url,user,password資訊。

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "driver" -> "org.apache.phoenix.queryserver.client.Driver", "dbtable" -> "TEST","fetchsize" -> "10000", "user" -> "xxxx", "password" -> "xxxx")).load()
jdbcDF.show()           

2.4.2.1 DataFrame方式寫

import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{IntegerType,StructField, StructType}
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
//建立schema
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
//建立Row對象,每個Row對象都是rowRDD中的一行
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
//建立起Row對象和模式之間的對應關系,也就是把資料和模式對應起來
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//下面建立一個prop變量用來儲存JDBC連接配接參數
val prop = new Properties()
prop.put("user", "xxxx") //表示使用者名是root
prop.put("password", "xxxx") //表示密碼是hadoop
prop.put("driver","org.apache.phoenix.queryserver.client.Driver") 
//下面就可以連接配接資料庫,采用append模式,表示追加記錄到資料庫spark的student表中
testDataFrame.write.mode("append").jdbc("jdbc:phoenix:thin:url=http://ld-xxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", prop)           

3. Maven工程示例

下面以一個maven工程為例介紹spark對接phoenix輕用戶端的一些基本操作

3.1 建立maven工程

建立名叫demo的maven工程,pom檔案内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.4.3</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.phoenix</groupId>
            <artifactId>ali-phoenix-queryserver-client</artifactId>
            <version>5.2.1-HBase-2.x</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>           

3.2 scala檔案示例

PhoenixTest1.scala, 記得修改url,user,password資訊。

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}

import java.sql.{DriverManager, SQLException}
import java.util.Properties

object PhoenixTest1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("spark on phoenix")
    val sparkSession = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()
    val sc = sparkSession.sparkContext

    print("======= start ==========")
    val driver = "org.apache.phoenix.queryserver.client.Driver"
    val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
    val info = new Properties()
    info.put("user", "xxxx") 
    info.put("password", "xxxx") 
    try {
      Class.forName(driver)
    } catch {
      case e: ClassNotFoundException => e.printStackTrace
    }

   //statement操作方式, 可以做所有phoenix ddl和dml操作
    val conn = DriverManager.getConnection(url, info)
    val stmt = conn.createStatement
    try {
      stmt.execute("drop table if exists test")
      stmt.execute("create table test(c1 integer primary key, c2 integer)")
      stmt.execute("upsert into test(c1,c2) values(1,1)")
      stmt.execute("upsert into test(c1,c2) values(2,2)")
      val rs = stmt.executeQuery("select * from test limit 1000")
      while (rs.next()) {
        println(rs.getString(1) + " | " +
          rs.getString(2) )
      }
    } catch {
      case e: SQLException => e.printStackTrace()
    } finally {
      if (null != stmt) {
        stmt.close()
      }
      if (null != conn) {
        conn.close()
      }
    }

    //DataFrame寫入
    //生成記錄
    val sqlContext = new SQLContext(sc)
    val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
    val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
    val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
    val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    testDataFrame.show()
    
    // 寫入記錄
    testDataFrame
      .write
      .mode("append")
      .jdbc("jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", info)

    //DataFrame讀取, option的兩種寫法
    val df1 = sqlContext
      .read.
      format("jdbc")
      .options(Map(
        "url" -> url,
        "driver" -> driver,
        "dbtable" -> "TEST",
        "fetchsize" -> "10000",
        "user" -> "root",
        "password" -> "root"))
      .load()
    df1.show()

    val jdbcDF2 = sqlContext.read.format("jdbc")
      .option("url", url)
      .option("driver", driver)
      .option("dbtable", "test")
      .option("fetchsize", "10000")
      .option("user", "xxxx")
      .option("password", "xxxx")
      .load()
    jdbcDF2.show()

    // 将SQL表寫入parquet檔案
    df1.select("*").write.format("parquet").save("file:///Volumes/wukong/data/work/spark/data/test.parquet")

    // 從parquet檔案中加載SQL表
    val df2 = sqlContext.read.load("file:///Volumes/wukong/data/work/spark/data/test.parquet")
    df2.show()
  }
}           

3.3 java檔案示例

StatementTest.java, 記得修改url,user,password資訊。

import org.apache.phoenix.queryserver.client.Driver;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;

public class StatementTest {
  public static void main(String[] args) {
    Connection pconn = null;
    Statement stmt = null;
    try {
      Class.forName(Driver.class.getName());
      String url = "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF";
      Properties props = new Properties();
      props.put("user", "xxxx");
      props.put("password", "xxxx");
      pconn = DriverManager.getConnection(url, props);
      pconn.setAutoCommit(true);
      stmt = pconn.createStatement();
      stmt.execute("drop table if exists test");
      stmt.execute("create table test(c1 integer primary key, c2 integer)");
      stmt.execute("upsert into test(c1,c2) values(1,1)");
      stmt.execute("upsert into test(c1,c2) values(2,2)");
      ResultSet rs = stmt.executeQuery("select * from test limit 1000");
      while (rs.next()) {
        System.out.println(rs.getString(1) + " | " +
            rs.getString(2));
      }
      stmt.execute("drop table if exists test");
    } catch (Throwable e) {
      e.printStackTrace();
    } finally {
      try {
        if (pconn != null) {
          pconn.close();
        }
      } catch (Throwable e) {
        e.printStackTrace();
      }
    }
  }
}           

3.4 打包

mvn clean package -DskipTests           

target下生成demo-1.0-SNAPSHOT.jar

3.5 送出到本地運作

./bin/spark-submit  --master local --verbose --class PhoenixTest1 /Volumes/wukong/data/work/spark/demo/target/demo-1.0-SNAPSHOT.jar