天天看點

NIFI自定義開發API-HTTP分頁功能

轉載請注明位址: https://blog.csdn.net/u011291276/article/details/107213758

1、摘要

NIFI自定義開發搭建配置請參考:https://blog.csdn.net/u011291276/article/details/107211923

此文檔目的為:在NIFI ETL中開發自定義HTTP分頁查詢,通過接口響應回的總條數和總頁數,
生成多個分頁查詢的API接口位址,然後逐一進行請求擷取響應資訊;
HTTP分頁查詢一般為POST查詢,此流程第二個元件同時也配置GET請求的分頁查詢;
           

2、最終結果展示

HTTP分頁查詢的流程邏輯為:第一次查詢,擷取接口響應的總條數;本地封裝分頁查詢的多個URL;多個URL逐一查詢擷取每頁資料;

核心插件及流程如下:

NIFI自定義開發API-HTTP分頁功能

下面将逐個說明這三個元件,并直接貼上源代碼。

3、元件1—— LeachHttpInvoke

LeachHttpInvoke元件在運作時,僅請求一條資料,目的隻為擷取接口響應的總條數或總頁數;
基本參數大家都能看懂,其中body體和header頭的格式都是對象字元串;
ReqMsg PassDown :參數有兩個值,
	false:不走分頁查詢,直接傳回響應結果。此目的為不使用分頁查詢時使用,不過正常情況下,
		不使用分頁查詢時可以使用NIFI自帶的元件InvokeHTTP請求;
	true:開始走分頁查詢,此元件在傳回資料時,參數格式會有變化;
           
NIFI自定義開發API-HTTP分頁功能

源碼:

package com.leach.nifi.processor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.leach.nifi.common.HttpRequestClient;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.springframework.http.MediaType;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;

/**
* @title   自定義HTTP 增加請求體輸入框
* @author   xiaobin.wang
* @create   2020/01/08
*/

@SupportsBatching
@Tags({"leach","http", "https", "rest", "client"})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("HTTP RESTFulApi請求。支援分頁查詢操作(連接配接LeachHttpPagingHandle-LeachHttpPagingRequest)")
public class LeachHttpInvoke extends AbstractProcessor {

    public static final String DEFAULT_CONTENT_TYPE = "application/json";

    private static final Map<String, String> excludedHeaders = new HashMap<String, String>();

