行業背景
- 行業現狀:
- 線上教育是運用網際網路、人工智能等現代資訊技術進行教與學互動的新型教育方式,是教育服務的重要組成部分。發展線上教育,有利于建構網絡化、數字化、個性化、終身化的教育體系,有利于建設“人人皆學、處處能學、時時可學”的學習型社會。
- 大資料在其行業中的作用:
- 對未來客戶的畫像更加精準,營銷推廣時可以對接更好的服務并提升成交轉化率(提升ROI不一定,這涉及到外部競争);
- 更全面的評估老師、學生、機構、行業等線上教育行業的各個參與者;
- 大資料幫助線上教育行業更快發展
業務場景
某公司開發了個線上教育類APP,教育訓練機構可以在APP中會釋出一些直播課程,離線課程,習題,學習文章等内容。使用者可線上學習新知識,離線鞏固已學知識,并對學過的内容進行課後練習/測試。
業務的建構涉及到幾部分:
- APP:應用程式,使用者通路入口
- 背景系統:
- 教學老師:通過分析學生課堂參與情況,提供不同的授課方案,因材施教。
- 運維人員:通過運維監控名額,實時監控線上教育直播網絡品質。
- 營運人員:根據學生注冊、學習品質、平台成單量等統計資訊針對性開展平台營運工作:
- 學生辦理注冊、增删課程等操作;
- 學生學習品質稽核;
- 平台名額檢視,如平台日成單量統計。
技術架構

