NIFI 使用
1. Kettle與NIFI差異
Kettle 介紹
- Kettle是一款國外開源的ETL工具,純java編寫,可以在Window、Linux、Unix上運作,綠色無需安裝,資料抽取高效穩定。
- Kettle 中文名稱叫水壺,該項目的主程式員MATT 希望把各種資料放到一個壺裡,然後以一種指定的格式流出。
- Kettle這個ETL工具集,它允許你管理來自不同資料庫的資料,通過提供一個圖形化的使用者環境來描述你想做什麼,而不是你想怎麼做。
- Kettle中有兩種腳本檔案,transformation和job,transformation完成針對資料的基礎轉換,job則完成整個工作流的控制。
- Kettle有三個主要元件:Spoon、Kitchen、Pan
- Spoon:是一個圖形化的界面,可以讓我們用圖形化的方式開發轉換和作業。windows選擇Spoon.bat;Linux選擇Spoon.sh
- Kitchen:利用Kitchen可以使用指令行調用Job
- Pan:利用Pan可以用指令行的形式調用Trans
- Carte:Carte是一個輕量級的Web容器,用于建立專用、遠端的ETL Server。
NIFI 介紹
- Apache NiFi 是一個易于使用、功能強大而且可靠的資料拉取、資料處理和分發系統,用于自動化管理系統間的資料流。
- 它支援高度可配置的訓示圖的資料路由、轉換和系統中介邏輯,支援從多種資料源動态拉取資料。
- NiFi原來是NSA(National Security Agency [美國國家安全局])的一個項目,目前已經代碼開源,是Apache基金會的頂級項目之一
- NiFi基于Web方式工作,背景在伺服器上進行排程。
- 使用者可以為資料處理定義為一個流程,然後進行處理,背景具有資料處理引擎、任務排程等元件。
NiFi 核心概念
- Nifi 的設計理念接近于基于流的程式設計 Flow Based Programming。
- FlowFile:表示通過系統移動的每個對象,包含資料流的基本屬性
- FlowFile Processor(處理器):負責實際對資料流執行工作
- Connection(連接配接線):負責不同處理器之間的連接配接,是資料的有界緩沖區
- Flow Controller(流量控制器):管理程序使用的線程及其配置設定
- Process Group(過程組):程序組是一組特定的程序及其連接配接,允許組合其他元件建立新元件
NIFI 特性
-
可視化指令與控制
設計,控制,回報和監測之間的無縫體驗
-
高度可配置
損失容忍vs保證傳遞
低延遲vs高吞吐量
動态優先
流可以在運作時修改
資料回壓
-
資料溯源
從頭到尾跟蹤資料流
-
為可擴充而設計
建立自己的處理器和更多
快速開發和有效的測試
-
安全
SSL,SSH,HTTPS,加密内容等
多租戶授權和内部授權/政策管理
nifi是将資料轉換成一種流的形式在各種處理器之間進行處理轉換的etl工具,它通過可視化可操作的使用者界面來編輯資料,更加直覺有效。
kettle 是C/S 架構 ,NiFi是基于WEB的B/S架構,友善內建。
2. NIFI的優點
- 可視化的UI界面,各個子產品元件之間高度可配置,且每個流程都有監控,可以通過界面直覺的看到各個資料處理子產品之間的資料流轉情況,分析出程式性能瓶頸。
- 資料流可以在UI界面自由拖拽和拓展,各子產品之間互相獨立,互不影響。
- 可以在處理耗時的地方建立多個處理子產品并行執行,提升處理速度。類似于代碼中加入了多線程,但相對于修改代碼,界面配置操作十分簡單。
- 修改友善,任意子產品都可以在資料流轉過程中随時啟停,任意處理子產品都可以實作熱插拔。資料流流向随時可變。
- NiFi的對處理子產品有對應的retry機制和錯誤分發機制,且可配置性強。
- NiFi基于元件的熱插拔部署,友善內建自定義元件
- NiFi支援緩沖所有排隊的資料,以及在這些隊列達到指定限制時提供背壓的能力,或者在資料達到指定年齡(其值已經消失)時使資料老化
- 具有多種現有元件可以提供資料抽取轉換流程
- NiFi 可以進行叢集部署,橫向擴充,提高系統吞吐量
3. NIFI的缺點
- 各個步驟中間結果落地導緻磁盤IO成為Nifi的瓶頸,這個缺點在資料備援量越大的時候表現的越明顯。
- 在實作特定業務場景現有元件不能滿足或實作複雜,需自定義開發元件
4. 單機部署 NIFI
- 上傳Apache NIFI包到Linux上,解壓安裝包;或者将你的本地作為伺服器,直接解壓zip包。
- 在解壓的目錄下,找到conf目錄,編輯bootstrap.conf檔案,修改NIFI的記憶體配置,預設的值比較小,比如這裡我改成啟動2g,最大10g
java.arg.2=-Xms2g java.arg.3=-Xmx10g
- 在解壓的目錄下,找到bin目錄,可以看到裡面有一些腳本
dump-nifi.bat nifi-env.bat nifi-env.sh nifi.sh run-nifi.bat status-nifi.bat
- 在解壓的目錄下,找到conf目錄,編輯nifi.properties檔案,修改端口号,預設為8080
nifi.web.http.port=8080
Linux或者Mac,使用nifi.sh start啟動NIFI,nifi.sh stop停止NIFI,nifi.sh restart重新開機NIFI。
Windows下,直接輕按兩下run-nifi.bat即可,退出的時候關閉運作視窗就可以了。
5. 叢集部署 NIFI
NiFi采用Zero-Master Clustering範例。叢集中的每個節點對資料執行相同的任務,但每個節點都在不同的資料集上運作。其中一個節點自動選擇(通過Apache ZooKeeper)作為叢集協調器。然後,群集中的所有節點都會向此節點發送心跳/狀态資訊,并且此節點負責斷開在一段時間内未報告任何心跳狀态的節點。此外,當新節點選擇加入群集時,新節點必須首先連接配接到目前標明的群集協調器,以擷取最新流。如果群集協調器确定允許該節點加入(基于其配置的防火牆檔案),則将目前流提供給該節點,并且該節點能夠加入群集,假設節點的流副本與群集協調器提供的副本比對。如果節點的流配置版本與群集協調器的版本不同,則該節點将不會加入群集。
zookeeper:NIFI内置zookeeper
- 編輯執行個體中,conf/nifi.properties檔案,不同節點改成對應内容,内容如下:
nifi.state.management.configuration.file=./conf/state-management.xml nifi.state.management.provider.local=local-provider nifi.state.management.provider.cluster=zk-provider # 指定此NiFi執行個體是否應運作嵌入式ZooKeeper伺服器,預設是false nifi.state.management.embedded.zookeeper.start=true nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties # 3個節點分别是8081 8082 8083 nifi.web.http.port=8081 # 如果執行個體是群集中的節點,請将此設定為true。預設值為false nifi.cluster.is.node=true # 3個節點分别是9081 9082 9083 nifi.cluster.node.protocol.port=9081 # 3個節點分别是6341 6342 6343 nifi.cluster.load.balance.port=6341 # 連接配接到Apache ZooKeeper所需的連接配接字元串。這是一個以逗号分隔的hostname:port對清單 nifi.zookeeper.connect.string=localhost:2181,localhost:2182,localhost:2183
- 修改zookeeper.properties
# 3個節點都一樣 server.1=localhost:2111:3111;2181 server.2=localhost:2222:3222;2182 server.3=localhost:2333:3333;2183
- 修改state-management.xml(3個節點都一樣)
<cluster-provider> <id>zk-provider</id> <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class> <property name="Connect String">localhost:2181,localhost:2182,localhost:2183</property> <property name="Root Node">/nifi</property> <property name="Session Timeout">10 seconds</property> <property name="Access Control">Open</property> </cluster-provider>
- 在3個節點的NIFI目錄下(bin目錄同級),建立
,zookeeper檔案夾裡建立檔案state/zookeeper
,3個節點分别寫入myid
,1
,2
3
#3個節點分别寫入 1 2 3 echo 1 > myid
- 分别啟動所有節點
6. 資料同步(表字段相同)
整體流程如下:
GenerateTableFetch --> ExecuteSQLRecord --> PutDatabaseRecord --> LogAttribute

