1. 背景
Lindorm相容Phoenix提供的是Phoenix 5.x輕用戶端,在Spark官網上對接Phoenix的例子大多是Phoenix 4.x重用戶端,是以本文給出Spark對接Phoenix 5.x輕用戶端的例子,友善大家參考。
2. Spark對接Phoenix 5.x輕用戶端
2.1 從Spark官網下載下傳Spark安裝包
從
Spark官網下載下傳Spark安裝包,版本自行選擇,本文以Spark-2.4.3版本為例。下載下傳後解壓,假定目錄為spark-2.4.3
2.2 從阿裡雲倉庫下載下傳Phoenix5.x輕用戶端
阿裡雲倉庫下載下傳Phoenix5.x輕用戶端ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar, 放置于spark-2.4.3下的jars目錄下。
2.3 生成log4j.properties
cd spark-2.4.3/conf
cp log4j.properties.template log4j.properties
2.3 啟動spark-shell
./bin/spark-shell
2.4 粘貼運作代碼
2.4.1 Phoenix Statement方式通路
- 在spark-shell上輸入:paste可以輸入多行文本
:paste
- 修改下面代碼中的url, user, password為自己的執行個體叢集資訊,然後全部粘貼于spark-shell中
import java.sql.{DriverManager, SQLException}
import java.util.Properties
val driver = "org.apache.phoenix.queryserver.client.Driver"
val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
val info = new Properties()
info.put("user", "xxxx") //表示使用者名是root
info.put("password", "xxxx") //表示密碼是hadoop
try {
Class.forName(driver)
} catch {
case e: ClassNotFoundException => e.printStackTrace
}
val conn = DriverManager.getConnection(url, info)
val stmt = conn.createStatement
try {
stmt.execute("drop table if exists test")
stmt.execute("create table test(c1 integer primary key, c2 integer)")
stmt.execute("upsert into test(c1,c2) values(1,1)")
stmt.execute("upsert into test(c1,c2) values(2,2)")
val rs = stmt.executeQuery("select * from test limit 1000")
while (rs.next()) {
println(rs.getString(1) + " | " +
rs.getString(2) )
}
stmt.execute("drop table if exists test")
} catch {
case e: SQLException => e.printStackTrace()
} finally {
if (null != stmt) {
stmt.close()
}
if (null != conn) {
conn.close()
}
}
- 輸入Ctrl+D 結束文本輸入,即可看到運作結果, 會顯示類似如下資訊:
// Exiting paste mode, now interpreting.
1 | 1
2 | 2
2.4.2 DataFrame方式通路
DataFrame方式隻能進行讀寫,建表操作和删表操作需要使用Phoenix Statement方式。
2.4.2.1 DataFrame方式讀
輸入:paste粘貼以下文本,然後輸入Ctrl+D後開始運作。記得修改url,user,password資訊。
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "driver" -> "org.apache.phoenix.queryserver.client.Driver", "dbtable" -> "TEST","fetchsize" -> "10000", "user" -> "xxxx", "password" -> "xxxx")).load()
jdbcDF.show()
2.4.2.1 DataFrame方式寫
import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{IntegerType,StructField, StructType}
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
//建立schema
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
//建立Row對象,每個Row對象都是rowRDD中的一行
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
//建立起Row對象和模式之間的對應關系,也就是把資料和模式對應起來
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//下面建立一個prop變量用來儲存JDBC連接配接參數
val prop = new Properties()
prop.put("user", "xxxx") //表示使用者名是root
prop.put("password", "xxxx") //表示密碼是hadoop
prop.put("driver","org.apache.phoenix.queryserver.client.Driver")
//下面就可以連接配接資料庫,采用append模式,表示追加記錄到資料庫spark的student表中
testDataFrame.write.mode("append").jdbc("jdbc:phoenix:thin:url=http://ld-xxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", prop)
3. Maven工程示例
下面以一個maven工程為例介紹spark對接phoenix輕用戶端的一些基本操作
3.1 建立maven工程
建立名叫demo的maven工程,pom檔案内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.4.3</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-queryserver-client</artifactId>
<version>5.2.1-HBase-2.x</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3.2 scala檔案示例
PhoenixTest1.scala, 記得修改url,user,password資訊。
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import java.sql.{DriverManager, SQLException}
import java.util.Properties
object PhoenixTest1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("spark on phoenix")
val sparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate()
val sc = sparkSession.sparkContext
print("======= start ==========")
val driver = "org.apache.phoenix.queryserver.client.Driver"
val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
val info = new Properties()
info.put("user", "xxxx")
info.put("password", "xxxx")
try {
Class.forName(driver)
} catch {
case e: ClassNotFoundException => e.printStackTrace
}
//statement操作方式, 可以做所有phoenix ddl和dml操作
val conn = DriverManager.getConnection(url, info)
val stmt = conn.createStatement
try {
stmt.execute("drop table if exists test")
stmt.execute("create table test(c1 integer primary key, c2 integer)")
stmt.execute("upsert into test(c1,c2) values(1,1)")
stmt.execute("upsert into test(c1,c2) values(2,2)")
val rs = stmt.executeQuery("select * from test limit 1000")
while (rs.next()) {
println(rs.getString(1) + " | " +
rs.getString(2) )
}
} catch {
case e: SQLException => e.printStackTrace()
} finally {
if (null != stmt) {
stmt.close()
}
if (null != conn) {
conn.close()
}
}
//DataFrame寫入
//生成記錄
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
testDataFrame.show()
// 寫入記錄
testDataFrame
.write
.mode("append")
.jdbc("jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", info)
//DataFrame讀取, option的兩種寫法
val df1 = sqlContext
.read.
format("jdbc")
.options(Map(
"url" -> url,
"driver" -> driver,
"dbtable" -> "TEST",
"fetchsize" -> "10000",
"user" -> "root",
"password" -> "root"))
.load()
df1.show()
val jdbcDF2 = sqlContext.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("dbtable", "test")
.option("fetchsize", "10000")
.option("user", "xxxx")
.option("password", "xxxx")
.load()
jdbcDF2.show()
// 将SQL表寫入parquet檔案
df1.select("*").write.format("parquet").save("file:///Volumes/wukong/data/work/spark/data/test.parquet")
// 從parquet檔案中加載SQL表
val df2 = sqlContext.read.load("file:///Volumes/wukong/data/work/spark/data/test.parquet")
df2.show()
}
}
3.3 java檔案示例
StatementTest.java, 記得修改url,user,password資訊。
import org.apache.phoenix.queryserver.client.Driver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
public class StatementTest {
public static void main(String[] args) {
Connection pconn = null;
Statement stmt = null;
try {
Class.forName(Driver.class.getName());
String url = "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF";
Properties props = new Properties();
props.put("user", "xxxx");
props.put("password", "xxxx");
pconn = DriverManager.getConnection(url, props);
pconn.setAutoCommit(true);
stmt = pconn.createStatement();
stmt.execute("drop table if exists test");
stmt.execute("create table test(c1 integer primary key, c2 integer)");
stmt.execute("upsert into test(c1,c2) values(1,1)");
stmt.execute("upsert into test(c1,c2) values(2,2)");
ResultSet rs = stmt.executeQuery("select * from test limit 1000");
while (rs.next()) {
System.out.println(rs.getString(1) + " | " +
rs.getString(2));
}
stmt.execute("drop table if exists test");
} catch (Throwable e) {
e.printStackTrace();
} finally {
try {
if (pconn != null) {
pconn.close();
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}
3.4 打包
mvn clean package -DskipTests
target下生成demo-1.0-SNAPSHOT.jar
3.5 送出到本地運作
./bin/spark-submit --master local --verbose --class PhoenixTest1 /Volumes/wukong/data/work/spark/demo/target/demo-1.0-SNAPSHOT.jar