    private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.SIMPLIFIED_CHINESE).withZoneUTC();

    // properties
    public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
            .name("HTTP Method")
            .description("HTTP request method (GET, POST)")
            .required(true)
            .defaultValue("GET")
            .allowableValues("POST","GET")
            .build();

    public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
            .name("Remote URL")
            .description("Remote URL which will be connected to, including scheme, host, port, path.")
            .required(true)
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .addValidator(StandardValidators.URL_VALIDATOR)
            .build();
    public static final PropertyDescriptor REQUEST_BODY = new PropertyDescriptor.Builder()
            .name("Request Body")
            .description("請求體參數 JSON字元串格式對象 {\"key\":\"value\"}")
            .required(false)
            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
            .name("Content Type")
            .description("Content Type(GET請求時無效)")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .allowableValues("raw","x-www-form-urlencoded","form-data")
            .defaultValue("raw")
            .build();
    public static final PropertyDescriptor REQUEST_HEADER = new PropertyDescriptor.Builder()
            .name("Request Header")
            .description("請求頭 JSON字元串格式對象 {\"key\":\"value\"}。此處不需要配置content-type。")
            .required(false)
            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final PropertyDescriptor PROP_PASS_DOWN = new PropertyDescriptor.Builder()
            .name("ReqMsg PassDown")
            .description("是否允許請求資訊向下傳遞。分頁查詢需向下傳遞({\"url\":\"\",\"header\":{},\"body\":{})")
            .required(false)
            .defaultValue("false")
            .allowableValues("true", "false")
            .build();

    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
            PROP_METHOD,
            PROP_URL,
            REQUEST_BODY,
            REQUEST_HEADER,
            CONTENT_TYPE,
            PROP_PASS_DOWN));

    public static final Relationship FAILURE = new Relationship.Builder()
            .name("failure")
            .description("執行異常或失敗,傳回此标志")
            .build();
    public static final Relationship SUCCESS = new Relationship.Builder()
            .name("success")
            .description("RESTFul Api執行成功,傳回此URL響應的内容資訊。" +
                    "當ReqMsg PassDown選擇true時,将傳回帶有請求參數的封裝體{\"reqMsg\":Object,\"respMsg\":Object}。" +
                    "當值為false時,直接傳回url請求結果Object")
            .build();

    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
            FAILURE,SUCCESS)));

    @Override
    public Set<Relationship> getRelationships() {
        return this.RELATIONSHIPS;
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }


    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        final ComponentLog logger =getLogger();
        FlowFile requestFlowFile = session.get();
        if(requestFlowFile==null){
            requestFlowFile=session.create();
        }
        final String request_url = context.getProperty(PROP_URL).getValue();
        final String request_method=context.getProperty(PROP_METHOD).getValue();
        final String content_type=context.getProperty(CONTENT_TYPE).getValue();
        final boolean isPassDown=context.getProperty(PROP_PASS_DOWN).asBoolean();
        //請求頭
        final String request_header = context.getProperty(REQUEST_HEADER).getValue();

        //請求參數body處理開始
        String reqMsg="";
        //擷取輸入參數
        if(context.getProperty(REQUEST_BODY).isSet()){
            final String request_body = context.getProperty(REQUEST_BODY).getValue();
            reqMsg=request_body;
        }else{
            final String[] req_bodys = new String[1];
            //擷取請求體
            requestFlowFile = session.write(requestFlowFile, new StreamCallback() {
                @Override
                public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException {
                    req_bodys[0] = IOUtils.toString(rcdIn,"utf-8");
                }
            });
            reqMsg=req_bodys[0];
        }

        try {
            String respMsg=null;
            //發送HTTP請求
            if("POST".equals(request_method)){
                respMsg= HttpRequestClient.post(request_url,reqMsg,content_type,request_header);
            }
            if("GET".equals(request_method)){
                respMsg= HttpRequestClient.get(request_url);
            }
            logger.debug("URL:"+request_url+"\n響應資訊:"+respMsg);
            //移除上遊下來的資料
            session.remove(requestFlowFile);
            //建立響應
            FlowFile responseFlowFile = session.create();
            //向下傳遞則封裝資訊體
            if (isPassDown) {
                //将請求URL和參數、請求頭封裝進map,供後續分頁功能使用,或其需要的地方
                Map<String,String> request=new HashMap<String,String>();
                request.put("url",request_url);
                request.put("header",request_header);
                request.put("reqMsg",reqMsg);
                request.put("respMsg",respMsg);
                request.put("method",request_method);

                String requestMsg=JSONObject.toJSONString(request);
                responseFlowFile = session.write(responseFlowFile, new OutputStreamCallback() {
                    @Override
                    public void process(final OutputStream out) throws IOException {
                        out.write(requestMsg.getBytes("UTF-8"));
                    }
                });
            }else{
                //響應資訊
                final byte[] respByte =respMsg.getBytes("utf-8");
                if (respByte.length > 0) {
                    responseFlowFile = session.write(responseFlowFile, new OutputStreamCallback() {
                        @Override
                        public void process(final OutputStream out) throws IOException {
                            out.write(respByte);
                        }
                    });
                }
            }
            if (responseFlowFile != null) {
                session.getProvenanceReporter().fetch(responseFlowFile, request_url);
            }
            //接口響應資訊
            responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
            //直接傳回響應結果
            session.transfer(responseFlowFile, SUCCESS);
        } catch (Exception e) {
            // penalize or yield
            logger.error("Leach HTTP 異常:",e);
            try {
                if (requestFlowFile != null) {
                    session.remove(requestFlowFile);
                }
            } catch (final Exception e1) {
                logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
            }
            logger.error("LeachHttpInvoke 執行異常:{}",e);
            FlowFile flowFile=session.create();
            final String errMsg="LeachHttpInvoke 執行異常:"+e.getMessage();
            flowFile=session.write(flowFile,out -> out.write(errMsg.getBytes("utf-8")));
            session.transfer(flowFile, FAILURE);
        }
    }
}

           

