天天看點

Spark on Phoenix 4.x Connector:如何在Spark側設定Phoenix參數前言案例描述詳細步驟小結

前言

X-Pack Spark可以使用Spark on Phoenix 4.x Connector直接對接Phoenix資料庫,讀取Phoenix資料表資料。有時在讀取Phoenix時需要設定Phoenix的一些參數,例如Phoenix為了保障資料庫的穩定性,預設開了索引包含,即查詢Phoebe表必須要帶上索引或者主鍵字段作為過濾條件。此時Spark作為查詢Phoenix資料庫的用戶端需要有傳遞參數的能力。本文就列舉了Spark側傳遞Phoenix參數的方法。

注意:本文的案例以X-Pack Spark和HBase SQL(Phoenix) 4.x作為背景。

案例描述

在Spark側設定Phoenix的參數常見的有如下:

  1. phoenix.force.index,查詢Phoenix的SQL語句中的過濾字段是否必須建立索引。本文以這個參數為例,具體使用方法見詳細步驟。
  2. phoenix.no.index,是否不走索引。預設值為false,即查詢語句會掃描索引表,如果過濾字段在索引表中;如果設定為true,則查詢語句不會掃描索引表,即使過濾字段在索引表中。

詳細步驟

提前在Phoenix中建立一張表,表的建立指令如下:

#建立語句
CREATE TABLE IF NOT EXISTS us_population (
   state CHAR(2) NOT NULL,
   city VARCHAR NOT NULL,
   population BIGINT
   CONSTRAINT my_pk PRIMARY KEY (state, city));
#插入資料語句  
UPSERT INTO us_population VALUES('NY','New York',8143197);
UPSERT INTO us_population VALUES('CA','Los Angeles',3844829);
UPSERT INTO us_population VALUES('IL','Chicago',2842518);
UPSERT INTO us_population VALUES('TX','Houston',2016582);
UPSERT INTO us_population VALUES('PA','Philadelphia',1463281);
UPSERT INTO us_population VALUES('AZ','Phoenix',1461575);
UPSERT INTO us_population VALUES('TX','San Antonio',1256509);
UPSERT INTO us_population VALUES('CA','San Diego',1255540);
UPSERT INTO us_population VALUES('TX','Dallas',1213825);
UPSERT INTO us_population VALUES('CA','San Jose',912332);           

Phoenix表建立完畢後在Phoenix用戶端運作如下查詢SQL

select * from us_population where population = 912332           

運作上面的SQL會報錯,大緻内容如下:

org.apache.phoenix.optimize.ForceIndexException: Default enable force index, please set phoenix.force.index=false to disable. The filters must be contains one index column at least.           

意思是過濾條件必須包含至少一個索引字段,可以通過設定phoenix.force.index=false來關閉這個限制。

在Spark側通過Spark on Phoenix 4.x Connectors讀取Phoenix資料表也會有這個限制。下面介紹下在Spark側如何設定Phoenix的參數。

1、通過Spark ThriftServer 執行SQL語句。

通過SQL語句的方式首先要在Spark側建立一個Phoenix表的映射。建表語句如下:

CREATE TABLE spark_on_phoenix01 USING org.apache.phoenix.spark
OPTIONS (
'zkUrl' '${ZK連結位址}',
'table' 'us_population'
);           

建立完畢後直接在Spark運作查詢語句“select * from spark_on_phoenix01 where population = 912332”也會報相同的錯誤,此時可以通過set phoenix.force.index=false方法設定,如下:

select * from spark_on_phoenix01 where population=912332;
Error: java.lang.RuntimeException: org.apache.phoenix.optimize.ForceIndexException: ERROR 599 (42913): Default enable force index, please set phoenix.force.index=false to disable. The filters must be contains one index column at least. tableName=US_POPULATION (state=,code=0)
0: jdbc:hive2://ap-wz92zrkxow69379w8-master2-> set phoenix.force.index=false;
+----------------------+--------+--+
|         key          | value  |
+----------------------+--------+--+
| phoenix.force.index  | false  |
+----------------------+--------+--+
1 row selected (0.021 seconds)
0: jdbc:hive2://ap-wz92zrkxow69379w8-master2-> select * from spark_on_phoenix01 where population=912332;
+--------+-----------+-------------+--+
| STATE  |   CITY    | POPULATION  |
+--------+-----------+-------------+--+
| CA     | San Jose  | 912332      |
+--------+-----------+-------------+--+           

注意:上述設定方法隻在目前session有效。

2、通過寫代碼SparkSession調用SQL,代碼如下:

//建立SparkSession
val sparkSession = SparkSession
      .builder()
      .enableHiveSupport()
      .appName("spark on phoenix4x")
      .getOrCreate()
//方法1:通過在sql中執行set phoenix.force.index=false 設定Phoenix參數
sparkSession.sql("set phoenix.force.index=false")
querySql = "select * from " + sparkTableName + " where population = 912332"
var result = sparkSession.sql(querySql)
result.show()

//方法2:通過在sparkSession的配置中設定 phoenix.force.index=false 設定Phoenix參數
sparkSession.sqlContext.setConf("phoenix.force.index", "false")
querySql = "select * from " + sparkTableName + " where population = 912332"
var result = sparkSession.sql(querySql)
result.show()           

3、通過SparkSession.read擷取Phoenix表資料,設定參數方法如下:

//初始化sparkSession時加入設定
val sparkSession = SparkSession
    .builder()
    .config("phoenix.force.index", false)
    .enableHiveSupport()
    .appName("spark on phoenix4x")
    .getOrCreate()

sparkSession
    .read
    .format("org.apache.phoenix.spark")
    .option("table", phoenixTableName)
    .option("zkUrl", zkAddress)
    .load()
    .show()           

4、通過X-Pack Spark 控制台--conf傳遞參數。

通過控制傳遞參數的方法如下:

--conf spark.hadoop.phoenix.force.index=false           

如下圖:

Spark on Phoenix 4.x Connector:如何在Spark側設定Phoenix參數前言案例描述詳細步驟小結

注意:因為Spark擷取--conf傳遞參數的機制會過濾“spark.hadoop”開頭的參數,是以通過--conf設定的參數需要在參數前面加上“spark.hadoop.”,即完成的參數為spark.hadoop.phoenix.force.index=false。

小結

Spark 對接Phoenix 4.x 方法可以參考:

Spark對接Phoenix4.x快速入門

X-Pack Spark詳細介紹可參考:

Spark 基本介紹

Phoenix 介紹可參考:

HBase SQL(Phoenix) 4.x 使用說明