天天看點

ODPS功能介紹之資料導入

在使用odps強大的資料處理能力之前,大家最關心的是自己的資料如何導入到odps中。 下面介紹一款向odps導入資料的工具-fluentd。

fluentd是一個開源的軟體,用來收集各種源頭日志(包括application log、sys log及access log),允許使用者選擇插件對日志資料進行過濾、并存儲到不同的資料處理端(包括mysql、oracle、mongodb、hadoop、treasure data、aws services、google services以及odps等)。fluentd以小巧靈活而著稱,允許使用者自定義資料源、過濾處理及目标端等插件,目前在這款軟體中已經有300+個插件運作fluentd的架構上,而且這些插件全部是開源的。 odps也在這款軟體上開源了資料導入插件。

使用這款軟體,向odps導入資料,需要具備如下環境:

ruby 2.1.0 或更新

gem 2.4.5 或更新

fluentd-0.10.49 或從fluentd 官網http://www.fluentd.org/ 查找最新,fluentd為不同的os提供了不同的版本,詳見http://docs.fluentd.org/articles/quickstart

protobuf-3.5.1 或更新(ruby protobuf)

接下來可以通過以下兩種方式中的任意一種來安裝odps fluentd 導入插件。

<b>方式一:通過ruby gem</b><b>安裝:</b>

$ gem install fluent-plugin-aliyun-odps

odps已經将這個插件釋出到gem庫中, 名稱為 fluent-plugin-aliyun-odps,隻需要通過gem install 指令來安裝即可(大家在使用gem 時在國内可能會遇到gem庫無法通路,可以在網上搜一下更改gem 庫源來解決)。

<b>方式二:通過插件源碼安裝:</b>

$ gem install protobuf

$ gem install fluentd –no-ri –no-rdoc

$ git clone https://github.com/aliyun/aliyun-odps-fluentd-plugin.git

