天天看點

NiFi學習-json檔案資料入庫流程圖處理器

NiFi學習-json檔案資料入庫

  • 流程圖
  • 處理器
    • GetFile處理器
    • PutFile
    • SplitJson
    • JoltTransformJSON
    • ExecuteGroovyScript

流程圖

NiFi學習-json檔案資料入庫流程圖處理器
NiFi學習-json檔案資料入庫流程圖處理器

處理器

GetFile處理器

讀取D:\nifi-1.11.4-bin\test路徑下以UP開頭的檔案

NiFi學習-json檔案資料入庫流程圖處理器

PutFile

備份檔案到testBackUp目錄下

NiFi學習-json檔案資料入庫流程圖處理器

SplitJson

分割net_test節點

NiFi學習-json檔案資料入庫流程圖處理器

JoltTransformJSON

json字段轉換

NiFi學習-json檔案資料入庫流程圖處理器

左下角ADVANCED處可以進行轉換測試:

NiFi學習-json檔案資料入庫流程圖處理器

ExecuteGroovyScript

執行Groovy腳本進行資料入庫(MySql)

NiFi學習-json檔案資料入庫流程圖處理器
import groovy.json.JsonSlurper
import groovy.sql.Sql
import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get()
if (!flowFile) return;

flowFile.read { inputStream ->
    def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    def msgContent = new JsonSlurper().parseText(text);
    def qrySql = "select 1 from sys_test where id = '${msgContent.ID}'";
    List list = SQL.db.rows(qrySql);
    if (list.size() > 0) {
        //存在記錄
        def upSql = "update sys_test set \
                name ='${msgContent.NAME}' \
                where ID = '${msgContent.ID}'";
        log.debug(upSql);
        SQL.db.executeUpdate(upSql);
    } else {
        def insertSql = "insert into sys_test(ID,NAME,AGE)values( \
           '${msgContent.ID}', \
           '${msgContent.NAME}',\
           '${msgContent.AGE}')";
        log.debug(insertSql);
        SQL.db.executeUpdate(insertSql);
    }
}

REL_SUCCESS << flowFile
           

繼續閱讀