天天看点

Nifi之ExecuteScript使用方法 (三)

本文是一系列文章中的第三篇,描述了如何使用ExecuteScript完成某些任务的各种方式.

文章

本文介绍了如何使用NiFi处理器ExecuteScript完成某些任务的各种方法,以及Groovy,Jython,Javascript(Nashorn)和JRuby中给出的示例。这是本系列的第3部分,我将讨论高级功能,如动态属性,模块,状态管理以及访问/使用Controller Services。

第1部分- NiFi API和FlowFiles简介

  • 从传入队列获取流文件
  • 创建新的流文件
  • 使用流文件属性
  • 传输流文件
  • 记录

第2部分- FlowFile I / O和错误处理

  • 从流文件中读取
  • 写入流文件
  • 读取和写入流文件
  • 错误处理

第3部分 - 高级功能

  • 使用动态属性
  • 添加模块
  • 国家管理
  • 访问控制器服务

高级功能

本系列的前两篇文章介绍了流文件操作的基础知识,例如读/写属性和内容,以及使用“session”变量(ProcessSession对象)检索和传输流文件。ExecuteScript还有许多其他功能; 会在这里谈谈其中的一些。

动态属性

其中一个功能是动态属性的概念,也称为用户定义属性。这些是处理器的属性,用户可以为其设置属性名称和值。并非所有处理器都支持/使用动态属性,但ExecuteScript会将动态属性作为变量传递,这些变量引用与属性值对应的PropertyValue对象。这里有两件重要的事情需要注意:

  1. 由于属性名称按原样绑定到变量名称,因此必须为指定的编程语言支持动态属性的命名约定。例如,Groovy不支持句点(.)作为有效的变量字符,因此动态属性(如“my.value”)将导致处理器失败。在这种情况下,有效的替代方案是“myValue”。
  2. 使用PropertyValue对象(而不是值的String表示形式)以允许脚本在将属性值评估为String之前对属性的值执行各种操作。如果已知属性包含文字值,则可以对变量调用getValue()方法以获取其String表示形式。相反,如果值可能包含表达式语言,或者您希望将值转换为除String之外的值(例如对布尔对象的值为'true'),则还有这些操作的方法。这些示例在下面的方法中说明,假设我们有两个属性'myProperty1'和'myProperty2'定义如下:
Nifi之ExecuteScript使用方法 (三)

应用场景1 : 获取动态属性的值,用户输入了动态属性以供脚本使用(例如,配置参数).

方法 :  使用变量的PropertyValue对象中的getValue()方法。此方法返回动态属性值的String表示形式。请注意,如果值中存在表达式语言,则getValue()将不对其进行求值(请参阅一下内容)。

Groovy

def myValue1 = myProperty1.value
           

Jython

myValue1 = myProperty1.getValue()
           

JavaScript

var myValue1 = myProperty1.getValue()
           

JRuby

myValue1 = myProperty1.getValue()
           

应用场景2 : 获取动态属性的值(在评估表达式语言结构后,比如上面提到的Groovy不支持点(.)),用户已输入动态属性以在脚本中使用(配置参数),并且它引用传入流文件中的属性。

方法 : 使用变量的PropertyValue对象中的evaluateAttributeExpressions(flowFile)方法。此方法后跟getValue(),在评估任何表达式语言结构后,返回动态属性值的String表示形式。如果流文件不可用,但已在环境或变量注册表中定义了变量,则可以使用不带参数的evaluateAttributeExpressions()

Groovy

def myValue1 = myProperty1.value
def myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).value
           

 Jython

myValue1 = myProperty1.getValue()myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
           

JavaScript

var myValue1 = myProperty1.getValue()var myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
           

JRuby

myValue1 = myProperty1.getValue()myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).getValue()
           

添加模块