4、元件2——LeachHttpPagingHandle

LeachHttpPagingHandle 此元件 為分頁查詢的核心,即根據收到的總條數、總頁數和自定義配置的分頁參數,
	進行組裝分頁URL,然後傳回多個URL,每個URL單獨傳回,在隊列中也是有多個queued資料;
考慮到分頁參數可能存在于嵌套JSON中,是以擷取分頁參數屬性名可以使用“:”來進行提取;

Request Paging Size AttrName :請求中每頁資料量的屬性名,若存在于嵌套中,則使用英文“:”号分割(POST請求必填)。
	例:key:key1:key2 ;
Request Paging Index AttrName : 請求中分頁頁碼的屬性名,若存在于嵌套中,則使用英文“:”号分割(POST請求必填)。
	例:key:key1:key2 ;
Response DataSize AttrName : 響應内容中總條數的屬性名,若提取嵌套中的屬性名,則使用英文“:”号分割。
	例:key:key1:key2 ;
Total Page Size : 總分頁頁碼數。若不填"Response  DataSize AttrName",則将此屬性值設定的頁碼數作為總頁數進行分頁查詢;
Get Paging URL : GET請求需循環分頁時使用,頁碼大小實際參數使用\"${size}\"替換,分頁頁碼實際參數使用\"${index}\"替換。	
	處理時會根據響應計算出的總頁數依次替換這兩個變量,傳回多個分頁後的URL。
	例:http://127.0.0.1:8080/get?attr1=參數1&pageSize=${size}&pageIndex=${index} 。
	此參數主要為GET請求分頁使用。
           
NIFI自定義開發API-HTTP分頁功能

源碼:

package com.leach.nifi.processor;

import com.alibaba.fastjson.JSONObject;
import com.leach.nifi.common.LeachCommon;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;

/**
* @title    
* @author   xiaobin.wang
* @create   2020/01/02
*/
//不需要關注上下文
@SideEffectFree
//支援批量
@SupportsBatching
//用于标記這個processor的标簽,可以用于搜尋
@Tags({ "leach","add", "value", "avro" ,"static","param"})
// 聲明,對于該processor輸入是必須的
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
//針對這個processor的注釋
@CapabilityDescription("HTTP  RESTFulApi請求,接收的資料格式為JSON對象:{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\",\"connectTimeout\":\"\",\"readTimeout\":\"\"}")
@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json")
public class LeachHttpPagingHandle extends AbstractProcessor {

    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;

    public static final PropertyDescriptor PAGE_SIZE=new PropertyDescriptor.Builder()
            .name("Page Size")
            .required(true)
            .addValidator(StandardValidators.NUMBER_VALIDATOR)
            .defaultValue("200")
            .description("分頁頁碼大小")
            .build();
    public static final PropertyDescriptor PAGING_SIZE_ATTR_NAME = new PropertyDescriptor.Builder()
            // 參數名,輸入框前展示的内容
            .name("Request Paging Size AttrName")
            // 是否必填
            .required(false)
            // 添加過濾器
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .description("請求中每頁資料量的屬性名,若存在于嵌套中,則使用英文“:”号分割(POST請求必填)。例:key:key1:key2")
            // 内容添加完成後建構
            .build();
    public static final PropertyDescriptor PAGING_INDEX_ATTR_NAME = new PropertyDescriptor.Builder()
            // 參數名,輸入框前展示的内容
            .name("Request Paging Index AttrName")
            // 是否必填
            .required(false)
            // 添加過濾器
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .description("請求中分頁頁碼的屬性名,若存在于嵌套中,則使用英文“:”号分割(POST請求必填)。例:key:key1:key2")
            // 内容添加完成後建構
            .build();
    public static final PropertyDescriptor DATASIZE_ATTR_NAME = new PropertyDescriptor.Builder()
            // 參數名,輸入框前展示的内容
            .name("Response DataSize AttrName")
            // 是否必填
            .required(false)
            // 添加過濾器
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .description("響應内容中總條數的屬性名,若提取嵌套中的屬性名,則使用英文“:”号分割。例:key:key1:key2")
            // 内容添加完成後建構
            .build();

