轉載請注明位址: https://blog.csdn.net/u011291276/article/details/107213758
1、摘要
NIFI自定義開發搭建配置請參考:https://blog.csdn.net/u011291276/article/details/107211923
此文檔目的為:在NIFI ETL中開發自定義HTTP分頁查詢,通過接口響應回的總條數和總頁數,
生成多個分頁查詢的API接口位址,然後逐一進行請求擷取響應資訊;
HTTP分頁查詢一般為POST查詢,此流程第二個元件同時也配置GET請求的分頁查詢;
2、最終結果展示
HTTP分頁查詢的流程邏輯為:第一次查詢,擷取接口響應的總條數;本地封裝分頁查詢的多個URL;多個URL逐一查詢擷取每頁資料;
核心插件及流程如下:

下面将逐個說明這三個元件,并直接貼上源代碼。
3、元件1—— LeachHttpInvoke
LeachHttpInvoke元件在運作時,僅請求一條資料,目的隻為擷取接口響應的總條數或總頁數;
基本參數大家都能看懂,其中body體和header頭的格式都是對象字元串;
ReqMsg PassDown :參數有兩個值,
false:不走分頁查詢,直接傳回響應結果。此目的為不使用分頁查詢時使用,不過正常情況下,
不使用分頁查詢時可以使用NIFI自帶的元件InvokeHTTP請求;
true:開始走分頁查詢,此元件在傳回資料時,參數格式會有變化;
源碼:
package com.leach.nifi.processor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.leach.nifi.common.HttpRequestClient;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.springframework.http.MediaType;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
/**
* @title 自定義HTTP 增加請求體輸入框
* @author xiaobin.wang
* @create 2020/01/08
*/
@SupportsBatching
@Tags({"leach","http", "https", "rest", "client"})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("HTTP RESTFulApi請求。支援分頁查詢操作(連接配接LeachHttpPagingHandle-LeachHttpPagingRequest)")
public class LeachHttpInvoke extends AbstractProcessor {
public static final String DEFAULT_CONTENT_TYPE = "application/json";
private static final Map<String, String> excludedHeaders = new HashMap<String, String>();
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.SIMPLIFIED_CHINESE).withZoneUTC();
// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
.name("HTTP Method")
.description("HTTP request method (GET, POST)")
.required(true)
.defaultValue("GET")
.allowableValues("POST","GET")
.build();
public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
.name("Remote URL")
.description("Remote URL which will be connected to, including scheme, host, port, path.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor REQUEST_BODY = new PropertyDescriptor.Builder()
.name("Request Body")
.description("請求體參數 JSON字元串格式對象 {\"key\":\"value\"}")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content Type")
.description("Content Type(GET請求時無效)")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("raw","x-www-form-urlencoded","form-data")
.defaultValue("raw")
.build();
public static final PropertyDescriptor REQUEST_HEADER = new PropertyDescriptor.Builder()
.name("Request Header")
.description("請求頭 JSON字元串格式對象 {\"key\":\"value\"}。此處不需要配置content-type。")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_PASS_DOWN = new PropertyDescriptor.Builder()
.name("ReqMsg PassDown")
.description("是否允許請求資訊向下傳遞。分頁查詢需向下傳遞({\"url\":\"\",\"header\":{},\"body\":{})")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,
PROP_URL,
REQUEST_BODY,
REQUEST_HEADER,
CONTENT_TYPE,
PROP_PASS_DOWN));
public static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("執行異常或失敗,傳回此标志")
.build();
public static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("RESTFul Api執行成功,傳回此URL響應的内容資訊。" +
"當ReqMsg PassDown選擇true時,将傳回帶有請求參數的封裝體{\"reqMsg\":Object,\"respMsg\":Object}。" +
"當值為false時,直接傳回url請求結果Object")
.build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
FAILURE,SUCCESS)));
@Override
public Set<Relationship> getRelationships() {
return this.RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ComponentLog logger =getLogger();
FlowFile requestFlowFile = session.get();
if(requestFlowFile==null){
requestFlowFile=session.create();
}
final String request_url = context.getProperty(PROP_URL).getValue();
final String request_method=context.getProperty(PROP_METHOD).getValue();
final String content_type=context.getProperty(CONTENT_TYPE).getValue();
final boolean isPassDown=context.getProperty(PROP_PASS_DOWN).asBoolean();
//請求頭
final String request_header = context.getProperty(REQUEST_HEADER).getValue();
//請求參數body處理開始
String reqMsg="";
//擷取輸入參數
if(context.getProperty(REQUEST_BODY).isSet()){
final String request_body = context.getProperty(REQUEST_BODY).getValue();
reqMsg=request_body;
}else{
final String[] req_bodys = new String[1];
//擷取請求體
requestFlowFile = session.write(requestFlowFile, new StreamCallback() {
@Override
public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException {
req_bodys[0] = IOUtils.toString(rcdIn,"utf-8");
}
});
reqMsg=req_bodys[0];
}
try {
String respMsg=null;
//發送HTTP請求
if("POST".equals(request_method)){
respMsg= HttpRequestClient.post(request_url,reqMsg,content_type,request_header);
}
if("GET".equals(request_method)){
respMsg= HttpRequestClient.get(request_url);
}
logger.debug("URL:"+request_url+"\n響應資訊:"+respMsg);
//移除上遊下來的資料
session.remove(requestFlowFile);
//建立響應
FlowFile responseFlowFile = session.create();
//向下傳遞則封裝資訊體
if (isPassDown) {
//将請求URL和參數、請求頭封裝進map,供後續分頁功能使用,或其需要的地方
Map<String,String> request=new HashMap<String,String>();
request.put("url",request_url);
request.put("header",request_header);
request.put("reqMsg",reqMsg);
request.put("respMsg",respMsg);
request.put("method",request_method);
String requestMsg=JSONObject.toJSONString(request);
responseFlowFile = session.write(responseFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(requestMsg.getBytes("UTF-8"));
}
});
}else{
//響應資訊
final byte[] respByte =respMsg.getBytes("utf-8");
if (respByte.length > 0) {
responseFlowFile = session.write(responseFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(respByte);
}
});
}
}
if (responseFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, request_url);
}
//接口響應資訊
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
//直接傳回響應結果
session.transfer(responseFlowFile, SUCCESS);
} catch (Exception e) {
// penalize or yield
logger.error("Leach HTTP 異常:",e);
try {
if (requestFlowFile != null) {
session.remove(requestFlowFile);
}
} catch (final Exception e1) {
logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
}
logger.error("LeachHttpInvoke 執行異常:{}",e);
FlowFile flowFile=session.create();
final String errMsg="LeachHttpInvoke 執行異常:"+e.getMessage();
flowFile=session.write(flowFile,out -> out.write(errMsg.getBytes("utf-8")));
session.transfer(flowFile, FAILURE);
}
}
}
4、元件2——LeachHttpPagingHandle
LeachHttpPagingHandle 此元件 為分頁查詢的核心,即根據收到的總條數、總頁數和自定義配置的分頁參數,
進行組裝分頁URL,然後傳回多個URL,每個URL單獨傳回,在隊列中也是有多個queued資料;
考慮到分頁參數可能存在于嵌套JSON中,是以擷取分頁參數屬性名可以使用“:”來進行提取;
Request Paging Size AttrName :請求中每頁資料量的屬性名,若存在于嵌套中,則使用英文“:”号分割(POST請求必填)。
例:key:key1:key2 ;
Request Paging Index AttrName : 請求中分頁頁碼的屬性名,若存在于嵌套中,則使用英文“:”号分割(POST請求必填)。
例:key:key1:key2 ;
Response DataSize AttrName : 響應内容中總條數的屬性名,若提取嵌套中的屬性名,則使用英文“:”号分割。
例:key:key1:key2 ;
Total Page Size : 總分頁頁碼數。若不填"Response DataSize AttrName",則将此屬性值設定的頁碼數作為總頁數進行分頁查詢;
Get Paging URL : GET請求需循環分頁時使用,頁碼大小實際參數使用\"${size}\"替換,分頁頁碼實際參數使用\"${index}\"替換。
處理時會根據響應計算出的總頁數依次替換這兩個變量,傳回多個分頁後的URL。
例:http://127.0.0.1:8080/get?attr1=參數1&pageSize=${size}&pageIndex=${index} 。
此參數主要為GET請求分頁使用。
源碼:
package com.leach.nifi.processor;
import com.alibaba.fastjson.JSONObject;
import com.leach.nifi.common.LeachCommon;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
/**
* @title
* @author xiaobin.wang
* @create 2020/01/02
*/
//不需要關注上下文
@SideEffectFree
//支援批量
@SupportsBatching
//用于标記這個processor的标簽,可以用于搜尋
@Tags({ "leach","add", "value", "avro" ,"static","param"})
// 聲明,對于該processor輸入是必須的
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
//針對這個processor的注釋
@CapabilityDescription("HTTP RESTFulApi請求,接收的資料格式為JSON對象:{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\",\"connectTimeout\":\"\",\"readTimeout\":\"\"}")
@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json")
public class LeachHttpPagingHandle extends AbstractProcessor {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
public static final PropertyDescriptor PAGE_SIZE=new PropertyDescriptor.Builder()
.name("Page Size")
.required(true)
.addValidator(StandardValidators.NUMBER_VALIDATOR)
.defaultValue("200")
.description("分頁頁碼大小")
.build();
public static final PropertyDescriptor PAGING_SIZE_ATTR_NAME = new PropertyDescriptor.Builder()
// 參數名,輸入框前展示的内容
.name("Request Paging Size AttrName")
// 是否必填
.required(false)
// 添加過濾器
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("請求中每頁資料量的屬性名,若存在于嵌套中,則使用英文“:”号分割(POST請求必填)。例:key:key1:key2")
// 内容添加完成後建構
.build();
public static final PropertyDescriptor PAGING_INDEX_ATTR_NAME = new PropertyDescriptor.Builder()
// 參數名,輸入框前展示的内容
.name("Request Paging Index AttrName")
// 是否必填
.required(false)
// 添加過濾器
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("請求中分頁頁碼的屬性名,若存在于嵌套中,則使用英文“:”号分割(POST請求必填)。例:key:key1:key2")
// 内容添加完成後建構
.build();
public static final PropertyDescriptor DATASIZE_ATTR_NAME = new PropertyDescriptor.Builder()
// 參數名,輸入框前展示的内容
.name("Response DataSize AttrName")
// 是否必填
.required(false)
// 添加過濾器
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("響應内容中總條數的屬性名,若提取嵌套中的屬性名,則使用英文“:”号分割。例:key:key1:key2")
// 内容添加完成後建構
.build();
public static final PropertyDescriptor TOTAL_PAGE_SIZE = new PropertyDescriptor.Builder()
// 參數名,輸入框前展示的内容
.name("Total Page Size")
// 是否必填
.required(false)
// 添加過濾器
.addValidator(StandardValidators.NUMBER_VALIDATOR)
.description("總分頁頁碼數。若不填\"DataSize AttrName\",則将此屬性值設定的頁碼數作為總頁數進行分頁查詢")
// 内容添加完成後建構
.build();
public static final PropertyDescriptor GET_PAGING_URL= new PropertyDescriptor.Builder()
// 參數名,輸入框前展示的内容
.name("Get Paging URL")
// 是否必填
.required(false)
// 添加過濾器
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("GET請求需循環分頁時使用,頁碼大小實際參數使用\"${size}\"替換,分頁頁碼實際參數使用\"${index}\"替換。" +
"\n處理時會根據響應計算出的總頁數依次替換這兩個變量,傳回多個分頁後的URL。" +
"\n例:http://127.0.0.1:8080/get?attr1=參數1&pageSize=${size}&pageIndex=${index}")
// 内容添加完成後建構
.build();
public static final Relationship SUCCESS = new Relationship.Builder().name("success")
.description("API分頁查詢處理成功,傳回資料格式為JSON對象:{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\"}").build();
public static final Relationship FAILURE = new Relationship.Builder().name("failure").description(
"API分頁查詢處理異常或失敗,傳回此标志。")
.build();
@Override
public void init(final ProcessorInitializationContext context) {
ArrayList<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PAGE_SIZE);
properties.add(PAGING_SIZE_ATTR_NAME);
properties.add(PAGING_INDEX_ATTR_NAME);
properties.add(DATASIZE_ATTR_NAME);
properties.add(TOTAL_PAGE_SIZE);
properties.add(GET_PAGING_URL);
// 防止多線程ADD
this.properties = Collections.unmodifiableList(properties);
Set<Relationship> relationships = new HashSet<>();
relationships.add(SUCCESS);
relationships.add(FAILURE);
// 防止多線程ADD
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
ComponentLog logger=getLogger();
FlowFile flowFile = processSession.get();
if (flowFile == null) {
getLogger().error("FlowFile is null", new Object[] {});
return;
}
//拿到請求資訊,下邊開始處理分頁參數
int page_size = processContext.getProperty(PAGE_SIZE).asInteger();
int start_index = 1;
String paging_size_attr_name = processContext.getProperty(PAGING_SIZE_ATTR_NAME).getValue();
String paging_index_attr_name = processContext.getProperty(PAGING_INDEX_ATTR_NAME).getValue();
String datasize_attr_name = processContext.getProperty(DATASIZE_ATTR_NAME).getValue();
final String get_paging_uri = processContext.getProperty(GET_PAGING_URL).getValue();
final String[] _request_str=new String[1];
try {
flowFile = processSession.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException {
String json = IOUtils.toString(rcdIn,"utf-8");
_request_str[0]=json;
}
});
//url method header reqMsg respMsg
final String httpStr=_request_str[0];
Map<String,String> httpMap=JSONObject.parseObject(httpStr,Map.class);
JSONObject reqJson=JSONObject.parseObject(httpMap.get("reqMsg"),JSONObject.class);
JSONObject respJson=JSONObject.parseObject(httpMap.get("respMsg"),JSONObject.class);
String method=httpMap.get("method");
//取出響應資訊後,直接删除
httpMap.remove("respMsg");
//計算總頁數
int total_page=0;
//判斷是否填寫了total_page_size
if(processContext.getProperty(TOTAL_PAGE_SIZE).isSet()){
//如果設定,則按照這個為總頁數
total_page = processContext.getProperty(TOTAL_PAGE_SIZE).asInteger();
}else{
//否則根據響應資訊中傳回的字段來計算總頁數
String[] attrs=datasize_attr_name.split(":");
List<String> list=new ArrayList<>();
Collections.addAll(list,attrs);
//擷取屬性Value
Object dataSizeStr=LeachCommon.extractAttrVal(list,respJson);
try {
double data_size=Float.valueOf(String.valueOf(dataSizeStr));
//總條數 / 總頁數 向上取整,轉int
total_page=new Double(Math.ceil(data_size/page_size)).intValue();
} catch (NumberFormatException e) {
e.printStackTrace();
logger.error("總資料量屬性提取内容應為整數數字類型,目前内容:{}",new Object[]{String.valueOf(dataSizeStr)});
throw new NumberFormatException("總資料量屬性提取内容應為整數數字類型,目前内容:{}"+String.valueOf(dataSizeStr));
}
}
if(start_index > total_page){
logger.error("開始查詢的頁碼("+start_index+")大于總頁數("+total_page+"),不執行查詢。目前每頁:"+page_size+"條");
throw new Exception("開始查詢的頁碼("+start_index+")大于總頁數("+total_page+"),不執行查詢。目前每頁:"+page_size+"條");
}
logger.info("total_page:"+total_page+",start_index:"+start_index);
//現在有總頁數、有請求資訊,開始替換請求資訊中的分頁參數,然後封裝HTTP請求到flowfile
if("GET".equals(method)){
//get分頁替換參數
if(get_paging_uri.indexOf(":_index") > 0){
for(int i=start_index;i<=total_page;i++){
String paging_uri=get_paging_uri
.replaceAll(":_index",i+"")
.replaceAll(":_size",page_size+"");
httpMap.put("url",paging_uri);
String pageStr=JSONObject.toJSONString(httpMap);
FlowFile pageFlowFile=processSession.create();
pageFlowFile=processSession.write(pageFlowFile,out -> out.write(pageStr.getBytes("utf-8")));
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
attributes.put("inputProcessor", this.getIdentifier());
attributes.put("relationship","success");
attributes.put("level", "INFO");
pageFlowFile=processSession.putAllAttributes(pageFlowFile,attributes);
processSession.transfer(pageFlowFile, SUCCESS);
}
}else{
String pageStr=JSONObject.toJSONString(httpMap);
FlowFile pageFlowFile=processSession.create();
pageFlowFile=processSession.write(pageFlowFile,out -> out.write(pageStr.getBytes("utf-8")));
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
attributes.put("inputProcessor", this.getIdentifier());
attributes.put("relationship","success");
attributes.put("level", "INFO");
pageFlowFile=processSession.putAllAttributes(pageFlowFile,attributes);
processSession.transfer(pageFlowFile, SUCCESS);
}
}else{
//POST方式替換變量
List<String> attrList =new ArrayList<String>();
Collections.addAll(attrList,paging_size_attr_name.split(":"));
//替換分頁size
reqJson=LeachCommon.replaceAttr(attrList,reqJson,page_size+"");
String[] indexAttrs=paging_index_attr_name.split(":");
//開始替換目前頁下标
for(int i=start_index;i<=total_page;i++){
attrList.clear();
//将數組添加到集合中,友善移除元素遞歸操作
Collections.addAll(attrList,indexAttrs);
//替換目前頁下标
reqJson= LeachCommon.replaceAttr(attrList,reqJson,i+"");
//替換http請求包中的請求資訊,保留url、header等資訊
httpMap.put("reqMsg",JSONObject.toJSONString(reqJson));
//将每一頁的httpMap封裝進flowfile中
String pageStr=JSONObject.toJSONString(httpMap);
FlowFile pageFlowFile=processSession.create();
pageFlowFile=processSession.write(pageFlowFile,out -> out.write(pageStr.getBytes("utf-8")));
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
attributes.put("inputProcessor", this.getIdentifier());
attributes.put("relationship","success");
attributes.put("level", "INFO");
pageFlowFile=processSession.putAllAttributes(pageFlowFile,attributes);
processSession.transfer(pageFlowFile, SUCCESS);
}
}
//移除
processSession.remove(flowFile);
} catch (final Exception pe) {
logger.error("API分頁處理異常:{}",pe);
if(flowFile!=null){
try {
processSession.remove(flowFile);
} catch (Exception e) {
e.printStackTrace();
}
}
flowFile=processSession.create();
final String errMsg="API分頁處理異常:"+pe.getMessage();
flowFile=processSession.write(flowFile,out -> out.write(errMsg.getBytes("utf-8")));
flowFile = processSession.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
flowFile = processSession.putAttribute(flowFile, "inputProcessor", this.getIdentifier());
flowFile = processSession.putAttribute(flowFile, "relationship", "failure");
flowFile = processSession.putAttribute(flowFile, "level", "ERROR");
processSession.transfer(flowFile, FAILURE);
}
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
}
5、元件3——LeachHttpPagingRequest
LeachHttpPagingRequest 元件目的為将收到的URL發起http請求 ;此元件比較簡單,配置如下;
源碼:
package com.leach.nifi.processor;
import com.alibaba.fastjson.JSONObject;
import com.leach.nifi.common.HttpRequestClient;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
/**
* @title 自定義HTTP HTTP請求直接發送
* @author xiaobin.wang
* @create 2020/05/07
*/
@SupportsBatching
@Tags({"leach","http","api", "post","get"})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("根據傳入或輸入的http對象,直接進行發送并傳回json資訊。" +
"傳入http對象格式為JSON對象或JSON集合:[{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\",\"connectTimeout\":\"\",\"readTimeout\":\"\"}]")
public class LeachHttpPagingRequest extends AbstractProcessor {
// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
.name("HTTP Method")
.description("HTTP request method (GET, POST)")
.required(true)
.defaultValue("GET")
.allowableValues("POST","GET")
.build();
// properties
public static final PropertyDescriptor HTTP_MSG = new PropertyDescriptor.Builder()
.name("HTTP Msg")
.description("輸入資料格式為JSON對象或JSON集合:[{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\"}]")
.required(false)
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content Type")
.description("Content Type(GET請求時無效)")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("raw","x-www-form-urlencoded","form-data")
.defaultValue("raw")
.build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,CONTENT_TYPE,HTTP_MSG));
public static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("執行異常或失敗,傳回此标志")
.build();
public static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("API查詢成功,傳回接口響應的JSON資訊")
.build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
FAILURE,SUCCESS)));
@Override
public Set<Relationship> getRelationships() {
return this.RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ComponentLog logger =getLogger();
FlowFile requestFlowFile = session.get();
if(requestFlowFile==null){
requestFlowFile=session.create();
}
final String http_msg = context.getProperty(HTTP_MSG).getValue();
final String content_type = context.getProperty(CONTENT_TYPE).getValue();
final String http_method = context.getProperty(PROP_METHOD).getValue();
final String[] req_bodys = new String[1];
//擷取請求Flowfile
requestFlowFile = session.write(requestFlowFile, new StreamCallback() {
@Override
public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException {
req_bodys[0] = IOUtils.toString(rcdIn,"utf-8");
}
});
String httpMsg=req_bodys[0];
//如果flowfile傳入是空的,且輸入參數有資料,則使用輸入資料進行查詢
if(StringUtils.isEmpty(httpMsg) && context.getProperty(http_msg).isSet()){
httpMsg = context.getProperty(http_msg).getValue();
}
List<Map<String,String>> httpMapList=new ArrayList<Map<String,String>>();
if(httpMsg.trim().startsWith("[")){
httpMapList=JSONObject.parseObject(httpMsg,List.class);
}else{
Map<String,String> map=JSONObject.parseObject(httpMsg,Map.class);
httpMapList.add(map);
}
try {
//響應集合
List<FlowFile> respFlowFiles=new ArrayList<FlowFile>();
for(Map<String,String> httpMap : httpMapList){
logger.debug("請求URL:{}\n請求header:{}\n請求Req:{}",new Object[]{httpMap.get("url"),httpMap.get("header"),httpMap.get("reqMsg")});
//發送HTTP請求
String respMsg=null;
//發送HTTP請求
if("POST".equals(http_method)){
respMsg= HttpRequestClient.post(httpMap.get("url"),httpMap.get("reqMsg"),content_type,httpMap.get("header"));
}
if("GET".equals(http_method)){
respMsg= HttpRequestClient.get(httpMap.get("url"));
}
respMsg=respMsg!=null?respMsg:"";
//使用完移除 請求體裡面的内容
session.remove(requestFlowFile);
//建立響應
FlowFile responseFlowFile = session.create();
//響應資訊
final byte[] respData =respMsg.getBytes("utf-8");
if (respData.length > 0) {
responseFlowFile = session.write(responseFlowFile, out ->out.write(respData));
}
if (responseFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, httpMap.get("url"));
}
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
attributes.put("inputProcessor", this.getIdentifier());
attributes.put("relationship","success");
attributes.put("level", "INFO");
responseFlowFile=session.putAllAttributes(responseFlowFile,attributes);
respFlowFiles.add(responseFlowFile);
}
//接口響應資訊
session.transfer(respFlowFiles,SUCCESS);
} catch (Exception e) {
try {
if (requestFlowFile != null) {
session.remove(requestFlowFile);
}
} catch (final Exception e1) {
logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
}
logger.error("LeachHttpPagingRequest API請求異常:{}",e);
FlowFile flowFile=session.create();
final String errMsg="LeachHttpPagingRequest API請求異常:"+e.getMessage();
flowFile=session.write(flowFile,out -> out.write(errMsg.getBytes("utf-8")));
session.transfer(flowFile, FAILURE);
}
}
}
6、總結
此流程為個人開發,裡面的邏輯基本都有注釋,上下的元件格式也是經過單獨處理進行發送和接收。
各位同學在開發時也可以根據個人需求進行邏輯修改。