天天看點

日志資料如何同步到MaxCompute

摘要:日常工作中,企業需要将通過ECS、容器、移動端、開源軟體、網站服務、JS等接入的實時日志資料進行應用開發。包括對日志實時查詢與分析、采集與消費、資料清洗與流計算、資料倉庫對接等場景。本次分享主要介紹日志資料如何同步到MaxCompute。具體講解如何通過Tunnel,DataHub,日志服務SLS以及Kafka将日志資料投遞到MaxCompute的參數介紹和詳細同步過程等内容。

演講嘉賓簡介:劉建偉,阿裡雲智能技術支援工程師

本次直播視訊精彩回顧,戳這裡!

https://yq.aliyun.com/live/1575

以下内容根據演講視訊以及PPT整理而成。

本次分享主要圍繞以下四個方面:

一、實驗目的

二、方案介紹

三、方案比較及場景應用

四、操作步驟

一、實驗目的及方案介紹

1.實驗目的

日常工作中,企業需要将通過ECS、容器、移動端、開源軟體、網站服務、JS等接入的實時日志資料進行應用開發。包括日志實時查詢與分析、采集與消費、資料清洗與流計算、資料倉庫對接等場景。

日志資料如何同步到MaxCompute

2.方案介紹

日志資料同步到MaxCompute的場景主要有四個方案。

方案一:使用Tunnel指令上傳日志資料到MaxCompute。

方案二:通過DataHub投遞資料到MaxCompute。DataHub DataConnector是把DataHub服務中的流式資料同步到其他雲産品中的功能,目前支援将Topic中的資料實時/準實時同步到MaxCompute、OSS、ElasticSearch、RDS Mysql、ADS.、TableStore中。使用者隻需要向DataHub中寫入一次資料,并在DataHub服務中配好同步功能,便可以在各個雲産品中使用這份資料。

方案三:通過SLS實時采集與消費( LogHub )投遞資料到MaxCompute。也可通過DataWorks的資料內建( Data Integration )功能投遞至MaxCompute。

方案四:通過Kafka訂閱實時資料投遞至MaxCompute。

其中方案二(DataHub)和方案三( LogHub )差異化不強,均屬于消息隊列。一般來說DataHub用于進行公測或自研。

日志資料如何同步到MaxCompute

三、方案比較及場景應用

1. Tunnel

Tunnel主要用于批量上傳資料到離線表中,适用于離線計算的場景。對于特殊格式日志,一般建議将日志作為一個整體字段上傳到MaxCompute表中,再進行拆分。

2. DataHub

DataHub用于實時上傳資料的場景,主要用于流式計算場景。資料上傳後會儲存到實時表裡,後續會在幾分鐘内通過定時任務的形式同步到離線表裡,供離線計算使用。

3.日志服務(SLS)

LogHub:可适用于資料清洗(ETL)、流計算( Stream Compute)、監控與報警、機器學習與疊代計算等場景。其實時性強,寫入即可消費。

Logtail (采集Agent ):實時采集傳輸,1秒内到達服務端( 99.9%)。寫入即可查詢分析。此外可支援海量資料,對資料量不設上限。種類豐富,支援行、列、TextFile等各種存儲格式。而且配置靈活,支援使用者自定義Partition等配置。

LogShipper(投遞數倉):可支援穩定可靠的日志投遞。将日志中樞資料投遞至存儲類服務進行存儲。支援壓縮、自定義Parition,以及行列等各種存儲方式。可以進行資料倉庫、資料分析、審計c推薦系統與使用者畫像場景的開發。支援通過控制台資料接入向導一站式配置正則模式采集日志與設定索引。

4.Kafka

Kafka是一款分布式釋出與訂閱的消息中間件,有高性能、高吞量的特點,每秒能處理上百萬消息。Kafka适用于流式資料處理。可應用場景分别是大資料領域和資料內建。大資料領域主要應用于使用者行為跟蹤、日志收集等場景。結合數倉将消息導入MaxCompute、 OSS、RDS、Hadoop.、HBase等離線資料倉庫。

四、操作步驟

1. 方案一:通過Tunnel指令上傳日志資料到MaxCompute

環境準備及步驟:

(1) 開通MaxCompute服務,安裝odpscmd用戶端。

(2) 準備日志服務資料。

(3) 建立MaxCompute表,用于儲存日志資料。

(4) 使用特征指令:tunnel u C:UsersDesktopweijing_loghub_demo.csv tunnel_log。

u即upload,随後是上傳路徑和用于存儲日志資料的表名。如下圖所示,執行後日志資料投遞成功,說明已經通過Tunnel指令将日志資料上傳到MaxCompute中。