    public static final PropertyDescriptor TOTAL_PAGE_SIZE = new PropertyDescriptor.Builder()
            // 參數名,輸入框前展示的内容
            .name("Total Page Size")
            // 是否必填
            .required(false)
            // 添加過濾器
            .addValidator(StandardValidators.NUMBER_VALIDATOR)
            .description("總分頁頁碼數。若不填\"DataSize AttrName\",則将此屬性值設定的頁碼數作為總頁數進行分頁查詢")
            // 内容添加完成後建構
            .build();
    public static final PropertyDescriptor GET_PAGING_URL= new PropertyDescriptor.Builder()
            // 參數名,輸入框前展示的内容
            .name("Get Paging URL")
            // 是否必填
            .required(false)
            // 添加過濾器
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .description("GET請求需循環分頁時使用,頁碼大小實際參數使用\"${size}\"替換,分頁頁碼實際參數使用\"${index}\"替換。" +
                    "\n處理時會根據響應計算出的總頁數依次替換這兩個變量,傳回多個分頁後的URL。" +
                    "\n例:http://127.0.0.1:8080/get?attr1=參數1&pageSize=${size}&pageIndex=${index}")
            // 内容添加完成後建構
            .build();

    public static final Relationship SUCCESS = new Relationship.Builder().name("success")
            .description("API分頁查詢處理成功,傳回資料格式為JSON對象:{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\"}").build();

    public static final Relationship FAILURE = new Relationship.Builder().name("failure").description(
            "API分頁查詢處理異常或失敗,傳回此标志。")
            .build();

    @Override
    public void init(final ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<>();
        properties.add(PAGE_SIZE);
        properties.add(PAGING_SIZE_ATTR_NAME);
        properties.add(PAGING_INDEX_ATTR_NAME);
        properties.add(DATASIZE_ATTR_NAME);
        properties.add(TOTAL_PAGE_SIZE);
        properties.add(GET_PAGING_URL);
        // 防止多線程ADD
        this.properties = Collections.unmodifiableList(properties);
        Set<Relationship> relationships = new HashSet<>();
        relationships.add(SUCCESS);
        relationships.add(FAILURE);
        // 防止多線程ADD
        this.relationships = Collections.unmodifiableSet(relationships);
    }


