天天看點

nifi ExecuteGroovyScript 操作手法 直接用sql 腳本 執行任務 操作資料

 目錄

​​一個示例​​

​​執行個體代碼模闆​​

​​截圖​​

​​script​​

​​scriptboty ​​

​​結果​​

​​介紹​​

​​控件基礎​​

​​基礎手法​​

​​設定新屬性​​

​​删除屬性​​

​​寫内容​​

​​得到内容​​

​​将流檔案傳輸到成功關系​​

​​使用dbcp ​​

​​處理處理器啟動和停止​​

​​根據主表的資料,去查詢子表,并且将主子表的資料進行 組裝 ​​

​​腳本sql實作1.删除2查詢3.添加​​

​​1.界面​​

​​2.配置倆個資料庫​​

​​3.具體代碼體​​

一個示例

這裡順便介紹一下 nifi 的配置資料庫連結的時候要采用 左斜杠的方式寫路徑如(f:/jdbc-xx.jar)

執行個體代碼模闆

截圖

nifi ExecuteGroovyScript 操作手法 直接用sql 腳本 執行任務 操作資料
  • putfile 指定一個目錄即可

script

nifi ExecuteGroovyScript 操作手法 直接用sql 腳本 執行任務 操作資料

scriptboty 

def flowFile = session.create()
 

flowFile.write("UTF-8", "THE CharSequence to write into flow file replacing current content")

session.transfer(flowFile, REL_SUCCESS)      

結果

nifi ExecuteGroovyScript 操作手法 直接用sql 腳本 執行任務 操作資料

介紹

控件基礎

  變量 類型 描述
session org.apache.nifi.processor.ProcessSession 用于擷取,更改和傳輸輸入檔案的會話
context org.apache.nifi.processor.ProcessContext 背景(幾乎沒有用)
log org.apache.nifi.logging.ComponentLog 此處理器執行個體的記錄器
REL_SUCCESS org.apache.nifi.processor.Relationship 成功的關系
REL_FAILURE org.apache.nifi.processor.Relationship 失敗的關系
CTL java.util.HashMap <String, ControllerService>

使用`CTL。*`處理器屬性定義的控制器服務填充的映射。 

`CTL。字首屬性可以連結到控制器服務,并提供從腳本通路此服務而無需其他代碼。

SQL java.util.HashMap <String,groovy.sql.Sql >

用`groovy.sql.Sql`對象填充的地圖連接配接到用`SQL。*`處理器屬性定義的相應資料庫。 

`SQL。字首屬性隻能連結到DBCPSercice。

