Nifi 的使用
Nifi 是一個開源的資料處理工具,可以通過簡單的Processor對資料流進行處理。1.8版本内置了286個Processor,可以處理大部分的應用場景。
Nifi 的一些核心概念:
FlowFile:資訊流,每一個資料流在系統裡面流動,并包含着key/value形式的attribute,以及不同大小的content;
FlowFile Processor:資料流處理器是nifi中真正處理工作的,譬如,整合,轉換,調節系統中的流轉的資料流,資料流處理器可以接收上遊的flow的attribute,以及content。資料流處理器可以處理0至多個流,并給出相應的回報,比如送出或者復原。
Connection:有界緩沖區,不同處理器之間的連接配接紐帶,他是一個消息隊列,可以接收不同的處理器,并與之以不同的流頻率進行互動,這些隊列可以動态配置設定優先級,并且可以有負載的上限,進而實作反壓。
Flow Controller:流量控制器負責管理有多少處理器的連接配接和管理線程以及配置設定資源,他作為不同處理器之間的資料流交換代理;
Process Group:處理器組是一些連在一起的處理器的組合,他可以通過inputport得到資料,也可以通過outputport發送資料,我們可以使用不同的處理器組,構造更多的組合。
本文主要通過幾個執行個體展示Nifi的使用方法 ,涉及以下幾個方面内容:
- 1、對Mongodb資料庫進行查詢、插入、更新
- 2、對查詢結果進行轉換生成JSON
- 3、帶參數的查詢、帶參數的更新資料
- 4、一些Processor的使用方法
MongodDB 的表複制 (不修改表結構)
表複制主要涉及兩個Processor,GetMonon負責讀取源資料、PutMongo 負責對目标表插入或更新資料。
GetMongo Confingure
Settings 、Scheduling 頁簽可以設定名稱、運作計劃、自動處理的流向等
Properties 頁簽屬性如下:
屬性 | 配置值 | 說明 |
---|---|---|
Client Service | MongoDBControllerService | MongoDBControllerService是全局的或在Processor Group 範圍内有效的MongoDB資料庫連接配接服務,建立激活後其他元件可以共用。 |
Mongo Database Name | db1 | 直接輸入MongoDB的資料庫 |
Mongo Collection Name | c1 | 直接輸入複制資料源的Collection 名稱 |
Query | 支援Mongo查詢文法,預設是為空,相當于{} | |
Query Output Attribute | 把查詢條件做為一個Atttribute 寫入到FlowFile中,在以後的Processor 可以使用該屬性。 |
MongoDBControllerService 配置 屬性 | 配置值 | 說明 ---|--- |--- Mongo URI | mongodb://db:[email protected]:27017/admin | Mongo資料庫連接配接
MongoDBControllerService 在使用前必須是Enabled狀态。
PutMongo Confingure
PutMongor的配置與GetMongo基本一緻
屬性 | 配置值 | 說明 |
---|---|---|
Mode | insert | 資料更新模式:插入或更新 |
點選運作就可以看到源源不斷地将源表的資料複制到目标表裡了.
可以通過設計查詢條件,運作周期等條件控制Processor 運作頻率達到複制資料的效果。
MongodDB 的表複制 (帶查詢條件,按條件更新)
配置與上個示例類似,主要差别如下:
GetMongo
屬性 | 配置值 | 說明 |
---|---|---|
Query | {"name":"zhangsan"} | 支援Mongo查詢文法,預設是為空,相當于{},如果值是動态通過Attribute傳遞的可以使用 ${} 來擷取屬性值 ,前面的條件就可以寫成這樣: {"name":"${name}"} |
Query Output Attribute | 把查詢條件做為一個Atttribute 寫入到FlowFile中,在以後的Processor 可以使用該屬性。 |
PutMongo
屬性 | 配置值 | 說明 |
---|---|---|
Mode | update | 資料更新模式:插入或更新 |
Upsert | true | upsert操作會先在集合中進行資料查找,如果資料已經存在,則更新,否則才插入。 |
Update Query Key | 資料更新依據Key | 相當于Mongo更新語句中$set 前面的查詢條件,值是從FlowFile傳的JSON資料. |
Update Query | 自定義的更新語句 | 見下面示例:${name} 可以從Flowfile的屬性中讀屬性值 |
{
"name" : "${name}"
},
{
$set:{
"age":"${age}"
}
}
EvaluateJsonPath 元件
EvaluateJsonPath元件可以将流檔案的内容計算為一個或多個JsonPath的表達式,取得表達的内容可以寫入Flowfile的屬性或flowfile content.
寫入屬性時,在Properties 頁中點右上角的“+”增加屬性解析配置。 例如: flowfile傳的内容為:
{ "_id" : { "$oid" : "5c04a7450dc992261c228d67" }, "name" : "張三", "age" : "28" }
将JSON内容解析出放到Attribute 中
将Destination 屬性設定為flowfile-attribute,并增加下面屬性key -value
屬性名 | JsonPath |
---|---|
_id | $.$oid |
name | $.name |
age | $.age |
在後面的流程中可以看到名稱為_id、name、age的屬性及對應值。
将JSON内容解析出放到flowfile中
将Destination 屬性設定為flowfile-content,并增加下面屬性key -value
屬性名 | JsonPath |
---|---|
ignored | $.* |
在下一個節點可以看到flowfile傳的内容是jsonPath 解析出來的值。
AttributesToJSON 元件
AttributesToJSON 元件可以将flowfile 中的屬性轉換成JSON格式的flowfile 輸出
在Attributes List 用逗号分隔屬性名即可, 輸出同樣是可以選擇flowfile-content 或flowfile-attribute。
版權聲明:本文為CSDN部落客「weixin_33719619」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。
原文連結:https://blog.csdn.net/weixin_33719619/article/details/92640023