天天看點

ksqlDB基本使用

基本概念

ksqlDB Server

ksqlDB是事件流資料庫,是一種特殊的資料庫,基于Kafka的實時資料流處理引擎,提供了強大且易用的SQL互動方式來對Kafka資料流進行處理,而無需編寫代碼。KSQL具備高擴充、高彈性、容錯式等優良特性,并且它提供了大範圍的流式處理操作,比如資料過濾、轉化、聚合、連接配接join、視窗化和 Sessionization (即捕獲單一會話期間的所有的流事件)等。

ksqlDB基本使用

ksqlDB CLI

KSQL指令行界面(CLI)以互動方式編寫KSQL查詢。 KSQL CLI充當KSQL Server的用戶端。

事件(Event)

ksqlDB旨在通過使用較低級别的流處理器來提高抽象度。通常,一個事件稱為“行”,就像它是關系資料庫中的一行一樣。

流(Stream)

流代表是一系列曆史資料的分區的,不可變的,僅可以追加的集合。 一旦将一行插入流中,就無法更改。可以在流的末尾添加新行,但是永遠不能更新或者删除現有的行。 每一行資料存儲在特定的分區中,每行隐式或顯式地擁有一個代表其身份的鍵,具有相同鍵的所有行都位于同一分區中。

表(Table)

表是可變的、分區的集合,它的内容會随時間而變化。 流表示事件的曆史序列,與之相反,表表示目前的真實情況。表通過利用每一行的鍵來工作。如果一個行序列共享一個鍵,那麼給定鍵的最後一行表示該鍵辨別的最新資訊,背景程序定期運作并删除除最新行以外的所有行。

舉例說明
ksqlDB基本使用
假設使用者Alice和Bob剛開始分别有200美元和100美元,經過了以下一系列交易:

  • Alice轉給Bob 100美元。
  • Bob轉給Alice 50美元。
  • Bob轉給Alice 100美元。

在例子中Stream表示資金從一個賬号轉移到另一個賬号的曆史記錄,Table反映了每個使用者賬号的最新狀态。是以我們得出結論:Table将具有賬戶的目前狀态,而Stream将捕獲交易記錄。

Stream可以看作是Table的變更日志,因為随着時間的推移更新Stream的聚合會産生一個表。 可以将某個Table在某個時間點視為Stream中每個鍵的最新值的快照(流的資料記錄是鍵值對),觀察Table随時間的變化會産生一個Stream。

Docker部署ksqlDB

建立docker-compose.yaml檔案,包含ksqlDB Server和ksqlDB Cli:

---
version: '2'
services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:0.15.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: 192.168.1.87:9092  #要連接配接的kafka叢集的位址
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.15.0
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true      

通過

docker-compose up -d

指令啟動,然後用下面指令連接配接ksql:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.15.0, Server v0.15.0 located at http://ksqldb-server:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>      

Producer代碼

