天天看點

Logstash + DataHub + MaxCompute/StreamCompute 進行實時資料分析

logstash是一款開源日志收集處理架構,有各種不同的input、filter、output插件,使用者使用這些插件可以将各種資料源導入到其他系統。

logstash-output-datahub插件,實作将資料導入datahub的功能,通過簡單的配置即可完成資料采集和向datahub的傳輸任務。

結合streamcompute(galaxy)使用者可以友善的完成流式資料從采集,傳輸,開發到結果資料存儲與展示的整套解決方案。

同時,還可以通過建立collector同步任務将資料同步到maxcompute(odps),之後在maxcompute上進行完備的資料開發工作。

接下來,會将各個流程步驟在文章中作較長的描述,以幫助使用者使用logstash+datahub+streamcompute/maxcompute快速建構起自己的流式資料應用。

建立用于資料采集與傳輸的datahub topic是我們的第一步。

公共雲datahub服務endpoint清單:

公有網絡

經典網絡ecs endpoint

vpc ecs endpoint

<a href="http://dh-cn-hangzhou.aliyuncs.com">http://dh-cn-hangzhou.aliyuncs.com</a>

<a href="http://dh-cn-hangzhou.aliyun-inc.com">http://dh-cn-hangzhou.aliyun-inc.com</a>

<a href="http://dh-cn-hangzhou-vpc.aliyuncs.com">http://dh-cn-hangzhou-vpc.aliyuncs.com</a>

shard: shard表示對一個topic進行資料傳輸的并發通道,每個shard會有對應的id。每個shard會有多種狀态 : "opening" - 啟動中,"active" - 啟動完成可服務

lifecycle: 表示一個topic中寫入資料可以儲存的時間,以天為機關

record: 使用者資料和datahub服務端互動的基本機關

schema: 描述record必須遵守的格式,以及每個字段的類型,包括:bigint、string、boolean、double和timestamp

目前datahub提供的工具包括datahub java sdk和datahub webconsole,另外console還處于試用階段,若有需要可聯系我們提供。

webconsole

使用者可在webconsole上完成對所屬資源的基本操作,包括建立、檢視、删除topic以及資料抽樣等。在webconsole中建立topic如下所示:

Logstash + DataHub + MaxCompute/StreamCompute 進行實時資料分析

sdk

由于datahub提供建立具有schema的topic的功能,是以使用者在使用logstash将資料采集到datahub時,可同時完成對原始資料清洗工作。這樣在後續的資料分析工作中,使用者能更加友善的進行資料開發。

我們以一條典型的日志為例,說明如何配置logstash和datahub topic.

示例日志為:

對應的datahub topic的schema定義為:

字段名稱

字段類型

request_time

string

thread_id

log_level

class_name

request_id

detail

logstash配置檔案為:

使用指令啟動logstash開始資料采集

可使用參數 -b 指定每次batch大小,即每次請求的記錄條數,可進行性能調試

目前datahub和計算引擎streamcompute(galaxy)和maxcompute(odps)已打通。

在streamcompute中,可以通過配置datahub資料源,直接進行資料開發,寫入datahub的資料會被streamcompute訂閱并進行實時計算。

同時,通過建立同步到maxcompute的collector,可以将datahub資料同步到maxcompute,進而在maxcompute中進行資料開發。

可以通過建立connector,将datahub資料導入到maxcompute(odps).

建立connector之前,使用者必須已建立好maxcompute的table,并且所使用的賬号必須具備該maxcompute project的createinstance權限和歸檔odps表的desc、alter、update權限。

Logstash + DataHub + MaxCompute/StreamCompute 進行實時資料分析

繼續閱讀