NiFi学习-json文件数据入库
- 流程图
- 处理器
-
- GetFile处理器
- PutFile
- SplitJson
- JoltTransformJSON
- ExecuteGroovyScript
流程图

处理器
GetFile处理器
读取D:\nifi-1.11.4-bin\test路径下以UP开头的文件
PutFile
备份文件到testBackUp目录下
SplitJson
分割net_test节点
JoltTransformJSON
json字段转换
左下角ADVANCED处可以进行转换测试:
ExecuteGroovyScript
执行Groovy脚本进行数据入库(MySql)
import groovy.json.JsonSlurper
import groovy.sql.Sql
import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if (!flowFile) return;
flowFile.read { inputStream ->
def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
def msgContent = new JsonSlurper().parseText(text);
def qrySql = "select 1 from sys_test where id = '${msgContent.ID}'";
List list = SQL.db.rows(qrySql);
if (list.size() > 0) {
//存在记录
def upSql = "update sys_test set \
name ='${msgContent.NAME}' \
where ID = '${msgContent.ID}'";
log.debug(upSql);
SQL.db.executeUpdate(upSql);
} else {
def insertSql = "insert into sys_test(ID,NAME,AGE)values( \
'${msgContent.ID}', \
'${msgContent.NAME}',\
'${msgContent.AGE}')";
log.debug(insertSql);
SQL.db.executeUpdate(insertSql);
}
}
REL_SUCCESS << flowFile