天天看點

NIFI使用者指南NIFI 使用

NIFI 使用

1. Kettle與NIFI差異

Kettle 介紹

  • Kettle是一款國外開源的ETL工具,純java編寫,可以在Window、Linux、Unix上運作,綠色無需安裝,資料抽取高效穩定。
  • Kettle 中文名稱叫水壺,該項目的主程式員MATT 希望把各種資料放到一個壺裡,然後以一種指定的格式流出。
  • Kettle這個ETL工具集,它允許你管理來自不同資料庫的資料,通過提供一個圖形化的使用者環境來描述你想做什麼,而不是你想怎麼做。
  • Kettle中有兩種腳本檔案,transformation和job,transformation完成針對資料的基礎轉換,job則完成整個工作流的控制。
  • Kettle有三個主要元件:Spoon、Kitchen、Pan
    • Spoon:是一個圖形化的界面,可以讓我們用圖形化的方式開發轉換和作業。windows選擇Spoon.bat;Linux選擇Spoon.sh
    • Kitchen:利用Kitchen可以使用指令行調用Job
    • Pan:利用Pan可以用指令行的形式調用Trans
    • Carte:Carte是一個輕量級的Web容器,用于建立專用、遠端的ETL Server。

NIFI 介紹

  • Apache NiFi 是一個易于使用、功能強大而且可靠的資料拉取、資料處理和分發系統,用于自動化管理系統間的資料流。
  • 它支援高度可配置的訓示圖的資料路由、轉換和系統中介邏輯,支援從多種資料源動态拉取資料。
  • NiFi原來是NSA(National Security Agency [美國國家安全局])的一個項目,目前已經代碼開源,是Apache基金會的頂級項目之一
  • NiFi基于Web方式工作,背景在伺服器上進行排程。
  • 使用者可以為資料處理定義為一個流程,然後進行處理,背景具有資料處理引擎、任務排程等元件。

NiFi 核心概念

  • Nifi 的設計理念接近于基于流的程式設計 Flow Based Programming。
  • FlowFile:表示通過系統移動的每個對象,包含資料流的基本屬性
  • FlowFile Processor(處理器):負責實際對資料流執行工作
  • Connection(連接配接線):負責不同處理器之間的連接配接,是資料的有界緩沖區
  • Flow Controller(流量控制器):管理程序使用的線程及其配置設定
  • Process Group(過程組):程序組是一組特定的程序及其連接配接,允許組合其他元件建立新元件

NIFI 特性

  1. 可視化指令與控制

    設計,控制,回報和監測之間的無縫體驗

  2. 高度可配置

    損失容忍vs保證傳遞

    低延遲vs高吞吐量

    動态優先

    流可以在運作時修改

    資料回壓

  3. 資料溯源

    從頭到尾跟蹤資料流

  4. 為可擴充而設計

    建立自己的處理器和更多

    快速開發和有效的測試

  5. 安全

    SSL,SSH,HTTPS,加密内容等

    多租戶授權和内部授權/政策管理

nifi是将資料轉換成一種流的形式在各種處理器之間進行處理轉換的etl工具,它通過可視化可操作的使用者界面來編輯資料,更加直覺有效。

kettle 是C/S 架構 ,NiFi是基于WEB的B/S架構,友善內建。

2. NIFI的優點

  1. 可視化的UI界面,各個子產品元件之間高度可配置,且每個流程都有監控,可以通過界面直覺的看到各個資料處理子產品之間的資料流轉情況,分析出程式性能瓶頸。
  2. 資料流可以在UI界面自由拖拽和拓展,各子產品之間互相獨立,互不影響。
  3. 可以在處理耗時的地方建立多個處理子產品并行執行,提升處理速度。類似于代碼中加入了多線程,但相對于修改代碼,界面配置操作十分簡單。
  4. 修改友善,任意子產品都可以在資料流轉過程中随時啟停,任意處理子產品都可以實作熱插拔。資料流流向随時可變。
  5. NiFi的對處理子產品有對應的retry機制和錯誤分發機制,且可配置性強。
  6. NiFi基于元件的熱插拔部署,友善內建自定義元件
  7. NiFi支援緩沖所有排隊的資料,以及在這些隊列達到指定限制時提供背壓的能力,或者在資料達到指定年齡(其值已經消失)時使資料老化
  8. 具有多種現有元件可以提供資料抽取轉換流程
  9. NiFi 可以進行叢集部署,橫向擴充,提高系統吞吐量

3. NIFI的缺點

  1. 各個步驟中間結果落地導緻磁盤IO成為Nifi的瓶頸,這個缺點在資料備援量越大的時候表現的越明顯。
  2. 在實作特定業務場景現有元件不能滿足或實作複雜,需自定義開發元件

4. 單機部署 NIFI

  1. 上傳Apache NIFI包到Linux上,解壓安裝包;或者将你的本地作為伺服器,直接解壓zip包。
  2. 在解壓的目錄下,找到conf目錄,編輯bootstrap.conf檔案,修改NIFI的記憶體配置,預設的值比較小,比如這裡我改成啟動2g,最大10g
    java.arg.2=-Xms2g
    java.arg.3=-Xmx10g
               
  3. 在解壓的目錄下,找到bin目錄,可以看到裡面有一些腳本
    dump-nifi.bat  
    nifi-env.bat  
    nifi-env.sh  
    nifi.sh         
    run-nifi.bat  
    status-nifi.bat
               
  4. 在解壓的目錄下,找到conf目錄,編輯nifi.properties檔案,修改端口号,預設為8080
    nifi.web.http.port=8080
               

Linux或者Mac,使用nifi.sh start啟動NIFI,nifi.sh stop停止NIFI,nifi.sh restart重新開機NIFI。

Windows下,直接輕按兩下run-nifi.bat即可,退出的時候關閉運作視窗就可以了。

5. 叢集部署 NIFI

NiFi采用Zero-Master Clustering範例。叢集中的每個節點對資料執行相同的任務,但每個節點都在不同的資料集上運作。其中一個節點自動選擇(通過Apache ZooKeeper)作為叢集協調器。然後,群集中的所有節點都會向此節點發送心跳/狀态資訊,并且此節點負責斷開在一段時間内未報告任何心跳狀态的節點。此外,當新節點選擇加入群集時,新節點必須首先連接配接到目前標明的群集協調器,以擷取最新流。如果群集協調器确定允許該節點加入(基于其配置的防火牆檔案),則将目前流提供給該節點,并且該節點能夠加入群集,假設節點的流副本與群集協調器提供的副本比對。如果節點的流配置版本與群集協調器的版本不同,則該節點将不會加入群集。

zookeeper:NIFI内置zookeeper

  1. 編輯執行個體中,conf/nifi.properties檔案,不同節點改成對應内容,内容如下:
    nifi.state.management.configuration.file=./conf/state-management.xml                   
    nifi.state.management.provider.local=local-provider  
    nifi.state.management.provider.cluster=zk-provider
    #  指定此NiFi執行個體是否應運作嵌入式ZooKeeper伺服器,預設是false                          
    nifi.state.management.embedded.zookeeper.start=true                                                                
    nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties 
    
    # 3個節點分别是8081 8082 8083
    nifi.web.http.port=8081
    
    # 如果執行個體是群集中的節點,請将此設定為true。預設值為false
    nifi.cluster.is.node=true
    # 3個節點分别是9081 9082 9083
    nifi.cluster.node.protocol.port=9081
    
    # 3個節點分别是6341 6342 6343
    nifi.cluster.load.balance.port=6341
    
    # 連接配接到Apache ZooKeeper所需的連接配接字元串。這是一個以逗号分隔的hostname:port對清單
    nifi.zookeeper.connect.string=localhost:2181,localhost:2182,localhost:2183
               
  2. 修改zookeeper.properties
    # 3個節點都一樣
    server.1=localhost:2111:3111;2181
    server.2=localhost:2222:3222;2182
    server.3=localhost:2333:3333;2183
               
  3. 修改state-management.xml(3個節點都一樣)
    <cluster-provider>
            <id>zk-provider</id>
            <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
            <property name="Connect String">localhost:2181,localhost:2182,localhost:2183</property>
            <property name="Root Node">/nifi</property>
            <property name="Session Timeout">10 seconds</property>
            <property name="Access Control">Open</property>
    </cluster-provider>
               
  4. 在3個節點的NIFI目錄下(bin目錄同級),建立

    state/zookeeper

    ,zookeeper檔案夾裡建立檔案

    myid

    ,3個節點分别寫入

    1

    ,

    2

    ,

    3

    #3個節點分别寫入 1 2 3
    echo 1 > myid
               
  5. 分别啟動所有節點