package tuling.kafkaDemo;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MsgProducer {
    private final static String TOPIC_NAME = "cr7-topic";
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.87:9092,192.168.1.88:9092,192.168.1.89:9092");
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "11.8.36.125:9092,11.8.38.116:9092,11.8.38.120:9092");
         /*
         發出消息持久化機制參數
        (1)acks=0: 表示producer不需要等待任何broker确認收到消息的回複,就可以繼續發送下一條消息。性能最高,但是最容易丢消息。
        (2)acks=1: 至少要等待leader已經成功将資料寫入本地log,但是不需要等待所有follower是否成功寫入。就可以繼續發送下一
             條消息。這種情況下,如果follower沒有成功備份資料,而此時leader又挂掉,則消息會丢失。
        (3)acks=-1或all: 需要等待 min.insync.replicas(預設為1,推薦配置大于等于2) 這個參數配置的副本個數都成功寫入日志,這種政策
            會保證隻要有一個備份存活就不會丢失資料。這是最強的資料保證。一般除非是金融級别,或跟錢打交道的場景才會使用這種配置。
         */
        props.put(ProducerConfig.ACKS_CONFIG, "1");
         /*
        發送失敗會重試,預設重試間隔100ms,重試能保證消息發送的可靠性,但是也可能造成消息重複發送,比如網絡抖動,是以需要在
        接收者那邊做好消息接收的幂等性處理
        */
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        //重試間隔設定
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
        //設定發送消息的本地緩沖區,如果設定了該緩沖區,消息會先發送到本地緩沖區,可以提高消息發送性能,預設值是33554432,即32MB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        /*
        kafka本地線程會從緩沖區取資料,批量發送到broker,
        設定批量發送消息的大小,預設值是16384,即16kb,就是說一個batch滿了16kb就發送出去
        */
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        /*
        預設值是0,意思就是消息必須立即被發送,但這樣會影響性能
        一般設定10毫秒左右,就是說這個消息發送完後會進入本地的一個batch,如果10毫秒内,這個batch滿了16kb就會随batch一起被發送出去
        如果10毫秒内,batch沒滿,那麼也必須把消息發送出去,不能讓消息的發送延遲時間太長
        */
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        //把發送消息的key從字元串序列化為位元組數組
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把發送消息value從字元串序列化為位元組數組
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //建立Kafka消費者執行個體
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        int msgNum = 50;
        final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
        for (int i = 1; i <= msgNum; i++) {
            Order order = new Order(i, 100 + i, 1, 1000.00);
            //指定發送分區
            /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , 0, order.getOrderId().toString(), JSON.toJSONString(order));*/
            //未指定發送分區,具體發送的分區計算公式:hash(key)%partitionNum
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , order.getOrderId().toString(), JSON.toJSONString(order));
            //等待消息發送成功的同步阻塞方法
//            RecordMetadata metadata = producer.send(producerRecord).get();
//            System.out.println("同步方式發送消息結果:" + "topic-" + metadata.topic() + "|partition-"
//                    + metadata.partition() + "|offset-" + metadata.offset());
            //異步回調方式發送消息
            producer.send(producerRecord, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("發送消息失敗:" + exception.getStackTrace());
                    }
                    if (metadata != null) {
                        System.out.println("異步方式發送消息結果:" + "topic-" + metadata.topic() + "|partition-"
                                + metadata.partition() + "|offset-" + metadata.offset());
                    }
                    // CountDownLatch能夠使一個線程在等待另外一些線程完成各自工作之後,再繼續執行。使用一個計數器進行實作。計數器初始值為線程的數量。
                    // 當每一個線程完成自己任務後,計數器的值就會減一。
                    countDownLatch.countDown();
                }
            });
            //異步應用場景送積分TODO
        }
        countDownLatch.await(5, TimeUnit.SECONDS); //當計數器的值為0時,表示所有的線程都已經完成一些任務,然後在CountDownLatch上等待的線程就可以恢複執行接下來的任務。
        producer.close();  //所有生産者線程完成任務後,主線程關閉和kafka broker的連接配接
    }
}      

Producer會以如下Json格式向Kafka Broker發送資料:

生産者會以如下Json格式
{"orderAmount":1000,"orderId":2,"productId":102,"productNum":1}      

列印Topic資料

ksql> PRINT 'cr7-topic' FROM BEGINNING limit 5;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/02/27 16:11:46.239 Z, key: 2, value: {"orderAmount":1000,"orderId":2,"productId":102,"productNum":1}, partition: 2
rowtime: 2021/02/27 16:11:46.239 Z, key: 3, value: {"orderAmount":1000,"orderId":3,"productId":103,"productNum":1}, partition: 2
rowtime: 2021/02/27 16:11:46.240 Z, key: 9, value: {"orderAmount":1000,"orderId":9,"productId":109,"productNum":1}, partition: 2
rowtime: 2021/02/27 16:11:46.241 Z, key: 16, value: {"orderAmount":1000,"orderId":16,"productId":116,"productNum":1}, partition: 2
rowtime: 2021/02/27 16:11:46.241 Z, key: 29, value: {"orderAmount":1000,"orderId":29,"productId":129,"productNum":1}, partition: 2      

