【Apache NIFI Processor】ExecuteSQL
Description 描述
該處理器執行SQL語句,傳回avro格式資料。處理器使用流式處理,是以支援任意大的結果集。處理器可以使用标準排程方法将此處理器排程為在計時器或cron表達式上運作,也可以由傳入的流檔案觸發。SQL語句來源可以來自該處理器屬性SQL select query,也可以來自上一個處理器的輸出流(FlowFile的内容應采用UTF-8格式)(GenerateTableFetch,ConvertJsonToSql等等生成的流内容中的SQL語句,類似于insert into。。。value (?。。。),這個?的值是存在于流屬性中的,命名約定為sql.args.N.value sql.args.N.type ,其中N是一個正整數。 sql.args.N.type應該是訓示JDBC類型的數字。FlowFile屬性’executesql.row.count’訓示選擇了多少行。)
配置
設定(Settings)

Name:處理器名稱,預設處理器的名稱與處理器類名相同。旁邊是一個複選框,訓示處理器是否已啟用,禁用狀态用于訓示當啟動一組處理器時,例如當DFM啟動整個程序組時,應排除此(disabled)處理器。
Id & Type & Bundle:Processor的唯一辨別符 & Processor的類名 & Processor所在NAR包。這些值無法修改。
Penalty Duration:在處理一條資料(FlowFile)的正常過程中,可能發生事件,該事件訓示處理器此時不能處理資料但是資料可以在稍後進行處理。在發生這種情況時,處理器可以選擇Penalize FlowFile。這将阻止FlowFile在一段時間内被處理。例如,如果處理器要将資料推送到遠端服務,但遠端服務已經有一個與處理器指定的檔案名同名的檔案,則處理器可能會懲罰FlowFile。Penalty Duration允許DFM指定FlowFile應該受到多長時間的懲罰。預設值為30 seconds。(簡單了解為推後一段時間再處理)
Yield Duration:處理器可以确定存在某種情況,處理器沒法進行處理資料。例如,如果處理器要将資料推送到遠端服務并且該服務沒有響應。這樣的話處理器應該Yield,這将阻止處理器運作一段時間。通過設定Yield Duration來指定該時間段。預設值為1 second。
Bulletin Level:每當處理器寫日志時,處理器将生成公告。此設定訓示應在使用者界面中顯示的最低級别的公告。預設情況下,公告級别設定為WARN,這意味着它将顯示所有警告和錯誤級别公告。
Automatically Terminate Relationships:此處列出了處理器定義的每個關系及其描述。為了使處理器被視為有效且能夠運作,處理器定義的每個關系必須連接配接到下遊元件或自動終止。如果關系是自動終止的,則将從流中删除任何路由到該關系的FlowFile,并視其為處理完成。已連接配接到下遊元件的任何關系都無法自動終止。必須首先從使用它的所有Connection中删除關系。此外,對于選擇自動終止的關系,如果将關系添加到連接配接,則自動終止狀态将被清除(turned off)。
排程(Scheduling)
Scheduling Strategy:
排程政策有三種可能的選項:
Timer driven:這是預設模式。處理器将定期運作。運作處理器的時間間隔由Run Schedule選項定義(見下文)。
Event driven:選擇此模式時,将由一個事件觸發處理器運作,當FlowFiles進入連接配接此處理器的Connections時,将産生這個事件。此模式目前被認為是實驗性的,并非所有處理器都支援。選擇此模式時,Run Schedule選項不可配置。此外,隻有此模式下Concurrent Tasks選項可以設定為0。這種情況,線程數僅受管理者配置的事件驅動線程池的大小限制。
CRON驅動:當使用CRON驅動的排程模式時,處理器将定期運作,類似于定時器驅動的排程模式。CRON驅動模式提供了更大的靈活性。CRON驅動的排程值是由六個必需字段和一個可選字段的字元串組成,每個字段由空格分隔。這些字段是:
Field | Valid values |
---|---|
Seconds | 0-59 |
Minutes | 0-59 |
Hours | 0-23 |
Day of Month | 1-31 |
Month | 1-12 or JAN-DEC |
Day of Week | 1-7 or SUN-SAT |
Year (optional) | empty, 1970-2099 |
您通常通過以下方式來指定值:
- 數字:指定一個或多個有效值。您可以使用逗号分隔清單輸入多個值。
- 範圍:使用 number-number文法指定範圍。
- 增量:使用 start value/increment文法指定增量。例如,在"分鐘"字段中,0/15表示分鐘0,15,30和45。
您還應該知道幾個有效的特殊字元:
- * 表示所有值對該字段都有效。
- ? 表示未指定特定值。此特殊字元在"Day of Month"和"Day of Week"字段中有效。
- L 您可以将L附加到星期值之一,以指定該月中該日的最後一次出現。例如,1L表示該月的最後一個星期日。
例如:
- 該字元串0 0 13 * * ?表示您希望将處理器安排在每天下午1:00運作。
- 該字元串0 20 14 ? * MON-FRI表示您希望将處理器安排在每周一至周五下午2:20運作。
- 該字元串0 15 10 ? * 6L 2011-2017表示您希望将處理器安排在2011年至2017年的每個月的最後一個星期五上午10:15運作。
有關其他資訊和示例,請參閱Quartz文檔中的ChronTrigger教程。
Run Schedule:
"Run Schedule"訓示處理器運作的頻率。此字段的有效值取決于所選的排程政策(參見上文)。如果使用事件驅動的排程政策,則此字段不可用。使用定時器驅動的排程政策時,該值是由數字後跟時間機關指定的持續時間。例如,1 second或5 mins。預設值0 sec表示處理器應盡可能頻繁地運作,隻要它有要處理的資料即可。有關适用于CRON驅動的排程政策的值的說明,請參閱CRON驅動的排程政策本身的說明。
Concurrent Tasks:
控制處理器将使用的線程數。換句話說,它控制此處理器應同時處理多少個FlowFiles。增加此值通常會使處理器在相同的時間内處理更多資料。但是,它是通過使用其他處理器無法使用的系統資源來實作此目的。這基本上提供了處理器的相對權重 - 應該将多少系統資源配置設定給此處理器而不是其他處理器。該字段适用于大多數處理器。但是,某些類型的處理器隻能使用單個任務進行排程。
Execution:
執行設定用于确定處理器将被排程執行的節點。選擇"All Nodes"将導緻在叢集中的每個節點上排程此處理器。選擇"Primary Node"将導緻此處理器僅在主節點上進行排程。
屬性(Properties)
Properties頁籤提供了一種特定于Processor的行為機制的配置。每種類型的處理器必須定義哪些屬性對其用例有意義。
屬性名稱 | 預設值 | 可選值 | 描述 |
---|---|---|---|
Database Connection Pooling Service | Controller Service API: DBCPService Implementations: DBCPConnectionPoolLookup HiveConnectionPool DBCPConnectionPool | 資料庫連接配接池 | |
SQL select query | 要執行的SQL,設定了此屬性,則使用此SQL(不用流中的SQL);不設定,則使用流中的SQL; 支援表達式語言 | ||
Column Convert Strategy | toUpperCase | ||
Max Wait Time | 0 seconds | 執行SQL的最大等待時間,小于1秒則系統預設此配置等于0秒,0秒即沒有限制的意思,無限等待 | |
Normalize Table/Column Names | false | ▪ true ▪ false | 是否将表名,列名中可能存在的avro格式不相容的字元進行轉換(例如逗号冒号轉換為下劃線,當然一般表名列名也不存在這些字元,應用較少,預設false) |
Use Avro Logical Types | false | ▪true ▪ false | 是否對DECIMAL/NUMBER, DATE, TIME 和TIMESTAMP類型使用Avro Logical Types。如果選擇false,這些列則轉成字元串形式。如果選擇true,Avro Logical Types則作為其基本類型,具體來說,DECIMAL/NUMBER轉換成logical ‘decimal’:寫成帶有精度的位元組,DATE轉換為邏輯logical“date-millis”:值寫成天數(從紀元(1970-01-01)算起的整數),TIME轉換為logical“time-millis”:值寫成毫秒數(從紀元(1970-01-01)算起的整數),TIMESTAMP轉換為logical“timestamp-millis”:值寫成毫秒數(從紀元(1970-01-01)算起的整數)。如果Avro記錄的reader也知道這些Logical Types,那麼就可以根據reader的實作類結合上下文反序列化這些值。 |
Compression Format | NONE | ▪ BZIP2 ▪ DEFLATE ▪ NONE ▪ SNAPPY ▪ LZO | 壓縮類型,預設值NONE |
Default Decimal Precision | 10 | 精度;當一個DECIMAL/NUMBER類型的值被寫成“DECIMAL”Avro Logical 類型時,需要一個特定的“precision”來表示可用具體數字的數量。通常,精度由列資料類型定義或資料庫引擎預設定義。當然,某些資料庫引擎也可以傳回未定義的精度(0)。 支援表達式語言 | |
Default Decimal Scale | 當一個DECIMAL/NUMBER類型被寫成“DECIMAL”Avro Logical 類型時,需要一個特定的“scale”來表示可用的小數位數。通常,scale是由列資料類型定義或資料庫引擎預設定義的。但是,當傳回未定義的精度(0)時,一些資料庫引擎的伸縮性也可能不确定。“預設十進制”用于編寫那些未定義的數字。如果一個值的小數比指定的比例多,那麼該值将被四舍五入,例如,1.53在比例為0時變成2,在比例為1時變成1.5。 支援表達式語言 | ||
Max Rows Per Flow File | 單個流檔案中包含的最大結果行數。這意味着允許将非常大的結果集分解為多個流檔案。如果指定的值為零,則在單個流檔案中傳回所有行。 支援表達式語言 | ||
Output Batch Size | 送出程序會話之前要排隊的輸出流檔案的數量。當設定為零時,會話将在處理完所有結果集行并準備好将輸出流檔案傳輸到下遊關系時送出。對于大型結果集,這可能導緻在處理器執行結束時傳輸大量流檔案。如果設定了此屬性,那麼當指定數量的流檔案準備好傳輸時,将送出會話,進而将流檔案釋放到下遊關系。注意:片段。在設定此屬性時,不會在FlowFiles上設定count屬性。 支援表達式語言 |
注:
Max Rows Per Flow File
首先查一百條資料,Max Rows Per Flow File 設為10,結果是輸出50個流檔案,每個流檔案10條資料。
Output Batch Size
看注釋已經此處的代碼邏輯,當流檔案數達到了outputBatchSize的時候,這批流檔案會被輸出到sucess:
比如配置如下,會發現流檔案輸出不再是一個一個的輸出,而是2個為機關的輸出:
示例說明
TODO:待更新。
參考:
https://nifichina.github.io/processors/ExecuteSQL.html#%E5%B1%9E%E6%80%A7