天天看点

Logstash + DataHub + MaxCompute/StreamCompute 进行实时数据分析

logstash是一款开源日志收集处理框架,有各种不同的input、filter、output插件,用户使用这些插件可以将各种数据源导入到其他系统。

logstash-output-datahub插件,实现将数据导入datahub的功能,通过简单的配置即可完成数据采集和向datahub的传输任务。

结合streamcompute(galaxy)用户可以方便的完成流式数据从采集,传输,开发到结果数据存储与展示的整套解决方案。

同时,还可以通过创建collector同步任务将数据同步到maxcompute(odps),之后在maxcompute上进行完备的数据开发工作。

接下来,会将各个流程步骤在文章中作详细描述,以帮助用户使用logstash+datahub+streamcompute/maxcompute快速构建起自己的流式数据应用。

创建用于数据采集与传输的datahub topic是我们的第一步。

公共云datahub服务endpoint列表:

公有网络

经典网络ecs endpoint

vpc ecs endpoint

<a href="http://dh-cn-hangzhou.aliyuncs.com">http://dh-cn-hangzhou.aliyuncs.com</a>

<a href="http://dh-cn-hangzhou.aliyun-inc.com">http://dh-cn-hangzhou.aliyun-inc.com</a>

<a href="http://dh-cn-hangzhou-vpc.aliyuncs.com">http://dh-cn-hangzhou-vpc.aliyuncs.com</a>

shard: shard表示对一个topic进行数据传输的并发通道,每个shard会有对应的id。每个shard会有多种状态 : "opening" - 启动中,"active" - 启动完成可服务

lifecycle: 表示一个topic中写入数据可以保存的时间,以天为单位

record: 用户数据和datahub服务端交互的基本单位

schema: 描述record必须遵守的格式,以及每个字段的类型,包括:bigint、string、boolean、double和timestamp

目前datahub提供的工具包括datahub java sdk和datahub webconsole,另外console还处于试用阶段,若有需要可联系我们提供。

webconsole

用户可在webconsole上完成对所属资源的基本操作,包括创建、查看、删除topic以及数据抽样等。在webconsole中创建topic如下所示:

Logstash + DataHub + MaxCompute/StreamCompute 进行实时数据分析

sdk

由于datahub提供创建具有schema的topic的功能,所以用户在使用logstash将数据采集到datahub时,可同时完成对原始数据清洗工作。这样在后续的数据分析工作中,用户能更加方便的进行数据开发。

我们以一条典型的日志为例,说明如何配置logstash和datahub topic.

示例日志为:

对应的datahub topic的schema定义为:

字段名称

字段类型

request_time

string

thread_id

log_level

class_name

request_id

detail

logstash配置文件为:

使用命令启动logstash开始数据采集

可使用参数 -b 指定每次batch大小,即每次请求的记录条数,可进行性能调试

目前datahub和计算引擎streamcompute(galaxy)和maxcompute(odps)已打通。

在streamcompute中,可以通过配置datahub数据源,直接进行数据开发,写入datahub的数据会被streamcompute订阅并进行实时计算。

同时,通过创建同步到maxcompute的collector,可以将datahub数据同步到maxcompute,从而在maxcompute中进行数据开发。

可以通过创建connector,将datahub数据导入到maxcompute(odps).

创建connector之前,用户必须已创建好maxcompute的table,并且所使用的账号必须具备该maxcompute project的createinstance权限和归档odps表的desc、alter、update权限。

Logstash + DataHub + MaxCompute/StreamCompute 进行实时数据分析

继续阅读