架構解析:
資料采集:該場景中,數倉的資料來源有兩部分:app的埋點至消息隊列 Kafka 以及 hbase 等業務資料庫的增量日志。值得注意的一點是,實時數倉往往和離線數倉配合使用,共享一套管控系統,如權限/中繼資料管理/排程等系統。
實時數倉架構:該場景中,整個實時數倉的ETL和BI部分的建構,全部通過 Flink + Kafka 完成,原始日志app_log_origin是從用戶端直接收集上來的。然後資料處理,加維等操作後,最終輸入到業務系統。
業務名額
- 實時資料中間層
- 學生記錄檔 ETL 清洗(分析學生操作線上信令日志)
- 擷取學生移動圖檔操作
- 擷取學生 hover 圖檔操作
- 擷取學生畫線操作
- 音頻播放
- 音頻暫停
- 圖文比對錯誤
- 圖文比對正确
- 學生注冊考試等級日志 ETL 清洗
- 學生記錄檔 ETL 清洗(分析學生操作線上信令日志)
- 學生行為分析
- 學生線上(直播)課程課堂表現統計
- 學生離線(錄播)課程學習時長統計
- 運維/網絡監控
- 直播課程(音頻)網絡監控
- 直播課程(視訊)網絡監控
- 營運分析
- 每小時不同 level 的學生注冊人數統計
- 每日課程顧問追蹤統計
說明:該案例中僅包含以上場景及名額,在實際的應用場景下還包括日uv/pv,topN熱門授課教師,教師授課品質、數量稽核等其他名額。
業務代碼
場景一:對原始日志進行實時資料清洗
學生在直播課程中,會做一些随堂練習/測試,通過頁面點選等操作形成原始埋點日志,為了很快的感覺學生的學習表現(課堂表現),業務方針對不同的操作進行計分處理。為了下遊有效的對資料進行處理,針對學生不同的操作,将原始資料(多層 JSON 資料)進行清洗(單層 JSON 資料),寫入 kafka 中。
- 埋點資料樣例
--輸入
{
"createTime":"",
"data":{
"userid":"",
"roomid":"",
"timestamp":"",
"role":"",
"msgid":"",
"msg":{
"msgtype":"",
"msg_data":{
"target_id":"",
"target_type":"",
"action":"",
"sub_action":"",
"page_index":""
}
}
}
}
--輸出
{
"messageCreateTime":"",
"timeStamp":"",
"messageTimeStamp":"",
"userId":"",
"roomId":"",
"role":"",
"msgId":"",
"msgType":"",
"targetId":"",
"targetType":"",
"action":"",
"subAction":"",
"pageIndex":"",
"event":""
}
輸入表
create table timeline_analysis_student_stream (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
-- 事件時間
`createTime` as cast(JSON_VALUE(`message`, '$.createTime')as VARCHAR),
-- 使用者 ID
`userid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.userid') as BIGINT),
-- 教室 ID
`roomid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.roomid') as BIGINT),
-- 操作時間
`time_stamp` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.timestamp') as BIGINT),
-- 角色
`role` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.role') as TINYINT),
-- 消息 ID
`msgid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msgid') as BIGINT),
-- 消息類型
`msg_msgType` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msgtype') as VARCHAR),
-- 消息目标 ID
`msg_msgData_targetId` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.target_id') as VARCHAR),
-- 消息目标類型
`msg_msgData_targetType` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.target_type') as VARCHAR),
-- 學生操作
`msg_msgData_action` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.action') as VARCHAR),
-- 學生次操作
`msg_msgData_subAction` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.sub_action') as VARCHAR),
-- PPT 頁碼
`msg_msgData_pageIndex` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.page_index') as BIGINT)
) with (
type = 'kafka011',
topic = 'timeline_client_topic',
`group.id` = 'timeline_analysis_student_consumer',
...
);
輸出表
create table signal_student_classroom_internation (
messageKey VARBINARY,
`message` VARBINARY,
PRIMARY KEY (messageKey)
) with (
type = 'kafka011',
topic = 'timeline_analysis_student',
...
);
-
- 當學生學習詞性(形容詞/副詞),課堂小練習讓學生将螢幕中出現的單詞圖檔進行分類,學生需要移動圖檔進入不同的分類桶中。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MOVE_PICTURE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetType = 'shape' AND
msg_msgData_action = 'move';
-
- 當學生學習單詞時,需要學習單詞讀音,當學生滑鼠懸停到圖檔時進行發音教學。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_HOVER_PICTURE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetType = 'shape' AND
msg_msgData_action = 'mouse' AND
msg_msgData_subAction = 'over';
-
- 學生通過畫線來進行随堂圖文比對練習。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_LINE_DRAW"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetType = 'shape' AND
msg_msgData_action = 'add';
- 擷取學生音頻播放操作
- 學生播放課件中的音頻。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_AUDIO_PLAY"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetType = 'template' AND
msg_msgData_action = 'audio' AND
msg_msgData_subAction = 'start';
- 擷取學生音頻暫停操作
- 學生暫停課件中的音頻。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_AUDIO_PAUSE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetType = 'template' AND
msg_msgData_action = 'audio' AND
msg_msgData_subAction = 'pause';
- 擷取學生圖文比對錯誤操作
- 連線操作後,傳回給學生連線結果。會影響課堂表現分數。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MATCH_WRONG"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetId = 'match' AND
msg_msgData_targetType = 'template' AND
msg_msgData_action = 'match' AND
msg_msgData_subAction = 'drop:wrong';
- 擷取學生圖文比對正确操作
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MATCH_CORRECT"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetId = 'match' AND
msg_msgData_targetType = 'template' AND
msg_msgData_action = 'match' AND
msg_msgData_subAction = 'drop:correct';
學生在 WEB/APP 頁面注冊時需要考試測評等級,以便後期學習對應 Level 的課程,通過 Flink 做資料清洗,将埋點到 kafka 上日志,輸出到 Hbase。
{
"id":"",
"chinese_name":"",
"english_name":"",
"level":"",
"pid":"",
"create_time":"",
"update_time":"",
"dept_id":""
}
create table blink_stg_activity__channel_name_dictionary_da (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
-- ID
id as JSON_VALUE(`message`,'$.id'),
-- 中文名稱
chinese_name as JSON_VALUE(`message`,'$.chinese_name'),
-- 英文名稱
english_name as JSON_VALUE(`message`,'$.english_name'),
-- 測試登記
level as JSON_VALUE(`message`,'$.level'),
-- 唯一辨別 ID
pid as JSON_VALUE(`message`,'$.pid'),
-- 建立時間
create_time as JSON_VALUE(`message`,'$.create_time'),
-- 更新時間
update_time as JSON_VALUE(`message`,'$.update_time'),
-- 部門 ID
dept_id as JSON_VALUE(`message`,'$.dept_id')
) with (
type = 'kafka010',
topic = 'blink_stg_activity__channel_name_dictionary_da',
`group.id` = 'blink_stg_activity__channel_name_dictionary_da',
...
);
create table blink_stg_activity__channel_name_dictionary_da_sinkhbase (
rowkey varchar,
id varchar,
chinese_name varchar,
english_name varchar,
level varchar,
pid varchar,
create_time varchar,
update_time varchar,
dept_id varchar,
primary key (rowkey)
) with (
type = 'cloudhbase',
tableName = 'channel_name_dictionary',
...
);
insert into
blink_stg_activity__channel_name_dictionary_da_sinkhbase
SELECT
MD5(id) as rowkey,
id ,
chinese_name ,
english_name ,
level ,
pid ,
create_time ,
update_time ,
dept_id
from
blink_stg_activity__channel_name_dictionary_da;
場景二:學生行為分析
場景一中針對學生記錄檔進行了清洗,該場景消費其清洗之後的資料,針對不同的使用者 ID、Web 服務端 ID、角色、操作事件進行分組,開 1min 視窗,通過 count(event)聚合進行計分,求得每分鐘學生線上(直播)課程的課堂表現。
- 該名額上遊資料是在學生記錄檔 ETL 清洗的基礎上進行統計
{
"userId":"",
"roomId":"",
"role":"",
"event":"",
"timeStamp":""
}
create table timeline_analysis_student_mashup_stream (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
-- 使用者 ID
`userId` as cast(JSON_VALUE (`message`, '$.userId') as BIGINT),
-- Web 伺服器 ID
`webserverId` as cast(JSON_VALUE (`message`, '$.roomId') as BIGINT),
-- 角色
`role` as cast(JSON_VALUE (`message`, '$.role') as TINYINT),
-- 操作事件
`event` as cast(JSON_VALUE (`message`, '$.event') as VARCHAR),
-- 事件時間
time_stamp as TO_TIMESTAMP(cast(JSON_VALUE (`message`, '$.timeStamp') as BIGINT)),
WATERMARK wk FOR time_stamp AS WITHOFFSET (time_stamp, 0)--為rowtime定義watermark
) with (
type = 'kafka011',
topic = 'timeline_analysis_student',
`group.id` = 'timeline-analysis-student-mashup-consumer',
...
);
create table timeline_signal_analysis_mysql (
start_time TIMESTAMP,
end_time TIMESTAMP,
webserver_id BIGINT,
user_id BIGINT,
role TINYINT,
event VARCHAR,
event_count BIGINT,
create_time TIMESTAMP
) with (
type='RDS',
tableName='timeline_signal_analysis',
...
);
- 學生課堂表現解析
- 學生在課堂中舉手回答問題等行為進行積分,以此衡量學生課堂表現。
insert into timeline_signal_analysis_mysql
select
TUMBLE_START(time_stamp,INTERVAL '1' MINUTE) as start_time,
TUMBLE_END(time_stamp,INTERVAL '1' MINUTE) as end_time,
webserverId as webserver_id,
userId as user_id,
role as role,
event as event,
COUNT(event) as event_count,
CURRENT_TIMESTAMP as create_time
FROM timeline_analysis_student_mashup_stream
GROUP BY TUMBLE (time_stamp,INTERVAL '1' MINUTE),
userId,
webserverId,
role,
event;
通過 subEvent = 'PPT_SUCCESS' 将完成課程的事件整理出來,通過自關聯的方式,和源表進行 JOIN 打寬,計算 'PPT_SUCCESS' 的時間點與最初播放 PPT 的時間內插補點。
{
"classroom_id":"",
"user_type":"",
"user_id":"",
"event_time":"",
"sub_event":"",
"extra":{
"data_time":"",
"msg":{
"pptIndex":""
}
}
}
create table qos_log_kafka (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
--(錄播)教室 ID
`classroomId` as cast(JSON_VALUE(`message`, '$.classroom_id')as VARCHAR),
-- 使用者類型
`userType` as cast(JSON_VALUE(`message`, '$.user_type')as VARCHAR),
-- 使用者 ID
`userId` as cast(JSON_VALUE(`message`, '$.user_id')as BIGINT),
-- 事件時間
`eventTime` as cast(JSON_VALUE(`message`, '$.event_time')as BIGINT),
-- 次操作
`subEvent` as cast(JSON_VALUE(`message`, '$.sub_event')as VARCHAR),
-- 資料時間
`extraDataTime` as cast(cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.extra')as VARCHAR), '$.data_time')as VARCHAR)as BIGINT),
-- PPT 頁碼
`extraMsgIndex` as cast(JSON_VALUE(cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.extra')as VARCHAR), '$.msg')as VARCHAR), '$.pptIndex')as BIGINT)
) with (
type = 'kafka011',
topic = 'qos_log',
...
);
create table user_enter_classroom_take_time_mysql (
user_id BIGINT,
classroom_id VARCHAR,
user_type VARCHAR,
spend_time BIGINT,
event_time TIMESTAMP,
create_time TIMESTAMP
) with (
type='rds',
tableName='user_enter_classroom_take_time',
...
);
- 學生進入教室時長
- 離線錄播課程,通過 PPT 的播放時間來計算學生進入教室的時長。
CREATE VIEW qos_log_kafka_view AS
SELECT
`userId`,
`classroomId`,
`userType`,
`eventTime`,
subEvent,
`extraDataTime`
FROM qos_log_kafka
WHERE subEvent = 'PPT_SUCCESS';
insert into user_enter_classroom_take_time_mysql
SELECT
a.userId,
a.classroomId,
a.userType,
b.extraDataTime-a.extraDataTime,--毫秒值
TO_TIMESTAMP(a.eventTime),
CURRENT_TIMESTAMP
FROM qos_log_kafka a
JOIN qos_log_kafka_view b ON a.userId=b.userId AND a.classroomId=b.classroomId
WHERE a.extraDataTime<b.extraDataTime;
場景三:運維/網絡監控
通過學生直播課程中,視訊/音頻運維埋點資訊計算,以userId, agoraChannelId,classroomId, userType, event,agoraAudioStateUid/agoraVideoStateUid進行分組,開 30s 的滾動視窗,求最近 30s 直播課的視訊/音頻品質(丢包/異常平均值、總次數),供下遊運維同學監控,實時調整音頻/視訊品質,給使用者最佳的學習體驗。
{
"classroom_id":"",
"user_type":"",
"user_id":"",
"agora_channel_id":"",
"event":"",
"agora_videoState":{
"fr":"",
"uid":""
},
"agora_audioState":{
"lost":"",
"uid":""
},
"messageCreateTime":""
}
create table qos_agora_record_kafka (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
-- 直播教室 ID
`classroomId` as cast(JSON_VALUE(`message`, '$.classroom_id')as VARCHAR),
-- 使用者類型
`userType` as cast(JSON_VALUE(`message`, '$.user_type')as VARCHAR),
-- 使用者 ID
`userId` as cast(JSON_VALUE(`message`, '$.user_id')as BIGINT),
-- 管道 ID
`agoraChannelId` as cast(JSON_VALUE(`message`, '$.agora_channel_id')as BIGINT),
-- 事件
`event` as cast(JSON_VALUE(`message`, '$.event')as VARCHAR),
-- 視訊故障記錄
`agoraVideoStateFr` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_videoState')as VARCHAR), '$.fr')as BIGINT),
-- 視訊故障唯一辨別 ID
`agoraVideoStateUid` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_videoState')as VARCHAR), '$.uid')as BIGINT),
-- 音頻丢失記錄
`agoraAudioStateLost` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_audioState')as VARCHAR), '$.lost')as BIGINT),
-- 音頻丢失唯一辨別 ID
`agoraAudioStateUid` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_audioState')as VARCHAR), '$.uid')as BIGINT),
-- 事件時間
`messageCreateTime` as cast(JSON_VALUE(`message`, '$.messageCreateTime')as BIGINT),
WATERMARK wk FOR messageCreateTime AS WITHOFFSET (messageCreateTime, 60000)--為rowtime定義watermark
) with (
type = 'kafka011',
topic = 'agora_record',
...
);
create table user_av_mysql (
-- 開窗時間
start_time TIMESTAMP,
-- 關窗時間
end_time TIMESTAMP,
--使用者 ID
user_id BIGINT,
web_server_id BIGINT,
-- 直播教室 ID
classroom_id VARCHAR,
-- 使用者類型
user_type VARCHAR,
extra_uid BIGINT,
event VARCHAR,
-- 異常總和值
event_sum BIGINT,
-- 異常平均值
event_avg DOUBLE,
-- 異常次數
event_count BIGINT,
create_time TIMESTAMP
) with (
type='rds',
tableName='user_av_record',
...
);
insert into user_av_mysql
select
TUMBLE_START(messageCreateTime, INTERVAL '30' SECOND) as start_time,
TUMBLE_END(messageCreateTime, INTERVAL '30' SECOND) as end_time,
CASE WHEN `userId` is NULL THEN -1 else userId END as user_id,
CASE WHEN `agoraChannelId` is NULL THEN -1 else agoraChannelId END as web_server_id,
CASE WHEN `classroomId` is NULL THEN -1 else classroomId END as classroom_id,
userType as user_type,
agoraAudioStateUid as extra_uid,
CONCAT(event,'_AUDIO_STATE') as event,
SUM(agoraAudioStateLost) as event_sum,
AVG(agoraAudioStateLost) as event_avg,
COUNT(event) as event_count,
CURRENT_TIMESTAMP as create_time
FROM qos_agora_record_kafka
WHERE agoraAudioStateLost >= 0 AND userType = 'student'
GROUP BY TUMBLE (messageCreateTime, INTERVAL '30' SECOND),
userId,
agoraChannelId,
classroomId,
userType,
event,
agoraAudioStateUid;
insert into user_av_mysql
select
TUMBLE_START(messageCreateTime,INTERVAL '30' SECOND) as start_time,
TUMBLE_END(messageCreateTime,INTERVAL '30' SECOND) as end_time,
CASE WHEN `userId` is NULL THEN -1 else userId END as user_id,
CASE WHEN `agoraChannelId` is NULL THEN -1 else agoraChannelId END as web_server_id,
CASE WHEN `classroomId` is NULL THEN -1 else classroomId END as classroom_id,
userType as user_type,
agoraVideoStateUid as extra_uid,
CONCAT(event,'_VIDEO_STATE') as event,
SUM(agoraVideoStateFr) as event_sum,
AVG(agoraVideoStateFr) as event_avg,
COUNT(event) as event_count,
CURRENT_TIMESTAMP as create_time
FROM qos_agora_record_kafka
WHERE agoraVideoStateFr >= 0 AND userType = 'student'
GROUP BY TUMBLE (messageCreateTime, INTERVAL '30' SECOND),
userId,
agoraChannelId,
classroomId,
userType,
event,
agoraVideoStateUid;
場景四:營運分析
學生通過不同管道(Web 廣告輸入、App 廣告輸入等)進行注冊,本場景會讀取注冊端日志,并關聯使用者注冊時的考試等級表(分為 A/B/C/D 四個 level),以此展現給營運人員,每小時不同 level&管道 的學生注冊人數,實時的調整營運推廣政策。
--學生表
{
"id":"",
"channel_id":"",
"update_time":""
}
--使用者注冊資料
{
"id":"",
"name":"",
"register_date_time":"",
"status":""
}
--學生測試等級表:使用場景一“學生注冊考試等級日志ETL清洗”的結果表
create table student_da_src (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
`id` as JSON_VALUE (`message`, '$.id'),--使用者 ID
`channel_id` as JSON_VALUE (`message`, '$.channel_id'),--管道 ID
`update_time` as JSON_VALUE (`message`, '$.update_time')--更新時間
) with (
type = 'kafka010',
topic = 'uc_account-student',
...
);
create table user_da_in (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
`id` as JSON_VALUE (`message`, '$.id'),--使用者 ID
`name` as JSON_VALUE (`message`, '$.name'),--使用者名稱
`register_date_time` as JSON_VALUE (`message`, '$.register_date_time'),--注冊時間
`status` as JSON_VALUE (`message`, '$.status')--狀态
) with (
type = 'kafka010',
topic = 'uc_account-user',
`group.id` = 'uc_account-user',
...
);
create table channel_da (
rowkey varchar,
id VARCHAR,
`level` VARCHAR,
primary key (rowkey),
PERIOD FOR SYSTEM_TIME
) with (
type = 'cloudhbase',
tableName = 'databus:activity.channel',
...
);
create table sink_table (
uk varchar,
reg_date bigint,
level varchar,
leads bigint,
primary key (uk)
) with (
type = 'elasticsearch',
index = 'vk_app_es_sign_csh',
typeName = 'vk_app_es_sign_csh',
...
);
create view student_da_src_view as
SELECT
last_value(id) as id,
last_value(update_time) as update_time,
last_value(channel_id) as channel_id
from student_da_src
group by id;
create view user_da_in_view as
SELECT
last_value(id) as id,
last_value(name) as name,
last_value(register_date_time) as register_date_time,
last_value(status) as status
from user_da_in
group by id;
insert into
sink_table
SELECT
case when level in ('A','B','C','D') then level else 'other' end as uk
,cast(date_format(register_date_time,'yyyyMMddHH') as bigint) as reg_date
,case when level in ('A','B','C','D') then level else 'other' end as levels
,COUNT(distinct t.id) AS leads
FROM
student_da_src_view t
LEFT JOIN user_da_in_view u ON u.id = t.id
LEFT JOIN channel_da FOR SYSTEM_TIME AS OF PROCTIME() ch ON ch.rowkey = MD5(t.channel_id)
where u.name not LIKE '%測試%'
and u.name not LIKE 'DM\\_%'
and u.name not LIKE '%test%'
and u.status='NORMAL'
group by date_format(register_date_time,'yyyyMMddHH')
,case when level in ('A','B','C','D') then level else 'other' end
,concat(date_format(register_date_time,'yyyyMMddHH'),case when level in ('A','B','C','D') then level else 'other' end)
;
首先通過 ID 進行分組,求出相同 ID 的最新消息(達到去重效果),在最新消息的基礎上使用全局Group聚合,根據事件時間(天)、課程顧問 ID 統計每天每位課程顧問找學生确認“學習進度/約課”的次數。
{
"id":"",
"leads_flow_event_id":"",
"group_id":"",
"cc_id":"",
"student_id":"",
"order_id":"",
"leads_id":"",
"confirm_date_time":"",
"create_time":"",
"update_time":"",
"order_create_time":"",
"canceled_date_time":"",
"apply_refund_date":"",
"status":""
}
create table cc_data_pack_order_info_src (
`messageKey` VARBINARY,
`message` VARBINARY,
`topic` VARCHAR,
`partition` INT,
`offset` BIGINT,
-- ID
`id` as JSON_VALUE (`message`, '$.id'),
-- (Course Consultant)課程顧問 ID
`cc_id` as JSON_VALUE (`message`, '$.cc_id'),
-- 學生 ID
`student_id` as JSON_VALUE (`message`, '$.student_id'),
-- 确認時間
`confirm_date_time` as JSON_VALUE (`message`, '$.confirm_date_time'),
-- 建立時間
`create_time` as JSON_VALUE (`message`, '$.create_time'),
-- 更新時間
`update_time` as JSON_VALUE (`message`, '$.update_time'),
-- 訂單建立時間
`order_create_time` as JSON_VALUE (`message`, '$.order_create_time'),
-- 訂單取消時間
`canceled_date_time` as JSON_VALUE (`message`, '$.canceled_date_time'),
-- 付款時間
`apply_refund_date` as JSON_VALUE (`message`, '$.apply_refund_date'),
-- 狀态
`status` as JSON_VALUE (`message`, '$.status')
) with (
type = 'kafka010',
topic = 'data_pack_order_info',
`group.id` = 'data_pack_order_info',
...
);
CREATE TABLE index_sink (
`cc_id` bigint(20) NOT NULL,
`cc_index` bigint(10) NOT NULL,
`type` int(6) NOT NULL,
`attribution_time` varchar NOT NULL,
`update_time` timestamp NOT NULL,
PRIMARY KEY (`cc_id`, `type`, `attribution_time`)
) WITH (
type='rds',
tableName='staff_index',
...
);
CREATE VIEW cc_data_pack_order_info_view as
select
last_value (cc_id) as cc_id,
last_value (confirm_date_time) as confirm_date_time,
last_value (`status`) as `status`
from
cc_data_pack_order_info_src
group by
id;
insert into index_sink
select
cast(cc_id as bigint) as cc_id,
count(*) as cc_index,
cast(1 as int) as type,
date_format(confirm_date_time,'yyyy-MM-dd') as attribution_time,
current_timestamp as update_time
from
cc_data_pack_order_info_view
where
confirm_date_time is not null
and `status` is not null
and `status` = 3
group by
date_format(confirm_date_time,'yyyy-MM-dd'), cc_id;
實時計算 Flink 版産品交流群
阿裡雲實時計算Flink - 解決方案: https://developer.aliyun.com/article/765097 阿裡雲實時計算Flink - 場景案例: https://ververica.cn/corporate-practice 阿裡雲實時計算Flink - 産品詳情頁: https://www.aliyun.com/product/bigdata/product/sc