Dynamic processor properties org.apache.nifi.components.PropertyDescriptor 所有未以`CTL。或`SQL`開頭

基礎手法

設定新屬性

flowFile.ATTRIBUTE_NAME = ATTRIBUTE_VALUE 
  flowFile.'mime.type' = 'text/xml' 
  flowFile.putAttribute("ATTRIBUTE_NAME", ATTRIBUTE_VALUE)
  //the same as
  flowFile = session.putAttribute(flowFile, "ATTRIBUTE_NAME", ATTRIBUTE_VALUE)      

删除屬性

flowFile.ATTRIBUTE_NAME = null
  //equals to
  flowFile = session.removeAttribute(flowFile, "ATTRIBUTE_NAME")      

寫内容

flowFile.write("UTF-8", "THE CharSequence to write into flow file replacing current content")
  flowFile.write("UTF-8"){writer-> 
    do something with java.io.Writer...
  }
  flowFile.write{outStream-> 
    do something with output stream...
  }
  flowFile.write{inStream, outStream-> 
    do something with input and output streams... 
  }      

得到内容

InputStream i = flowFile.read()
  def json = new groovy.json.JsonSlurper().parse( flowFile.read() )
  String text = flowFile.read().getText("UTF-8")      

将流檔案傳輸到成功關系

REL_SUCCESS << flowFile 
  flowFile.transfer(REL_SUCCESS)
  //the same as:
  session.transfer(flowFile, REL_SUCCESS)      

使用dbcp 

  • 添加一個屬性
nifi ExecuteGroovyScript 操作手法 直接用sql 腳本 執行任務 操作資料
  • 添加腳本bo'd'y
nifi ExecuteGroovyScript 操作手法 直接用sql 腳本 執行任務 操作資料
  • 腳本寫法dbcp
import groovy.sql.Sql

  //define property named `SQL.db` connected to a DBCPConnectionPool controller service
  //for this case it's an H2 database example
  
  //read value from the database with prepared statement 
  //and assign into flowfile attribute `db.yesterday`
  def daysAdd = -1
  def row = SQL.db.firstRow("select dateadd('DAY', ${daysAdd}, sysdate) as DB_DATE from dual")
  flowFile.'db.yesterday' = row.DB_DATE

  //to work with BLOBs and CLOBs in the database 
  //use parameter casting using groovy.sql.Sql.BLOB(Stream) and groovy.sql.Sql.CLOB(Reader)
  
  //write content of the flow file into database blob
  flowFile.read{ rawIn->
    def parms = [
      p_id   : flowFile.ID as Long, //get flow file attribute named `ID`
      p_data : Sql.BLOB( rawIn ),   //use input stream as BLOB sql parameter
    ]
    SQL.db.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id")
  }      

處理處理器啟動和停止

import org.apache.nifi.processor.ProcessContext
  import java.util.concurrent.atomic.AtomicLong

  class Const{
    static Date startTime = null;
    static AtomicLong triggerCount = null;
  }

  static onStart(ProcessContext context){
    Const.startTime = new Date()
    Const.triggerCount = new AtomicLong(0)
    println "onStart $context ${Const.startTime}"
  }

  static onStop(ProcessContext context){
    def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000
    println "onStop $context executed ${ Const.triggerCount } times during ${ alive } seconds"
  }


  flowFile.'trigger.count' = Const.triggerCount.incrementAndGet()
  REL_SUCCESS << flowFile      

根據主表的資料,去查詢子表,并且将主子表的資料進行 組裝 

//By [email protected]
import groovy.json.JsonSlurper
import groovy.sql.Sql
import groovy.json.JsonOutput
//get the flowfile 
def ff = session.get()
if(!ff)return
String text = ff.read().getText("UTF-8")
def jsonSlurper = new JsonSlurper()
//get Map
def map = jsonSlurper.parseText(text)
// get attributes of this flowfile
def join_value = ff.getAttribute('join_value')
def tablename= ff.getAttribute('slaveTableName')
def tsLt= ff.getAttribute('tsLt')
def slaveJoinColumn= ff.getAttribute('slaveJoinColumn')
def slaveJsonField= ff.getAttribute('slaveJsonField')
// build the sql which select from slave table
def sql = "select distinct id from ${tablename} where ${slaveJoinColumn}= '${join_value}' and dr !='2' and ts < '${tsLt}'"
//SQL.mydb references http://docs.groovy-lang.org/2.4.10/html/api/groovy/sql/Sql.html object
List list = SQL.mydb.rows(sql .toString()) //important to cast to String
List result = []
//查詢子表一定時間範圍(<tsLt)的最新狀态
for(int i=0;i<list.size();i++){
    sql = "select  * from test.fi_note_cusl_ysbxd_b where id = '${ list[i].id}' and dr !='2' order by ts DESC limit 1"
    result.add( SQL.mydb.firstRow(sql .toString()))
}
map.put(slaveJsonField.toString(),result)//主子表資料json組合
def output = JsonOutput.toJson(map)
session.remove(ff)
def newff = session.create()
newff.putAttribute("data", output )
//transfer flow file to success
REL_SUCCESS << newff      

腳本sql實作1.删除2查詢3.添加

1.界面

nifi ExecuteGroovyScript 操作手法 直接用sql 腳本 執行任務 操作資料

2.配置倆個資料庫

nifi ExecuteGroovyScript 操作手法 直接用sql 腳本 執行任務 操作資料

3.具體代碼體

//sql doc :http://docs.groovy-lang.org/2.4.10/html/api/groovy/sql/Sql.html
 
//清空更新庫
SQL.mydb2.execute("delete from cd") 

//查詢資料
def sql = "select a,b from ab  " 
List list = SQL.mydb.rows(sql .toString())  
 

//查詢子表一定時間範圍(<tsLt)的最新狀态
for(int i=0;i<list.size();i++){ 
    //添加資料
     SQL.mydb2.executeInsert( "insert into cd (c,d) values('${list[i].a}','${list[i].b}')")
    
}

//向後邊的服務回報下成功了 ok
def output = "ok";
def newff = session.create()
newff.putAttribute("data", output ) 
REL_SUCCESS << newff



      

ok

這個直接執行sql 實在友善了,開心gogogo 各種玩起來

^_^ ^_^

ok