天天看点

通过Impala thrift API接口进行Impala查询

最近需要做一个通过java提交Impala查询的功能,翻阅了一些资料,可以通过Cloudera Manager提供的API实现Impala查询

通过Impala thrift API接口进行Impala查询

但是多了一层CM平台,如果CM平台不稳定,很可能会导致Impala查询出错。如果能直接让Impala查询并返回结果是最好的方式了。

参考GitHub上一位大佬的代码,网上并没有直接可用的jar包或者java文件,需要通过thrift文件(可从hue源码中扒需要的thrift文件)生成java文件,添加libthrift-0.9.1.jar、hive-service.jar、hive-metastore.jar、slf4j.api-1.6.1.jar.zip和commons-lang3-3.3.2-bin.tar.gz依赖,然后就能使用了,其中hive-service和hive-metastore版本根据自己使用情况而定,详细的操作可以看大佬代码中的build.sh文件。

下面是代码示例

  • submitQuery
public static HashMap<String, Object> submitQuery(String host, int port, String sql, String username, String password) throws Exception {

        TSocket transport = new TSocket(host, port, 99999);
        TBinaryProtocol protocol = new TBinaryProtocol(transport);
        TCLIService.Client client = new ImpalaHiveServer2Service.Client(protocol);
        transport.open();

        TOpenSessionReq openReq = new TOpenSessionReq();
        openReq.setClient_protocol(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
        openReq.setUsername(username);
        openReq.setPassword(password);

        TOpenSessionResp openResp = client.OpenSession(openReq);

        TSessionHandle sessHandle = openResp.getSessionHandle();

        TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
        //设置异步
        execReq.setRunAsync(true);

        TExecuteStatementResp execResp = client.ExecuteStatement(execReq);

        TStatus status = execResp.getStatus();

        TOperationHandle tOperationHandle = execResp.getOperationHandle();
        HashMap<String, Object> result = new HashMap<>();

        if (tOperationHandle == null) {
            String msg = execResp.getStatus().getErrorMessage();
            result.put("status", "error");
            result.put("data", msg);
        }

        TOperationState queryHandleStatus = getQueryHandleStatus(client, tOperationHandle);
        if (queryHandleStatus == TOperationState.RUNNING_STATE) {
            getQueryLog(client, tOperationHandle);
            result.put("type", "log");
            result.put("data", getQueryLog(client, tOperationHandle));

        } else if (queryHandleStatus == TOperationState.FINISHED_STATE) {
            result.put("type", "result");
            result.put("data", getResult(client, tOperationHandle, sql));

        } else if (status.getStatusCode() == TStatusCode.ERROR_STATUS) {
            String msg = status.getErrorMessage();
            result.put("type", "error");
            result.put("data", msg);
        }

        TCloseOperationReq closeReq = new TCloseOperationReq();
        closeReq.setOperationHandle(tOperationHandle);
        client.CloseOperation(closeReq);
        TCloseSessionReq closeConnectionReq = new TCloseSessionReq(sessHandle);
        client.CloseSession(closeConnectionReq);

        transport.close();

        return result;
    }
           
  • getResult
public static ArrayList<HashMap<String, Object>> getResult(TCLIService.Client client, TOperationHandle tOperationHandle, String sql) throws TException {
        //获取列名
        TGetResultSetMetadataReq metadataReq = new TGetResultSetMetadataReq(tOperationHandle);
        TGetResultSetMetadataResp metadataResp = client.GetResultSetMetadata(metadataReq);
        TTableSchema tableSchema = metadataResp.getSchema();
        ArrayList<String> columns = new ArrayList<>();
        if (tableSchema != null) {
            List<TColumnDesc> columnDescs = tableSchema.getColumns();
            for (TColumnDesc tColumnDesc : columnDescs) {
                columns.add(tColumnDesc.getColumnName());
            }
        }

        //获取数据
        TFetchResultsReq fetchReq = new TFetchResultsReq();
        fetchReq.setOperationHandle(tOperationHandle);
        fetchReq.setMaxRows(100);
        //org.apache.hive.service.cli.thrift.TFetchOrientation.FETCH_NEXT
        TFetchResultsResp resultsResp = client.FetchResults(fetchReq);
        TStatus status = resultsResp.getStatus();
        if (status.getStatusCode() == TStatusCode.ERROR_STATUS) {
            String msg = status.getErrorMessage();
            System.out.println(msg + "," + status.getSqlState() + "," + Integer.toString(status.getErrorCode()) + "," + status.isSetInfoMessages());
            System.out.println("After FetchResults: " + sql);
        }
        TRowSet resultsSet = resultsResp.getResults();
        List<TRow> resultRows = resultsSet.getRows();

        ArrayList<HashMap<String, Object>> retArray = new ArrayList<>();
        for (TRow resultRow : resultRows) {
            List<TColumnValue> row = resultRow.getColVals();
            List<Object> list_row = new ArrayList<>();
            for (TColumnValue field : row) {
                if (field.isSetStringVal()) {
                    list_row.add(field.getStringVal().getValue());
                } else if (field.isSetDoubleVal()) {
                    list_row.add(field.getDoubleVal().getValue());
                } else if (field.isSetI16Val()) {
                    list_row.add(field.getI16Val().getValue());
                } else if (field.isSetI32Val()) {
                    list_row.add(field.getI32Val().getValue());
                } else if (field.isSetI64Val()) {
                    list_row.add(field.getI64Val().getValue());
                } else if (field.isSetBoolVal()) {
                    list_row.add(field.getBoolVal().isValue());
                } else if (field.isSetByteVal()) {
                    list_row.add(field.getByteVal().getValue());
                }
            }
            HashMap<String, Object> map = new HashMap<>();
            for (int i = 0; i < columns.size(); i++) {
                map.put(columns.get(i), list_row.get(i));
            }
            retArray.add(map);
        }
        return retArray;
    }
           
  • getQueryLog
public static String getQueryLog(TCLIService.Client client, TOperationHandle tOperationHandle) throws Exception {
        String log = null;
        if (tOperationHandle != null) {
            TGetLogReq tGetLogReq = new TGetLogReq(tOperationHandle);
            TGetLogResp logResp = client.GetLog(tGetLogReq);
            log = logResp.getLog();
        }
        return log;
    }
           
  • getQueryHandleStatus
public static TOperationState getQueryHandleStatus(TCLIService.Client client, TOperationHandle tOperationHandle) throws Exception {

        TOperationState tOperationState = null;
        if (tOperationHandle != null) {
            TGetOperationStatusReq statusReq = new TGetOperationStatusReq(tOperationHandle);
            TGetOperationStatusResp statusResp = client.GetOperationStatus(statusReq);
            tOperationState = statusResp.getOperationState();
        }
        return tOperationState;
    }