最近需要做一個通過java送出Impala查詢的功能,翻閱了一些資料,可以通過Cloudera Manager提供的API實作Impala查詢

但是多了一層CM平台,如果CM平台不穩定,很可能會導緻Impala查詢出錯。如果能直接讓Impala查詢并傳回結果是最好的方式了。
參考GitHub上一位大佬的代碼,網上并沒有直接可用的jar包或者java檔案,需要通過thrift檔案(可從hue源碼中扒需要的thrift檔案)生成java檔案,添加libthrift-0.9.1.jar、hive-service.jar、hive-metastore.jar、slf4j.api-1.6.1.jar.zip和commons-lang3-3.3.2-bin.tar.gz依賴,然後就能使用了,其中hive-service和hive-metastore版本根據自己使用情況而定,詳細的操作可以看大佬代碼中的build.sh檔案。
下面是代碼示例
- submitQuery
public static HashMap<String, Object> submitQuery(String host, int port, String sql, String username, String password) throws Exception {
TSocket transport = new TSocket(host, port, 99999);
TBinaryProtocol protocol = new TBinaryProtocol(transport);
TCLIService.Client client = new ImpalaHiveServer2Service.Client(protocol);
transport.open();
TOpenSessionReq openReq = new TOpenSessionReq();
openReq.setClient_protocol(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
openReq.setUsername(username);
openReq.setPassword(password);
TOpenSessionResp openResp = client.OpenSession(openReq);
TSessionHandle sessHandle = openResp.getSessionHandle();
TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
//設定異步
execReq.setRunAsync(true);
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
TStatus status = execResp.getStatus();
TOperationHandle tOperationHandle = execResp.getOperationHandle();
HashMap<String, Object> result = new HashMap<>();
if (tOperationHandle == null) {
String msg = execResp.getStatus().getErrorMessage();
result.put("status", "error");
result.put("data", msg);
}
TOperationState queryHandleStatus = getQueryHandleStatus(client, tOperationHandle);
if (queryHandleStatus == TOperationState.RUNNING_STATE) {
getQueryLog(client, tOperationHandle);
result.put("type", "log");
result.put("data", getQueryLog(client, tOperationHandle));
} else if (queryHandleStatus == TOperationState.FINISHED_STATE) {
result.put("type", "result");
result.put("data", getResult(client, tOperationHandle, sql));
} else if (status.getStatusCode() == TStatusCode.ERROR_STATUS) {
String msg = status.getErrorMessage();
result.put("type", "error");
result.put("data", msg);
}
TCloseOperationReq closeReq = new TCloseOperationReq();
closeReq.setOperationHandle(tOperationHandle);
client.CloseOperation(closeReq);
TCloseSessionReq closeConnectionReq = new TCloseSessionReq(sessHandle);
client.CloseSession(closeConnectionReq);
transport.close();
return result;
}
- getResult
public static ArrayList<HashMap<String, Object>> getResult(TCLIService.Client client, TOperationHandle tOperationHandle, String sql) throws TException {
//擷取列名
TGetResultSetMetadataReq metadataReq = new TGetResultSetMetadataReq(tOperationHandle);
TGetResultSetMetadataResp metadataResp = client.GetResultSetMetadata(metadataReq);
TTableSchema tableSchema = metadataResp.getSchema();
ArrayList<String> columns = new ArrayList<>();
if (tableSchema != null) {
List<TColumnDesc> columnDescs = tableSchema.getColumns();
for (TColumnDesc tColumnDesc : columnDescs) {
columns.add(tColumnDesc.getColumnName());
}
}
//擷取資料
TFetchResultsReq fetchReq = new TFetchResultsReq();
fetchReq.setOperationHandle(tOperationHandle);
fetchReq.setMaxRows(100);
//org.apache.hive.service.cli.thrift.TFetchOrientation.FETCH_NEXT
TFetchResultsResp resultsResp = client.FetchResults(fetchReq);
TStatus status = resultsResp.getStatus();
if (status.getStatusCode() == TStatusCode.ERROR_STATUS) {
String msg = status.getErrorMessage();
System.out.println(msg + "," + status.getSqlState() + "," + Integer.toString(status.getErrorCode()) + "," + status.isSetInfoMessages());
System.out.println("After FetchResults: " + sql);
}
TRowSet resultsSet = resultsResp.getResults();
List<TRow> resultRows = resultsSet.getRows();
ArrayList<HashMap<String, Object>> retArray = new ArrayList<>();
for (TRow resultRow : resultRows) {
List<TColumnValue> row = resultRow.getColVals();
List<Object> list_row = new ArrayList<>();
for (TColumnValue field : row) {
if (field.isSetStringVal()) {
list_row.add(field.getStringVal().getValue());
} else if (field.isSetDoubleVal()) {
list_row.add(field.getDoubleVal().getValue());
} else if (field.isSetI16Val()) {
list_row.add(field.getI16Val().getValue());
} else if (field.isSetI32Val()) {
list_row.add(field.getI32Val().getValue());
} else if (field.isSetI64Val()) {
list_row.add(field.getI64Val().getValue());
} else if (field.isSetBoolVal()) {
list_row.add(field.getBoolVal().isValue());
} else if (field.isSetByteVal()) {
list_row.add(field.getByteVal().getValue());
}
}
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < columns.size(); i++) {
map.put(columns.get(i), list_row.get(i));
}
retArray.add(map);
}
return retArray;
}
- getQueryLog
public static String getQueryLog(TCLIService.Client client, TOperationHandle tOperationHandle) throws Exception {
String log = null;
if (tOperationHandle != null) {
TGetLogReq tGetLogReq = new TGetLogReq(tOperationHandle);
TGetLogResp logResp = client.GetLog(tGetLogReq);
log = logResp.getLog();
}
return log;
}
- getQueryHandleStatus
public static TOperationState getQueryHandleStatus(TCLIService.Client client, TOperationHandle tOperationHandle) throws Exception {
TOperationState tOperationState = null;
if (tOperationHandle != null) {
TGetOperationStatusReq statusReq = new TGetOperationStatusReq(tOperationHandle);
TGetOperationStatusResp statusResp = client.GetOperationStatus(statusReq);
tOperationState = statusResp.getOperationState();
}
return tOperationState;
}