ExecuteScript的另一个功能是能够向类路径添加外部“模块”,这允许您利用各种第三方库,脚本等。但是每个脚本引擎都以不同方式处理模块的概念,因此我将讨论它们分别。一般来说,有两种类型的模块,Java库(JAR)和脚本(用与ExecuteScript中相同的语言编写。以下是各种脚本引擎如何处理这些: 

因为博主本人在使用ExecuteScript Processor时使用的是驱动器为python,对其他语言未验证,所以咱们只看Jython部分.若需要查看其他Groovy,JavaScript和JRuby的部分,请参考文章末尾的原文连接(如果文中链接访问不通,不解释).

Jython

Jython脚本引擎(至少在ExecuteScript中使用)目前仅支持导入纯Python模块,而不支持本机编译的模块(例如CPython),例如numpy或scipy。它目前也不支持JAR(笔者写本篇博客时nifi的版本为1.1.x,博主测试1.7.x版本是支持导入jar包的),尽管这可能会在即将发布的版本中发生变化。在底层,模块目录属性中的条目在执行前添加到脚本中,对于每个指定的模块位置,使用“import sys”后“sys.path.append('your/path')”。

如果安装了Python,则可以通过将其site-packages文件夹添加到Module Directory属性来使用其所有已安装的纯Python模块,例如

/usr/local/lib/python2.7/site-packages

Nifi之ExecuteScript使用方法 (三)

样例代码:

import sys
sys.path.append('/usr/lib/python2.7/site-packages/')
sys.path.append('/usr/lib/python2.7/site-packages/module_name/')
sys.path.append('/usr/lib/python2.7/site-packages/module_name/xxx.py')
           

状态管理

NiFi为处理器和其他NiFi组件提供了持久保存一些信息的能力,以便在组件周围实现某些状态功能。例如,QueryDatabaseTable处理器跟踪它在指定列中看到的最大值,这样下次运行时,它只会获取大于原来最大值的行。

状态管理方面的一个重要概念是范围。NiFi组件可以选择将其状态存储在集群级别或本地级别。请注意,在独立的NiFi实例中,“群集范围”与“本地范围”相同。范围的选择通常是关于在流中,每个节点上的相同处理器是否可以共享状态数据。如果群集中的实例不需要共享状态,则使用本地范围。在Java中,这些选项作为名为Scope的枚举提供,因此当我引用Scope.CLUSTER和Scope.LOCAL时,我分别表示集群和本地作用域。

要在ExecuteScript中使用状态管理功能(下面是特定于语言的示例),您可以通过调用ProcessContext的getStateManager()方法获得对StateManager的引用(回想一下,每个引擎都获得一个名为“context”的变量,其中包含ProcessContext实例)。然后,您可以在StateManager对象上调用以下方法:

void setState(Map <String,String> state,Scope scope) - 更新组件在给定范围内的状态值,并将其设置为给定值。请注意,该值是一个Map; “组件状态”的概念是每个构成较低级别状态的所有键/值对的映射。Map会立即更新以提供原子性。

StateMap getState(Scope scope)  - 返回给定范围内组件的当前状态。此方法永远不会返回null; 相反,它是一个StateMap对象,如果尚未设置状态,StateMap的版本将为-1,值的映射将为空。通常会创建一个新的Map <String,String>来存储更新的值,然后调用setState()或replace()。

boolean replace(StateMap oldValue,Map <String,String> newValue,Scope scope) - 当且仅当值与给定的oldValue相同时,才更新组件状态(在给定范围内)的值到新值。如果状态已更新为新值,则返回true; 否则,如果状态的值不等于oldValue,则返回false。

void clear(Scope scope) - 清除给定范围内组件状态的所有键和值。

应用场景1 : 获取键/值对的当前映射,脚本需要从状态管理器获取当前键/值对以供脚本使用(例如更新)。

方法:使用ProcessContext中的getStateManager()方法,然后使用StateManager中的getStateMap(),然后使用toMap()转换为键/值对的Map <String,String>。请注意,StateMap还有一个简单检索值的get(key)方法,但这种方法并不常用,因为Map通常会更新.

Groovy

import org.apache.nifi.components.state.Scope
def oldMap = context.stateManager.getState(Scope.LOCAL).toMap()
           

Jython

from org.apache.nifi.components.state import Scope
oldMap = context.stateManager.getState(Scope.LOCAL).toMap()
           

JavaScript

var Scope = Java.type('org.apache.nifi.components.state.Scope');
var oldMap = context.stateManager.getState(Scope.LOCAL).toMap();
           

JRuby

java_import org.apache.nifi.components.state.Scope
oldMap = context.stateManager.getState(Scope::LOCAL).toMap()
           

注意:在脚本中只显式引用了Scope类,因此它是唯一导入的类。如果您引用StateManager,StateMap等,您还需要导入这些类。

应用场景2 : 更新键/值对的映射,脚本希望使用新的键/值对映射更新状态映射。

方法:要获取当前StateMap对象,请再次使用ProcessContext中的getStateManager()方法,然后使用StateManager中的getStateMap()。这些示例假设一个新的Map,但使用上面的方法(使用toMap()方法),您可以使用现有值创建一个新的Map,然后只更新所需的条目。请注意,如果没有当前映射(即StateMap.getVersion()返回-1),则replace()将不起作用,因此示例将相应地检查并调用setState()或replace()。从新的ExecuteScript实例运行时,StateMap版本将为-1,因此在单次执行后,如果右键单击ExecuteScript处理器并选择View State,您应该看到如下内容:

Nifi之ExecuteScript使用方法 (三)

Groovy

import org.apache.nifi.components.state.Scope
def stateManager = context.stateManager
def stateMap = stateManager.getState(Scope.CLUSTER)
def newMap = ['myKey1': 'myValue1']
if (stateMap.version == -1) {
  stateManager.setState(newMap, Scope.CLUSTER);
} else {
  stateManager.replace(stateMap, newMap, Scope.CLUSTER);
}
           

Jython

from org.apache.nifi.components.state import Scope
stateManager = context.stateManager
stateMap = stateManager.getState(Scope.CLUSTER)
newMap = {'myKey1': 'myValue1'}
if stateMap.version == -1:
    stateManager.setState(newMap, Scope.CLUSTER)
else:
    stateManager.replace(stateMap, newMap, Scope.CLUSTER)
           

JavaScript

var Scope = Java.type('org.apache.nifi.components.state.Scope');
var stateManager = context.stateManager;
var stateMap = stateManager.getState(Scope.CLUSTER);
var newMap = {'myKey1': 'myValue1'};
if (stateMap.version == -1) {
  stateManager.setState(newMap, Scope.CLUSTER);
} else {
  stateManager.replace(stateMap, newMap, Scope.CLUSTER);
}
           

JRuby

java_import org.apache.nifi.components.state.Scope
stateManager = context.stateManager
stateMap = stateManager.getState(Scope::CLUSTER)
newMap = {'myKey1'=> 'myValue1'}
if stateMap.version == -1
    stateManager.setState(newMap, Scope::CLUSTER)
else
    stateManager.replace(stateMap, newMap, Scope::CLUSTER)
end
           

清除state map

Groovy

import org.apache.nifi.components.state.Scope
context.stateManager.clear(Scope.LOCAL)
           

Jython

from org.apache.nifi.components.state import Scope
context.stateManager.clear(Scope.LOCAL)
           

JavaScript

var Scope = Java.type('org.apache.nifi.components.state.Scope');
context.stateManager.clear(Scope.LOCAL);
           

 JRuby

java_import org.apache.nifi.components.state.Scope
context.stateManager.clear(Scope::LOCAL)
           

访问控制器服务

在NiFi ARchive(NAR)结构中,Controller Services作为接口公开,通常在API JAR中。例如,DistributedCacheClient是从ControllerService扩展的接口,它驻留在nifi-distributed-cache-client-service-api JAR中,该JAR位于nifi-standard-services-api-nar NAR中。其他希望引用接口的NAR(例如,创建新类型的客户端实现)必须将nifi-standard-services-api-nar指定为其父NAR,然后引用处理器中提供的API JAR实例子模块。 

博主之所以了解到控制服务是在ExecuteScript Processor中需要连接到redis的Sentinel模式.有试过上边提到的引用第三方包的方式安装python-redis然后再Module Directory和代码中引入xx.py文件,也试过使用同样的方法引入jar文件(Jython的底层是java,所以博主尝试用java的办法连接sentinel),最终都以失败告终.然后了解到了redis的控制服务器可以连接到sentinel模式以供代码中调用.

首先,如何打开一个控制器服务

一般而言,我们的processor流程都简历在"组"中,因为这样可以让界面看着更加整齐.鼠标点住第四个图标拖拽到画布,自定义一个组的名字即可创建组.

Nifi之ExecuteScript使用方法 (三)

右击组模块,选择configure.会弹出 "组名Configuration"

点击右侧的加号,在filter中输入redis,你将看到下图

Nifi之ExecuteScript使用方法 (三)

本文中我们选择第一个service.双击即可选中.service的最初状态都为invalid.我们需要填写必要的连接信息才可以启用.点击最右侧的设置按钮填写信息.除了key值加粗的必填项以外,下图圈起来的部分也要填写.填写的所有值都来自redis的设置.redis.conf或sentinel.conf

Nifi之ExecuteScript使用方法 (三)

填写完毕后,在左侧的SETTINGS选项卡中,复制service的ID,如下图

Nifi之ExecuteScript使用方法 (三)

以上,配置就完成了.下面是如何在代码中获得redis连接.还是需要用到content对象.

service = context.getControllerServiceLookup().getControllerService("刚刚复制的id粘这里")
redis = service.getConnection()
           

此时就获得了redis的connecting对象,此时,你可以对redis做你想做的事情.

值得一提的是,获得的redis的连接对象本身为org.springframework.data.redis.connection.jedis.JedisConnection的实体(nifi1.7.1中),所以如果要调用任何方法,请参考这个类.您可以将类名复制到百度中找到它的代码.

第二种使用service的办法,以DistributedMapCacheClient为例:

在ExecuteScript配置中,创建一个名为“clientServiceId”的动态属性,并将值设置为service的ID:

Nifi之ExecuteScript使用方法 (三)

然后我们可以使用clientServiceId.asControllerService(DistributedMapCacheClient),其中该参数是对DistributedMapCacheClient的Class对象的引用。对于这些示例,我已使用字符串键'a'预先填充缓存,该键设置为字符串值'hello'。

一旦我们有了一个DistributedMapCacheClient实例,那么为了检索一个值,我们可以调用它的get(key,serializer,deserializer)方法。在我们的例子中,由于键和值是字符串,我们只需要一个Serializer <String>和Deserializer <String>的实例传递给get()方法。所有语言的方法类似于本系列文章第2部分中描述的StreamCallback实例的创建。这些示例将从预先填充的服务器获取密钥“a”的值并记录结果(“Result = hello”)

获取存储在DistributedMapCacheServer中的属性的值

应用场景 : 当用户已将值填充到DistributedMapCacheServer(例如配置数据)中,并且脚本需要访问它们。

方法:使用上述方法,创建StringSerializer和StringDeserializer对象,然后按ID获取DistributedMapCacheClientService实例,然后在服务上调用get()。为方便起见,此处包括结果的记录。

Groovy

import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.Deserializer
import java.nio.charset.StandardCharsets
 
def StringSerializer = {value, out -> out.write(value.getBytes(StandardCharsets.UTF_8))} as Serializer<String>
def StringDeserializer = { bytes -> new String(bytes) } as Deserializer<String>
 
def myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
def result = myDistClient.get('a', StringSerializer, StringDeserializer)
log.info("Result = $result")
           

Jython

from org.python.core.util import StringUtil
from org.apache.nifi.distributed.cache.client import DistributedMapCacheClient, Serializer, Deserializer
 
# Define a subclass of Serializer for use in the client's get() method
class StringSerializer(Serializer):
  def __init__(self):
        pass
  def serialize(self, value, out):
       out.write(value)
 
# Define a subclass of Deserializer for use in the client's get() method
class StringDeserializer(Deserializer):
  def __init__(self):
        pass
  def deserialize(self,  bytes):
        return StringUtil.fromBytes(bytes)
 
myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
result = myDistClient.get('a', StringSerializer(), StringDeserializer())
log.info('Result = ' + str(result))
           

JavaScript

var DistributedMapCacheClient = Java.type('org.apache.nifi.distributed.cache.client.DistributedMapCacheClient');
var Serializer = Java.type('org.apache.nifi.distributed.cache.client.Serializer');
var Deserializer = Java.type('org.apache.nifi.distributed.cache.client.Deserializer');
var StandardCharsets = Java.type('java.nio.charset.StandardCharsets');
 
var StringSerializer = new Serializer(function(value, out) {
  out.write(value.getBytes(StandardCharsets.UTF_8));
})
var StringDeserializer = new Deserializer(function(arr) {
  // For some reason I had to build a string from the character codes in the "arr" array
  var s = "";
  for(var i = 0; i < arr.length; i++) {
    s = s + String.fromCharCode(arr[i]);
  }
  return s;
})
 
var myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.class);
var result = myDistClient.get('a', StringSerializer, StringDeserializer);
log.info("Result = "+ result);
           