6. 資料同步(表字段相同)

整體流程如下:

GenerateTableFetch --> ExecuteSQLRecord --> PutDatabaseRecord --> LogAttribute

NIFI使用者指南NIFI 使用

GenerateTableFetch元件:從源表中生成擷取行的“頁”的SQL select查詢。分區大小屬性以及表的行數決定頁面和生成的流檔案的大小和數量。此外,可以通過設定最大值列來實作增量擷取,這将導緻處理器跟蹤列的最大值,進而隻擷取那些列的值超過觀察到的最大值的行

NIFI使用者指南NIFI 使用

ExecuteSQLRecord元件:執行提供的SQL選擇查詢。查詢結果将轉換為所指定格式輸出。使用流,是以支援任意大的結果集。

NIFI使用者指南NIFI 使用

PutDatabaseRecord元件:使用指定的記錄器從傳入流檔案輸入(可能是多個)記錄。這些記錄被轉換為SQL語句,并作為單個批處理執行

NIFI使用者指南NIFI 使用

連接配接池配置DBCPConnectionPool

NIFI使用者指南NIFI 使用

7. 資料同步(表字段不相同)

整體流程如下:

QueryDatabaseTable --> ConvertAvroToJSON --> SplitJson --> EvaluateJsonPath --> ReplaceText --> PutSQL

NIFI使用者指南NIFI 使用

QueryDatabaseTable元件:生成一個SQL select查詢,或使用提供的語句,并執行它來擷取指定最大值列中值大于之前看到的最大值的所有行。查詢結果将轉換為Avro格式。

NIFI使用者指南NIFI 使用

ConvertAvroToJSON元件:将二進制Avro記錄轉換為JSON對象。這個處理器提供了Avro字段到JSON字段的直接映射,這樣得到的JSON将具有與Avro文檔相同的層次結構

NIFI使用者指南NIFI 使用

SplitJson元件:對于由JsonPath表達式指定的數組元素,将一個JSON檔案拆分為多個單獨的流檔案。每個生成的FlowFile由指定數組的一個元素組成,并傳輸到關系“split”,同時将原始檔案傳輸到關系“original”。如果沒有找到指定的JsonPath,或者沒有對數組元素求值,原始檔案将被路由到“failure”,并且不生成任何檔案。

NIFI使用者指南NIFI 使用

EvaluateJsonPath元件:根據FlowFile的内容評估一個或多個JsonPath表達式。這些表達式的結果将配置設定給FlowFile屬性,或者寫入FlowFile本身的内容,具體取決于處理器的配置。

NIFI使用者指南NIFI 使用

ReplaceText元件:通過對正規表達式(regex)求值并将與正規表達式比對的内容部分替換為其他值,更新流檔案的内容。通過替換成目标表字段的sql語句,資料可以從EvaluateJsonPath元件存放到的attribute屬性中擷取,擷取方式${key},将替換後的sql語句傳遞到下遊PutSql元件中。

NIFI使用者指南NIFI 使用