GenerateTableFetch元件:從源表中生成擷取行的“頁”的SQL select查詢。分區大小屬性以及表的行數決定頁面和生成的流檔案的大小和數量。此外,可以通過設定最大值列來實作增量擷取,這将導緻處理器跟蹤列的最大值,進而隻擷取那些列的值超過觀察到的最大值的行
ExecuteSQLRecord元件:執行提供的SQL選擇查詢。查詢結果将轉換為所指定格式輸出。使用流,是以支援任意大的結果集。
PutDatabaseRecord元件:使用指定的記錄器從傳入流檔案輸入(可能是多個)記錄。這些記錄被轉換為SQL語句,并作為單個批處理執行
連接配接池配置DBCPConnectionPool
7. 資料同步(表字段不相同)
整體流程如下:
QueryDatabaseTable --> ConvertAvroToJSON --> SplitJson --> EvaluateJsonPath --> ReplaceText --> PutSQL
QueryDatabaseTable元件:生成一個SQL select查詢,或使用提供的語句,并執行它來擷取指定最大值列中值大于之前看到的最大值的所有行。查詢結果将轉換為Avro格式。
ConvertAvroToJSON元件:将二進制Avro記錄轉換為JSON對象。這個處理器提供了Avro字段到JSON字段的直接映射,這樣得到的JSON将具有與Avro文檔相同的層次結構
SplitJson元件:對于由JsonPath表達式指定的數組元素,将一個JSON檔案拆分為多個單獨的流檔案。每個生成的FlowFile由指定數組的一個元素組成,并傳輸到關系“split”,同時将原始檔案傳輸到關系“original”。如果沒有找到指定的JsonPath,或者沒有對數組元素求值,原始檔案将被路由到“failure”,并且不生成任何檔案。
EvaluateJsonPath元件:根據FlowFile的内容評估一個或多個JsonPath表達式。這些表達式的結果将配置設定給FlowFile屬性,或者寫入FlowFile本身的内容,具體取決于處理器的配置。
ReplaceText元件:通過對正規表達式(regex)求值并将與正規表達式比對的内容部分替換為其他值,更新流檔案的内容。通過替換成目标表字段的sql語句,資料可以從EvaluateJsonPath元件存放到的attribute屬性中擷取,擷取方式${key},将替換後的sql語句傳遞到下遊PutSql元件中。
PutSQL元件:執行SQL UPDATE或INSERT指令。傳入流檔案的内容應該是要執行的SQL指令。
以上兩種資料同步都是基于mysql 到 mysql ,oracle隻需要更換資料庫連接配接池配置。
注意:oracle資料同步使用EvaluateJsonPath元件擷取屬性值時字段名稱需要大寫
NIFI 元件之間資料傳遞時通過隊列的方式控制,是以不能控制事務,但如果有一個元件初始化失敗時,上有傳遞下來的隊列中的資料時不會被消費,當元件異常修複之後會繼續執行隊列中的内容。
8. binlog日志采集資料同步
為了不影響業務,可以通過binlog日志對資料庫表資料進行同步
整體流程:
CaptureChangeMySQL --> RouteOnAttribute --> JoltTransformJSON --> EvaluateJsonPath --> ReplaceText --> PutSQL --> LogAttribute
CaptureChangeMySQL元件:從MySQL資料庫中檢索更改資料捕獲(CDC)事件。CDC事件包括插入、更新、删除操作。事件輸出為按操作發生時的順序排列的單個流檔案。讀取binlog日志路由下遊處理
redis存儲用戶端配置server服務端
此時你會發現多了一個
RedisConnectionPoolService
繼續配置
RedisConnectionPoolService
最後啟動redis服務端和用戶端
RouteOnAttribute 元件:根據binlog中含有的類型參數,把binlog記錄的日志操作根據類型進行路由處理,提供給不同的下遊分支操作
Routing Strategy:路由政策用預設的Route toProperty name,根據屬性名進行路由.添加的自定義屬性可以根據業務分發給不同的下遊處理器。
JoltTransformJSON元件:對flowfile JSON有效負載應用一系列的Jolt規範。使用轉換後的内容建立一個新的FlowFile,并将其路由到“success”關系。如果JSON轉換失敗,原始的流檔案将被路由到“failure”關系。
點選進階設定會打開如下圖Jolt測試界面
上面有紅叉子的那個區域Jolt Specification是填寫我們的Jolt語句的;
左下方區域JSON Input是輸入要被處理前的Json資料.
右下方區域JSON Output是輸出Input被jolt語句處理後的結果.
Jolt Specification區域輸入以下内容
[{
"operation": "shift",
"spec": {
"columns": {
"*": {
"@(value)": "@(1,name)"
}
}
}
}]
“operation”: “shift”:實作整理出key,value格式
“operation”: “modify-default-beta”:實作拼接了一個帶字首字元串的新字段apid,以及value是字元串ap_拼接id的value值.
JSON Input輸入以下内容
{
"type" : "delete",
"timestamp" : 1592386594000,
"binlog_filename" : "mysql-bin.000001",
"binlog_position" : 229,
"database" : "ipaas",
"table_name" : "target",
"table_id" : 33,
"columns" : [ {
"id" : 1,
"name" : "id",
"column_type" : -5,
"value" : 50
}, {
"id" : 2,
"name" : "username",
"column_type" : 12,
"value" : "徐朝"
}, {
"id" : 3,
"name" : "userage",
"column_type" : 4,
"value" : 20
}, {
"id" : 4,
"name" : "time",
"column_type" : 12,
"value" : "2020-06-17 10:31:45"
} ]
}
最後點選TRANSFORM按鈕檢視效果
測試沒問題,可以複制我們調試好的Jolt Specification内容,傳回剛才Jolt Specification這裡,貼進去儲存配置
EvaluateJsonPath元件:根據FlowFile的内容評估一個或多個JsonPath表達式。這些表達式的結果将配置設定給FlowFile屬性,或者寫入FlowFile本身的内容,具體取決于處理器的配置。
ReplaceText元件:通過對正規表達式(regex)求值并将與正規表達式比對的内容部分替換為其他值,更新流檔案的内容。通過替換成目标表字段的sql語句,資料可以從EvaluateJsonPath元件存放到的attribute屬性中擷取,擷取方式${key},将替換後的sql語句傳遞到下遊PutSql元件中。
PutSQL元件:執行上遊傳遞下來的sql語句
LogAttribute元件:記錄執行日志
輸出結果:
9. 多表分别查詢彙總入庫(表字段不相同)
完整流程:
同**資料同步(表字段不同)**分别有多條處理流程将資料查詢出來,然後使用funnel元件進行資料合并後統一入庫
10. 根據規則字段映射
完整流程:
從源資料表中查詢出所有資料轉換為json,然後通過SplitJson切分成多個json對象,在通過EvaluateJsonPath元件将值存放到屬性清單中,再通過ExecuteSQL元件根據字段映射條件查詢規則表并轉換為json,再通過EvaluateJsonPath元件将規則表資料也添加到源表資料的屬性清單中,再根據RouteOnAttribute元件判斷條件路由需要的資料到下遊;然後通過ReplaceText元件從屬性清單中擷取值拼接sql交由下遊處理器PutSQL執行。
ExecuteSQL元件配置如下:
RouteOnAttribute元件配置如下:
自定義添加過濾條件
11. 自定義元件Nifi Processor
1. 建立Maven工程
父工程my-processor,子工程nifi-my-processor-nar和nifi-my-processor-processors,這裡使用的版本時1.11.4
my-processor pom檔案:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>1.11.4</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>my-processor</artifactId>
<version>1.11.4</version>
<name>my-processor</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<modules>
<module>nifi-my-processor-nar</module>
<module>nifi-my-processor-processors</module>
</modules>
</project>
nifi-my-processor-nar pom檔案:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>my-processor</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.11.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-my-processor-nar</artifactId>
<packaging>nar</packaging>
<name>nifi-my-processor-nar</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.11.4</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-my-processor-processors</artifactId>
<version>1.11.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
nifi-my-processor-processors pom檔案:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>my-processor</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.11.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-my-processor-processors</artifactId>
<packaging>jar</packaging>
<name>nifi-my-processor-processors</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.11.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.11.4</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.11.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
2. 修改項目編寫代碼
- 删除nifi-my-processor-processors子項目中,src/test中的測試檔案(打包可能出現錯誤)
- Nifi的要求是在/src/main/resources/META-INF/services/目錄下建立一個檔案org.apache.nifi.processor.Processor,這個類似于配置檔案,指向該Processor所在的目錄,比如我的配置檔案内容就是
org.apache.nifi.processor.MyProcessor
代碼編寫,建立MyProcessor類。其中有設定狀态,屬性,及處理方法(onTrigger)等
package org.apache.nifi.processor;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.InputStreamReader;
import java.io.StringWriter;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
/**
* @Classname MyProcessor
* @Description
* @Author xuzhaoa
* @Date 2020/7/2 9:49
*/
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
public class MyProcessor extends AbstractProcessor {
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder().name("MY_PROPERTY")
.displayName("My property")
.description("Example Property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship MY_RELATIONSHIP_SUCCESS = new Relationship.Builder()
.name("sucess")
.description("Example relationship Success")
.build();
public static final Relationship MY_RELATIONSHIP_FAILURE = new Relationship.Builder()
.name("failure")
.description("Example relationship Failure")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(MY_PROPERTY);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(MY_RELATIONSHIP_SUCCESS);
relationships.add(MY_RELATIONSHIP_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
// TODO implement
final AtomicReference<String> value = new AtomicReference<>();
session.read(flowFile, in -> {
try {
StringWriter sw = new StringWriter();
InputStreamReader inr = new InputStreamReader(in);
char[] buffer = new char[1024];
int n = 0;
while (-1 != (n = inr.read(buffer))) {
sw.write(buffer, 0, n);
}
String str = sw.toString();
String result = "處理了:" + str + context.getProperty("MY_PROPERTY").getValue();
value.set(result);
} catch (Exception ex) {
ex.printStackTrace();
getLogger().error("Failed to read json string.");
}
});
String results = value.get();
if (results != null && !results.isEmpty()) {
flowFile = session.putAttribute(flowFile, "match", results);
}
flowFile = session.write(flowFile, out -> out.write(value.get().getBytes()));
session.transfer(flowFile, MY_RELATIONSHIP_SUCCESS);
}
}
我們使其extends AbstractProcessor這個抽象類,@Tag标簽是為了在web GUI中,能夠使用搜尋的方式快速找到我們自己定義的這個Processor。CapabilityDescription内的值會暫時在Processor選擇的那個頁面中,相當于一個備注。
一般來說隻需要繼承AbstractProcessor就可以了,但是某些複雜的任務可能需要去繼承更底層的AbstractSessionFactoryProcessor這個抽象類。
我們通過
PropertyDescriptor
以及
Relationship
中的模闆方法定義了兩個新的關系和屬性描述值,這些值會出現在webUI中
該元件隻是簡單的測試将流中資料替換,功能實作主要通過該類自行實作
整個Processor的核心部分 -> onTrigger 部分, onTrigger方法會在一個flow file被傳入處理器時調用。為了讀取以及改變傳遞來的FlowFile,Nifi提供了三個callback接口方法
-
InputStreamCallback:
該接口繼承細節如下: 将流中的資料讀取處理進行替換
NIFI使用者指南NIFI 使用
OutputStreamCallback :将内容寫入值中
最後使用
transfer()
功能傳遞回這個flowFile以及成功辨別。
3. 打包部署
項目打包後将nifi-my-processor-nar工程target目錄中的 nifi-my-processor-nar-1.0-SNAPSHOT.nar 檔案,拷貝到 nifi\lib 目錄中
建立流程使用自定義元件
GenerateFlowFile --> MyProcessor --> PutFile
GenerateFlowFile 元件配置生成内容
MyProcessor 元件替換内容
檢視結果: