天天看點

基于 Pulsar Functions 的事件處理設計模式

原作者:David Kjerrumgaard

翻譯:StreamNative——Sijia

本文将介紹一些常見的實時流式傳輸模式及其實作。

模式 1:動态路由

首先回顧一下如何使用 Apache Pulsar Functions 實作基于内容的路由。基于内容的路由是一種內建模式。該模式已經存在多年,通常用于事件中心和消息架構中。基本思路是檢查每條消息的内容,根據消息内容将消息路由到不同目的地。

基于 Pulsar Functions 的事件處理設計模式

下面的例子使用了Apache Pulsar SDK,SDK 允許使用者配置三個不同的值:

  • 用于在消息中查找比對的正規表達式
  • 消息比對表達式模式時被發送到的 topic
  • 消息不比對表達式模式時被發送到的 topic

這個例子證明了 Pulsar Functions 功能強大,可以基于功能邏輯動态決定将事件發送到哪裡。

import java.util.regex.*;
  import org.apache.pulsar.functions.api.Context;
  import org.apache.pulsar.functions.api.Function;

  public ContentBasedRoutingFunction implements Function<String, String> {

     String process(String input, Context context) throws Exception {
         String regex = context
             .getUserConfigValue(“regex”).toString();
         String matchedTopic = context
             .getUserConfigValue(“matched-topic”).toString();
         String unmatchedTopic = context
             .getUserConfigValue(“unmatched-topic”).toString();

         Pattern p = Pattern.compile(regex);
         Matcher m = p.matcher(input);
         if (m.matches()) {
           context.publish(matchedTopic, input);
         } else {
           context.publish(unmatchedTopic, input);
         }
     }
  }
           

模式 2:過濾

如果想通過僅保留滿足給定條件的事件來排除 topic 上的大多數事件時,應用選擇過濾模式。過濾模式對僅查找感興趣的事件特别有效,如信用卡付款超過了一定金額;日志檔案中的 ERROR 消息;傳感器讀數超過特定門檻值等等(見模式 4)。

假如使用者在監視信用卡交易的事件流,并嘗試檢測欺詐或可疑行為。由于交易量很大,選擇“同意/不同意”的時間有限,使用者必須先過濾掉有“風險”特征的交易,如預付現金、大額付款等。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;

public class FraudFilter implements Function<Purchase, Purchase> {

    Purchase process(Purchase p, Context context) throws Exception {
         if (p.getTransactionType() == ‘CASH ADVANCE’) ||
             p.getAmount > 500.00) {
            return p;
         }
        return null;
    }
}
           

可以使用過濾器來過濾有“風險”特征的交易。過濾器可以識别這些“風險”特征,并隻将這些交易路由到一個單獨的 topic 上以進行進一步評估。經過過濾器過濾後,所有信用卡支付都可以被路由到一個“潛在欺詐行為”的 topic 上進行進一步評估,而其他事件則會被過濾掉,過濾器也不會對過濾掉的事件執行任何操作。

基于 Pulsar Functions 的事件處理設計模式

圖 2 是基于三個獨立支付對象的 FraudFilter function。第一次支付符合給定标準,被路由到“潛在欺詐行為”topic 上進行進一步評估;而第二次和第三次支付不符合欺詐标準,直接被過濾掉(沒有被路由到“潛在欺詐行為”過濾器上)。

模式 3:轉換

轉換模式用于将事件從一種類型轉換為另一種類型,或用于添加、删除或修改輸入事件的值。

投影

投影模式類似于關系代數中的投影算子,選擇輸入事件的屬性子集,并建立僅包含這些屬性的輸出事件。投影模式可用于删除事件中的敏感字段,或者隻保留事件中的必要屬性。圖 3 為投影模式的一種應用,在将記錄釋出到下遊 topic 前,“屏蔽”傳入的社安全号碼。

基于 Pulsar Functions 的事件處理設計模式

富集模式

富集模式用于将資料添加到輸入屬性中不存在的輸出事件中。典型的富集模式包含基于輸入事件中的某個鍵值對引用資料進行某種查找。以下示例展示了如何根據輸入事件中包含的 IP 位址将地理位置添加到輸出事件。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;
import com.company.services.GeoService;