PutSQL元件:執行SQL UPDATE或INSERT指令。傳入流檔案的内容應該是要執行的SQL指令。

NIFI使用者指南NIFI 使用

以上兩種資料同步都是基于mysql 到 mysql ,oracle隻需要更換資料庫連接配接池配置。

注意:oracle資料同步使用EvaluateJsonPath元件擷取屬性值時字段名稱需要大寫

NIFI使用者指南NIFI 使用

NIFI 元件之間資料傳遞時通過隊列的方式控制,是以不能控制事務,但如果有一個元件初始化失敗時,上有傳遞下來的隊列中的資料時不會被消費,當元件異常修複之後會繼續執行隊列中的内容。

8. binlog日志采集資料同步

為了不影響業務,可以通過binlog日志對資料庫表資料進行同步

整體流程:

CaptureChangeMySQL --> RouteOnAttribute --> JoltTransformJSON --> EvaluateJsonPath --> ReplaceText --> PutSQL --> LogAttribute

NIFI使用者指南NIFI 使用
NIFI使用者指南NIFI 使用

CaptureChangeMySQL元件:從MySQL資料庫中檢索更改資料捕獲(CDC)事件。CDC事件包括插入、更新、删除操作。事件輸出為按操作發生時的順序排列的單個流檔案。讀取binlog日志路由下遊處理

NIFI使用者指南NIFI 使用

redis存儲用戶端配置server服務端

NIFI使用者指南NIFI 使用

此時你會發現多了一個

RedisConnectionPoolService

NIFI使用者指南NIFI 使用

繼續配置

RedisConnectionPoolService

NIFI使用者指南NIFI 使用

​ 最後啟動redis服務端和用戶端

RouteOnAttribute 元件:根據binlog中含有的類型參數,把binlog記錄的日志操作根據類型進行路由處理,提供給不同的下遊分支操作

NIFI使用者指南NIFI 使用

Routing Strategy:路由政策用預設的Route toProperty name,根據屬性名進行路由.添加的自定義屬性可以根據業務分發給不同的下遊處理器。

JoltTransformJSON元件:對flowfile JSON有效負載應用一系列的Jolt規範。使用轉換後的内容建立一個新的FlowFile,并将其路由到“success”關系。如果JSON轉換失敗,原始的流檔案将被路由到“failure”關系。

NIFI使用者指南NIFI 使用

點選進階設定會打開如下圖Jolt測試界面

上面有紅叉子的那個區域Jolt Specification是填寫我們的Jolt語句的;

左下方區域JSON Input是輸入要被處理前的Json資料.

右下方區域JSON Output是輸出Input被jolt語句處理後的結果.

NIFI使用者指南NIFI 使用

Jolt Specification區域輸入以下内容

[{
	"operation": "shift",
	"spec": {
		"columns": {
			"*": {
				"@(value)": "@(1,name)"
			}
		}
	}
}]
           

“operation”: “shift”:實作整理出key,value格式

“operation”: “modify-default-beta”:實作拼接了一個帶字首字元串的新字段apid,以及value是字元串ap_拼接id的value值.

JSON Input輸入以下内容

{
  "type" : "delete",
  "timestamp" : 1592386594000,
  "binlog_filename" : "mysql-bin.000001",
  "binlog_position" : 229,
  "database" : "ipaas",
  "table_name" : "target",
  "table_id" : 33,
  "columns" : [ {
    "id" : 1,
    "name" : "id",
    "column_type" : -5,
    "value" : 50
  }, {
    "id" : 2,
    "name" : "username",
    "column_type" : 12,
    "value" : "徐朝"
  }, {
    "id" : 3,
    "name" : "userage",
    "column_type" : 4,
    "value" : 20
  }, {
    "id" : 4,
    "name" : "time",
    "column_type" : 12,
    "value" : "2020-06-17 10:31:45"
  } ]
}
           

最後點選TRANSFORM按鈕檢視效果

NIFI使用者指南NIFI 使用

