前言
X-Pack Spark可以使用Spark on Phoenix 4.x Connector直接對接Phoenix資料庫,讀取Phoenix資料表資料。有時在讀取Phoenix時需要設定Phoenix的一些參數,例如Phoenix為了保障資料庫的穩定性,預設開了索引包含,即查詢Phoebe表必須要帶上索引或者主鍵字段作為過濾條件。此時Spark作為查詢Phoenix資料庫的用戶端需要有傳遞參數的能力。本文就列舉了Spark側傳遞Phoenix參數的方法。
注意:本文的案例以X-Pack Spark和HBase SQL(Phoenix) 4.x作為背景。
案例描述
在Spark側設定Phoenix的參數常見的有如下:
- phoenix.force.index,查詢Phoenix的SQL語句中的過濾字段是否必須建立索引。本文以這個參數為例,具體使用方法見詳細步驟。
- phoenix.no.index,是否不走索引。預設值為false,即查詢語句會掃描索引表,如果過濾字段在索引表中;如果設定為true,則查詢語句不會掃描索引表,即使過濾字段在索引表中。
詳細步驟
提前在Phoenix中建立一張表,表的建立指令如下:
#建立語句
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
#插入資料語句
UPSERT INTO us_population VALUES('NY','New York',8143197);
UPSERT INTO us_population VALUES('CA','Los Angeles',3844829);
UPSERT INTO us_population VALUES('IL','Chicago',2842518);
UPSERT INTO us_population VALUES('TX','Houston',2016582);
UPSERT INTO us_population VALUES('PA','Philadelphia',1463281);
UPSERT INTO us_population VALUES('AZ','Phoenix',1461575);
UPSERT INTO us_population VALUES('TX','San Antonio',1256509);
UPSERT INTO us_population VALUES('CA','San Diego',1255540);
UPSERT INTO us_population VALUES('TX','Dallas',1213825);
UPSERT INTO us_population VALUES('CA','San Jose',912332);
Phoenix表建立完畢後在Phoenix用戶端運作如下查詢SQL
select * from us_population where population = 912332
運作上面的SQL會報錯,大緻内容如下:
org.apache.phoenix.optimize.ForceIndexException: Default enable force index, please set phoenix.force.index=false to disable. The filters must be contains one index column at least.
意思是過濾條件必須包含至少一個索引字段,可以通過設定phoenix.force.index=false來關閉這個限制。
在Spark側通過Spark on Phoenix 4.x Connectors讀取Phoenix資料表也會有這個限制。下面介紹下在Spark側如何設定Phoenix的參數。
1、通過Spark ThriftServer 執行SQL語句。
通過SQL語句的方式首先要在Spark側建立一個Phoenix表的映射。建表語句如下:
CREATE TABLE spark_on_phoenix01 USING org.apache.phoenix.spark
OPTIONS (
'zkUrl' '${ZK連結位址}',
'table' 'us_population'
);
建立完畢後直接在Spark運作查詢語句“select * from spark_on_phoenix01 where population = 912332”也會報相同的錯誤,此時可以通過set phoenix.force.index=false方法設定,如下:
select * from spark_on_phoenix01 where population=912332;
Error: java.lang.RuntimeException: org.apache.phoenix.optimize.ForceIndexException: ERROR 599 (42913): Default enable force index, please set phoenix.force.index=false to disable. The filters must be contains one index column at least. tableName=US_POPULATION (state=,code=0)
0: jdbc:hive2://ap-wz92zrkxow69379w8-master2-> set phoenix.force.index=false;
+----------------------+--------+--+
| key | value |
+----------------------+--------+--+
| phoenix.force.index | false |
+----------------------+--------+--+
1 row selected (0.021 seconds)
0: jdbc:hive2://ap-wz92zrkxow69379w8-master2-> select * from spark_on_phoenix01 where population=912332;
+--------+-----------+-------------+--+
| STATE | CITY | POPULATION |
+--------+-----------+-------------+--+
| CA | San Jose | 912332 |
+--------+-----------+-------------+--+
注意:上述設定方法隻在目前session有效。
2、通過寫代碼SparkSession調用SQL,代碼如下:
//建立SparkSession
val sparkSession = SparkSession
.builder()
.enableHiveSupport()
.appName("spark on phoenix4x")
.getOrCreate()
//方法1:通過在sql中執行set phoenix.force.index=false 設定Phoenix參數
sparkSession.sql("set phoenix.force.index=false")
querySql = "select * from " + sparkTableName + " where population = 912332"
var result = sparkSession.sql(querySql)
result.show()
//方法2:通過在sparkSession的配置中設定 phoenix.force.index=false 設定Phoenix參數
sparkSession.sqlContext.setConf("phoenix.force.index", "false")
querySql = "select * from " + sparkTableName + " where population = 912332"
var result = sparkSession.sql(querySql)
result.show()
3、通過SparkSession.read擷取Phoenix表資料,設定參數方法如下:
//初始化sparkSession時加入設定
val sparkSession = SparkSession
.builder()
.config("phoenix.force.index", false)
.enableHiveSupport()
.appName("spark on phoenix4x")
.getOrCreate()
sparkSession
.read
.format("org.apache.phoenix.spark")
.option("table", phoenixTableName)
.option("zkUrl", zkAddress)
.load()
.show()
4、通過X-Pack Spark 控制台--conf傳遞參數。
通過控制傳遞參數的方法如下:
--conf spark.hadoop.phoenix.force.index=false
如下圖:

注意:因為Spark擷取--conf傳遞參數的機制會過濾“spark.hadoop”開頭的參數,是以通過--conf設定的參數需要在參數前面加上“spark.hadoop.”,即完成的參數為spark.hadoop.phoenix.force.index=false。
小結
Spark 對接Phoenix 4.x 方法可以參考:
Spark對接Phoenix4.x快速入門。
X-Pack Spark詳細介紹可參考:
Spark 基本介紹Phoenix 介紹可參考:
HBase SQL(Phoenix) 4.x 使用說明