埋點日志采集
埋點在本項目中有三大類
App端行為日志
PC web端行為日志
微信小程式端行為日志
需求
日志生成在N台伺服器中,現在需要使用flume采集到HDFS
- 3類日志采集後要分别存儲到不同的hdfs路徑
- 日志中的手機号,賬号需要脫敏處理(加密)
- 不同日期的資料要寫到不同的檔案夾,且配置設定應以事件時間為依據
- 因為日志伺服器所在子網跟HDFS叢集不在同一個網段,需要中轉傳輸
大資料項目1
上遊
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1 g2
a1.sources.r1.filegroups.g1 = /opt/data/logdata/app/event.*
a1.sources.r1.filegroups.g2 = /opt/data/logdata/wx/event.*
a1.sources.r1.headers.g1.datatype = app
a1.sources.r1.headers.g2.datatype = wx
a1.sources.r1.batchSize = 100
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn._51doit.flnm.demo02.FieldEncryptInterceptor$FieldEncryptInterceptorBuilder
a1.sources.r1.interceptors.i1.headerName = timestamp
a1.sources.r1.interceptors.i1.timestamp_field = timeStamp
a1.sources.r1.interceptors.i1.to_encrypt_field = account
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/data/flumedata/file-channel/checkpoint
a1.channels.c1.dataDirs = /opt/data/flumedata/file-channel/data
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux02
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100
a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux03
a1.sinks.k2.port = 41414
a1.sinks.k2.batch-size = 100
# 定義sink組及其配套的sink處理器
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000
啟動指令
bin/flume-ng agent -c conf/ -f agentsconf/shangyou -n a1 -Dflume.root.logger=INFO,console
下遊
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sources.r1.batchSize = 100
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/data/flumedata/file-channel/checkpoint
a1.channels.c1.dataDirs = /opt/data/flumedata/file-channel/data
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://linux01:8020/logdata/%{datatype}/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = DoitEduData
a1.sinks.k1.hdfs.fileSuffix = .log.gz
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 268435456
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.codeC = gzip
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.useLocalTimeStamp = false
攔截器jia包代碼
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @author 濤哥
* @nick_name "deep as the sea"
* @contact qq:657270652 wx:doit_edu
* @site www.doitedu.cn
* @date 2021-01-11
* @desc 項目字段加密及時間戳提取攔截器
*/
public class FieldEncryptInterceptor implements Interceptor {
String timestamp_field;
String to_encrypt_field;
String headerName;
public FieldEncryptInterceptor(String timestamp_field, String to_encrypt_field, String headerName) {
this.timestamp_field = timestamp_field;
this.to_encrypt_field = to_encrypt_field;
this.headerName = headerName;
}
public void initialize() {
}
public Event intercept(Event event) {
// 根據要加密的字段,從event中提取原值(用json解析)
try {
String line = new String(event.getBody());
JSONObject jsonObject = JSON.parseObject(line);
String toEncryptField = jsonObject.getString(to_encrypt_field);
String timeStampField = jsonObject.getString(timestamp_field);
// 加密
if (StringUtils.isNotBlank(toEncryptField)) {
String encrypted = DigestUtils.md5Hex(toEncryptField);
// 将加密後的值替換掉原值
jsonObject.put(to_encrypt_field, encrypted);
// 轉回json,并放回event
String res = jsonObject.toJSONString();
event.setBody(res.getBytes("UTF-8"));
}
// 放入時間戳到header中
event.getHeaders().put(headerName, timeStampField);
} catch (Exception e) {
event.getHeaders().put("datatype", "malformed");
e.printStackTrace();
}
return event;
}
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
public void close() {
}
public static class FieldEncryptInterceptorBuilder implements Interceptor.Builder {
String timestamp_field;
String to_encrypt_field;
String headerName;
public Interceptor build() {
return new FieldEncryptInterceptor(timestamp_field, to_encrypt_field, headerName);
}
public void configure(Context context) {
timestamp_field = context.getString("timestamp_field");
headerName = context.getString("headerName");
to_encrypt_field = context.getString("to_encrypt_field");
}
}
}
項目背景(為什麼要有大資料)
某些APP上線後,由于業務模式新尹,市場需求大,經過一段時間的精心營運後,逐漸積累起了上千萬,使用者,以及三四百萬的日活量,app的業務能和産品.數量也急速膨脹,随着規模的增長,逐漸凸顯出大量問題.
- 營銷分析斷層:市場銷售成本居高不下,投放拉新的效果追蹤出現斷層,無法追各管道實際化率,難以準确分析ROI
- 使用者營運不精準:"千人一面"的全量使用者營銷,投入産量難以把控,不精準的粗犷方式難以真正提升存量使用者的長期活躍度
-
全局營運名額監控不實時:有營運的BI系統,但營運名額監控不及時,未形成核心的名額預警機制,決策滞後
公司急需告别這種粗放的,嚴重依賴人力的營運情況,繼續建設一套強大的資料營運平台,用于驅動營銷管道效果評估,用于精細化營運改進,産品功能及使用者體驗優化,老闆看闆輔助管理決策,産品個性化推薦改造,使用者标簽體系建構等應用市場
從各方面為公司的進一步發展提供有力的資料支撐
需求總覽:行為事件域分析
基礎統計分析
- 整體概況:從産品的整體的使用情況出發,對産品的整體使用情況有基礎了解
- 使用者擷取:從獲客管道和版本的方向出發,根據不同管道,不同的版本生成一些可以了解管道優略的名額,可以清晰的觀察每個管道的流量,轉化等情況
- 活躍與留存:從使用者的通路和粘性出發,可以觀察出産品在使用者通路,回訪等方面的趨勢變化,清楚的了解使用者對産品的粘性和沉浸程度
- 事件轉化;根據選擇的事件和屬性,生成該事件的發生次數,人數,分布等資料名額,可以了解整體的使用者轉化以及收益相關的資料情況
- 使用者特征:根據地理位置.性别作業系統當一些基礎屬性,将進行分組,友善了解使用者的分布占比情況
基礎統計分析名額概覽
整體概況
産品整體的使用情況,包括使用者量、通路情況、留存等幫助對産品整體名額有一個大緻的了解
累計使用者量:産品上線至今的累計使用者量
每日新增使用者量
每日的全部通路人數、次數
每日的全部通路的人均次數/時長/深度
新老使用者通路占比
每日新老使用者的分布情況
新使用者/全部使用者的7日留存:起始和後續事件都為使用者進行頁面通路
各頁面的通路次數分布:基于pageview事件中的頁面标題屬性進行分組
通路終端(app /pc web /微信小程式 /H5)分布:按照通路的作業系統分組
使用者擷取
管道通路
每個管道的使用者的使用情況,包括管道中新使用者的占比、留存等,了解産品在獲客層面上的優勢與不足;其中App的管道資料,會根據iOS,Android進行細分
新增使用者量:全部新使用者數量,包括自然流量和管道流量
管道新增使用者量:僅計算管道流量新增使用者數
各管道新使用者人均通路時長
異常流量:App 異常流量,定義為打開5秒内即進行關閉操作的通路行為
版本資料
App 每個版本的使用情況,幫助了解在産品更新的過程中,是否在活躍和留存方面有所改善
版本通路流量
人均通路時長
各版本留存:各版本的使用者7日留存
活躍與留存
通路流量
産品的每日通路資料,名額集中在新老使用者的通路行為上,提供通路次數、時長、次數分布、通路時段高峰等名額,幫助了解新老使用者在使用産品時的一些行為特征
通路使用者數
新老使用者通路占比
新老使用者人均使用時長
新老使用者啟動/通路次數
每日/每周啟動時段
使用者每日通路産品的時段分布
使用者每周通路産品的星期分布
使用者留存
提供使用者7日,次日,次周,次月留存的資料,幫助了解新老使用者的使用粘性
7日留存/流失
次日留存/流失
次周留存/流失
次月留存/流失
事件轉化
事件轉化
各類關鍵事件(如收藏,分享,轉發,加購等),發生次數、人數以及分布情況
新老使用者事件發生次數/人數/人均次數
事件次數的分布
收益類事件轉化
使用者自定義收益類事件,神策會自動生成該事件的發生次數、人數以及分布情況,會根據您選擇的數值類型屬性,計算該數值的總值、人均值以及次均值
新老使用者收益事件發生次數/人數/人均次數
新老使用者收益事件
使用者特征
通路省份分布
通路城市分布
通路性别分布
通路作業系統分布
新老使用者占比
技術選型
資料采集:flume
存儲平台:hdfs
基礎設施:hive
運算引擎:mapreduce/spark
資源排程:yarn
任務排程:azkaban/oozie
中繼資料管理:atlas(或自研系統)
OLAP引擎:kylin/presto (或clickhouse)
前端界面:superset(或自研javaweb系統)
分層設計
分層原因
資料倉庫中的資料表,往往是分層管理、分層計算的;
所謂分層,具體來說,就是将大量的資料表按照一定規則和定義來進行邏輯劃分;
ADS層: 應用服務層
DWS層:數倉服務(service/summary)層(輕度聚合)
DWD層:數倉明細層
ODS層:操作資料(最原始的資料)層 – 貼源層
DIM層:存儲維表
ODS層:對應着外部資料源ETL到數倉體系之後的表!
DWD層:數倉明細層;一般是對ODS層的表按主題進行加工和劃分;本層中表記錄的還是明細資料;
DWS層:數倉服務層;
ADS層: 應用層,主要是一些結果報表!
分層的意義:資料管理更明晰!運算複用度更高!需求開發更快捷!便于解耦底層業務(資料)變化!
分層詳解
ODS層
資料内容:存放flume采集過來的原始日志
存儲格式:以json格式文本檔案存儲
存儲周期:3個月
DWD層
資料内容:對ODS層資料做ETL處理後的扁平化明細資料
存儲格式:以orc / parquet檔案格式存儲
存儲周期:6個月
DWS層
資料内容:根據主題分析需求,從DWD中輕度聚合後的資料
存儲格式:以ORC/PARQUET檔案格式存儲
存儲周期:1年
ADS層
資料内容:根據業務人員需求,從DWS計算出來的報表
存儲格式:以ORC/PARQUET檔案格式存儲
存儲周期:3年
DIM層
存儲各種維表
模型設計
ODS層
與原始日志資料保持完全一緻
我們有APP端日志,PC端日志,微信小程式端日志,分别對應ODS的三個表
ODS.ACTION_APP_LOG
ODS.ACTION_WEB_LOG
ODS.ACTION_WXAPP_LOG
建表時,一般采用外部表;
表的資料檔案格式:跟原始日志檔案一緻
分區:按天分區(視資料量和計算頻度而定,如資料量大且需每小時計算一次,則可按小時粒度分區)
DWD層
模組化思想
通常是對ODS層資料進行精細化加工處理
不完全星型模型
事實表中,不是所有次元都按次元主鍵資訊存儲(次元退化)
地域次元資訊:年月日周等時間次元資訊,這些次元資訊,基本不會發生任何改變,并且在大部分主題分析場景中,都需要使用,直接在事實表中存儲次元值
頁面資訊:頁面類别資訊,頻道資訊,業務活動資訊,會員等級資訊等,可能發生緩慢變化的次元資訊,事實表中遵循經典理論存儲次元主鍵,具體次元值則在主題分析計算時臨時關聯
事實表
app_event_detail: APP-Event事件明細表
web_event_detail: WEB-Event事件明細表
wxapp_event_detail: 小程式-Event事件明細表
次元表
coupon_info
ad_info
campain_info
lanmu_info
page_info
page_type
pindao_info
promotion_location
huodong_info
miaosha_info
product
product_detail
product_type
shop_info
tuangou_info
user_info
DWS層
模組化思想
主題模組化
次元模組化
最主要思路:按照分析主題,"彙總"各類資料成大寬表
也有一些做法是,将DWS層的表設計成“輕度聚合表”
主要表模型
流量會話聚合天/月表
日新日活次元聚合表
事件會話聚合天/月表
訪客連續活躍區間表
新使用者留存次元聚合表
營運位次元聚合表
管道拉新次元聚合表
訪客分布次元聚合表
使用者事件鍊聚合表(支撐轉化分析,進階留存分析等)
……更多
ODS層詳細設計ODS層功能
ODS:操作資料層
主要作用:直接映射操作資料(原始資料),資料備份;
模組化方法:與原始資料結構保持完全一緻
存儲周期:相對來說,存儲周期較短;視資料規模,增長速度,以及業務的需求而定;對于埋點日志資料ODS層存儲,通常可以選擇3個月或者半年;存1年的是土豪公司(或者确有需要,當然,也有可能是資料量很小)
ODS層開發手冊
3類日志資料
資料類型 輸入路徑(HDFS目錄) 目标位置(HIVE表)
app端埋點日志 /logdata/app/2020-07-26/ ods.event_app_log 分區:2020/07/26
web端埋點日志 /logdata/web/2020-07-26/ ods.event_web_log 分區:2020/07/26
wx小程式埋點日志 /logdata/wxapp/2020-07-26/ ods.event_wxapp_log 分區:2020/07/26
H5頁面埋點日志 /
表明命名規範:層級.主題_資料域_功能_粒度
比如:app事件明細表: dwd.event_app_detail_d
要求:
原始日志格式
普通文本檔案,JSON資料格式,導入hive表後,要求可以很友善地select各個字段
分區表
外部表
建立外部表
由于原始資料是普通文本檔案,而檔案内容是json格式的一條一條記錄
在建立hive表結構進行映射時,有兩種選擇:
1.将資料視為無結構的string
2.将資料按json格式進行映射(這需要外部工具包JsonSerde 的支援)
本項目采用方案2來進行建表映射
hive完整建表語句和解析
--指定表名
create table t3(
id int,
name string
)
--指定分區
PARTITIONED BY (dt string)
--指定分隔符
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'=',',
'line.delim'='\n'
)
--指定輸入輸出格式
STORED AS
INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
--表屬性
TBLPROPERTIES(
'EXTERNAL'='TRUE',
'comment'='this is a ods table',
'orc.compress'='snappy'
)
;
-- ODS層,app埋點日志對應表模型建立
--先删除表防止有這個表明
DROP TABLE IF EXISTS `ods.event_app_log`;
--指定表明和外部表的屬性
CREATE EXTERNAL TABLE `ods.event_app_log`(
`account` string ,
`appid` string ,
`appversion` string ,
`carrier` string ,
`deviceid` string ,
`devicetype` string ,
`eventid` string ,
`ip` string ,
`latitude` double ,
`longitude` double ,
`nettype` string ,
`osname` string ,
`osversion` string ,
`properties` map<string,string> ,
`releasechannel` string ,
`resolution` string ,
`sessionid` string ,
`timestamp` bigint
)
--指定分區
PARTITIONED BY (`dt` string)
--指定分隔符這邊是以json格式切割
ROW FORMAT SERDE
'org.apache.hive.hcatalog.data.JsonSerDe'
--指定輸入輸出格式
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
--指定輸出路徑
LOCATION
'hdfs://linux01:8020/user/hive/warehouse/ods.db/event_app_log'
--表屬性版本号之類的沒啥用
TBLPROPERTIES (
'bucketing_version'='2',
'transient_lastDdlTime'='1610337798'
);
接下來就是具體操作
流程是我們會從别的資料庫通過flime導到自己的hdfs中,然後經過自己的hive處理成結構化資料再放回到自己的hdfs上,原來的資料就沒了
flime就是上面的上遊下遊,hive建表就是上面的埋點資料模組化然後load一下
-- 資料入庫
load data inpath '/logdata/app/2021-01-10' into table ods.event_app_log partition(dt='2021-01-10');
-- 如何删除一個表中已存在分區
alter table ods.event_app_log drop partition(dt='2020-01-10');
-- 不适用load,如何添加一個分區到已存在的表中
alter table ods.event_app_log add partition(dt='2020-01-11') location '/abc/ddd/'
-- 入庫腳本開發
#!/bin/bash
######################################
#
# @author :
# @date : 2021-01-11
# @desc : app端埋點日志入庫
# @other
######################################
export JAVA_HOME=/opt/apps/jdk1.8.0_141/
export HIVE_HOME=/opt/apps/hive-3.1.2/
DT=$(date -d'-1 day' +%Y-%m-%d)
${HIVE_HOME}/bin/hive -e "
load data inpath '/logdata/app/${DT}' into table ods.event_app_log partition(dt='${DT}')
"
if [ $? -eq 0 ]
then
echo "${DT}app success"
else
echo "wx failed"
fi