天天看點

大資料項目1

埋點日志采集

埋點在本項目中有三大類

App端行為日志

PC web端行為日志

微信小程式端行為日志

需求

日志生成在N台伺服器中,現在需要使用flume采集到HDFS

  • 3類日志采集後要分别存儲到不同的hdfs路徑
  • 日志中的手機号,賬号需要脫敏處理(加密)
  • 不同日期的資料要寫到不同的檔案夾,且配置設定應以事件時間為依據
  • 因為日志伺服器所在子網跟HDFS叢集不在同一個網段,需要中轉傳輸
    大資料項目1

上遊

a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2


a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1 g2
a1.sources.r1.filegroups.g1 = /opt/data/logdata/app/event.*
a1.sources.r1.filegroups.g2 = /opt/data/logdata/wx/event.*
a1.sources.r1.headers.g1.datatype = app
a1.sources.r1.headers.g2.datatype = wx
a1.sources.r1.batchSize = 100

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn._51doit.flnm.demo02.FieldEncryptInterceptor$FieldEncryptInterceptorBuilder
a1.sources.r1.interceptors.i1.headerName = timestamp
a1.sources.r1.interceptors.i1.timestamp_field = timeStamp
a1.sources.r1.interceptors.i1.to_encrypt_field = account


a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/data/flumedata/file-channel/checkpoint
a1.channels.c1.dataDirs = /opt/data/flumedata/file-channel/data


a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux02
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 100


a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux03
a1.sinks.k2.port = 41414
a1.sinks.k2.batch-size = 100

# 定義sink組及其配套的sink處理器
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000
啟動指令
bin/flume-ng agent -c conf/ -f agentsconf/shangyou -n a1 -Dflume.root.logger=INFO,console 
           

下遊

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sources.r1.batchSize = 100

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/data/flumedata/file-channel/checkpoint
a1.channels.c1.dataDirs = /opt/data/flumedata/file-channel/data

a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://linux01:8020/logdata/%{datatype}/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = DoitEduData
a1.sinks.k1.hdfs.fileSuffix = .log.gz
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 268435456
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.codeC = gzip
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.useLocalTimeStamp = false
           

攔截器jia包代碼

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author 濤哥
 * @nick_name "deep as the sea"
 * @contact qq:657270652 wx:doit_edu
 * @site www.doitedu.cn
 * @date 2021-01-11
 * @desc 項目字段加密及時間戳提取攔截器
 */
public class FieldEncryptInterceptor implements Interceptor {

    String timestamp_field;
    String to_encrypt_field;
    String headerName;

    public FieldEncryptInterceptor(String timestamp_field, String to_encrypt_field, String headerName) {

        this.timestamp_field = timestamp_field;
        this.to_encrypt_field = to_encrypt_field;
        this.headerName = headerName;

    }

    public void initialize() {

    }

    public Event intercept(Event event) {

        // 根據要加密的字段,從event中提取原值(用json解析)
        try {
            String line = new String(event.getBody());
            JSONObject jsonObject = JSON.parseObject(line);

            String toEncryptField = jsonObject.getString(to_encrypt_field);
            String timeStampField = jsonObject.getString(timestamp_field);

            // 加密
            if (StringUtils.isNotBlank(toEncryptField)) {
                String encrypted = DigestUtils.md5Hex(toEncryptField);

                // 将加密後的值替換掉原值
                jsonObject.put(to_encrypt_field, encrypted);

                // 轉回json,并放回event
                String res = jsonObject.toJSONString();
                event.setBody(res.getBytes("UTF-8"));
            }

            // 放入時間戳到header中
            event.getHeaders().put(headerName, timeStampField);

        } catch (Exception e) {

            event.getHeaders().put("datatype", "malformed");

            e.printStackTrace();
        }


        return event;
    }

    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }

        return list;
    }

    public void close() {

    }


    public static class FieldEncryptInterceptorBuilder implements Interceptor.Builder {
        String timestamp_field;
        String to_encrypt_field;
        String headerName;

        public Interceptor build() {

            return new FieldEncryptInterceptor(timestamp_field, to_encrypt_field, headerName);
        }

        public void configure(Context context) {
            timestamp_field = context.getString("timestamp_field");
            headerName = context.getString("headerName");
            to_encrypt_field = context.getString("to_encrypt_field");


        }
    }
}

           

項目背景(為什麼要有大資料)