測試沒問題,可以複制我們調試好的Jolt Specification内容,傳回剛才Jolt Specification這裡,貼進去儲存配置

EvaluateJsonPath元件:根據FlowFile的内容評估一個或多個JsonPath表達式。這些表達式的結果将配置設定給FlowFile屬性,或者寫入FlowFile本身的内容,具體取決于處理器的配置。

NIFI使用者指南NIFI 使用

ReplaceText元件:通過對正規表達式(regex)求值并将與正規表達式比對的内容部分替換為其他值,更新流檔案的内容。通過替換成目标表字段的sql語句,資料可以從EvaluateJsonPath元件存放到的attribute屬性中擷取,擷取方式${key},将替換後的sql語句傳遞到下遊PutSql元件中。

NIFI使用者指南NIFI 使用

PutSQL元件:執行上遊傳遞下來的sql語句

NIFI使用者指南NIFI 使用

LogAttribute元件:記錄執行日志

NIFI使用者指南NIFI 使用

輸出結果:

NIFI使用者指南NIFI 使用

9. 多表分别查詢彙總入庫(表字段不相同)

完整流程:

NIFI使用者指南NIFI 使用

同**資料同步(表字段不同)**分别有多條處理流程将資料查詢出來,然後使用funnel元件進行資料合并後統一入庫

10. 根據規則字段映射

完整流程:

NIFI使用者指南NIFI 使用

從源資料表中查詢出所有資料轉換為json,然後通過SplitJson切分成多個json對象,在通過EvaluateJsonPath元件将值存放到屬性清單中,再通過ExecuteSQL元件根據字段映射條件查詢規則表并轉換為json,再通過EvaluateJsonPath元件将規則表資料也添加到源表資料的屬性清單中,再根據RouteOnAttribute元件判斷條件路由需要的資料到下遊;然後通過ReplaceText元件從屬性清單中擷取值拼接sql交由下遊處理器PutSQL執行。

ExecuteSQL元件配置如下:

NIFI使用者指南NIFI 使用

RouteOnAttribute元件配置如下:

自定義添加過濾條件

NIFI使用者指南NIFI 使用

11. 自定義元件Nifi Processor

1. 建立Maven工程

父工程my-processor,子工程nifi-my-processor-nar和nifi-my-processor-processors,這裡使用的版本時1.11.4

NIFI使用者指南NIFI 使用

my-processor pom檔案:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi</artifactId>
        <version>1.11.4</version>
    </parent>

    <groupId>org.apache.nifi</groupId>
    <artifactId>my-processor</artifactId>
    <version>1.11.4</version>

    <name>my-processor</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <modules>
        <module>nifi-my-processor-nar</module>
        <module>nifi-my-processor-processors</module>
    </modules>

</project>

           

nifi-my-processor-nar pom檔案:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>my-processor</artifactId>
        <groupId>org.apache.nifi</groupId>
        <version>1.11.4</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>nifi-my-processor-nar</artifactId>
    <packaging>nar</packaging>

    <name>nifi-my-processor-nar</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>

    <properties>
        <maven.javadoc.skip>true</maven.javadoc.skip>
        <source.skip>true</source.skip>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-standard-services-api-nar</artifactId>
            <version>1.11.4</version>
            <type>nar</type>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-my-processor-processors</artifactId>
            <version>1.11.4</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

           

nifi-my-processor-processors pom檔案:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>my-processor</artifactId>
        <groupId>org.apache.nifi</groupId>
        <version>1.11.4</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>nifi-my-processor-processors</artifactId>
    <packaging>jar</packaging>

    <name>nifi-my-processor-processors</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>


    <dependencies>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-api</artifactId>
            <version>1.11.4</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-utils</artifactId>
            <version>1.11.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-mock</artifactId>
            <version>1.11.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

           

