天天看點

Nifi之ExecuteScript使用方法 (三)

本文是一系列文章中的第三篇,描述了如何使用ExecuteScript完成某些任務的各種方式.

文章

本文介紹了如何使用NiFi處理器ExecuteScript完成某些任務的各種方法,以及Groovy,Jython,Javascript(Nashorn)和JRuby中給出的示例。這是本系列的第3部分,我将讨論進階功能,如動态屬性,子產品,狀态管理以及通路/使用Controller Services。

第1部分- NiFi API和FlowFiles簡介

  • 從傳入隊列擷取流檔案
  • 建立新的流檔案
  • 使用流檔案屬性
  • 傳輸流檔案
  • 記錄

第2部分- FlowFile I / O和錯誤處理

  • 從流檔案中讀取
  • 寫入流檔案
  • 讀取和寫入流檔案
  • 錯誤處理

第3部分 - 進階功能

  • 使用動态屬性
  • 添加子產品
  • 國家管理
  • 通路控制器服務

進階功能

本系列的前兩篇文章介紹了流檔案操作的基礎知識,例如讀/寫屬性和内容,以及使用“session”變量(ProcessSession對象)檢索和傳輸流檔案。ExecuteScript還有許多其他功能; 會在這裡談談其中的一些。

動态屬性

其中一個功能是動态屬性的概念,也稱為使用者定義屬性。這些是處理器的屬性,使用者可以為其設定屬性名稱和值。并非所有處理器都支援/使用動态屬性,但ExecuteScript會将動态屬性作為變量傳遞,這些變量引用與屬性值對應的PropertyValue對象。這裡有兩件重要的事情需要注意:

  1. 由于屬性名稱按原樣綁定到變量名稱,是以必須為指定的程式設計語言支援動态屬性的命名約定。例如,Groovy不支援句點(.)作為有效的變量字元,是以動态屬性(如“my.value”)将導緻處理器失敗。在這種情況下,有效的替代方案是“myValue”。
  2. 使用PropertyValue對象(而不是值的String表示形式)以允許腳本在将屬性值評估為String之前對屬性的值執行各種操作。如果已知屬性包含文字值,則可以對變量調用getValue()方法以擷取其String表示形式。相反,如果值可能包含表達式語言,或者您希望将值轉換為除String之外的值(例如對布爾對象的值為'true'),則還有這些操作的方法。這些示例在下面的方法中說明,假設我們有兩個屬性'myProperty1'和'myProperty2'定義如下:
Nifi之ExecuteScript使用方法 (三)

應用場景1 : 擷取動态屬性的值,使用者輸入了動态屬性以供腳本使用(例如,配置參數).

方法 :  使用變量的PropertyValue對象中的getValue()方法。此方法傳回動态屬性值的String表示形式。請注意,如果值中存在表達式語言,則getValue()将不對其進行求值(請參閱一下内容)。

Groovy

def myValue1 = myProperty1.value
           

Jython

myValue1 = myProperty1.getValue()
           

JavaScript

var myValue1 = myProperty1.getValue()
           

JRuby

myValue1 = myProperty1.getValue()
           

應用場景2 : 擷取動态屬性的值(在評估表達式語言結構後,比如上面提到的Groovy不支援點(.)),使用者已輸入動态屬性以在腳本中使用(配置參數),并且它引用傳入流檔案中的屬性。

方法 : 使用變量的PropertyValue對象中的evaluateAttributeExpressions(flowFile)方法。此方法後跟getValue(),在評估任何表達式語言結構後,傳回動态屬性值的String表示形式。如果流檔案不可用,但已在環境或變量系統資料庫中定義了變量,則可以使用不帶參數的evaluateAttributeExpressions()

Groovy

def myValue1 = myProperty1.value
def myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).value
           

 Jython

myValue1 = myProperty1.getValue()myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
           

JavaScript

var myValue1 = myProperty1.getValue()var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
           