某些APP上線後,由于業務模式新尹,市場需求大,經過一段時間的精心營運後,逐漸積累起了上千萬,使用者,以及三四百萬的日活量,app的業務能和産品.數量也急速膨脹,随着規模的增長,逐漸凸顯出大量問題.

  • 營銷分析斷層:市場銷售成本居高不下,投放拉新的效果追蹤出現斷層,無法追各管道實際化率,難以準确分析ROI
  • 使用者營運不精準:"千人一面"的全量使用者營銷,投入産量難以把控,不精準的粗犷方式難以真正提升存量使用者的長期活躍度
  • 全局營運名額監控不實時:有營運的BI系統,但營運名額監控不及時,未形成核心的名額預警機制,決策滞後

    公司急需告别這種粗放的,嚴重依賴人力的營運情況,繼續建設一套強大的資料營運平台,用于驅動營銷管道效果評估,用于精細化營運改進,産品功能及使用者體驗優化,老闆看闆輔助管理決策,産品個性化推薦改造,使用者标簽體系建構等應用市場

    從各方面為公司的進一步發展提供有力的資料支撐

需求總覽:行為事件域分析

基礎統計分析

  • 整體概況:從産品的整體的使用情況出發,對産品的整體使用情況有基礎了解
  • 使用者擷取:從獲客管道和版本的方向出發,根據不同管道,不同的版本生成一些可以了解管道優略的名額,可以清晰的觀察每個管道的流量,轉化等情況
  • 活躍與留存:從使用者的通路和粘性出發,可以觀察出産品在使用者通路,回訪等方面的趨勢變化,清楚的了解使用者對産品的粘性和沉浸程度
  • 事件轉化;根據選擇的事件和屬性,生成該事件的發生次數,人數,分布等資料名額,可以了解整體的使用者轉化以及收益相關的資料情況
  • 使用者特征:根據地理位置.性别作業系統當一些基礎屬性,将進行分組,友善了解使用者的分布占比情況

基礎統計分析名額概覽

整體概況

産品整體的使用情況,包括使用者量、通路情況、留存等幫助對産品整體名額有一個大緻的了解

累計使用者量:産品上線至今的累計使用者量

每日新增使用者量

每日的全部通路人數、次數

每日的全部通路的人均次數/時長/深度

新老使用者通路占比

每日新老使用者的分布情況

新使用者/全部使用者的7日留存:起始和後續事件都為使用者進行頁面通路

各頁面的通路次數分布:基于pageview事件中的頁面标題屬性進行分組

通路終端(app /pc web /微信小程式 /H5)分布:按照通路的作業系統分組

使用者擷取

管道通路

每個管道的使用者的使用情況,包括管道中新使用者的占比、留存等,了解産品在獲客層面上的優勢與不足;其中App的管道資料,會根據iOS,Android進行細分

新增使用者量:全部新使用者數量,包括自然流量和管道流量

管道新增使用者量:僅計算管道流量新增使用者數

各管道新使用者人均通路時長

異常流量:App 異常流量,定義為打開5秒内即進行關閉操作的通路行為

版本資料

App 每個版本的使用情況,幫助了解在産品更新的過程中,是否在活躍和留存方面有所改善

版本通路流量

人均通路時長

各版本留存:各版本的使用者7日留存

活躍與留存

通路流量

産品的每日通路資料,名額集中在新老使用者的通路行為上,提供通路次數、時長、次數分布、通路時段高峰等名額,幫助了解新老使用者在使用産品時的一些行為特征

通路使用者數

新老使用者通路占比

新老使用者人均使用時長

新老使用者啟動/通路次數

每日/每周啟動時段

使用者每日通路産品的時段分布

使用者每周通路産品的星期分布

使用者留存

提供使用者7日,次日,次周,次月留存的資料,幫助了解新老使用者的使用粘性

7日留存/流失

次日留存/流失

次周留存/流失

次月留存/流失

事件轉化

事件轉化

各類關鍵事件(如收藏,分享,轉發,加購等),發生次數、人數以及分布情況

新老使用者事件發生次數/人數/人均次數

事件次數的分布

收益類事件轉化

使用者自定義收益類事件,神策會自動生成該事件的發生次數、人數以及分布情況,會根據您選擇的數值類型屬性,計算該數值的總值、人均值以及次均值

新老使用者收益事件發生次數/人數/人均次數

新老使用者收益事件

使用者特征

通路省份分布

通路城市分布

通路性别分布

通路作業系統分布

新老使用者占比