2. 修改項目編寫代碼

  1. 删除nifi-my-processor-processors子項目中,src/test中的測試檔案(打包可能出現錯誤)
  2. Nifi的要求是在/src/main/resources/META-INF/services/目錄下建立一個檔案org.apache.nifi.processor.Processor,這個類似于配置檔案,指向該Processor所在的目錄,比如我的配置檔案内容就是
    org.apache.nifi.processor.MyProcessor
NIFI使用者指南NIFI 使用

代碼編寫,建立MyProcessor類。其中有設定狀态,屬性,及處理方法(onTrigger)等

NIFI使用者指南NIFI 使用
package org.apache.nifi.processor;

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.InputStreamReader;
import java.io.StringWriter;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @Classname MyProcessor
 * @Description
 * @Author xuzhaoa
 * @Date 2020/7/2 9:49
 */
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
public class MyProcessor extends AbstractProcessor {


    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder().name("MY_PROPERTY")
            .displayName("My property")
            .description("Example Property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship MY_RELATIONSHIP_SUCCESS = new Relationship.Builder()
            .name("sucess")
            .description("Example relationship Success")
            .build();

    public static final Relationship MY_RELATIONSHIP_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("Example relationship Failure")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(MY_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(MY_RELATIONSHIP_SUCCESS);
        relationships.add(MY_RELATIONSHIP_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        // TODO implement
        final AtomicReference<String> value = new AtomicReference<>();
        session.read(flowFile, in -> {
            try {
                StringWriter sw = new StringWriter();
                InputStreamReader inr = new InputStreamReader(in);
                char[] buffer = new char[1024];
                int n = 0;
                while (-1 != (n = inr.read(buffer))) {
                    sw.write(buffer, 0, n);
                }
                String str = sw.toString();

                String result = "處理了:" + str + context.getProperty("MY_PROPERTY").getValue();
                value.set(result);
            } catch (Exception ex) {
                ex.printStackTrace();
                getLogger().error("Failed to read json string.");
            }
        });

        String results = value.get();
        if (results != null && !results.isEmpty()) {
            flowFile = session.putAttribute(flowFile, "match", results);
        }

        flowFile = session.write(flowFile, out -> out.write(value.get().getBytes()));

        session.transfer(flowFile, MY_RELATIONSHIP_SUCCESS);

    }
}
           

我們使其extends AbstractProcessor這個抽象類,@Tag标簽是為了在web GUI中,能夠使用搜尋的方式快速找到我們自己定義的這個Processor。CapabilityDescription内的值會暫時在Processor選擇的那個頁面中,相當于一個備注。

一般來說隻需要繼承AbstractProcessor就可以了,但是某些複雜的任務可能需要去繼承更底層的AbstractSessionFactoryProcessor這個抽象類。

我們通過

PropertyDescriptor

以及

Relationship

中的模闆方法定義了兩個新的關系和屬性描述值,這些值會出現在webUI中

該元件隻是簡單的測試将流中資料替換,功能實作主要通過該類自行實作

整個Processor的核心部分 -> onTrigger 部分, onTrigger方法會在一個flow file被傳入處理器時調用。為了讀取以及改變傳遞來的FlowFile,Nifi提供了三個callback接口方法

  • InputStreamCallback:

    該接口繼承細節如下: 将流中的資料讀取處理進行替換

    NIFI使用者指南NIFI 使用

OutputStreamCallback :将内容寫入值中

NIFI使用者指南NIFI 使用

最後使用

transfer()

功能傳遞回這個flowFile以及成功辨別。

3. 打包部署

項目打包後将nifi-my-processor-nar工程target目錄中的 nifi-my-processor-nar-1.0-SNAPSHOT.nar 檔案,拷貝到 nifi\lib 目錄中

建立流程使用自定義元件

GenerateFlowFile --> MyProcessor --> PutFile

NIFI使用者指南NIFI 使用

GenerateFlowFile 元件配置生成内容

NIFI使用者指南NIFI 使用

MyProcessor 元件替換内容

NIFI使用者指南NIFI 使用

檢視結果:

NIFI使用者指南NIFI 使用