天天看点

Nifi 的简单使用Nifi 的使用

Nifi 的使用

Nifi 是一个开源的数据处理工具,可以通过简单的Processor对数据流进行处理。1.8版本内置了286个Processor,可以处理大部分的应用场景。

Nifi 的一些核心概念:

FlowFile:信息流,每一个数据流在系统里面流动,并包含着key/value形式的attribute,以及不同大小的content;

FlowFile Processor:数据流处理器是nifi中真正处理工作的,譬如,整合,转换,调节系统中的流转的数据流,数据流处理器可以接收上游的flow的attribute,以及content。数据流处理器可以处理0至多个流,并给出相应的反馈,比如提交或者回滚。

Connection:有界缓冲区,不同处理器之间的连接纽带,他是一个消息队列,可以接收不同的处理器,并与之以不同的流频率进行交互,这些队列可以动态分配优先级,并且可以有负载的上限,从而实现反压。

Flow Controller:流量控制器负责管理有多少处理器的连接和管理线程以及分配资源,他作为不同处理器之间的数据流交换代理;

Process Group:处理器组是一些连在一起的处理器的组合,他可以通过inputport得到数据,也可以通过outputport发送数据,我们可以使用不同的处理器组,构造更多的组合。

本文主要通过几个实例展示Nifi的使用方法 ,涉及以下几个方面内容:

  • 1、对Mongodb数据库进行查询、插入、更新
  • 2、对查询结果进行转换生成JSON
  • 3、带参数的查询、带参数的更新数据
  • 4、一些Processor的使用方法

MongodDB 的表复制 (不修改表结构)

表复制主要涉及两个Processor,GetMonon负责读取源数据、PutMongo 负责对目标表插入或更新数据。

GetMongo Confingure

Settings 、Scheduling 页签可以设置名称、运行计划、自动处理的流向等

Properties 页签属性如下:

属性 配置值 说明
Client Service MongoDBControllerService MongoDBControllerService是全局的或在Processor Group 范围内有效的MongoDB数据库连接服务,创建激活后其他组件可以共用。
Mongo Database Name db1 直接输入MongoDB的数据库
Mongo Collection Name c1 直接输入复制数据源的Collection 名称
Query 支持Mongo查询语法,默认是为空,相当于{}
Query Output Attribute 把查询条件做为一个Atttribute 写入到FlowFile中,在以后的Processor 可以使用该属性。

MongoDBControllerService 配置 属性 | 配置值 | 说明 ---|--- |--- Mongo URI | mongodb://db:[email protected]:27017/admin | Mongo数据库连接

MongoDBControllerService 在使用前必须是Enabled状态。

PutMongo Confingure

PutMongor的配置与GetMongo基本一致

属性 配置值 说明
Mode insert 数据更新模式:插入或更新

点击运行就可以看到源源不断地将源表的数据复制到目标表里了.

可以通过设计查询条件,运行周期等条件控制Processor 运行频率达到复制数据的效果。

MongodDB 的表复制 (带查询条件,按条件更新)

配置与上个示例类似,主要差别如下:

GetMongo

属性 配置值 说明
Query {"name":"zhangsan"} 支持Mongo查询语法,默认是为空,相当于{},如果值是动态通过Attribute传递的可以使用 ${} 来获取属性值 ,前面的条件就可以写成这样: {"name":"${name}"}
Query Output Attribute 把查询条件做为一个Atttribute 写入到FlowFile中,在以后的Processor 可以使用该属性。

PutMongo

属性 配置值 说明
Mode update 数据更新模式:插入或更新
Upsert true upsert操作会先在集合中进行数据查找,如果数据已经存在,则更新,否则才插入。
Update Query Key 数据更新依据Key 相当于Mongo更新语句中$set 前面的查询条件,值是从FlowFile传的JSON数据.
Update Query 自定义的更新语句 见下面示例:${name} 可以从Flowfile的属性中读属性值
{
    "name" : "${name}"
},
{
    $set:{
       "age":"${age}"
    }
}
           

EvaluateJsonPath 组件

EvaluateJsonPath组件可以将流文件的内容计算为一个或多个JsonPath的表达式,取得表达的内容可以写入Flowfile的属性或flowfile content.

写入属性时,在Properties 页中点右上角的“+”增加属性解析配置。 例如: flowfile传的内容为:

{ "_id" : { "$oid" : "5c04a7450dc992261c228d67" }, "name" : "张三", "age" : "28" }
           

将JSON内容解析出放到Attribute 中

将Destination 属性设置为flowfile-attribute,并增加下面属性key -value

属性名 JsonPath
_id $.$oid
name $.name
age $.age

在后面的流程中可以看到名称为_id、name、age的属性及对应值。

将JSON内容解析出放到flowfile中

将Destination 属性设置为flowfile-content,并增加下面属性key -value

属性名 JsonPath
ignored $.*

在下一个节点可以看到flowfile传的内容是jsonPath 解析出来的值。

AttributesToJSON 组件

AttributesToJSON 组件可以将flowfile 中的属性转换成JSON格式的flowfile 输出

在Attributes List 用逗号分隔属性名即可, 输出同样是可以选择flowfile-content 或flowfile-attribute。

版权声明:本文为CSDN博主「weixin_33719619」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/weixin_33719619/article/details/92640023