技術選型

資料采集:flume

存儲平台:hdfs

基礎設施:hive

運算引擎:mapreduce/spark

資源排程:yarn

任務排程:azkaban/oozie

中繼資料管理:atlas(或自研系統)

OLAP引擎:kylin/presto (或clickhouse)

前端界面:superset(或自研javaweb系統)

分層設計

分層原因

資料倉庫中的資料表,往往是分層管理、分層計算的;

所謂分層,具體來說,就是将大量的資料表按照一定規則和定義來進行邏輯劃分;

ADS層: 應用服務層

DWS層:數倉服務(service/summary)層(輕度聚合)

DWD層:數倉明細層

ODS層:操作資料(最原始的資料)層 – 貼源層

DIM層:存儲維表

ODS層:對應着外部資料源ETL到數倉體系之後的表!

DWD層:數倉明細層;一般是對ODS層的表按主題進行加工和劃分;本層中表記錄的還是明細資料;

DWS層:數倉服務層;

ADS層: 應用層,主要是一些結果報表!

分層的意義:資料管理更明晰!運算複用度更高!需求開發更快捷!便于解耦底層業務(資料)變化!

分層詳解

ODS層

資料内容:存放flume采集過來的原始日志

存儲格式:以json格式文本檔案存儲

存儲周期:3個月

DWD層

資料内容:對ODS層資料做ETL處理後的扁平化明細資料

存儲格式:以orc / parquet檔案格式存儲

存儲周期:6個月

DWS層

資料内容:根據主題分析需求,從DWD中輕度聚合後的資料

存儲格式:以ORC/PARQUET檔案格式存儲

存儲周期:1年

ADS層

資料内容:根據業務人員需求,從DWS計算出來的報表

存儲格式:以ORC/PARQUET檔案格式存儲

存儲周期:3年

DIM層

存儲各種維表

模型設計

ODS層

與原始日志資料保持完全一緻

我們有APP端日志,PC端日志,微信小程式端日志,分别對應ODS的三個表

ODS.ACTION_APP_LOG

ODS.ACTION_WEB_LOG

ODS.ACTION_WXAPP_LOG

建表時,一般采用外部表;

表的資料檔案格式:跟原始日志檔案一緻

分區:按天分區(視資料量和計算頻度而定,如資料量大且需每小時計算一次,則可按小時粒度分區)

DWD層

模組化思想

通常是對ODS層資料進行精細化加工處理

不完全星型模型

事實表中,不是所有次元都按次元主鍵資訊存儲(次元退化)

地域次元資訊:年月日周等時間次元資訊,這些次元資訊,基本不會發生任何改變,并且在大部分主題分析場景中,都需要使用,直接在事實表中存儲次元值

頁面資訊:頁面類别資訊,頻道資訊,業務活動資訊,會員等級資訊等,可能發生緩慢變化的次元資訊,事實表中遵循經典理論存儲次元主鍵,具體次元值則在主題分析計算時臨時關聯

事實表

app_event_detail: APP-Event事件明細表

web_event_detail: WEB-Event事件明細表

wxapp_event_detail: 小程式-Event事件明細表

次元表

coupon_info

ad_info

campain_info

lanmu_info

page_info

page_type

pindao_info

promotion_location

huodong_info

miaosha_info

product

product_detail

product_type

shop_info

tuangou_info

user_info

DWS層

模組化思想

主題模組化

次元模組化

最主要思路:按照分析主題,"彙總"各類資料成大寬表

也有一些做法是,将DWS層的表設計成“輕度聚合表”

主要表模型

流量會話聚合天/月表

日新日活次元聚合表

事件會話聚合天/月表

訪客連續活躍區間表

新使用者留存次元聚合表

營運位次元聚合表

管道拉新次元聚合表

訪客分布次元聚合表

使用者事件鍊聚合表(支撐轉化分析,進階留存分析等)

……更多

ODS層詳細設計ODS層功能

ODS:操作資料層

主要作用:直接映射操作資料(原始資料),資料備份;

模組化方法:與原始資料結構保持完全一緻

存儲周期:相對來說,存儲周期較短;視資料規模,增長速度,以及業務的需求而定;對于埋點日志資料ODS層存儲,通常可以選擇3個月或者半年;存1年的是土豪公司(或者确有需要,當然,也有可能是資料量很小)

ODS層開發手冊

3類日志資料

資料類型 輸入路徑(HDFS目錄) 目标位置(HIVE表)

