天天看點

RocketMQ Streams 1.1.0: 輕量級流處理再出發

本文作者:倪澤,Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer

01 背景

RocketMQ Streams是一款基于RocketMQ為基礎的輕量級流計算引擎,具有資源消耗少、部署簡單、功能全面的特點,目前已經在社群開源。RocketMQ Streams在阿裡雲内部被使用在對資源比較敏感,同時又強烈需要流計算的場景,比如在自建機房的雲安全場景下。

自RocketMQ Streams開源以來,吸引了大量使用者調研和試用。但是也存在一些問題,在RocketMQ Streams 1.1.0中,主要針對以下問題做出了改進和優化。

1、面向使用者API不夠友好,不能使用泛型,不支援自定義序列化/反序列化;

2、代碼備援,在RocketMQ Streams中存在将流處理拓撲序列化反序列化子產品,RocketMQ Streams作為輕量級流處理SDK,建構好流處理節點之後應該可以直接處理資料,不存在将流處理拓撲圖本地儲存或者網絡傳輸需求。

3、流處理過程不容易了解,含有大量緩存、重新整理邏輯;

4、存在大量支援SQL的代碼,這部分和SDK方式運作流處理任務的邏輯無關;

在RocketMQ Streams 1.1.0中,對上述問題做出了改進,期望能帶來更好的使用體驗。同時,重新設計了流處理拓撲建構過程、去掉備援代碼,使得代碼更容易被了解。

從今天起,将推出系列文章介紹RocketMQ Streams 1.1.0版本,本次文章主要介紹RocketMQ Streams 1.1.0的API如何使用,如何利用RocketMQ Streams快速建構流處理應用。

02 典型使用示例

本地運作下列示例的步驟:

1、部署RocketMQ 5.0;

2、使用mqAdmin建立topic;

3、建構示例工程,添加依賴,啟動示例。RocketMQ Streams 坐标:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-streams</artifactId>
    <version>1.1.0</version>
</dependency>           

4、向topic中寫入相應資料,并觀察結果。

更詳細文檔請參考:https://github.com/apache/rocketmq-streams

WordCount

public class WordCount {
    public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("wordCount");


        builder.source("sourceTopic", total -> {
                    String value = new String(total, StandardCharsets.UTF_8);
                    return new Pair<>(null, value);
                })
                .flatMap((ValueMapperAction<String, List<String>>) value -> {
                    String[] splits = value.toLowerCase().split("\\W+");
                    return Arrays.asList(splits);
                })
                .keyBy(value -> value)
                .count()
                .toRStream()
                .print();


        TopologyBuilder topologyBuilder = builder.build();


        Properties properties = new Properties();
        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");


        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);


        final CountDownLatch latch = new CountDownLatch(1);


        Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
            @Override
            public void run() {
                rocketMQStream.stop();
                latch.countDown();
            }
        });


        try {
            rocketMQStream.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}           

WordCount示例要點:

1、JobId wordCount唯一辨別流處理任務;

2、自定義的反序列化;

3、一對多轉化;

4、lambda形式從資料中指定Key;

5、支援有狀态計算;

視窗聚合

public class WindowCount {
    public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("windowCountUser");


        AggregateAction<String, User, Num> aggregateAction = (key, value, accumulator) -> new Num(value.getName(), 100);


        builder.source("user", source -> {
                    User user1 = JSON.parseObject(source, User.class);
                    return new Pair<>(null, user1);
                })
                .selectTimestamp(User::getTimestamp)
                .filter(value -> value.getAge() > 0)
                .keyBy(value -> "key")
                .window(WindowBuilder.tumblingWindow(Time.seconds(15)))
                .aggregate(aggregateAction)
                .toRStream()
                .print();


        TopologyBuilder topologyBuilder = builder.build();


        Properties properties = new Properties();
        properties.putIfAbsent(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
        properties.put(Constant.TIME_TYPE, TimeType.EVENT_TIME);
        properties.put(Constant.ALLOW_LATENESS_MILLISECOND, 2000);


        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);


        rocketMQStream.start();
    }
}           

視窗聚合示例要點:

1、支援指定時間字段;

2、支援滑動、滾動、會話多種類型window;

3、支援自定義UDAF類型聚合;

4、支援自定義時間類型和資料最大遲到時間;

雙流JOIN

public class JoinWindow {
    public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("joinWindow");


        //左流
        RStream<User> user = builder.source("user", total -> {
            User user1 = JSON.parseObject(total, User.class);
            return new Pair<>(null, user1);
        });


        //右流
        RStream<Num> num = builder.source("num", source -> {
            Num user12 = JSON.parseObject(source, Num.class);
            return new Pair<>(null, user12);
        });


        //自定義join後的運算
        ValueJoinAction<User, Num, Union> action = new ValueJoinAction<User, Num, Union>() {
            @Override
            public Union apply(User value1, Num value2) {
                ...
            }
        };


        user.join(num)
                .where(User::getName)
                .equalTo(Num::getName)
                .window(WindowBuilder.tumblingWindow(Time.seconds(30)))
                .apply(action)
                .print();


        TopologyBuilder topologyBuilder = builder.build();


        Properties properties = new Properties();
        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");


        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);


        rocketMQStream.start();
    }
}           

雙流聚合示例要點:

1、支援window join和非window join,對于非window join,隻需要在上述及連表達式中去掉window即可;

2、支援多種視窗類型的window join;

3、支援對join後資料自定義操作;

03 參與貢獻

RocketMQ Streams是Apache RocketMQ的子項目,已經在社群開源,參與RocketMQ Streams相關工作,請參考以下資源:

1、試用RocketMQ Streams,并閱讀相關文檔以了解更多資訊;

maven倉庫坐标:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-streams</artifactId>
    <version>1.1.0</version>
</dependency>           

RocketMQ Streams文檔:

https://rocketmq.apache.org/zh/docs/streams/30RocketMQ%20Streams%20Overview

2、參與貢獻:如果你有任何功能請求或錯誤報告,請随時送出 Pull Request 來分享你的回報和想法;

社群倉庫:

https://github.com/apache/rocketmq-streams

3、聯系我們:可以在 GitHub上建立 Issue,向 RocketMQ 郵件清單發送電子郵件,或在 RocketMQ Streams SIG 交流群與專家共同探讨,RocketMQ Streams SIG加入方式:添加“小火箭”微信,回複RocketMQ Streams。

郵件清單:

https://lists.apache.org/[email protected]

繼續閱讀