建立Stream

基于名為cr7-topic的topic建立一個Stream,注意Stream的名字不能有

-

ksql> CREATE STREAM  cr7_topic_stream ( orderAmount INTEGER, orderId INTEGER, productId INTEGER, productNum INTEGER)
WITH (kafka_topic='cr7-topic',value_format='json');
 Message
----------------
 Stream created
----------------      

列出所有Stream

ksql> list streams;
 Stream Name         | Kafka Topic                 | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------
 CR7_TOPIC_STREAM    | cr7-topic                   | KAFKA      | JSON         | false      

查詢Stream資料

運作Producer程式,可以看到會持續輸出資料:

ksql> select * from  CR7_TOPIC_STREAM EMIT CHANGES;
+---------------------------+---------------------------+---------------------------+---------------------------+
|ORDERAMOUNT                |ORDERID                    |PRODUCTID                  |PRODUCTNUM                 |
+---------------------------+---------------------------+---------------------------+---------------------------+
|1000                       |4                          |104                        |1                          |
|1000                       |6                          |106                        |1                          |
|1000                       |10                         |110                        |1                          |
|1000                       |12                         |112                        |1                          |
|1000                       |13                         |113                        |1                          |
|1000                       |14                         |114                        |1                          |
|1000                       |18                         |118                        |1                          |
|1000                       |19                         |119                        |1                          |
|1000                       |20                         |120                        |1                          |
|1000                       |24                         |124                        |1                          |
|1000                       |26                         |126                        |1                          |
|1000                       |31                         |131                        |1                          |
|1000                       |35                         |135                        |1                          |
|1000                       |38                         |138                        |1                          |
|1000                       |39                         |139                        |1                          |
|1000                       |42                         |142                        |1                          |
|1000                       |46                         |146                        |1                          |
|1000                       |1                          |101                        |1                          |
|1000                       |5                          |105                        |1                          |
|1000                       |7                          |107                        |1                          |
|1000                       |8                          |108                        |1                          |
|1000                       |11                         |111                        |1                          |
|1000                       |15                         |115                        |1                          |
|1000                       |17                         |117                        |1                          |
|1000                       |21                         |121                        |1                          |
|1000                       |22                         |122                        |1                          |
|1000                       |23                         |123                        |1                          |
|1000                       |25                         |125                        |1                          |
|1000                       |2                          |102                        |1                          |
|1000                       |3                          |103                        |1                          |      

通過Stream建立另一個Stream

将Stream cr7_topic_stream中orderid為單數的資料寫入新的Stream s3中:

ksql> CREATE STREAM s3 AS SELECT * FROM cr7_topic_stream
WHERE (orderid%2) != 0 EMIT CHANGES;      

檢視Stream s3,可以看到隻有orderid為單數的資料:

ksql> select * from s3 emit changes;
+---------------------------+---------------------------+---------------------------+---------------------------+
|ORDERAMOUNT                |ORDERID                    |PRODUCTID                  |PRODUCTNUM                 |
+---------------------------+---------------------------+---------------------------+---------------------------+
|1000                       |1                          |101                        |1                          |
|1000                       |5                          |105                        |1                          |
|1000                       |7                          |107                        |1                          |
|1000                       |11                         |111                        |1                          |
|1000                       |15                         |115                        |1                          |
|1000                       |17                         |117                        |1                          |
|1000                       |21                         |121                        |1                          |
|1000                       |23                         |123                        |1                          |
|1000                       |25                         |125                        |1                          |
|1000                       |27                         |127                        |1                          |
|1000                       |33                         |133                        |1                          |
|1000                       |37                         |137                        |1                          |
|1000                       |43                         |143                        |1                          |
|1000                       |45                         |145                        |1                          |
|1000                       |47                         |147                        |1                          |
|1000                       |13                         |113                        |1                          |
|1000                       |19                         |119                        |1                          |
|1000                       |31                         |131                        |1                          |
|1000                       |35                         |135                        |1                          |
|1000                       |39                         |139                        |1                          |
|1000                       |3                          |103                        |1                          |      