日志資料如何同步到MaxCompute
(5) 查詢表資料是否導入成功。
日志資料如何同步到MaxCompute

查詢後顯示表中已有資料導入,說明日志資料成功導入MaxCompute表中。

注意事項:

(1) 使用Tunnel指令行工具上傳資料目前不支援通配符或正規表達式指令。若使用者想借助正規表達式上傳資料,可使用方案三(LogHub支援正規表達式)。

(2) 對于特殊格式的日志資料,一般建議作為一個整體字段上傳,到MaxCompute裡,再進行拆分。

2. 方案二:通過DataHub投遞日志資料到MaxCompute

(1) 登入阿裡雲DataHub控制台,建立Project。

(2) 進入Project管理頁面 Project清單->Project檢視,建立Topic。建立Topic有兩種方式,一是直接建立,或者是導入MaxCompute表結構。

(3) 選擇導入MaxCompute表結構。輸入MaxCompute項目,選擇項目名稱。輸入MaxCompute表,此MaxCompute表可以是已經建立的表,也可建立表名,會自動建立一個MaxCompute表。然後填寫AccessidAccessKey資訊。勾選自動建立DataConnector,勾選後會在建立Topic時自動建立一個DataConnector。填寫Topic名稱。Shard數量預設為1k KPS,使用者可根據自己的資料流量進行設定。再設定生命周期。

日志資料如何同步到MaxCompute

注意:Schema對應MaxCompute表,該表字段類型、名稱、順序必須與DataHub Topic字段完全一緻,如果三個條件中的任意一個不滿足,則歸檔Connector無法建立。

(4) 建立好的DataConnector詳細資訊如下圖所示。包括已同步時間、最新寫入資料時間、MaxCompute Endpoint、運作狀态、髒資料量、目前點位。目前點位從0開始,下圖示例中為2,說明已導入三條資料。

日志資料如何同步到MaxCompute

直接建立Topic: 首先輸入一個MaxCompute項目,再輸入MaxCompute表,可為已知表或建立表。再輸入相應ACCESSIDACCESSKEY資訊。

(5) 如果已經建立Topic,隻需要在詳情頁的右上角點選 + DataConnector。

分區範圍包含SYSTEM_TIME、EVENT_TIME、USER_DEFINE三種模式,SystemTime模式會使用寫入時間轉化為字元串進行分區。EventTime模式會根據topic中固定的event_time字段時間進行分區(需要在建立Topic時增加一個TIMESTAMP類型名稱為event_time的字段,并且寫入資料時向這個字段寫入其微秒時間)。UserDefine模式将會直接使用使用者自定義的分區字段字元串分區。分區格式目前僅支援固定格式。

日志資料如何同步到MaxCompute

(6) 回到DataHub控制台,點選Topic。

如下圖所示,可以看到Shard通道,會記錄資料寫入時間、最新資料時間、資料量以及目前存儲量。

日志資料如何同步到MaxCompute
點選DataConnector,可檢視其詳細資訊。包括配置的目标服務、目标描述以及最新寫入資料的時間。
日志資料如何同步到MaxCompute
(7) 日志資料抽樣。如下圖所示點選資料抽樣可以檢視寫入DataHub中的日志資料。
日志資料如何同步到MaxCompute
點選檢視DataConnector詳細資訊,可看到歸檔資訊包括目前點位、髒資料、運作狀态。如果運作狀态為失敗,需要檢查原因,一般可能為Endpoint配置問題。DataHub投入資料到MaxCompute離線表中預設每60M commit一次,或五分鐘進行一次強行寫入,可以保證至少五分鐘一次資料同步。
日志資料如何同步到MaxCompute
日志資料如何同步到MaxCompute
(8) 測試日志資料是否投遞成功。如下圖所示,進行掃描發現有資料寫入,說明通過DataHub投遞資料至MaxCompute成功。
日志資料如何同步到MaxCompute

(1) 目前所有DataConnector均僅支援同一Region的雲服務之間同步資料,不支援同步資料到跨Region的服務。

(2) DataConnector所配置的目标服務Endpoint需要填寫相應的内網域名(經典網絡),不支援使用公網域名同步。

(3) 資料同步目前僅支援at least once語義,在網絡服務異常等小機率場景下可能會導緻目的端的資料産生重複但不會丢失,需要做去重處理。

(4) topic預設可建立20個,如果需要建立更多,需送出工單申請。

3. 方案三:通過LogHub投遞日志資料到MaxCompute

直接通過LogHub投遞:

(1) 開通日志服務,登入日志服務控制台,建立新的Project或者單擊已建立的Project名稱。