app端埋點日志 /logdata/app/2020-07-26/ ods.event_app_log 分區:2020/07/26

web端埋點日志 /logdata/web/2020-07-26/ ods.event_web_log 分區:2020/07/26

wx小程式埋點日志 /logdata/wxapp/2020-07-26/ ods.event_wxapp_log 分區:2020/07/26

H5頁面埋點日志 /

表明命名規範:層級.主題_資料域_功能_粒度

比如:app事件明細表: dwd.event_app_detail_d

要求:

原始日志格式

普通文本檔案,JSON資料格式,導入hive表後,要求可以很友善地select各個字段

分區表

外部表

建立外部表

由于原始資料是普通文本檔案,而檔案内容是json格式的一條一條記錄

在建立hive表結構進行映射時,有兩種選擇:

1.将資料視為無結構的string

2.将資料按json格式進行映射(這需要外部工具包JsonSerde 的支援)

本項目采用方案2來進行建表映射

hive完整建表語句和解析

--指定表名
create table t3(
id int,
name string
)
--指定分區
PARTITIONED BY (dt string)
--指定分隔符
ROW FORMAT 
SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (   
  'field.delim'=',',   
  'line.delim'='\n'
)
--指定輸入輸出格式
STORED AS
  INPUTFORMAT 
    'org.apache.hadoop.mapred.TextInputFormat'
  OUTPUTFORMAT 
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
--表屬性
TBLPROPERTIES(
  'EXTERNAL'='TRUE',
  'comment'='this is a ods table',
  'orc.compress'='snappy'
)
;
           
-- ODS層,app埋點日志對應表模型建立
--先删除表防止有這個表明
DROP TABLE IF EXISTS `ods.event_app_log`;
--指定表明和外部表的屬性
CREATE EXTERNAL TABLE `ods.event_app_log`(         
  `account` string ,    
  `appid` string ,      
  `appversion` string , 
  `carrier` string ,    
  `deviceid` string ,   
  `devicetype` string , 
  `eventid` string ,    
  `ip` string ,         
  `latitude` double ,   
  `longitude` double ,  
  `nettype` string ,    
  `osname` string ,     
  `osversion` string ,  
  `properties` map<string,string> ,  
  `releasechannel` string ,   
  `resolution` string ,   
  `sessionid` string ,   
  `timestamp` bigint 
)   
--指定分區
PARTITIONED BY (`dt` string)    
--指定分隔符這邊是以json格式切割                                  
ROW FORMAT SERDE                                    
  'org.apache.hive.hcatalog.data.JsonSerDe' 
--指定輸入輸出格式
STORED AS INPUTFORMAT                               
  'org.apache.hadoop.mapred.TextInputFormat'        
OUTPUTFORMAT                                        
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  
--指定輸出路徑
LOCATION                                            
  'hdfs://linux01:8020/user/hive/warehouse/ods.db/event_app_log'  
--表屬性版本号之類的沒啥用
TBLPROPERTIES (                                     
  'bucketing_version'='2',                          
  'transient_lastDdlTime'='1610337798'
);
           

接下來就是具體操作

流程是我們會從别的資料庫通過flime導到自己的hdfs中,然後經過自己的hive處理成結構化資料再放回到自己的hdfs上,原來的資料就沒了

flime就是上面的上遊下遊,hive建表就是上面的埋點資料模組化然後load一下

-- 資料入庫
load data inpath '/logdata/app/2021-01-10' into table ods.event_app_log partition(dt='2021-01-10');

-- 如何删除一個表中已存在分區
alter table ods.event_app_log drop partition(dt='2020-01-10');

-- 不适用load,如何添加一個分區到已存在的表中
alter table ods.event_app_log add partition(dt='2020-01-11') location '/abc/ddd/'
           
-- 入庫腳本開發
#!/bin/bash
######################################
#
#  @author :  
#  @date   :  2021-01-11
#  @desc   :  app端埋點日志入庫
#  @other  
######################################

export JAVA_HOME=/opt/apps/jdk1.8.0_141/
export HIVE_HOME=/opt/apps/hive-3.1.2/


DT=$(date -d'-1 day' +%Y-%m-%d)

${HIVE_HOME}/bin/hive -e "
load data inpath '/logdata/app/${DT}' into table ods.event_app_log partition(dt='${DT}')
"

if [ $? -eq 0 ]
then
 echo "${DT}app success"
else
 echo "wx failed"
fi
           

繼續閱讀