转载请注明地址: https://blog.csdn.net/u011291276/article/details/107213758
1、摘要
NIFI自定义开发搭建配置请参考:https://blog.csdn.net/u011291276/article/details/107211923
此文档目的为:在NIFI ETL中开发自定义HTTP分页查询,通过接口响应回的总条数和总页数,
生成多个分页查询的API接口地址,然后逐一进行请求获取响应信息;
HTTP分页查询一般为POST查询,此流程第二个组件同时也配置GET请求的分页查询;
2、最终结果展示
HTTP分页查询的流程逻辑为:第一次查询,获取接口响应的总条数;本地封装分页查询的多个URL;多个URL逐一查询获取每页数据;
核心插件及流程如下:

下面将逐个说明这三个组件,并直接贴上源代码。
3、组件1—— LeachHttpInvoke
LeachHttpInvoke组件在运行时,仅请求一条数据,目的只为获取接口响应的总条数或总页数;
基本参数大家都能看懂,其中body体和header头的格式都是对象字符串;
ReqMsg PassDown :参数有两个值,
false:不走分页查询,直接返回响应结果。此目的为不使用分页查询时使用,不过正常情况下,
不使用分页查询时可以使用NIFI自带的组件InvokeHTTP请求;
true:开始走分页查询,此组件在返回数据时,参数格式会有变化;
源码:
package com.leach.nifi.processor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.leach.nifi.common.HttpRequestClient;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.springframework.http.MediaType;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
/**
* @title 自定义HTTP 增加请求体输入框
* @author xiaobin.wang
* @create 2020/01/08
*/
@SupportsBatching
@Tags({"leach","http", "https", "rest", "client"})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("HTTP RESTFulApi请求。支持分页查询操作(连接LeachHttpPagingHandle-LeachHttpPagingRequest)")
public class LeachHttpInvoke extends AbstractProcessor {
public static final String DEFAULT_CONTENT_TYPE = "application/json";
private static final Map<String, String> excludedHeaders = new HashMap<String, String>();
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.SIMPLIFIED_CHINESE).withZoneUTC();
// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
.name("HTTP Method")
.description("HTTP request method (GET, POST)")
.required(true)
.defaultValue("GET")
.allowableValues("POST","GET")
.build();
public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
.name("Remote URL")
.description("Remote URL which will be connected to, including scheme, host, port, path.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor REQUEST_BODY = new PropertyDescriptor.Builder()
.name("Request Body")
.description("请求体参数 JSON字符串格式对象 {\"key\":\"value\"}")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content Type")
.description("Content Type(GET请求时无效)")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("raw","x-www-form-urlencoded","form-data")
.defaultValue("raw")
.build();
public static final PropertyDescriptor REQUEST_HEADER = new PropertyDescriptor.Builder()
.name("Request Header")
.description("请求头 JSON字符串格式对象 {\"key\":\"value\"}。此处不需要配置content-type。")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_PASS_DOWN = new PropertyDescriptor.Builder()
.name("ReqMsg PassDown")
.description("是否允许请求信息向下传递。分页查询需向下传递({\"url\":\"\",\"header\":{},\"body\":{})")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,
PROP_URL,
REQUEST_BODY,
REQUEST_HEADER,
CONTENT_TYPE,
PROP_PASS_DOWN));
public static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("执行异常或失败,返回此标志")
.build();
public static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("RESTFul Api执行成功,返回此URL响应的内容信息。" +
"当ReqMsg PassDown选择true时,将返回带有请求参数的封装体{\"reqMsg\":Object,\"respMsg\":Object}。" +
"当值为false时,直接返回url请求结果Object")
.build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
FAILURE,SUCCESS)));
@Override
public Set<Relationship> getRelationships() {
return this.RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ComponentLog logger =getLogger();
FlowFile requestFlowFile = session.get();
if(requestFlowFile==null){
requestFlowFile=session.create();
}
final String request_url = context.getProperty(PROP_URL).getValue();
final String request_method=context.getProperty(PROP_METHOD).getValue();
final String content_type=context.getProperty(CONTENT_TYPE).getValue();
final boolean isPassDown=context.getProperty(PROP_PASS_DOWN).asBoolean();
//请求头
final String request_header = context.getProperty(REQUEST_HEADER).getValue();
//请求参数body处理开始
String reqMsg="";
//获取输入参数
if(context.getProperty(REQUEST_BODY).isSet()){
final String request_body = context.getProperty(REQUEST_BODY).getValue();
reqMsg=request_body;
}else{
final String[] req_bodys = new String[1];
//获取请求体
requestFlowFile = session.write(requestFlowFile, new StreamCallback() {
@Override
public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException {
req_bodys[0] = IOUtils.toString(rcdIn,"utf-8");
}
});
reqMsg=req_bodys[0];
}
try {
String respMsg=null;
//发送HTTP请求
if("POST".equals(request_method)){
respMsg= HttpRequestClient.post(request_url,reqMsg,content_type,request_header);
}
if("GET".equals(request_method)){
respMsg= HttpRequestClient.get(request_url);
}
logger.debug("URL:"+request_url+"\n响应信息:"+respMsg);
//移除上游下来的数据
session.remove(requestFlowFile);
//创建响应
FlowFile responseFlowFile = session.create();
//向下传递则封装信息体
if (isPassDown) {
//将请求URL和参数、请求头封装进map,供后续分页功能使用,或其需要的地方
Map<String,String> request=new HashMap<String,String>();
request.put("url",request_url);
request.put("header",request_header);
request.put("reqMsg",reqMsg);
request.put("respMsg",respMsg);
request.put("method",request_method);
String requestMsg=JSONObject.toJSONString(request);
responseFlowFile = session.write(responseFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(requestMsg.getBytes("UTF-8"));
}
});
}else{
//响应信息
final byte[] respByte =respMsg.getBytes("utf-8");
if (respByte.length > 0) {
responseFlowFile = session.write(responseFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(respByte);
}
});
}
}
if (responseFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, request_url);
}
//接口响应信息
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
//直接返回响应结果
session.transfer(responseFlowFile, SUCCESS);
} catch (Exception e) {
// penalize or yield
logger.error("Leach HTTP 异常:",e);
try {
if (requestFlowFile != null) {
session.remove(requestFlowFile);
}
} catch (final Exception e1) {
logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
}
logger.error("LeachHttpInvoke 执行异常:{}",e);
FlowFile flowFile=session.create();
final String errMsg="LeachHttpInvoke 执行异常:"+e.getMessage();
flowFile=session.write(flowFile,out -> out.write(errMsg.getBytes("utf-8")));
session.transfer(flowFile, FAILURE);
}
}
}
4、组件2——LeachHttpPagingHandle
LeachHttpPagingHandle 此组件 为分页查询的核心,即根据收到的总条数、总页数和自定义配置的分页参数,
进行组装分页URL,然后返回多个URL,每个URL单独返回,在队列中也是有多个queued数据;
考虑到分页参数可能存在于嵌套JSON中,所以获取分页参数属性名可以使用“:”来进行提取;
Request Paging Size AttrName :请求中每页数据量的属性名,若存在于嵌套中,则使用英文“:”号分割(POST请求必填)。
例:key:key1:key2 ;
Request Paging Index AttrName : 请求中分页页码的属性名,若存在于嵌套中,则使用英文“:”号分割(POST请求必填)。
例:key:key1:key2 ;
Response DataSize AttrName : 响应内容中总条数的属性名,若提取嵌套中的属性名,则使用英文“:”号分割。
例:key:key1:key2 ;
Total Page Size : 总分页页码数。若不填"Response DataSize AttrName",则将此属性值设置的页码数作为总页数进行分页查询;
Get Paging URL : GET请求需循环分页时使用,页码大小实际参数使用\"${size}\"替换,分页页码实际参数使用\"${index}\"替换。
处理时会根据响应计算出的总页数依次替换这两个变量,返回多个分页后的URL。
例:http://127.0.0.1:8080/get?attr1=参数1&pageSize=${size}&pageIndex=${index} 。
此参数主要为GET请求分页使用。
源码:
package com.leach.nifi.processor;
import com.alibaba.fastjson.JSONObject;
import com.leach.nifi.common.LeachCommon;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
/**
* @title
* @author xiaobin.wang
* @create 2020/01/02
*/
//不需要关注上下文
@SideEffectFree
//支持批量
@SupportsBatching
//用于标记这个processor的标签,可以用于搜索
@Tags({ "leach","add", "value", "avro" ,"static","param"})
// 声明,对于该processor输入是必须的
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
//针对这个processor的注释
@CapabilityDescription("HTTP RESTFulApi请求,接收的数据格式为JSON对象:{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\",\"connectTimeout\":\"\",\"readTimeout\":\"\"}")
@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json")
public class LeachHttpPagingHandle extends AbstractProcessor {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
public static final PropertyDescriptor PAGE_SIZE=new PropertyDescriptor.Builder()
.name("Page Size")
.required(true)
.addValidator(StandardValidators.NUMBER_VALIDATOR)
.defaultValue("200")
.description("分页页码大小")
.build();
public static final PropertyDescriptor PAGING_SIZE_ATTR_NAME = new PropertyDescriptor.Builder()
// 参数名,输入框前展示的内容
.name("Request Paging Size AttrName")
// 是否必填
.required(false)
// 添加过滤器
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("请求中每页数据量的属性名,若存在于嵌套中,则使用英文“:”号分割(POST请求必填)。例:key:key1:key2")
// 内容添加完成后构建
.build();
public static final PropertyDescriptor PAGING_INDEX_ATTR_NAME = new PropertyDescriptor.Builder()
// 参数名,输入框前展示的内容
.name("Request Paging Index AttrName")
// 是否必填
.required(false)
// 添加过滤器
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("请求中分页页码的属性名,若存在于嵌套中,则使用英文“:”号分割(POST请求必填)。例:key:key1:key2")
// 内容添加完成后构建
.build();
public static final PropertyDescriptor DATASIZE_ATTR_NAME = new PropertyDescriptor.Builder()
// 参数名,输入框前展示的内容
.name("Response DataSize AttrName")
// 是否必填
.required(false)
// 添加过滤器
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("响应内容中总条数的属性名,若提取嵌套中的属性名,则使用英文“:”号分割。例:key:key1:key2")
// 内容添加完成后构建
.build();
public static final PropertyDescriptor TOTAL_PAGE_SIZE = new PropertyDescriptor.Builder()
// 参数名,输入框前展示的内容
.name("Total Page Size")
// 是否必填
.required(false)
// 添加过滤器
.addValidator(StandardValidators.NUMBER_VALIDATOR)
.description("总分页页码数。若不填\"DataSize AttrName\",则将此属性值设置的页码数作为总页数进行分页查询")
// 内容添加完成后构建
.build();
public static final PropertyDescriptor GET_PAGING_URL= new PropertyDescriptor.Builder()
// 参数名,输入框前展示的内容
.name("Get Paging URL")
// 是否必填
.required(false)
// 添加过滤器
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("GET请求需循环分页时使用,页码大小实际参数使用\"${size}\"替换,分页页码实际参数使用\"${index}\"替换。" +
"\n处理时会根据响应计算出的总页数依次替换这两个变量,返回多个分页后的URL。" +
"\n例:http://127.0.0.1:8080/get?attr1=参数1&pageSize=${size}&pageIndex=${index}")
// 内容添加完成后构建
.build();
public static final Relationship SUCCESS = new Relationship.Builder().name("success")
.description("API分页查询处理成功,返回数据格式为JSON对象:{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\"}").build();
public static final Relationship FAILURE = new Relationship.Builder().name("failure").description(
"API分页查询处理异常或失败,返回此标志。")
.build();
@Override
public void init(final ProcessorInitializationContext context) {
ArrayList<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PAGE_SIZE);
properties.add(PAGING_SIZE_ATTR_NAME);
properties.add(PAGING_INDEX_ATTR_NAME);
properties.add(DATASIZE_ATTR_NAME);
properties.add(TOTAL_PAGE_SIZE);
properties.add(GET_PAGING_URL);
// 防止多线程ADD
this.properties = Collections.unmodifiableList(properties);
Set<Relationship> relationships = new HashSet<>();
relationships.add(SUCCESS);
relationships.add(FAILURE);
// 防止多线程ADD
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
ComponentLog logger=getLogger();
FlowFile flowFile = processSession.get();
if (flowFile == null) {
getLogger().error("FlowFile is null", new Object[] {});
return;
}
//拿到请求信息,下边开始处理分页参数
int page_size = processContext.getProperty(PAGE_SIZE).asInteger();
int start_index = 1;
String paging_size_attr_name = processContext.getProperty(PAGING_SIZE_ATTR_NAME).getValue();
String paging_index_attr_name = processContext.getProperty(PAGING_INDEX_ATTR_NAME).getValue();
String datasize_attr_name = processContext.getProperty(DATASIZE_ATTR_NAME).getValue();
final String get_paging_uri = processContext.getProperty(GET_PAGING_URL).getValue();
final String[] _request_str=new String[1];
try {
flowFile = processSession.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException {
String json = IOUtils.toString(rcdIn,"utf-8");
_request_str[0]=json;
}
});
//url method header reqMsg respMsg
final String httpStr=_request_str[0];
Map<String,String> httpMap=JSONObject.parseObject(httpStr,Map.class);
JSONObject reqJson=JSONObject.parseObject(httpMap.get("reqMsg"),JSONObject.class);
JSONObject respJson=JSONObject.parseObject(httpMap.get("respMsg"),JSONObject.class);
String method=httpMap.get("method");
//取出响应信息后,直接删除
httpMap.remove("respMsg");
//计算总页数
int total_page=0;
//判断是否填写了total_page_size
if(processContext.getProperty(TOTAL_PAGE_SIZE).isSet()){
//如果设置,则按照这个为总页数
total_page = processContext.getProperty(TOTAL_PAGE_SIZE).asInteger();
}else{
//否则根据响应信息中返回的字段来计算总页数
String[] attrs=datasize_attr_name.split(":");
List<String> list=new ArrayList<>();
Collections.addAll(list,attrs);
//获取属性Value
Object dataSizeStr=LeachCommon.extractAttrVal(list,respJson);
try {
double data_size=Float.valueOf(String.valueOf(dataSizeStr));
//总条数 / 总页数 向上取整,转int
total_page=new Double(Math.ceil(data_size/page_size)).intValue();
} catch (NumberFormatException e) {
e.printStackTrace();
logger.error("总数据量属性提取内容应为整数数字类型,当前内容:{}",new Object[]{String.valueOf(dataSizeStr)});
throw new NumberFormatException("总数据量属性提取内容应为整数数字类型,当前内容:{}"+String.valueOf(dataSizeStr));
}
}
if(start_index > total_page){
logger.error("开始查询的页码("+start_index+")大于总页数("+total_page+"),不执行查询。当前每页:"+page_size+"条");
throw new Exception("开始查询的页码("+start_index+")大于总页数("+total_page+"),不执行查询。当前每页:"+page_size+"条");
}
logger.info("total_page:"+total_page+",start_index:"+start_index);
//现在有总页数、有请求信息,开始替换请求信息中的分页参数,然后封装HTTP请求到flowfile
if("GET".equals(method)){
//get分页替换参数
if(get_paging_uri.indexOf(":_index") > 0){
for(int i=start_index;i<=total_page;i++){
String paging_uri=get_paging_uri
.replaceAll(":_index",i+"")
.replaceAll(":_size",page_size+"");
httpMap.put("url",paging_uri);
String pageStr=JSONObject.toJSONString(httpMap);
FlowFile pageFlowFile=processSession.create();
pageFlowFile=processSession.write(pageFlowFile,out -> out.write(pageStr.getBytes("utf-8")));
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
attributes.put("inputProcessor", this.getIdentifier());
attributes.put("relationship","success");
attributes.put("level", "INFO");
pageFlowFile=processSession.putAllAttributes(pageFlowFile,attributes);
processSession.transfer(pageFlowFile, SUCCESS);
}
}else{
String pageStr=JSONObject.toJSONString(httpMap);
FlowFile pageFlowFile=processSession.create();
pageFlowFile=processSession.write(pageFlowFile,out -> out.write(pageStr.getBytes("utf-8")));
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
attributes.put("inputProcessor", this.getIdentifier());
attributes.put("relationship","success");
attributes.put("level", "INFO");
pageFlowFile=processSession.putAllAttributes(pageFlowFile,attributes);
processSession.transfer(pageFlowFile, SUCCESS);
}
}else{
//POST方式替换变量
List<String> attrList =new ArrayList<String>();
Collections.addAll(attrList,paging_size_attr_name.split(":"));
//替换分页size
reqJson=LeachCommon.replaceAttr(attrList,reqJson,page_size+"");
String[] indexAttrs=paging_index_attr_name.split(":");
//开始替换当前页下标
for(int i=start_index;i<=total_page;i++){
attrList.clear();
//将数组添加到集合中,方便移除元素递归操作
Collections.addAll(attrList,indexAttrs);
//替换当前页下标
reqJson= LeachCommon.replaceAttr(attrList,reqJson,i+"");
//替换http请求包中的请求信息,保留url、header等信息
httpMap.put("reqMsg",JSONObject.toJSONString(reqJson));
//将每一页的httpMap封装进flowfile中
String pageStr=JSONObject.toJSONString(httpMap);
FlowFile pageFlowFile=processSession.create();
pageFlowFile=processSession.write(pageFlowFile,out -> out.write(pageStr.getBytes("utf-8")));
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
attributes.put("inputProcessor", this.getIdentifier());
attributes.put("relationship","success");
attributes.put("level", "INFO");
pageFlowFile=processSession.putAllAttributes(pageFlowFile,attributes);
processSession.transfer(pageFlowFile, SUCCESS);
}
}
//移除
processSession.remove(flowFile);
} catch (final Exception pe) {
logger.error("API分页处理异常:{}",pe);
if(flowFile!=null){
try {
processSession.remove(flowFile);
} catch (Exception e) {
e.printStackTrace();
}
}
flowFile=processSession.create();
final String errMsg="API分页处理异常:"+pe.getMessage();
flowFile=processSession.write(flowFile,out -> out.write(errMsg.getBytes("utf-8")));
flowFile = processSession.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
flowFile = processSession.putAttribute(flowFile, "inputProcessor", this.getIdentifier());
flowFile = processSession.putAttribute(flowFile, "relationship", "failure");
flowFile = processSession.putAttribute(flowFile, "level", "ERROR");
processSession.transfer(flowFile, FAILURE);
}
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
}
5、组件3——LeachHttpPagingRequest
LeachHttpPagingRequest 组件目的为将收到的URL发起http请求 ;此组件比较简单,配置如下;
源码:
package com.leach.nifi.processor;
import com.alibaba.fastjson.JSONObject;
import com.leach.nifi.common.HttpRequestClient;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
/**
* @title 自定义HTTP HTTP请求直接发送
* @author xiaobin.wang
* @create 2020/05/07
*/
@SupportsBatching
@Tags({"leach","http","api", "post","get"})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("根据传入或输入的http对象,直接进行发送并返回json信息。" +
"传入http对象格式为JSON对象或JSON集合:[{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\",\"connectTimeout\":\"\",\"readTimeout\":\"\"}]")
public class LeachHttpPagingRequest extends AbstractProcessor {
// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
.name("HTTP Method")
.description("HTTP request method (GET, POST)")
.required(true)
.defaultValue("GET")
.allowableValues("POST","GET")
.build();
// properties
public static final PropertyDescriptor HTTP_MSG = new PropertyDescriptor.Builder()
.name("HTTP Msg")
.description("输入数据格式为JSON对象或JSON集合:[{\"url\":\"String\",\"header\":\"{}\",\"reqMsg\":\"{}\"}]")
.required(false)
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content Type")
.description("Content Type(GET请求时无效)")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("raw","x-www-form-urlencoded","form-data")
.defaultValue("raw")
.build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,CONTENT_TYPE,HTTP_MSG));
public static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("执行异常或失败,返回此标志")
.build();
public static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("API查询成功,返回接口响应的JSON信息")
.build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
FAILURE,SUCCESS)));
@Override
public Set<Relationship> getRelationships() {
return this.RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ComponentLog logger =getLogger();
FlowFile requestFlowFile = session.get();
if(requestFlowFile==null){
requestFlowFile=session.create();
}
final String http_msg = context.getProperty(HTTP_MSG).getValue();
final String content_type = context.getProperty(CONTENT_TYPE).getValue();
final String http_method = context.getProperty(PROP_METHOD).getValue();
final String[] req_bodys = new String[1];
//获取请求Flowfile
requestFlowFile = session.write(requestFlowFile, new StreamCallback() {
@Override
public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException {
req_bodys[0] = IOUtils.toString(rcdIn,"utf-8");
}
});
String httpMsg=req_bodys[0];
//如果flowfile传入是空的,且输入参数有数据,则使用输入数据进行查询
if(StringUtils.isEmpty(httpMsg) && context.getProperty(http_msg).isSet()){
httpMsg = context.getProperty(http_msg).getValue();
}
List<Map<String,String>> httpMapList=new ArrayList<Map<String,String>>();
if(httpMsg.trim().startsWith("[")){
httpMapList=JSONObject.parseObject(httpMsg,List.class);
}else{
Map<String,String> map=JSONObject.parseObject(httpMsg,Map.class);
httpMapList.add(map);
}
try {
//响应集合
List<FlowFile> respFlowFiles=new ArrayList<FlowFile>();
for(Map<String,String> httpMap : httpMapList){
logger.debug("请求URL:{}\n请求header:{}\n请求Req:{}",new Object[]{httpMap.get("url"),httpMap.get("header"),httpMap.get("reqMsg")});
//发送HTTP请求
String respMsg=null;
//发送HTTP请求
if("POST".equals(http_method)){
respMsg= HttpRequestClient.post(httpMap.get("url"),httpMap.get("reqMsg"),content_type,httpMap.get("header"));
}
if("GET".equals(http_method)){
respMsg= HttpRequestClient.get(httpMap.get("url"));
}
respMsg=respMsg!=null?respMsg:"";
//使用完移除 请求体里面的内容
session.remove(requestFlowFile);
//创建响应
FlowFile responseFlowFile = session.create();
//响应信息
final byte[] respData =respMsg.getBytes("utf-8");
if (respData.length > 0) {
responseFlowFile = session.write(responseFlowFile, out ->out.write(respData));
}
if (responseFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, httpMap.get("url"));
}
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(),"application/json");
attributes.put("inputProcessor", this.getIdentifier());
attributes.put("relationship","success");
attributes.put("level", "INFO");
responseFlowFile=session.putAllAttributes(responseFlowFile,attributes);
respFlowFiles.add(responseFlowFile);
}
//接口响应信息
session.transfer(respFlowFiles,SUCCESS);
} catch (Exception e) {
try {
if (requestFlowFile != null) {
session.remove(requestFlowFile);
}
} catch (final Exception e1) {
logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
}
logger.error("LeachHttpPagingRequest API请求异常:{}",e);
FlowFile flowFile=session.create();
final String errMsg="LeachHttpPagingRequest API请求异常:"+e.getMessage();
flowFile=session.write(flowFile,out -> out.write(errMsg.getBytes("utf-8")));
session.transfer(flowFile, FAILURE);
}
}
}
6、总结
此流程为个人开发,里面的逻辑基本都有注释,上下的组件格式也是经过单独处理进行发送和接收。
各位同学在开发时也可以根据个人需求进行逻辑修改。