天天看点

NiFi学习-json文件数据入库流程图处理器

NiFi学习-json文件数据入库

  • 流程图
  • 处理器
    • GetFile处理器
    • PutFile
    • SplitJson
    • JoltTransformJSON
    • ExecuteGroovyScript

流程图

NiFi学习-json文件数据入库流程图处理器
NiFi学习-json文件数据入库流程图处理器

处理器

GetFile处理器

读取D:\nifi-1.11.4-bin\test路径下以UP开头的文件

NiFi学习-json文件数据入库流程图处理器

PutFile

备份文件到testBackUp目录下

NiFi学习-json文件数据入库流程图处理器

SplitJson

分割net_test节点

NiFi学习-json文件数据入库流程图处理器

JoltTransformJSON

json字段转换

NiFi学习-json文件数据入库流程图处理器

左下角ADVANCED处可以进行转换测试:

NiFi学习-json文件数据入库流程图处理器

ExecuteGroovyScript

执行Groovy脚本进行数据入库(MySql)

NiFi学习-json文件数据入库流程图处理器
import groovy.json.JsonSlurper
import groovy.sql.Sql
import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get()
if (!flowFile) return;

flowFile.read { inputStream ->
    def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    def msgContent = new JsonSlurper().parseText(text);
    def qrySql = "select 1 from sys_test where id = '${msgContent.ID}'";
    List list = SQL.db.rows(qrySql);
    if (list.size() > 0) {
        //存在记录
        def upSql = "update sys_test set \
                name ='${msgContent.NAME}' \
                where ID = '${msgContent.ID}'";
        log.debug(upSql);
        SQL.db.executeUpdate(upSql);
    } else {
        def insertSql = "insert into sys_test(ID,NAME,AGE)values( \
           '${msgContent.ID}', \
           '${msgContent.NAME}',\
           '${msgContent.AGE}')";
        log.debug(insertSql);
        SQL.db.executeUpdate(insertSql);
    }
}

REL_SUCCESS << flowFile
           

继续阅读