$ cp aliyun-odps-fluentd-plugin/lib/fluent/plugin/* {your_fluentd_directory}/lib/fluent/plugin/ -r

其中第二條指令是安裝fluentd,如果已經安裝可以省略。 odps fluentd插件源碼在github上,clone下來之後直接放到fluentd的plugin目錄中即可。

使用fluentd導入資料時,最主要的是配置fluentd的conf檔案,更多conf檔案 的介紹請參見: http://docs.fluentd.org/articles/config-file

<b>示例一</b>:導入nginx日志 。conf中source的配置如下:

&lt;source&gt;

type tail

path /opt/log/in/in.log

pos_file /opt/log/in/in.log.pos

refresh_interval 5s

tag in.log

format /^(?&lt;remote&gt;[^ ]*) – - \[(?&lt;datetime&gt;[^\]]*)\] “(?&lt;method&gt;\s+)(?: +(?&lt;path&gt;[^\"]*?)(?: +\s*)?)?” (?&lt;code&gt;[^ ]*) (?&lt;size&gt;[^ ]*) “-” “(?&lt;agent&gt;[^\"]*)”$/

time_format %y%b%d %h:%m:%s %z

&lt;/source&gt;

fluentd 以tail方式監控指定的檔案内容是否有變化,更多的tail配置參見:http://docs.fluentd.org/articles/in_tail

match 配置如下:

&lt;match in.**&gt;

type aliyun_odps

aliyun_access_id ************

aliyun_access_key *********

aliyun_odps_endpoint http://service.odps.aliyun.com/api

aliyun_odps_hub_endpoint http://dh.odps.aliyun.com

buffer_chunk_limit 2m

buffer_queue_limit 128

flush_interval 5s

project projectforlog

&lt;table in.log&gt;

table nginx_log

fields remote,method,path,code,size,agent

partition ctime=${datetime.strftime(‘%y%m%d’)}

time_format %d/%b/%y:%h:%m:%s %z

&lt;/table&gt;

&lt;/match&gt;

資料會導入到projectforlog project的nginx_log表中,其中會以源中的datetime字段作為分區,插件遇到不同的值時會自動建立分區;

<b>示例二</b>:導入mysql中的資料。導入mysql中資料時,需要安裝fluent-plugin-sql插件作為source:

$ gem install fluent-plugin-sql

配置conf中的source:

type sql

host 127.0.0.1

database test

adapter mysql

username xxxx

password xxxx

select_interval 10s

select_limit 100

state_file /path/sql_state

&lt;table&gt;

table test_table

tag in.sql

update_column id

這個例子是從test_table中select資料,每間隔10s去讀取100條資料出來,select 時将id列作為主鍵(id字段是自增型)。關于fluent-plugin-sql的更多說明參見:https://github.com/fluent/fluent-plugin-sql

project your_projectforlog

table mysql_data

fields id,field1,field2,fields3

資料會導出到odps projectforlog project的mysql_data表中,導入的字段包括id,field1,field2,field3。

通過fluentd導入資料是走的odps實時資料流入通道-datahub,這個通道需要一個特殊的odps表,這個表在建立時需要指定為hub table。建立表時可以使用如下語名:

create table &lt;table_name) (field_name type,…) partitioned by (pt_name type) into &lt;n1&gt; shards hublifecycle &lt;n2&gt;;

其中:n1 是指shards數量,有效值為1-20。在導入資料時,每個shard的流入量是10m/秒。n2是指資料在datahub上的保留期,有效值1-7,主要用于流計算場景中使用曆史資料。 例如:

create table access_log(f1 string, f2 string,f3 string,f4 string,f5 string,f6 string, f7 string) partitioned by(ctime string) into 5 shards hublifecycle 7;

如果向已經存在的表導入資料,也需要将表修改為hub表, 其指令為:

alter table table_name enable huttable with &lt;n1&gt; shards hublifecycle &lt;n2&gt;;

向odps導入資料,需要将odps插件配置在conf檔案中match項中。插件支援的參數說明如下:

type(fixed): 固定值 aliyun_odps.

aliyun_access_id(required):雲賬号access_id.

aliyun_access_key(required):雲賬号access key.

aliyun_odps_hub_endpoint(required):如果你的服務部署在esc上,請把本值設定為 http://dh-ext.odps.aliyun-inc.com, 否則設定為http://dh.odps.aliyun.com.

aliyunodps_endpoint(required):如果你的服務部署在esc上,請把本值設定為 http://odps-ext.aiyun-inc.com/api, 否則設定為http://service.odps.aliyun.com/api .

buffer_chunk_limit(optional): 塊大小,支援“k”(kb),“m”(mb),“g”(gb)機關,預設 8mb,建議值2mb.

buffer_queue_limit(optional): 塊隊列大小,此值與buffer_chunk_limit共同決定整個緩沖區大小。

flush_interval(optional): 強制發送間隔,達到時間後塊資料未滿則強制發送, 預設 60s.

project(required): project名稱.

table(required): table名稱.

fields(required): 與source對應,字段名必須存在于source之中.

partition(optional):若為分區表,則設定此項.

分區名支援的設定模式:

固定值: partition ctime=20150804

關鍵字: partition ctime=${remote} (其中remote為source中某字段)

時間格式關鍵字: partition ctime=${datetime.strftime(‘%y%m%d’)} (其中datetime為source中某時間格式字段,輸出為%y%m%d格式作為分區名稱)

time_format(optional):如果使用時間格式關鍵字為&lt;partition&gt;, 請設定本參數. 例如: source[datetime]=”29/aug/2015:11:10:16 +0800″,則設定&lt;time_format&gt;為”%d/%b/%y:%h:%m:%s %z”

除了使用fluentd可以導入資料外,odps還支援通過flume導入資料。flume是apache的一款開源軟體,odps團隊基于flume開源了導入插件源代碼,感興趣的朋友可以參見https://github.com/aliyun/aliyun-odps-flume-plugin 了解更多細節。

繼續閱讀