天天看點

Blink流式計算-Kafka接入demo

//定義解析Kakfa message的UDTF

CREATE FUNCTION myParse AS 'com.xxxxxx.MyKafkaUDTF';

CREATE FUNCTION myUdf AS 'com.xxxxxxx.MyWaterMarkUDTF';

//注意:kafka源表DDL字段必須與以下例子一緻

create table my_input (

messageKey VARBINARY,

message

VARBINARY,

topic varchar,

partition

int,

offset

bigint,

ctTime AS TO_TIMESTAMP (myUdf (

message

)),

//注意計算裡的類型必須為timestamp才能在做watermark。

WATERMARK wk FOR

ctTime

AS WITHOFFSET (

ctTime

, 2000) --為rowtime定義watermark

) WITH (

type = 'KAFKA08',

topic = 'myTopic',

group.id

= 'mGroup',

extraConfig = 'bootstrap.servers=127.0.0.1:9092',

zookeeper.connect

= '127.0.0.1:2181',

startupMode = 'EARLISET'

);

-- 滾動視窗 group by prodId

CREATE VIEW input_view01 (

windowStart,

windowEnd,

prodId,

prodName,

prodNumber

) AS

SELECT

HOP_START (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE),

HOP_END (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE),

T.prodId as prodId,

T.prodName as prodName,

count (*) as prodNumber

from

my_input as S,

LATERAL TABLE (myParse (

message

)) as T (

id,

createdAt,

updatedAt

)

Group BY HOP (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE), T.prodId, T.prodName;

CREATE VIEW input_view60 (

T.id,

T.prodId,

T.prodName,

T.createdAt,

T.updatedAt

message

goCs,

-- 結果print

create table outprint01(

prodId bigint,

prodName varchar,

prodNumber bigint

)with(

type = 'print'

insert into outprint01

select prodId , prodName , prodNumber

from input_view01;

-- 結算結果寫入Kafka

create table result_kafka (

message

PRIMARY KEY (messageKey)

) with (

topic = 'myResultTopic',

extraConfig='bootstrap.servers=127.0.0.1:9092',

zookeeper.connect

startupMode='EARLISET'

//此處的結果輸出,可以考慮将結果組裝成字元串,中間用|隔開,接收方再解析

INSERT INTO

result_kafka

cast(prodId as VARBINARY) as messageKey,

cast(prodName as VARBINARY) as

message

FROM

input_view01;

MyKafkaUDTF寫法:

package com.xxxxxxxx;

import com.alibaba.fastjson.JSONObject;

import org.apache.flink.table.functions.TableFunction;

import org.apache.flink.table.types.DataType;

import org.apache.flink.table.types.DataTypes;

import org.apache.flink.types.Row;

import java.io.UnsupportedEncodingException;

import java.sql.Timestamp;

public class MyKafkaUDTF extends TableFunction {

public void eval(byte[] message) {

try {

String msg = new String(message, "UTF-8");

System.out.println("收到的消息:"+msg);

JSONObject jsonObject = JSONObject.parseObject(msg);

if (jsonObject != null) {

//id

Long id = jsonObject.getLong("id");

//prodId

Long prodId = jsonObject.getLong("prodId");

//prodName

String prodName = jsonObject.getString("prodName ");

Long createAt = jsonObject.getLong("createdAt");

Long updatedAt = jsonObject.getLong("updatedAt");

//建立時間時間戳

Timestamp createAtTimeStamp = new Timestamp(createAt);

Timestamp updatedAtTimeStamp = new Timestamp(updatedAt);

Row row = new Row(8);

row.setField(0, id);

row.setField(1, prodId);

row.setField(2, prodName);

row.setField(3, createAtTimeStamp );

row.setField(4, updatedAtTimeStamp );

System.out.println("message str ==>" + row.toString());

collect(row);

}

} catch (Exception e) {

e.printStackTrace();

System.out.println(" error. Input data " + msg + "is not json string");

} catch (UnsupportedEncodingException e) {

@Override

// 如果傳回值是Row,就必須重載實作這個方法,顯式地告訴系統傳回的字段類型

public DataType getResultType(Object[] arguments, Class[] argTypes) {

return DataTypes.createRowType(

DataTypes.LONG,

DataTypes.STRING,

DataTypes.TIMESTAMP,

DataTypes.TIMESTAMP);

package xxxxxxx;

import org.apache.flink.table.functions.ScalarFunction;

import java.text.SimpleDateFormat;

import java.util.Date;

public class MyWaterMarkUDTF extends ScalarFunction {

public String eval(byte[] message) {

JSONObject data = JSONObject.parseObject(msg);

System.out.println("time:"+data.getString("createdAt"));

Long createAtLong = data.getLong("createdAt");

SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

String createTimeStr = parser.format(new Date(createAtLong));

return createTimeStr;

return null;

//可選,close方法可以不寫

public void close() {