public class IPLookup implements Function<Purchase, Purchase> {
    Purchase process(Purchase p) throws Exception {
      Geo g = GeoService.getByIp(p.getIPAddress());
      // By default, these fields are blank, so we just modify the object
      p.setLongitude(g.getLon());
      p.setLatitiude(g.getLat());
      return p;
    }
}
           

分離模式

在分離模式下,事件處理器接收單個輸入事件,并将其分為多個輸出事件。當輸入事件是一個包含多個單獨事件(如日志檔案中的 entry)的批處理,并且想要單獨處理每個事件時,分離模式十分适用。下圖展示了分離模式的處理過程:先根據換行符分隔輸入,再逐行釋出到配置的輸出 topic。

基于 Pulsar Functions 的事件處理設計模式

此 function 的實作過程如下:

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class Splitter implements Function<String, String> {

    String process(String s, Context context) throws Exception {
       Arrays.asLists(s.split(“\\R”).forEach(line ->
            context.publish(context.getOutputTopic(), line));
       return null;
    }
}
           

模式 4:警報和門檻值

警報和門檻值模式可進行檢測,并根據檢測條件生成警報(如高溫警報)。可以基于簡單的值,也可以基于較複雜的條件(如增長率、數量的持續變化等)生成警報。

下面的示例為基于使用者配置的門檻值參數(如 100.00,38.7 等)和接收警報通知的郵箱位址生成警報。當此 function 接收到超過配置門檻值的傳感器事件時,将發送電子郵件。

import javax.mail.*;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public SimpleAlertFunction implements Function<Sensor, Void> {
   Void process(Sensor sensor, Context context) throws Exception {
       Double threshold = context
           .getUserConfigValue(“threshold”).toString();
       String alertEmail = context
           .getUserConfigValue(“alert-email”).toString();

      if (sensor.getReading() >= threshold) {
         Session s = Session.getDefaultInstance();
         MimeMessage msg = new MineMessage(s);
         msg.setText(“Alert for Sensor:” + sensor.getId());
         Transport.send(msg);
      }
      return null;
  }
}
           

下面是一個有狀态 function 示例,該 function 根據特定傳感器讀數的增長率生成警報。在決定是否生成警報時,需要通路以前的傳感器讀數。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public ComplexAlertFunction implements Function<Sensor, Void> {

  Void process(Sensor sensor, Context context) throws Exception {
      Double threshold = context
           .getUserConfigValue(“threshold”).toString();
      String alertTopic = context
           .getUserConfigValue(“alert-topic”).toString();

      // Get previous & current metric values
      Float previous = context.getState(sensor.getId() + “-metric”);
      Long previous_time = context.getState(sensor.getId() + “-metric-time”);
      Float current = sensor.getMetric();
      Long current_time = sensor.getMetricTime();

      // Calculate Rate of change & compare to threshold.
      Double rateOfChange = (current-previous) /
                           (current_time-previous_time);
      if (abs(rateOfChange) >= threshold) {
         // Publish the sensor ID to the alert topic for handling
         context.publish(alertTopic, sensor.getId());
      }

      // Update metric values
      context.putState(sensor.getId() + “-metric”, current);
      context.putState(sensor.getId() + “-metric-time”, current_time);
  }
}
           

通過 Apache Pulsar Functions 狀态管理特性僅保留先前的度量讀數和時間,并将傳感器 ID 添加到這些值中(因為将會處理來自多個傳感器的度量,是以需要傳感器 ID)。為了簡單起見,假設事件以正确的順序到達,即始終是最新讀數,沒有亂序讀數。

另外,這一次我們将傳感器 ID 轉發到一個專門的警報 topic 以進行進一步處理,而不是僅發送電子郵件。通過這種方式,我們可以對事件進行額外的富集處理(通過 Pulsar Functions)。例如,查找擷取傳感器的地理位置,然後通知相關人員。

基于 Pulsar Functions 的事件處理設計模式

模式 5:簡單計數和視窗計數

簡單計數和視窗計數模式使用了聚合函數,聚合函數将事件的集合作為輸入,并通過對輸入事件應用一個 function 生成一個所需的輸出事件。聚合函數包括:求和、平均值、最大值、最小值、百分位數等。

基于 Pulsar Functions 的事件處理設計模式

圖 6 字數統計示例

以下為使用 Pulsar Functions 實作“字數統計”的示例,計算每個單詞在給定 topic 中出現次數的總和。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public WordCountFunction implements Function<String, Void> {

   Void process(String s, Context context) throws Exception {
      Arrays.asLists(s.split(“\\.”).forEach(word -> context.incrCounter(word, 1));
      return null;
   }
}
           

考慮到流資料 source 無休止的特性,無限期聚合用處不大,因為通常是在資料視窗上進行這些計算(如前一小時内的故障次數)。

基于 Pulsar Functions 的事件處理設計模式

資料視窗代表事件流的有限子集,如圖 7 所示。但是,應該如何定義資料視窗的邊界?有兩個用于定義視窗的常用屬性:

  • 觸發政策:控制執行或觸發 function 代碼的時間。Apache Pulsar Function 架構通過這些規則來通知代碼處理視窗中收集的全部資料。
  • 清除政策:控制保留在視窗中的資料量。這些規則用于決定是否從視窗中清除資料元素。

這兩個政策都是由時間或視窗中的資料量驅動的。二者之間的差別是什麼?又是怎樣協同工作的?在多種視窗技術中,最常用的是滾動視窗和滑動視窗。

滾動視窗

視窗已滿是滾動視窗清除政策的唯一條件,是以,隻需要指定想要使用觸發政策(基于計數或基于時間)即可。基于計數的滾動視窗是怎樣工作的?

在圖 8 的第一個示例中,觸發政策設定為 2,也就是說,在視窗中有兩個項目時,觸發器将會觸發,開始執行 Pulsar Function 代碼。這一系列行為與時間無關,視窗計數達到 2 用了 5 秒還是 5 個小時并不重要,重要的是視窗計數達到 2。

基于 Pulsar Functions 的事件處理設計模式

将上述基于計數的滾動視窗與基于時間的滾動視窗(時間設定為 10 秒)進行對比。經過 10 秒的間隔後,無論視窗中有多少事件,function 代碼都會被觸發。在下圖中,第一個視窗中有 7 個事件,而第二個視窗中隻有 3 個事件。

基于 Pulsar Functions 的事件處理設計模式

滑動視窗

滑動視窗計數定義了視窗的長度,視窗長度設定了清除政策以限制保留待處理的資料量;滑動間隔定義了觸發政策。滾動視窗政策和滑動視窗政策都可以根據時間(時間段)或長度(資料元素的數量)來定義。

在下圖中,視窗長度為 2 秒,也就是說,2 秒以前的資料會被清除,并且不會用于計算。滑動間隔為 1 秒,即每 1 秒鐘執行一次 Pulsar function 代碼。這樣,可以在整個視窗長度内處理資料。

基于 Pulsar Functions 的事件處理設計模式

前面的示例都是基于時間來定義清除政策和觸發政策,也可以根據長度來定義清除政策或觸發政策,或者同時定義這兩種政策。

在 Pulsar Functions 中實作這兩種類型的視窗 function 都很容易,隻需要指定一個 java.util.Collection 作為輸入類型,如下所示,并在建立 function 時在 -userConfig 标志中指定适當的視窗配置屬性。

用于實作前面提到的時間視窗四種情形的配置參數如下:

  • “–windowLengthCount”:每個視窗的消息數量
  • “–windowLengthDurationMs”:視窗時間(以毫秒為機關)
  • “–slidingIntervalCount”:視窗滑動後的消息數量
  • “–slidingIntervalDurationMs”:視窗滑動後的時間
時間,滑動視窗

-windowLengthDurationMs = XXXX

-slidingIntervalDurationMs = XXXX

時間,Batch Window(即滾動視窗) -windowLengthDurationMs = XXXX
長度,滑動視窗

-windowLengthCount = XXXX

-slidingIntervalCount = XXXX

長度,Batch Window(即滾動視窗) -windowLengthCount = XXXX

總結