Stream資料聚合查詢

查詢Stream cr7_topic_stream中的條目總數和orderamount的總和,并以productnum作為分組:

ksql> SELECT COUNT(*),SUM(orderamount) from cr7_topic_stream GROUP BY  productnum EMIT CHANGES;
+---------------------------------------------------------+---------------------------------------------------------+
|KSQL_COL_0                                               |KSQL_COL_1                                               |
+---------------------------------------------------------+---------------------------------------------------------+
|50                                                       |50000      

手動往Stream中插入資料

ksql> INSERT INTO cr7_topic_stream 
(orderId,productNum)
values (777,7777);      

檢視Stream資料結構

ksql> describe  cr7_topic_stream;
Name                 : CR7_TOPIC_STREAM
 Field       | Type
-----------------------
 ORDERAMOUNT | INTEGER
 ORDERID     | INTEGER
 PRODUCTID   | INTEGER
 PRODUCTNUM  | INTEGER
-----------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;      

将上

EXTENDED

參數檢視詳細資訊:

ksql> describe extended cr7_topic_stream;
Name                 : CR7_TOPIC_STREAM
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : cr7-topic (partitions: 3, replication: 3)
Statement            : CREATE STREAM CR7_TOPIC_STREAM (ORDERAMOUNT INTEGER, ORDERID INTEGER, PRODUCTID INTEGER, PRODUCTNUM INTEGER) WITH (KAFKA_TOPIC='cr7-topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
 Field       | Type
-----------------------
 ORDERAMOUNT | INTEGER
 ORDERID     | INTEGER
 PRODUCTID   | INTEGER
 PRODUCTNUM  | INTEGER
-----------------------
Sources that have a DROP constraint on this source
--------------------------------------------------
S3
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic cr7-topic)      

删除Stream

DROP STREAM cr7_topic_stream;      

建立Table

必須要含有主鍵,主鍵是Kafka生産者生産消息時指定的key。

ksql> CREATE TABLE  cr7_topic_table (
orderAmount INTEGER, orderId INTEGER, productId INTEGER, productNum INTEGER, kafkaProducerKey VARCHAR PRIMARY KEY
)
WITH (kafka_topic='cr7-topic',value_format='json');      

kafka腳本生産消息指定key的方法:

#以逗号作為key和value的分隔符。
kafka-console-producer.sh --broker-list kafka1:9092 --topic cr7-topic --property parse.key=true --property key.separator=,
>mykey,{"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}      

檢視Table資訊

ksql> describe cr7_topic_table;
Name                 : CR7_TOPIC_TABLE
 Field       | Type
----------------------------------------------
 ORDERAMOUNT | INTEGER
 ORDERID     | INTEGER          (primary key)
 PRODUCTID   | INTEGER
 PRODUCTNUM  | INTEGER
----------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>      

查詢Table

ksql> select * from cr7_topic_table emit changes;
+---------------------+---------------------+---------------------+---------------------+---------------------+
|KAFKAPRODUCERKEY     |ORDERAMOUNT          |ORDERID              |PRODUCTID            |PRODUCTNUM           |
+---------------------+---------------------+---------------------+---------------------+---------------------+
|1                    |1000                 |1                    |101                  |1                    |
|2                    |1000                 |2                    |102                  |2                    |
|3                    |1000                 |3                    |103                  |3                    |
......
#當生産者重新生産資料,把Java代碼中 
#Order order = new Order(i, 100 + i, 1, 1000.00);
修改為
#Order order = new Order(i, 100 + i, 1, 2000.00);
#在key值一樣的情況下,查cr7_topic_table會是最新的值
|2                    |2000                 |2                    |102                  |2                    |
|3                    |2000                 |3                    |103                  |3                    |
|1                    |2000                 |1                    |101                  |1                    |
......