(2) 建立新的Logstore或者單擊已經建立好的Logstore名稱。

(3) 單擊對應的Logstore,查詢分析導入到LogHub的日志資料。如下圖示範所示幾條資料即為LogHub的日志資料,需要将其同步到MaxCompute表中。

日志資料如何同步到MaxCompute

(4) 選擇需要投遞的日志庫名稱并依次展開節點,日志庫名稱->資料處理->導出->MaxCompute。單擊開始投遞。

(5) 單擊開啟投遞以進入LogHub->資料投遞頁面。進行相關配置。

(6) 配置投遞規則,在LogHub->資料投遞頁面配置字段關聯等相關内容。如下圖所示。

a) 自定義一個投遞名稱

b) MaxCompute表名稱,項目名以及日志表明。輸入自定義的建立的MaxCompute表名稱或者選擇已有的MaxCompute表。

c) 字段關聯。左邊是LogHub表的字段列,右邊是MaxCompute表中的字段列。按序,左邊填寫與MaxCompute表資料列相映射的日志服務字段名稱,右邊填寫或選擇MaxCompute表的普通字段名稱及字段類型。

d) 分區字段:__partition_time__

格式:将日志時間作為分區字段,通過日期來篩選資料是MaxCompute常見的過濾資料方法。__partition_time__是根據日志__time__值計算得到(不是日志寫入服務端時間,也不是日志投遞時間),結合分區時間格式,向下取整。

日志資料如何同步到MaxCompute

(7) 在投遞管理頁面,單擊修改即可對之前的配置資訊進行編輯。如果想新增列,可以在大資料計算服務MaxCompute修改投遞的資料表列資訊,單擊修改後會加載最新的資料表資訊。

(8) 投遞任務管理。如下圖所示,配置成功後點選确定,會提示成功配置了一個資料投遞到MaxCompute。

日志資料如何同步到MaxCompute

啟動投遞功能後,日志服務背景會定期啟動離線投遞任務。使用者可以在控制台上看到這些投遞任務的狀态和錯誤資訊。當投遞任務發生錯誤時,需要檢視錯誤資訊,問題解決後可以通過雲控制台中日志投遞任務管理或SDK來重試失敗任務。

(9) 檢視日志投遞的運作狀态。如下圖所示日志開始投遞後狀态是運作中,運作行數為0,日志投遞成功後運作行數為11,狀态轉變為成功。說明通過LogHub投遞日志到MaxCompute成功。

日志資料如何同步到MaxCompute
日志資料如何同步到MaxCompute

(10) 日志投遞MaxCompute後,檢查資料完整性。

a) 通過控制台或API/SDK判斷(推薦)。使用API、SDK或者控制台擷取指定Project/Logstore投遞任務清單。控制台會對該傳回結果進行可視化展示。

b) 通過MaxCompute分區粗略估計,比如在MaxCompute中以半小時做一次分區,投遞任務為每30分鐘一次,當表中包含以下分區:

2019_10_25_10_00
2019_10_25_10_30           

當發現分區2019_10_25_11_00出現時,說明11:00之前分區資料已經完整。該方法不依賴API,判斷方式簡單但結果并不精确,僅用作粗略估計。

(11) 查詢MaxCompute表中資料。

日志資料如何同步到MaxCompute

(1) 數加控制台建立、修改投遞配置必須由主賬号完成,不支援子賬号操作。

(2) 不同Logstore的資料不可導入到同一個MaxCompute表中,否則會造成分區沖突、丢失資料等後果。

(3) MaxCompute表至少包含一個資料列、一個分區列。

(4) MaxCompute單表有分區數目6萬的限制,分區數超出後無法再寫入資料,是以日志服務導入MaxCompute表至多支援3個分區列。需謹慎選擇自定義字段作為分區列,保證其值可枚舉。

(5) 日志服務資料的一個字段最多允許映射到一個MaxCompute表的列(資料列或分區列),不支援字段備援,同一個字段名第二次使用時其投遞的值為null,如果null出現在分區列會導緻資料無法被投遞。

(6) 投遞MaxCompute是批量任務,需謹慎設定分區列及其類型:保證一個同步任務内處理的資料分區數小于512個;用作分區列的字段值不能為空或包括‘/’等MaxCompute保留字段。

(7) 不支援海外Region的MaxCompute投遞,海外Region的MaxCompute可使用DataWorks進行資料同步。

DataWorks:

(1) 登入阿裡雲LogHub控制台,建立Project。

(2) 登入DataWorks控制台,單擊對應項目進入資料內建。

(3) 配置LogHub資料源。進入同步資源管理->資料源頁面,單擊右上角的新增資料源。