JRuby

myValue1 = myProperty1.getValue()myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
           

添加子產品

ExecuteScript的另一個功能是能夠向類路徑添加外部“子產品”,這允許您利用各種第三方庫,腳本等。但是每個腳本引擎都以不同方式處理子產品的概念,是以我将讨論它們分别。一般來說,有兩種類型的子產品,Java庫(JAR)和腳本(用與ExecuteScript中相同的語言編寫。以下是各種腳本引擎如何處理這些: 

因為部落客本人在使用ExecuteScript Processor時使用的是驅動器為python,對其他語言未驗證,是以咱們隻看Jython部分.若需要檢視其他Groovy,JavaScript和JRuby的部分,請參考文章末尾的原文連接配接(如果文中連結通路不通,不解釋).

Jython

Jython腳本引擎(至少在ExecuteScript中使用)目前僅支援導入純Python子產品,而不支援本機編譯的子產品(例如CPython),例如numpy或scipy。它目前也不支援JAR(筆者寫本篇部落格時nifi的版本為1.1.x,部落客測試1.7.x版本是支援導入jar包的),盡管這可能會在即将釋出的版本中發生變化。在底層,子產品目錄屬性中的條目在執行前添加到腳本中,對于每個指定的子產品位置,使用“import sys”後“sys.path.append('your/path')”。

如果安裝了Python,則可以通過将其site-packages檔案夾添加到Module Directory屬性來使用其所有已安裝的純Python子產品,例如

/usr/local/lib/python2.7/site-packages

Nifi之ExecuteScript使用方法 (三)

樣例代碼:

import sys
sys.path.append('/usr/lib/python2.7/site-packages/')
sys.path.append('/usr/lib/python2.7/site-packages/module_name/')
sys.path.append('/usr/lib/python2.7/site-packages/module_name/xxx.py')
           

狀态管理

NiFi為處理器和其他NiFi元件提供了持久儲存一些資訊的能力,以便在元件周圍實作某些狀态功能。例如,QueryDatabaseTable處理器跟蹤它在指定列中看到的最大值,這樣下次運作時,它隻會擷取大于原來最大值的行。

狀态管理方面的一個重要概念是範圍。NiFi元件可以選擇将其狀态存儲在叢集級别或本地級别。請注意,在獨立的NiFi執行個體中,“群集範圍”與“本地範圍”相同。範圍的選擇通常是關于在流中,每個節點上的相同處理器是否可以共享狀态資料。如果群集中的執行個體不需要共享狀态,則使用本地範圍。在Java中,這些選項作為名為Scope的枚舉提供,是以當我引用Scope.CLUSTER和Scope.LOCAL時,我分别表示叢集和本地作用域。

要在ExecuteScript中使用狀态管理功能(下面是特定于語言的示例),您可以通過調用ProcessContext的getStateManager()方法獲得對StateManager的引用(回想一下,每個引擎都獲得一個名為“context”的變量,其中包含ProcessContext執行個體)。然後,您可以在StateManager對象上調用以下方法:

void setState(Map <String,String> state,Scope scope) - 更新元件在給定範圍内的狀态值,并将其設定為給定值。請注意,該值是一個Map; “元件狀态”的概念是每個構成較低級别狀态的所有鍵/值對的映射。Map會立即更新以提供原子性。

StateMap getState(Scope scope)  - 傳回給定範圍内元件的目前狀态。此方法永遠不會傳回null; 相反,它是一個StateMap對象,如果尚未設定狀态,StateMap的版本将為-1,值的映射将為空。通常會建立一個新的Map <String,String>來存儲更新的值,然後調用setState()或replace()。

boolean replace(StateMap oldValue,Map <String,String> newValue,Scope scope) - 當且僅當值與給定的oldValue相同時,才更新元件狀态(在給定範圍内)的值到新值。如果狀态已更新為新值,則傳回true; 否則,如果狀态的值不等于oldValue,則傳回false。

void clear(Scope scope) - 清除給定範圍内元件狀态的所有鍵和值。

應用場景1 : 擷取鍵/值對的目前映射,腳本需要從狀态管理器擷取目前鍵/值對以供腳本使用(例如更新)。

方法:使用ProcessContext中的getStateManager()方法,然後使用StateManager中的getStateMap(),然後使用toMap()轉換為鍵/值對的Map <String,String>。請注意,StateMap還有一個簡單檢索值的get(key)方法,但這種方法并不常用,因為Map通常會更新.

Groovy

import org.apache.nifi.components.state.Scope
def oldMap = context.stateManager.getState(Scope.LOCAL).toMap()
           

Jython

from org.apache.nifi.components.state import Scope
oldMap = context.stateManager.getState(Scope.LOCAL).toMap()
           

JavaScript

var Scope = Java.type('org.apache.nifi.components.state.Scope');
var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();
           

JRuby

java_import org.apache.nifi.components.state.Scope
oldMap = context.stateManager.getState(Scope::LOCAL).toMap()
           

注意:在腳本中隻顯式引用了Scope類,是以它是唯一導入的類。如果您引用StateManager,StateMap等,您還需要導入這些類。

應用場景2 : 更新鍵/值對的映射,腳本希望使用新的鍵/值對映射更新狀态映射。

方法:要擷取目前StateMap對象,請再次使用ProcessContext中的getStateManager()方法,然後使用StateManager中的getStateMap()。這些示例假設一個新的Map,但使用上面的方法(使用toMap()方法),您可以使用現有值建立一個新的Map,然後隻更新所需的條目。請注意,如果沒有目前映射(即StateMap.getVersion()傳回-1),則replace()将不起作用,是以示例将相應地檢查并調用setState()或replace()。從新的ExecuteScript執行個體運作時,StateMap版本将為-1,是以在單次執行後,如果右鍵單擊ExecuteScript處理器并選擇View State,您應該看到如下内容:

Nifi之ExecuteScript使用方法 (三)

Groovy

import org.apache.nifi.components.state.Scope
def stateManager = context.stateManager
def stateMap = stateManager.getState(Scope.CLUSTER)
def newMap = ['myKey1': 'myValue1']
if (stateMap.version == -1) {
  stateManager.setState(newMap, Scope.CLUSTER);
} else {
  stateManager.replace(stateMap, newMap, Scope.CLUSTER);
}
           

Jython

from org.apache.nifi.components.state import Scope
stateManager = context.stateManager
stateMap = stateManager.getState(Scope.CLUSTER)
newMap = {'myKey1': 'myValue1'}
if stateMap.version == -1:
    stateManager.setState(newMap, Scope.CLUSTER)
else:
    stateManager.replace(stateMap, newMap, Scope.CLUSTER)
           

JavaScript

var Scope = Java.type('org.apache.nifi.components.state.Scope');
var stateManager = context.stateManager;
var stateMap = stateManager.getState(Scope.CLUSTER);
var newMap = {'myKey1': 'myValue1'};
if (stateMap.version == -1) {
  stateManager.setState(newMap, Scope.CLUSTER);
} else {
  stateManager.replace(stateMap, newMap, Scope.CLUSTER);
}
           

JRuby

java_import org.apache.nifi.components.state.Scope
stateManager = context.stateManager
stateMap = stateManager.getState(Scope::CLUSTER)
newMap = {'myKey1'=> 'myValue1'}
if stateMap.version == -1
    stateManager.setState(newMap, Scope::CLUSTER)
else
    stateManager.replace(stateMap, newMap, Scope::CLUSTER)
end
           

清除state map

Groovy

import org.apache.nifi.components.state.Scope
context.stateManager.clear(Scope.LOCAL)
           

Jython

from org.apache.nifi.components.state import Scope
context.stateManager.clear(Scope.LOCAL)
           

JavaScript

var Scope = Java.type('org.apache.nifi.components.state.Scope');
context.stateManager.clear(Scope.LOCAL);
           

 JRuby

java_import org.apache.nifi.components.state.Scope
context.stateManager.clear(Scope::LOCAL)
           

通路控制器服務

在NiFi ARchive(NAR)結構中,Controller Services作為接口公開,通常在API JAR中。例如,DistributedCacheClient是從ControllerService擴充的接口,它駐留在nifi-distributed-cache-client-service-api JAR中,該JAR位于nifi-standard-services-api-nar NAR中。其他希望引用接口的NAR(例如,建立新類型的用戶端實作)必須将nifi-standard-services-api-nar指定為其父NAR,然後引用處理器中提供的API JAR執行個體子子產品。 

部落客之是以了解到控制服務是在ExecuteScript Processor中需要連接配接到redis的Sentinel模式.有試過上邊提到的引用第三方包的方式安裝python-redis然後再Module Directory和代碼中引入xx.py檔案,也試過使用同樣的方法引入jar檔案(Jython的底層是java,是以部落客嘗試用java的辦法連接配接sentinel),最終都以失敗告終.然後了解到了redis的控制伺服器可以連接配接到sentinel模式以供代碼中調用.

首先,如何打開一個控制器服務

一般而言,我們的processor流程都履歷在"組"中,因為這樣可以讓界面看着更加整齊.滑鼠點住第四個圖示拖拽到畫布,自定義一個組的名字即可建立組.

Nifi之ExecuteScript使用方法 (三)

右擊組子產品,選擇configure.會彈出 "組名Configuration"

點選右側的加号,在filter中輸入redis,你将看到下圖

Nifi之ExecuteScript使用方法 (三)

本文中我們選擇第一個service.輕按兩下即可選中.service的最初狀态都為invalid.我們需要填寫必要的連接配接資訊才可以啟用.點選最右側的設定按鈕填寫資訊.除了key值加粗的必填項以外,下圖圈起來的部分也要填寫.填寫的所有值都來自redis的設定.redis.conf或sentinel.conf

Nifi之ExecuteScript使用方法 (三)

填寫完畢後,在左側的SETTINGS頁籤中,複制service的ID,如下圖

Nifi之ExecuteScript使用方法 (三)

以上,配置就完成了.下面是如何在代碼中獲得redis連接配接.還是需要用到content對象.

service = context.getControllerServiceLookup().getControllerService("剛剛複制的id粘這裡")
redis = service.getConnection()
           

此時就獲得了redis的connecting對象,此時,你可以對redis做你想做的事情.

值得一提的是,獲得的redis的連接配接對象本身為org.springframework.data.redis.connection.jedis.JedisConnection的實體(nifi1.7.1中),是以如果要調用任何方法,請參考這個類.您可以将類名複制到百度中找到它的代碼.

第二種使用service的辦法,以DistributedMapCacheClient為例:

在ExecuteScript配置中,建立一個名為“clientServiceId”的動态屬性,并将值設定為service的ID:

Nifi之ExecuteScript使用方法 (三)

然後我們可以使用clientServiceId.asControllerService(DistributedMapCacheClient),其中該參數是對DistributedMapCacheClient的Class對象的引用。對于這些示例,我已使用字元串鍵'a'預先填充緩存,該鍵設定為字元串值'hello'。

一旦我們有了一個DistributedMapCacheClient執行個體,那麼為了檢索一個值,我們可以調用它的get(key,serializer,deserializer)方法。在我們的例子中,由于鍵和值是字元串,我們隻需要一個Serializer <String>和Deserializer <String>的執行個體傳遞給get()方法。所有語言的方法類似于本系列文章第2部分中描述的StreamCallback執行個體的建立。這些示例将從預先填充的伺服器擷取密鑰“a”的值并記錄結果(“Result = hello”)

擷取存儲在DistributedMapCacheServer中的屬性的值

應用場景 : 當使用者已将值填充到DistributedMapCacheServer(例如配置資料)中,并且腳本需要通路它們。

方法:使用上述方法,建立StringSerializer和StringDeserializer對象,然後按ID擷取DistributedMapCacheClientService執行個體,然後在服務上調用get()。為友善起見,此處包括結果的記錄。

Groovy

import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.Deserializer
import java.nio.charset.StandardCharsets
 
def StringSerializer = {value, out -> out.write(value.getBytes(StandardCharsets.UTF_8))} as Serializer<String>
def StringDeserializer = { bytes -> new String(bytes) } as Deserializer<String>
 
def myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
def result = myDistClient.get('a', StringSerializer, StringDeserializer)
log.info("Result = $result")
           

Jython

from org.python.core.util import StringUtil
from org.apache.nifi.distributed.cache.client import DistributedMapCacheClient, Serializer, Deserializer
 
# Define a subclass of Serializer for use in the client's get() method
class StringSerializer(Serializer):
  def __init__(self):
        pass
  def serialize(self, value, out):
       out.write(value)
 
# Define a subclass of Deserializer for use in the client's get() method
class StringDeserializer(Deserializer):
  def __init__(self):
        pass
  def deserialize(self,  bytes):
        return StringUtil.fromBytes(bytes)
 
myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
result = myDistClient.get('a', StringSerializer(), StringDeserializer())
log.info('Result = ' + str(result))
           

JavaScript

var DistributedMapCacheClient = Java.type('org.apache.nifi.distributed.cache.client.DistributedMapCacheClient');
var Serializer = Java.type('org.apache.nifi.distributed.cache.client.Serializer');
var Deserializer = Java.type('org.apache.nifi.distributed.cache.client.Deserializer');
var StandardCharsets = Java.type('java.nio.charset.StandardCharsets');
 
var StringSerializer = new Serializer(function(value, out) {
  out.write(value.getBytes(StandardCharsets.UTF_8));
})
var StringDeserializer = new Deserializer(function(arr) {
  // For some reason I had to build a string from the character codes in the "arr" array
  var s = "";
  for(var i = 0; i < arr.length; i++) {
    s = s + String.fromCharCode(arr[i]);
  }
  return s;
})
 
var myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.class);
var result = myDistClient.get('a', StringSerializer, StringDeserializer);
log.info("Result = "+ result);
           