    @Override
    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        ComponentLog logger=getLogger();
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            getLogger().error("FlowFile is null", new Object[] {});
            return;
        }

        //拿到請求資訊,下邊開始處理分頁參數
        int page_size = processContext.getProperty(PAGE_SIZE).asInteger();
        int start_index = 1;
        String paging_size_attr_name = processContext.getProperty(PAGING_SIZE_ATTR_NAME).getValue();
        String paging_index_attr_name = processContext.getProperty(PAGING_INDEX_ATTR_NAME).getValue();
        String datasize_attr_name = processContext.getProperty(DATASIZE_ATTR_NAME).getValue();
        final String get_paging_uri = processContext.getProperty(GET_PAGING_URL).getValue();

        final String[] _request_str=new String[1];
        try {
            flowFile = processSession.write(flowFile, new StreamCallback() {
                @Override
                public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException {
                    String json = IOUtils.toString(rcdIn,"utf-8");
                    _request_str[0]=json;
                }
            });

            //url method header reqMsg respMsg
            final String httpStr=_request_str[0];
            Map<String,String> httpMap=JSONObject.parseObject(httpStr,Map.class);
            JSONObject reqJson=JSONObject.parseObject(httpMap.get("reqMsg"),JSONObject.class);
            JSONObject respJson=JSONObject.parseObject(httpMap.get("respMsg"),JSONObject.class);
            String method=httpMap.get("method");
            //取出響應資訊後,直接删除
            httpMap.remove("respMsg");

            //計算總頁數
            int total_page=0;
            //判斷是否填寫了total_page_size
            if(processContext.getProperty(TOTAL_PAGE_SIZE).isSet()){
                //如果設定,則按照這個為總頁數
                total_page = processContext.getProperty(TOTAL_PAGE_SIZE).asInteger();
            }else{
                //否則根據響應資訊中傳回的字段來計算總頁數
                String[] attrs=datasize_attr_name.split(":");
                List<String> list=new ArrayList<>();
                Collections.addAll(list,attrs);
                //擷取屬性Value
                Object dataSizeStr=LeachCommon.extractAttrVal(list,respJson);
                try {
                    double data_size=Float.valueOf(String.valueOf(dataSizeStr));
                    //總條數 / 總頁數  向上取整,轉int
                    total_page=new Double(Math.ceil(data_size/page_size)).intValue();
                } catch (NumberFormatException e) {
                    e.printStackTrace();
                    logger.error("總資料量屬性提取内容應為整數數字類型,目前内容:{}",new Object[]{String.valueOf(dataSizeStr)});
                    throw new NumberFormatException("總資料量屬性提取内容應為整數數字類型,目前内容:{}"+String.valueOf(dataSizeStr));
                }
            }
            if(start_index > total_page){
                logger.error("開始查詢的頁碼("+start_index+")大于總頁數("+total_page+"),不執行查詢。目前每頁:"+page_size+"條");
                throw new Exception("開始查詢的頁碼("+start_index+")大于總頁數("+total_page+"),不執行查詢。目前每頁:"+page_size+"條");
            }
            logger.info("total_page:"+total_page+",start_index:"+start_index);
            //現在有總頁數、有請求資訊,開始替換請求資訊中的分頁參數,然後封裝HTTP請求到flowfile
            if("GET".equals(method)){
                //get分頁替換參數
                if(get_paging_uri.indexOf(":_index") > 0){
                    for(int i=start_index;i<=total_page;i++){
                        String paging_uri=get_paging_uri
                                .replaceAll(":_index",i+"")
                                .replaceAll(":_size",page_size+"");
                        httpMap.put("url",paging_uri);

                        String pageStr=JSONObject.toJSONString(httpMap);
                        FlowFile pageFlowFile=processSession.create();
                        pageFlowFile=processSession.write(pageFlowFile,out -> out.write(pageStr.getBytes("utf-8")));
                        Map<String, String> attributes = new HashMap<>();
                        attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
                        attributes.put("inputProcessor", this.getIdentifier());
                        attributes.put("relationship","success");
                        attributes.put("level", "INFO");

                        pageFlowFile=processSession.putAllAttributes(pageFlowFile,attributes);
                        processSession.transfer(pageFlowFile, SUCCESS);
                    }
                }else{
                    String pageStr=JSONObject.toJSONString(httpMap);
                    FlowFile pageFlowFile=processSession.create();
                    pageFlowFile=processSession.write(pageFlowFile,out -> out.write(pageStr.getBytes("utf-8")));
                    Map<String, String> attributes = new HashMap<>();
                    attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
                    attributes.put("inputProcessor", this.getIdentifier());
                    attributes.put("relationship","success");
                    attributes.put("level", "INFO");

                    pageFlowFile=processSession.putAllAttributes(pageFlowFile,attributes);
                    processSession.transfer(pageFlowFile, SUCCESS);
                }
            }else{
                //POST方式替換變量
                List<String> attrList =new ArrayList<String>();
                Collections.addAll(attrList,paging_size_attr_name.split(":"));
                //替換分頁size
                reqJson=LeachCommon.replaceAttr(attrList,reqJson,page_size+"");
                String[] indexAttrs=paging_index_attr_name.split(":");
                //開始替換目前頁下标
                for(int i=start_index;i<=total_page;i++){
                    attrList.clear();
                    //将數組添加到集合中,友善移除元素遞歸操作
                    Collections.addAll(attrList,indexAttrs);
                    //替換目前頁下标
                    reqJson= LeachCommon.replaceAttr(attrList,reqJson,i+"");
                    //替換http請求包中的請求資訊,保留url、header等資訊
                    httpMap.put("reqMsg",JSONObject.toJSONString(reqJson));

                    //将每一頁的httpMap封裝進flowfile中
                    String pageStr=JSONObject.toJSONString(httpMap);
                    FlowFile pageFlowFile=processSession.create();
                    pageFlowFile=processSession.write(pageFlowFile,out -> out.write(pageStr.getBytes("utf-8")));
                    Map<String, String> attributes = new HashMap<>();
                    attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
                    attributes.put("inputProcessor", this.getIdentifier());
                    attributes.put("relationship","success");
                    attributes.put("level", "INFO");

                    pageFlowFile=processSession.putAllAttributes(pageFlowFile,attributes);
                    processSession.transfer(pageFlowFile, SUCCESS);
                }
            }
            //移除
            processSession.remove(flowFile);
        } catch (final Exception pe) {
            logger.error("API分頁處理異常:{}",pe);
            if(flowFile!=null){
                try {
                    processSession.remove(flowFile);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            flowFile=processSession.create();
            final String errMsg="API分頁處理異常:"+pe.getMessage();
            flowFile=processSession.write(flowFile,out -> out.write(errMsg.getBytes("utf-8")));
            flowFile = processSession.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
            flowFile = processSession.putAttribute(flowFile, "inputProcessor", this.getIdentifier());
            flowFile = processSession.putAttribute(flowFile, "relationship", "failure");
            flowFile = processSession.putAttribute(flowFile, "level", "ERROR");
            processSession.transfer(flowFile, FAILURE);
        }
    }

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }
}

           

