在使用odps強大的資料處理能力之前,大家最關心的是自己的資料如何導入到odps中。 下面介紹一款向odps導入資料的工具-fluentd。
fluentd是一個開源的軟體,用來收集各種源頭日志(包括application log、sys log及access log),允許使用者選擇插件對日志資料進行過濾、并存儲到不同的資料處理端(包括mysql、oracle、mongodb、hadoop、treasure data、aws services、google services以及odps等)。fluentd以小巧靈活而著稱,允許使用者自定義資料源、過濾處理及目标端等插件,目前在這款軟體中已經有300+個插件運作fluentd的架構上,而且這些插件全部是開源的。 odps也在這款軟體上開源了資料導入插件。
使用這款軟體,向odps導入資料,需要具備如下環境:
ruby 2.1.0 或更新
gem 2.4.5 或更新
fluentd-0.10.49 或從fluentd 官網http://www.fluentd.org/ 查找最新,fluentd為不同的os提供了不同的版本,詳見http://docs.fluentd.org/articles/quickstart
protobuf-3.5.1 或更新(ruby protobuf)
接下來可以通過以下兩種方式中的任意一種來安裝odps fluentd 導入插件。
<b>方式一:通過ruby gem</b><b>安裝:</b>
$ gem install fluent-plugin-aliyun-odps
odps已經将這個插件釋出到gem庫中, 名稱為 fluent-plugin-aliyun-odps,隻需要通過gem install 指令來安裝即可(大家在使用gem 時在國内可能會遇到gem庫無法通路,可以在網上搜一下更改gem 庫源來解決)。
<b>方式二:通過插件源碼安裝:</b>
$ gem install protobuf
$ gem install fluentd –no-ri –no-rdoc
$ git clone https://github.com/aliyun/aliyun-odps-fluentd-plugin.git
$ cp aliyun-odps-fluentd-plugin/lib/fluent/plugin/* {your_fluentd_directory}/lib/fluent/plugin/ -r
其中第二條指令是安裝fluentd,如果已經安裝可以省略。 odps fluentd插件源碼在github上,clone下來之後直接放到fluentd的plugin目錄中即可。
使用fluentd導入資料時,最主要的是配置fluentd的conf檔案,更多conf檔案 的介紹請參見: http://docs.fluentd.org/articles/config-file
<b>示例一</b>:導入nginx日志 。conf中source的配置如下:
<source>
type tail
path /opt/log/in/in.log
pos_file /opt/log/in/in.log.pos
refresh_interval 5s
tag in.log
format /^(?<remote>[^ ]*) – - \[(?<datetime>[^\]]*)\] “(?<method>\s+)(?: +(?<path>[^\"]*?)(?: +\s*)?)?” (?<code>[^ ]*) (?<size>[^ ]*) “-” “(?<agent>[^\"]*)”$/
time_format %y%b%d %h:%m:%s %z
</source>
fluentd 以tail方式監控指定的檔案内容是否有變化,更多的tail配置參見:http://docs.fluentd.org/articles/in_tail
match 配置如下:
<match in.**>
type aliyun_odps
aliyun_access_id ************
aliyun_access_key *********
aliyun_odps_endpoint http://service.odps.aliyun.com/api
aliyun_odps_hub_endpoint http://dh.odps.aliyun.com
buffer_chunk_limit 2m
buffer_queue_limit 128
flush_interval 5s
project projectforlog
<table in.log>
table nginx_log
fields remote,method,path,code,size,agent
partition ctime=${datetime.strftime(‘%y%m%d’)}
time_format %d/%b/%y:%h:%m:%s %z
</table>
</match>
資料會導入到projectforlog project的nginx_log表中,其中會以源中的datetime字段作為分區,插件遇到不同的值時會自動建立分區;
<b>示例二</b>:導入mysql中的資料。導入mysql中資料時,需要安裝fluent-plugin-sql插件作為source:
$ gem install fluent-plugin-sql
配置conf中的source:
type sql
host 127.0.0.1
database test
adapter mysql
username xxxx
password xxxx
select_interval 10s
select_limit 100
state_file /path/sql_state
<table>
table test_table
tag in.sql
update_column id
這個例子是從test_table中select資料,每間隔10s去讀取100條資料出來,select 時将id列作為主鍵(id字段是自增型)。關于fluent-plugin-sql的更多說明參見:https://github.com/fluent/fluent-plugin-sql
project your_projectforlog
table mysql_data
fields id,field1,field2,fields3
資料會導出到odps projectforlog project的mysql_data表中,導入的字段包括id,field1,field2,field3。
通過fluentd導入資料是走的odps實時資料流入通道-datahub,這個通道需要一個特殊的odps表,這個表在建立時需要指定為hub table。建立表時可以使用如下語名:
create table <table_name) (field_name type,…) partitioned by (pt_name type) into <n1> shards hublifecycle <n2>;
其中:n1 是指shards數量,有效值為1-20。在導入資料時,每個shard的流入量是10m/秒。n2是指資料在datahub上的保留期,有效值1-7,主要用于流計算場景中使用曆史資料。 例如:
create table access_log(f1 string, f2 string,f3 string,f4 string,f5 string,f6 string, f7 string) partitioned by(ctime string) into 5 shards hublifecycle 7;
如果向已經存在的表導入資料,也需要将表修改為hub表, 其指令為:
alter table table_name enable huttable with <n1> shards hublifecycle <n2>;
向odps導入資料,需要将odps插件配置在conf檔案中match項中。插件支援的參數說明如下:
type(fixed): 固定值 aliyun_odps.
aliyun_access_id(required):雲賬号access_id.
aliyun_access_key(required):雲賬号access key.
aliyun_odps_hub_endpoint(required):如果你的服務部署在esc上,請把本值設定為 http://dh-ext.odps.aliyun-inc.com, 否則設定為http://dh.odps.aliyun.com.
aliyunodps_endpoint(required):如果你的服務部署在esc上,請把本值設定為 http://odps-ext.aiyun-inc.com/api, 否則設定為http://service.odps.aliyun.com/api .
buffer_chunk_limit(optional): 塊大小,支援“k”(kb),“m”(mb),“g”(gb)機關,預設 8mb,建議值2mb.
buffer_queue_limit(optional): 塊隊列大小,此值與buffer_chunk_limit共同決定整個緩沖區大小。
flush_interval(optional): 強制發送間隔,達到時間後塊資料未滿則強制發送, 預設 60s.
project(required): project名稱.
table(required): table名稱.
fields(required): 與source對應,字段名必須存在于source之中.
partition(optional):若為分區表,則設定此項.
分區名支援的設定模式:
固定值: partition ctime=20150804
關鍵字: partition ctime=${remote} (其中remote為source中某字段)
時間格式關鍵字: partition ctime=${datetime.strftime(‘%y%m%d’)} (其中datetime為source中某時間格式字段,輸出為%y%m%d格式作為分區名稱)
time_format(optional):如果使用時間格式關鍵字為<partition>, 請設定本參數. 例如: source[datetime]=”29/aug/2015:11:10:16 +0800″,則設定<time_format>為”%d/%b/%y:%h:%m:%s %z”
除了使用fluentd可以導入資料外,odps還支援通過flume導入資料。flume是apache的一款開源軟體,odps團隊基于flume開源了導入插件源代碼,感興趣的朋友可以參見https://github.com/aliyun/aliyun-odps-flume-plugin 了解更多細節。