JRuby

java_import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
java_import org.apache.nifi.distributed.cache.client.Serializer
java_import org.apache.nifi.distributed.cache.client.Deserializer
java_import java.nio.charset.StandardCharsets
 
# Define a subclass of Serializer for use in the client's get() method
class StringSerializer
  include Serializer
  def serialize(value, out)
       out.write(value.to_java.getBytes(StandardCharsets::UTF_8))
  end
end
 
# Define a subclass of Deserializer for use in the client's get() method
class StringDeserializer
  include Deserializer
  def deserialize(bytes)
       bytes.to_s
  end
end
 
myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient.java_class)
result = myDistClient.get('a', StringSerializer.new, StringDeserializer.new)
log.info('Result = ' + result)
           

Note:

1-脚本运行时显示的各种error,在nifi-app.log中.如下图

Nifi之ExecuteScript使用方法 (三)

2-使用log.info('xxxxx')写入log日志时, 在nifi-app.log中

3-在ExecuteScript Processor中直接print的信息,在nifi-bootstrap.log中.直接print的办法调试代码也很方便.

本文包含有关如何使用各种支持的语言与NiFi API进行交互的更复杂的示例。我可能会在本系列中添加其他部分,或者肯定会有关于脚本处理器的其他文章,因为已经进行了改进并添加了其他功能(例如ScriptedReportingTask即将推出!)。

原文连接:https://community.hortonworks.com/articles/77739/executescript-cookbook-part-3.html