(4) 選擇資料源類型為LogHub,填寫新增LogHub資料源對話框中的配置,如下圖所示。選擇環境,填寫資料源名稱、LogHub Endpoint、在LogHub中建立的Project以及ACCESSIDACCESSKEY資訊。填寫完成後單擊測試連通性。測試連通性通過說明資料源可以正确使用。單擊确定。

日志資料如何同步到MaxCompute

(5) 配置同步任務。可選擇向導模式,通過簡單便捷的可視化頁面完成任務配置;或者選擇腳本模式,深度自定義配置同步任務。

建立業務流程->資料內建->建立資料內建節點->資料同步進入資料同步任務配置頁面。

使用向導模式需要填寫日志開始時間和結束時間。日志開始時間表示資料消費的開始時間位點,為yyyyMMddHHmmss格式的時間字元串(比如20191025103000),左閉右開。日志結束時間表示資料消費的結束時間位點,格式與日志開始時間相同。

批量條數:一次讀取的資料條數,預設為256。

日志資料如何同步到MaxCompute

a) 向導模式配置同步任務。

配置資料源及資料去向。資料源為LogHub,填寫Logstore,配置日志開始時間和日志結束時間。資料去向可選擇MaxCompute的一個資料源,選擇需要導入的表,填寫資料資訊,進行字段映射。儲存資料同步的配置。送出并運作。查詢MaxCompute表中資料,確定日志服務資料已經成功同步到MaxCompute。

b) 腳本模式配置同步任務。

導入模闆,選擇資料源和目标資料源。編輯腳本。需要填寫reader和writer的腳本配置。

如下圖紅色框框選内容所示,reader端需要填寫beginDataTime(日志開始時間)、datasourse(資料源,填寫已配置好的資料源名稱)、encoding(編碼格式)、endDataTime(日志結束時間)、logstore(日志庫)等。下圖綠色框框選内容所示為writer端即(MaxCompute端)相應配置資訊。需要填寫列名,需要和LogHub配置的列名對應。填寫datasourse、partition、table、truncate。

日志資料如何同步到MaxCompute

儲存腳本資料節點,腳本模式配置成功後單擊送出并運作。運作後資料即會從LogHub投遞到MaxCompute表中。

日志資料如何同步到MaxCompute

如上圖示範截圖所示,投遞成功。查詢MaxCompute表中資料,確定日志服務資料已經成功同步到MaxCompute。如下圖所示。

日志資料如何同步到MaxCompute

4. 方案四:通過Kafka投遞日志資料到MaxCompute

(1) 搭建Kafka叢集。

(2) 在控制台建立Topic和Consumer Group。

(3) Flume讀取日志檔案資料寫入到Kafka。首先為flume建構agent:進入flume下的配檔案夾中編寫建構agent的配置檔案。再通過此指令啟動flume的agent:bin/flume-ng agent -c conf -f 配置檔案夾名/配置檔案名-n a1 -Dflume.root.logger=INFO,console。然後啟動Kafka的消費者進行資料寫入bin/Kafka-console-consumer.sh:--zookeeper 主機名:2181 --topic Kafka_odps 。這樣就開啟了日志采集,檔案會寫入到Kafka中。

(4) 資料寫入到Kafka,通過資料內建DataWorks同步資料到MaxCompute。

建立業務流程->建立資料同步節點->轉換為腳本->配置腳本。

(5) 配置腳本資訊,運作腳本進行資料節點的同步。如下圖紅色框框選内容為reader端配置,綠色框框選内容為writer端配置。注意reader端需要配置server,為Kafka broke Server,格式為:ip 端口号。框選的列為Kafka的屬性列、資料列。下圖示範中所填寫内容為屬性列,再填寫相應Topic。需要填寫begin offset,即資料消費開始節點。MaxCompute writer端,一一對應Kafka左邊的列,需要填寫table表名、datasourse等。腳本配置完成後點選送出并運作。運作成功後日志資料就會投遞到MaxCompute表中。

日志資料如何同步到MaxCompute

(6) 查詢MaxCompute表中資料,確定日志服務資料成功同步到MaxCompute。如下圖示範所示。

日志資料如何同步到MaxCompute

(1) 資料同步的腳本編寫,Reader、Writer的配置。如果腳本配置有誤,資料投遞将會失敗,運作失敗。可以結合官方文檔進行配置。

(2) Flume采集日志資料中配置檔案的配置。配置檔案中的Topic必須與Kafka建立的Topic保持一緻。

歡迎加入“MaxCompute開發者社群2群”,點選連結申請加入或掃描二維碼

https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
日志資料如何同步到MaxCompute