5、元件3——LeachHttpPagingRequest

LeachHttpPagingRequest 元件目的為将收到的URL發起http請求 ;此元件比較簡單,配置如下;
           
NIFI自定義開發API-HTTP分頁功能

源碼:

package com.leach.nifi.processor;

import com.alibaba.fastjson.JSONObject;
import com.leach.nifi.common.HttpRequestClient;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;

/**
* @title   自定義HTTP HTTP請求直接發送
* @author   xiaobin.wang
* @create   2020/05/07
*/

@SupportsBatching
@Tags({"leach","http","api", "post","get"})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("根據傳入或輸入的http對象,直接進行發送并傳回json資訊。" +
        "傳入http對象格式為JSON對象或JSON集合:[{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\",\"connectTimeout\":\"\",\"readTimeout\":\"\"}]")
public class LeachHttpPagingRequest extends AbstractProcessor {

    // properties
    public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
            .name("HTTP Method")
            .description("HTTP request method (GET, POST)")
            .required(true)
            .defaultValue("GET")
            .allowableValues("POST","GET")
            .build();
    // properties
    public static final PropertyDescriptor HTTP_MSG = new PropertyDescriptor.Builder()
            .name("HTTP Msg")
            .description("輸入資料格式為JSON對象或JSON集合:[{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\"}]")
            .required(false)
            .build();
    public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
            .name("Content Type")
            .description("Content Type(GET請求時無效)")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .allowableValues("raw","x-www-form-urlencoded","form-data")
            .defaultValue("raw")
            .build();

    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
            PROP_METHOD,CONTENT_TYPE,HTTP_MSG));

    public static final Relationship FAILURE = new Relationship.Builder()
            .name("failure")
            .description("執行異常或失敗,傳回此标志")
            .build();
    public static final Relationship SUCCESS = new Relationship.Builder()
            .name("success")
            .description("API查詢成功,傳回接口響應的JSON資訊")
            .build();

    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
            FAILURE,SUCCESS)));

    @Override
    public Set<Relationship> getRelationships() {
        return this.RELATIONSHIPS;
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        final ComponentLog logger =getLogger();

        FlowFile requestFlowFile = session.get();
        if(requestFlowFile==null){
            requestFlowFile=session.create();
        }
        final String http_msg = context.getProperty(HTTP_MSG).getValue();
        final String content_type = context.getProperty(CONTENT_TYPE).getValue();
        final String http_method = context.getProperty(PROP_METHOD).getValue();

        final String[] req_bodys = new String[1];
        //擷取請求Flowfile
        requestFlowFile = session.write(requestFlowFile, new StreamCallback() {
            @Override
            public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException {
                req_bodys[0] = IOUtils.toString(rcdIn,"utf-8");
            }
        });
        String httpMsg=req_bodys[0];

        //如果flowfile傳入是空的,且輸入參數有資料,則使用輸入資料進行查詢
        if(StringUtils.isEmpty(httpMsg) && context.getProperty(http_msg).isSet()){
            httpMsg = context.getProperty(http_msg).getValue();
        }
        List<Map<String,String>> httpMapList=new ArrayList<Map<String,String>>();
        if(httpMsg.trim().startsWith("[")){
            httpMapList=JSONObject.parseObject(httpMsg,List.class);
        }else{
            Map<String,String> map=JSONObject.parseObject(httpMsg,Map.class);
            httpMapList.add(map);
        }
        try {
            //響應集合
            List<FlowFile> respFlowFiles=new ArrayList<FlowFile>();
            for(Map<String,String> httpMap : httpMapList){
                logger.debug("請求URL:{}\n請求header:{}\n請求Req:{}",new Object[]{httpMap.get("url"),httpMap.get("header"),httpMap.get("reqMsg")});
                //發送HTTP請求
                String respMsg=null;
                //發送HTTP請求
                if("POST".equals(http_method)){
                    respMsg= HttpRequestClient.post(httpMap.get("url"),httpMap.get("reqMsg"),content_type,httpMap.get("header"));
                }
                if("GET".equals(http_method)){
                    respMsg= HttpRequestClient.get(httpMap.get("url"));
                }
                respMsg=respMsg!=null?respMsg:"";
                //使用完移除 請求體裡面的内容
                session.remove(requestFlowFile);
                //建立響應
                FlowFile responseFlowFile = session.create();
                //響應資訊
                final byte[] respData =respMsg.getBytes("utf-8");
                if (respData.length > 0) {
                    responseFlowFile = session.write(responseFlowFile, out ->out.write(respData));
                }
                if (responseFlowFile != null) {
                    session.getProvenanceReporter().fetch(responseFlowFile, httpMap.get("url"));
                }

                Map<String, String> attributes = new HashMap<>();
                attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
                attributes.put("inputProcessor", this.getIdentifier());
                attributes.put("relationship","success");
                attributes.put("level", "INFO");

                responseFlowFile=session.putAllAttributes(responseFlowFile,attributes);
                respFlowFiles.add(responseFlowFile);
            }
            //接口響應資訊
            session.transfer(respFlowFiles,SUCCESS);
        } catch (Exception e) {
            try {
                if (requestFlowFile != null) {
                    session.remove(requestFlowFile);
                }
            } catch (final Exception e1) {
                logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
            }
            logger.error("LeachHttpPagingRequest API請求異常:{}",e);
            FlowFile flowFile=session.create();
            final String errMsg="LeachHttpPagingRequest API請求異常:"+e.getMessage();
            flowFile=session.write(flowFile,out -> out.write(errMsg.getBytes("utf-8")));
            session.transfer(flowFile, FAILURE);
        }
    }
}

           

6、總結

此流程為個人開發,裡面的邏輯基本都有注釋,上下的元件格式也是經過單獨處理進行發送和接收。
各位同學在開發時也可以根據個人需求進行邏輯修改。
           

繼續閱讀