JRuby

java_import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
java_import org.apache.nifi.distributed.cache.client.Serializer
java_import org.apache.nifi.distributed.cache.client.Deserializer
java_import java.nio.charset.StandardCharsets
 
# Define a subclass of Serializer for use in the client's get() method
class StringSerializer
  include Serializer
  def serialize(value, out)
       out.write(value.to_java.getBytes(StandardCharsets::UTF_8))
  end
end
 
# Define a subclass of Deserializer for use in the client's get() method
class StringDeserializer
  include Deserializer
  def deserialize(bytes)
       bytes.to_s
  end
end
 
myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.java_class)
result = myDistClient.get('a', StringSerializer.new, StringDeserializer.new)
log.info('Result = ' + result)
           

Note:

1-腳本運作時顯示的各種error,在nifi-app.log中.如下圖

Nifi之ExecuteScript使用方法 (三)

2-使用log.info('xxxxx')寫入log日志時, 在nifi-app.log中

3-在ExecuteScript Processor中直接print的資訊,在nifi-bootstrap.log中.直接print的辦法調試代碼也很友善.

本文包含有關如何使用各種支援的語言與NiFi API進行互動的更複雜的示例。我可能會在本系列中添加其他部分,或者肯定會有關于腳本處理器的其他文章,因為已經進行了改進并添加了其他功能(例如ScriptedReportingTask即将推出!)。

原文連接配接:https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html