天天看點

湖倉一體電商項目(四):項目資料種類與采集

作者:Lansonli

#頭條創作挑戰賽#

項目資料種類與采集

湖倉一體電商項目(四):項目資料種類與采集

實時數倉項目中的資料分為兩類,一類是業務系統産生的業務資料,這部分資料存儲在MySQL資料庫中,另一類是實時使用者日志行為資料,這部分資料是使用者登入系統産生的日志資料。

針對MySQL日志資料我們采用maxwell全量或者增量實時采集到大資料平台中,針對使用者日志資料,通過log4j日志将資料采集到目錄中,再通過Flume實時同步到大資料平台,總體資料采集思路如下圖所示:

湖倉一體電商項目(四):項目資料種類與采集

;

針對MySQL業務資料和使用者日志資料建構離線+實時湖倉一體資料分析平台,我們暫時劃分為會員主題和商品主題。下面了解下主題各類表情況。

一、MySQL業務資料

1、配置MySQL支援UTF8編碼

在node2節點上配“/etc/my.cnf”檔案,在對應的标簽下加入如下配置,更改mysql資料庫編碼格式為utf-8:

[mysqld]
character-set-server=utf8

[client]
default-character-set = utf8           

修改完成之後重新開機mysql即可。

2、MySQL資料表

MySQL業務資料存儲在庫“lakehousedb”中,此資料庫中的業務資料表如下:

2.1、會員基本資訊表 : mc_member_info

湖倉一體電商項目(四):項目資料種類與采集

;

2.2、 會員收貨位址表 : mc_member_address

湖倉一體電商項目(四):項目資料種類與采集

;

2.3、使用者登入資料表 : mc_user_login

湖倉一體電商項目(四):項目資料種類與采集

;

2.4、商品分類表 : pc_product_category

湖倉一體電商項目(四):項目資料種類與采集

;

2.5、商品基本資訊表 : pc_product

湖倉一體電商項目(四):項目資料種類與采集

;

3、MySQL業務資料采集

我們通過maxwell資料同步工具監控MySQL binlog日志将MySQL日志資料同步到Kafka topic “KAFKA-DB-BUSSINESS-DATA”中,詳細步驟如下:

3.1、配置maxwell config.properties檔案

進入node3“/software/maxwell-1.28.2”目錄,配置config.properties檔案,主要是配置監控mysql日志資料對應的Kafka topic,配置詳細内容如下:

producer=kafka kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092 kafka_topic=KAFKA-DB-BUSSINESS-DATA #設定根據表将binlog寫入Kafka不同分區,還可指定:database, table, primary_key, transaction_id, thread_id, column producer_partition_by=table #mysql 節點 host=node2 #連接配接mysql使用者名和密碼 user=maxwell password=maxwell #指定maxwell 目前連接配接mysql的執行個體id,這裡用于全量同步表資料使用 client_id=maxwell_first

3.2、啟動kafka,建立Kafka topic,并監控Kafka topic

啟動Zookeeper叢集、Kafka 叢集,建立topic“KAFKA-DB-BUSSINESS-DATA” topic:

#進入Kafka路徑,建立對應topic
[root@node1 ~]# cd /software/kafka_2.11-0.11.0.3/bin/
[root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DB-BUSSINESS-DATA --partitions 3 --replication-factor 3

#監控Kafak topic 中的資料
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DB-BUSSINESS-DATA           

3.3、啟動maxwell

#在node3節點上啟動maxwell
[root@node3 ~]# cd /software/maxwell-1.28.2/bin/
[root@node3 bin]#  maxwell --config ../config.properties           

3.4、在mysql中建立“lakehousedb”并導入資料

#進入mysql ,建立資料庫lakehousedb
[root@node2 ~]# mysql -u root -p123456
mysql> create database lakehousedb;           

打開“Navicat”工具,将資料中的“lakehousedb.sql”檔案導入到MySQL資料庫“lakehousedb”中,我們可以看到在對應的kafka topic “KAFKA-DB-BUSSINESS-DATA”中會有資料被采集過來。

湖倉一體電商項目(四):項目資料種類與采集

;

二、使用者日志資料

1、使用者日志資料

目前使用者日志資料隻有“會員浏覽商品日志資料”,其詳細資訊如下:

  • 接口位址:/collector/common/browselog
  • 請求方式:post
  • 請求資料類型:application/json
  • 接口描述:使用者登入系統後,會有目前登入時間資訊及目前使用者登入後浏覽商品,跳轉連結、浏覽所獲積分等資訊
  • 請求示例:
{
    "logTime": 1646393162044,
    "userId": "uid53439497",
    "userIp": "216.36.11.233",
    "frontProductUrl": "https://fo0z7oZj/rInrtrb/ui",
    "browseProductUrl": "https://2/5Rwwx/SqqwwwOUsK4",
    "browseProductTpCode": "202",
    "browseProductCode": "q6HCcpwfdgfgfxd2I",
    "obtainPoints": 16,
}           
  • 請求參數解釋如下:
參數名稱 參數說明
logTime 浏覽日志時間
userId 使用者編号
userIp 浏覽Ip位址
frontProductUrl 跳轉前URL位址,有為null,有的不為null
browseProductUrl 浏覽商品URL
browseProductTpCode 浏覽商品二級分類
browseProductCode 浏覽商品編号
obtainPoints 浏覽商品所獲積分

2、使用者日志資料采集

日志資料采集是通過log4j日志配置來将使用者的日志資料集中擷取,這裡我們編寫日志采集接口項目“LogCollector”來采集使用者日志資料。

當使用者浏覽網站觸發對應的接口時,日志采集接口根據配合的log4j将使用者浏覽資訊寫入對應的目錄中,然後通過Flume監控對應的日志目錄,将使用者日志資料采集到Kafka topic “KAFKA-USER-LOG-DATA”中。

這裡我們自己模拟使用者浏覽日志資料,将使用者浏覽日志資料采集到Kafka中,詳細步驟如下:

2.1、将日志采集接口項目打包,上傳到node5節點

将日志采集接口項目“LogCollector”項目配置成生産環境prod,打包,上傳到node5節點目錄/software下。

2.2、編寫Flume 配置檔案a.properties

将a.properties存放在node5節點/software目錄下,檔案配置内容如下:

#設定source名稱
a.sources = r1
#設定channel的名稱
a.channels = c1
#設定sink的名稱
a.sinks = k1

# For each one of the sources, the type is defined
#設定source類型為TAILDIR,監控目錄下的檔案
#Taildir Source可實時監控目錄一批檔案,并記錄每個檔案最新消費位置,agent程序重新開機後不會有重複消費的問題
a.sources.r1.type = TAILDIR
#檔案的組,可以定義多種
a.sources.r1.filegroups = f1
#第一組監控的是對應檔案夾中的什麼檔案:.log檔案
a.sources.r1.filegroups.f1 = /software/lakehouselogs/userbrowse/.*log

# The channel can be defined as follows.
#設定source的channel名稱
a.sources.r1.channels = c1
a.sources.r1.max-line-length = 1000000
#a.sources.r1.eventSize = 512000000

# Each channel's type is defined.
#設定channel的類型
a.channels.c1.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
#設定channel道中最大可以存儲的event數量
a.channels.c1.capacity = 1000
#每次最大從source擷取或者發送到sink中的資料量
a.channels.c1.transcationCapacity=100

# Each sink's type must be defined
#設定Kafka接收器
a.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#設定Kafka的broker位址和端口号
a.sinks.k1.brokerList=node1:9092,node2:9092,node3:9092
#設定Kafka的Topic
a.sinks.k1.topic=KAFKA-USER-LOG-DATA
#設定序列化方式
a.sinks.k1.serializer.class=kafka.serializer.StringEncoder 
#Specify the channel the sink should use
#設定sink的channel名稱
a.sinks.k1.channel = c1           

2.3、在Kafka中建立對應的topic并監控

#進入Kafka路徑,建立對應topic
[root@node1 ~]# cd /software/kafka_2.11-0.11.0.3/bin/
[root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-USER-LOG-DATA --partitions 3 --replication-factor 3

#監控Kafak topic 中的資料
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-USER-LOG-DATA           

2.4、啟動日志采集接口

在node5節點上啟動日志采集接口,啟動指令如下:

[root@node5 ~]# cd /software/
[root@node5 software]# java -jar ./logcollector-0.0.1-SNAPSHOT.jar            

啟動之後,根據日志采集接口配置會在“/software/lakehouselogs/userbrowse”目錄中彙集使用者浏覽商品日志資料。

2.5、 啟動Flume,監控使用者日志資料到Kafka

在node5節點上啟動Flume,監控使用者浏覽日志資料到Kafka “KAFKA-USER-LOG-DATA” topic。

[root@node5 ~]# cd /software/
[root@node5 software]# flume-ng agent --name a -f /software/a.properties -Dflume.root.logger=INFO,console           

2.6、啟動模拟使用者浏覽日志代碼,向日志采集接口生産資料

在window本地啟動“LakeHouseMockData”項目下的“RTMockUserLogData”代碼,向日志采集接口中生産使用者浏覽商品日志資料。

啟動代碼後,我們會在Kafka “KAFKA-USER-LOG-DATA” topic 中看到監控到的使用者日志資料。

湖倉一體電商項目(四):項目資料種類與采集

;

三、錯誤解決

如果在向mysql中建立庫及表時有如下錯誤:

Err 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause and contains nonaggregated column 'information_schema.PROFILING.SEQ' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by

以上錯誤是由于MySQL sql_mode引起,對于group by聚合操作,如果在select中的列沒有在group by中出現,那麼這個SQL是不合法的。按照以下步驟來處理。

1、首先停止mysql,然後在mysql節點配置my.ini檔案

[root@node2 ~]# service mysqld stop           

打開/etc/my.cnf檔案,在mysqld标簽下配置如下内容:

mysqld sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

2、重新開機mysql即可解決

[root@node2 ~]# service mysqld start           

繼續閱讀