天天看點

大資料與機器學習:實踐方法與行業案例.2.2 資料緩沖區

<b>2.2 資料緩沖區</b>

資料緩沖區是處于生産環境和分析環境之間的中間區域,它是資料閉環中各個系統間的資料中轉站,從各個系統接收原始資料,并将其暫存在對應的目錄中。其他系統可以從資料緩沖區中擷取需要的資料檔案。

為了便于管理和遷移資料,我們規定存入資料緩沖區中的資料使用文本檔案的格式,這樣一來,資料緩沖區就可以使用一台或幾台檔案伺服器實作。幾乎所有的應用系統都支援文本檔案的資料互動,新的系統可以輕松加入資料閉環之中。

資料緩沖區的一端連接配接生産環境中的大量應用系統,另一端連接配接分析環境中的資料平台,避免了生産環境和分析環境的互相影響,同時也為系統之間資料檔案的互動制定了統一标準(見圖2-2)。

圖2-2 資料緩沖區連接配接生産環境和分析環境

資料緩沖區的另一個優勢在于友善自動化和資料管理,多個應用系統的檔案存檔在同一個檔案伺服器中,便于資料的統一管理和分發。比如,在一個多部門、跨地域的企業中,不同地區、不同部門的資料檔案之間的互動,如果沒有資料緩沖區的統一收集與分發,那将會形成一個複雜的交叉網絡。

表2-2列舉了資料緩沖區的一些主要優點,本書主要專注于系統解耦,并基于資料緩沖區完成原始資料的自動加載過程。

表2-2 資料緩沖區的優點

作 用 備 注

系統解耦 系統間互不影響

便于資料平台切換 基于檔案,檔案兩端的資料平台互不影響

便于資料系統擴充 基于檔案系統,便于不同資料平台之間對接

便于資料管理 檔案備份,存儲空間管理

便于資料分發 企業不同機構間的資料分發

2.2.1 系統解耦

1. 資料直連互動

資料直連的方式是将原始資料從生産系統直接導出至分析系統,是資料互動的緊耦合方式。這種方式在資料規模較小時并不會出現問題,是以很多企業搭建資料體系時采用了資料直連的方式,但随着資料規模的增大,資料直連的弊端會逐漸展現,讓我們看一下如下的場景:

某個星期一的早晨,資料工程師小王走入辦公室,發現開發工程師小李、系統工程師小周、開發經理和産品經理正聚在一起商讨問題。

開發工程師小李:我昨晚上線完成後,業務人員驗證通過,當時系統沒有任何問題的,程式肯定沒問題!

産品經理:可是現在系統反應奇慢無比,基本處于癱瘓狀态,很多業務人員都等着開工呢,怎麼辦?

系統工程師小周:你自己看,資料庫伺服器磁盤i/o好大……這種情況之前可是沒有的……這個資料庫程序是怎麼回事?磁盤i/o就是被它拖垮的。

開發經理(一臉黑線):趕緊查一查,看看誰幹的!

系統工程師小周:這好像是分析資料庫在抽資料。……小王你剛好來了,你看這個作業是你的吧?

資料工程師小王(緊張中):這個我看看……平時都是20分鐘就抽取結束了啊!今天怎麼還沒有完成?怎麼回事?

開發經理:那先趕緊停下來,解決了生産問題再說。小王,最近資料這塊怎麼老出問題啊!

資料工程師小王(委屈):etl作業跑了半年多了,都沒問題。開發昨晚上線,今天就出問題了……

開發工程師小李(打斷小王):上線前都經過測試了,上線後也驗證了,沒有問題的。現在是資料庫的問題,和系統沒有關系好吧?

資料工程師小王:那為什麼上線後,etl作業就這麼慢呢?!也不能怪我啊……

上面的場景是資料直連方式經常會遇到的問題,這種問題可能在生産系統上線後突然出現,也可能在平常的日子裡莫名奇妙地發生。由于生産系統和分析系統之間的緊耦合,一旦出現問題,生産系統和分析系統都可能受到影響,而問題産生的原因卻很難查清。表2-3總結了資料直連的弊端。

表2-3 資料直連的弊端

弊 端 說 明

系統緊耦合 雙方互相影響,生産系統切換影響分析系統,分析系統也會影響生産系統效率

不利于資料庫權限管理 生産資料庫需要為etl作業開放權限,不利于生産資料的安全

不利于資料平台擴充 由于采用資料直連,其他資料平台(如hadoop平台)加入時比較困難

2. 資料緩沖區互動

前面已經論述使用資料緩沖區進行互動的優點,本節将進一步研究資料緩沖區進行資料互動的詳細流程,圖2-3是整個過程的示意圖。

圖2-3 資料緩沖區進行資料互動的流程

在這個過程中,資料由生産環境流入分析環境,共經過以下四個步驟。

1)批量導出。資料從生産資料庫批量導出為文本檔案,該過程使用dbms系統自帶的批量導出指令,對于大資料平台資料庫,使用對應的指令或者使用第三方插件。2.2.2節将對批量導出指令進行詳細介紹。

2)ftp傳輸第一階段。将步驟1)導出的資料檔案通過ftp上傳至資料緩沖區。步驟1)和步驟2)的自動化過程可以通過etl定時作業完成,實作方法參閱etl作業章節。

3)ftp傳輸第二階段。将資料檔案從資料緩沖區下載下傳至分析環境中。

4)批量加載。使用批量導入指令将檔案加載至資料平台。

這種方式解決了職責不清的問題。從圖2-3中可以發現,步驟1)和步驟2)處于資料緩沖區之前,它們屬于生産環境的範疇,由開發工程師負責;步驟3)和步驟4)屬于分析系統的範疇,由資料工程師負責。

這個架構還解決了資料直連方式面臨的以下問題。

1)生産系統與分析系統的耦合問題。通過資料緩沖區實作了生産系統和分析系統的解耦,無論生産系統如何變更,隻要傳輸至資料緩沖區中的文本檔案格式不變,分析系統就不受影響;而分析系統在将資料加載至資料平台的時候,也不會影響到生産系統的性能。

2)資料權限的問題,讓資料更加安全。資料緩沖區的左邊完全由開發工程師負責,是以生産資料庫權限不會流轉至後端的資料工程師;而通過在資料批量導出過程中對敏感資料的屏蔽處理(如對手機号碼加密等)後,後端資料平台無法看到敏感資料,提高了資料安全性。

3)增強了資料平台的可擴充性。由于各種資料平台,如傳統資料倉庫、hadoop平台、mpp資料庫等均對文本檔案有良好的支援,不同平台之間的資料互動,均可以通過資料緩沖區實作資料互動。例如,hadoop平台的hive資料倉庫可以通過資料緩沖區與傳統資料倉庫中的關系資料庫實作互動。

2.2.2 批量導出

批量導出是将資料庫中的資料一次性導出至文本檔案中,導出的檔案有固定的列分隔符和行分隔符,或者有固定的字段長度。批量導出的方法大緻可以分為以下兩種(見表2-4)。

1)使用odbc或jdbc接口,如etl工具或定制的java程式等。

2)使用批量導出指令,一般是資料庫自帶的指令。

表2-4 常用資料庫批量導出方法

方法 舉例 性能 适用場景

odbc/jdbc接口 etl工具、java程式 速度慢,對源資料庫影響大 資料量較小的場景

批量導出指令 批量導出指令、hadoop shell指令 速度快,對源資料庫影響小 資料量大或跨系統的場景,如生産資料到分析資料,或資料倉庫到大資料平台

表2-5中列出了常用資料庫的批量導出指令,這些指令将在第3章詳細介紹。

表2-5 常用資料庫批量導出指令

資料庫系統 批量導出指令

sql server bcp out

db2 export

oracle 第三方插件sqluldr

mysql mysqldump

hive hadoop shell

1. sql server : bcp out

bcp是sql server自帶的批量導出/導入指令,它包括批量導出指令bcp out和批量導入指令bcp in。bcp out指令的文法如代碼清單2-1所示(表2-6為參數說明)。

代碼清單 2-1

bcp table_name  out data_file

  [-f format_file]

  [-u login_id]

  [-p password]

    [-s [server_name[\instance_name]]

表2-6 參數說明

參 數 參數含義

table_name 表名稱

data_file 輸出的資料檔案的完整路徑

-f format_file 指定格式化檔案的完整路徑

-u login_id 指定用于連接配接到 sql server 的登入 id

-p password 指定登入 id 的密碼

-s[server_name[\instance_name] 指定要連接配接的 sql server 執行個體

例如,從sql server資料庫表“dbo.巡檢商戶明細”中批量導出資料到檔案“巡檢商戶明細.dat”中的bcp out指令如代碼清單2-2所示。

代碼清單 2-2

bcp dbo.巡檢商戶明細 out d:\巡檢商戶明細.dat -fd:\巡檢商戶明細.fmt -u test -p 123 -s localhost

*注:實際輸入指令時,應在一行,中間不能有換行。

代碼清單2-1中各參數說明如下。

1)dbo.巡檢商戶明細是需要導出資料的表名稱。

2)d:\巡檢商戶明細.dat為導出的檔案路徑及檔案名稱。

3)-f d:\巡檢商戶明細.fmt 指明格式檔案的路徑及名稱。

4)-u test指明登入資料庫所用的使用者名稱為“test”。

5)-p 123指明資料庫使用者“test”的密碼為“123”。

6)-s localhost指明登入的資料庫為“localhost”。

bcp out指令使用了格式檔案選項“-f format_file”,用來指明資料檔案的格式。格式檔案是用來描述資料檔案格式的檔案,在格式檔案中需指明要導出的字段名稱、長度、列分割符、行分隔符、排序方式等,可以用在bcp out和bcp in指令中。用在bcp out指令中時,它用來定義導出檔案的格式;用在bcp in指令中時,它用來描述待導入文本檔案的格式。圖2-4所示為格式檔案的式樣。

圖2-4 sql server 控制檔案格式

對圖2-4中涉及參數的說明如下:

1)version是微軟公司對sql server系列産品的版本編号,如表2-7所示。

2)number of columns是所要導出的資料庫表所含的字段個數。

3)host file field order是輸出的資料檔案字段列的序号,與server column order對應,一般與server column order保持一緻。

4)host file data type保持預設值“sqlchar”即可。

5)prefix length(字首長度),一般導出的資料檔案字段都不填充字首,是以此處為0。

6)host file data length為輸出檔案字段的長度,此處如果terminator指明的分隔符不為“”,則該參數并不起實際作用。

7)terminator指明資料檔案的列分隔符和行分隔符(格式檔案最後一行該字段為行分隔符)。

8)server column order為資料表中字段的資料,與host file field order保持一緻。

9)server column name是資料庫中的字段名稱。

10)column collation指明字段的collation,一般僅為字元格式的字段使用,保持預設即可。

表2-7 sql server版本号對照表

産品名稱 版本号

sql server 2014 12

sql server 2012 11

sql server 2008 r2 10

sql server 2008 10

sql server 2005 9

sql server 2000 8

格式檔案中應重點關注的部分為number of columns、host file field order、terminator、server column order、server column name,其餘保持預設即可。

如何制作格式檔案呢?下面使用一個具體的例子說明如何建立格式檔案。假設在資料庫中建立表“dbo.巡檢商戶明細”,其建立的表腳本如代碼清單2-3所示。

代碼清單 2-3

create table [dbo].[巡檢商戶明細]

(

[商戶代碼]     [varchar](30)    null,

[商戶名稱]     [nvarchar](100)  null,

[機構号]       [varchar](20)    null,

[合作方名稱]   [nvarchar](100)  null,

[分公司]       [nvarchar](50)   null

    );

使用bcp format指令,可得到該表的控制檔案模闆“巡檢商戶明細.fmt”,指令如代碼清單2-4所示。

代碼清單 2-4

bcp dbo.巡檢商戶明細 format nul -c -f 巡檢商戶明細.fmt -utest -p123 -slocalhost

巡檢商戶明細.fmt檔案中預設的列分隔符為“\t”(制表符)、行分隔符為“\r\n”(回車換行符),其内容如代碼清單2-5所示。

代碼清單 2-5

9.0

5

1 sqlchar 0 30 "\t" 1 [商戶代碼]

sql_latin1_general_cp1_ci_as

2 sqlchar 0 100 "\t" 2 [商戶名稱]

3 sqlchar 0 20 "\t" 3 [機構号]

4 sqlchar 0 100 "\t" 4 [合作方名稱]

5 sqlchar 0 50 "\r\n" 5 [分公司]

注:格式檔案的最後一行必須為空白行,否則在使用bcp指令時會報格式錯誤。

得到格式檔案模闆之後,就可以在此基礎上進行修改了,比如使用“#”作為列分隔符,使用“\n”作為行分隔符,修改後的格式檔案内容如代碼清單2-6所示。

代碼清單 2-6

1 sqlchar 0 30 "#" 1 [商戶代碼] sql_latin1_general_cp1_ci_as

2 sqlchar 0 100 "#" 2 [商戶名稱] sql_latin1_general_cp1_ci_as

3 sqlchar 0 20 "#" 3 [機構号] sql_latin1_general_cp1_ci_as

4 sqlchar 0 100 "#" 4 [合作方名稱] sql_latin1_general_cp1_ci_as

5 sqlchar 0 50 "\n" 5 [分公司] sql_latin1_general_cp1_ci_as

這樣,将修改後的格式檔案用于bcp out指令,則輸出的資料檔案将使用“#”作為列分隔符,使用“\n”作為行分隔符。作為驗證,可以執行如代碼清單2-7所示的bcp out指令。

代碼清單 2-7

bcp dbo.巡檢商戶明細 out d:\巡檢商戶明細.dat -f d:\巡檢商戶明細.fmt -utest -p123

-slocalhost

檢視輸出的資料檔案“巡檢商戶明細資料.dat”,其内容如代碼清單2-8所示。

代碼清單 2-8

000391952#新白鹿餐廳(百聯中環店)#0880#安智餐飲有限公司#安徽分公司

000472873#頤和四季體驗館#0880#萬源城娛樂城#北京分公司

000901032#書院人家#0880#萬家燈火餐飲文化傳播公司#北京分公司

000900109#三陽灣食府#0880#三陽衆城文化管理有限公司#北京分公司

......

從以上代碼清單中可以看到,輸出的文本檔案使用“#”作為列分隔符,使用“\n”作為行分隔符。可以根據需要修改格式檔案,進而得到滿足要求的資料檔案輸出。更詳細的bcp指令可參閱微軟官方幫助文檔。

2. db2 : export

ibm db2資料庫中資料的批量導出可以使用export指令,其基礎文法如代碼清單2-9所示(表2-8為參數說明)。

代碼清單 2-9

export to filename of {ixf |del | wsf }

[ modified by {filetype-mod …} ]  { select-statement |[ where … ]}

例如,在db2中自帶sample資料庫中的一張表,其建立的表腳本如代碼清單2-10所示。

表2-8 db2 export指令的參數說明

filebname 導出的檔案名稱

ixf | del | wsf 輸出格式,對于文本檔案,選擇del格式

filetype-mod 檔案類型修飾符,包括設定字元串定界符、列分隔符等多種選項

chardel 指定字元串定界符

coldel 指定列分隔符

select-statement 用于提取資料的select語句

代碼清單 2-10

create table hb_static (

sta_mth varchar(8) default null,

face_value integer default 5,

total_amt bigint default 0

);

現在需要把這張表中的資料導出成文本檔案,運作db2cmd,依次運作如代碼清單2-11所示的指令。

代碼清單 2-11

db2 connect to sample

db2 export to d:\\hb_static.del of del modified by chardel'' coldel; select *

from hb_static

export指令指明導出檔案路徑及名稱“d:\\hb_static.del”,chardel“指明使用”作為字元串定界符(資料庫表中的字元類型的資料使用該字元串包裹),coldel; 指明使用;作為列分隔符。最終得到的文本檔案d:\\hb_static.del的内容(部分)如代碼清單2-12所示。

代碼清單 2-12

'201402';20;480

'201402';5;165

'201402';10;200

'201402';50;1100

'201403';5;915

由于db2的export指令沒有提供形如sql server格式檔案之類的控制檔案,而僅通過指令選項指定列分隔符、字元串界定符,使得db2的export指令導出的文本檔案格式較為單一(例如,隻能使用一個字元作為列分隔符),但在大多數場景中,export可以滿足要求。

3. oracle : sqluldr2

oracle資料庫未提供批量導出指令,一般采用第三方工具進行批量導出。sqlplus提供的spool工具雖然可以進行資料導出,但是它并不适合大量資料的快速導出,主要原因是其導出效率很低,大量資料導出會非常耗時。

另一款批量導出工具sqluldr2适用于大批量資料的導出,速度非常快,可以将資料以csv、txt等格式導出。

首先需要下載下傳sqluldr2.exe(可上網搜尋),如果安裝的是64位的oracle,則需要下載下傳sqluldr264.exe,然後将sqluldr2.exe複制到$oracle_home的bin目錄(該目錄中有oracle自帶的sqlldr.exe,這是oracle的批量導入工具。沒錯,oracle提供了批量導入工具。卻沒有提供批量導出工具)中。現在就可以開始使用sqluldr2.exe了,sqluldr2的指令格式如代碼清單2-13所示(表2-9為其參數說明)。

代碼清單 2-13

sqluldr2 logon_str {query="select_statement" | sql=sql_file }

[file=output_file]

[field=col_del]

[record=row_del]

[quote=quote]

表2-9 sqluldr2的參數說明

參 數 含 義

logon_str 資料庫登入資訊,必需參數

select_statement 查詢語句,用于提取資料,與sql_file兩者二選一

sql_file 指定的sql語句腳本檔案

output_file 輸出的資料檔案路徑及名稱,如果不指定此選項,則預設輸出uldrdata.txt

col_del 列分隔符,預設為逗号

row_del 行分隔符,預設為\r\n

quote 字元串界定符

例如,在oracle資料庫中有一張表,其建立表的腳本如代碼清單2-14所示。

代碼清單 2-14

create table  “ods"."dp_comment"

"memberid" varchar2(50),

"taste" varchar2(50),

"environment" varchar2(50),

"service" varchar2(50),

"level_score" varchar2(50),

"content" varchar2(2000),

"shopid" varchar2(50)

使用sqluldr2将表中的資料導出至文本檔案dp_comment.txt中,如代碼清單2-15所示。

代碼清單 2-15

sqluldr2 ods/ods@yfb_orc query="select * from ods.dp_comment"

     file=d:\\dp_comment.txt field=#$

sqluldr2指令将表中的資料導出至dp_comment.txt,field=#$指明導出的文本檔案中的列使用#$進行分割。檢視檔案dp_comment.txt,其内容如代碼清單2-16所示(部分)。

代碼清單 2-16

195084790#$3#$3#$3#$40#$菜品味道中規中矩,價錢稍貴!#$17222108

630568#$4#$4#$4#$50#$很滿意 這是很好的一次體驗 謝謝#$17222108

178216498#$3#$3#$3#$50#$嘗試了真不錯#$17222108

該指令的一個很有用的選項為table選項,該選項可以生成一個預設的控制檔案,該控制檔案可以用于oracle的sqlldr指令進行資料批量導入。在上述導出的指令中加入table選項,如代碼清單2-17所示。

代碼清單 2-17

file=d:\\dianping_comment.txt field=#$ table=dp_comment

執行上述指令後,除了輸出檔案d:\\dp_comment.txt外,還生成了控制檔案dp_comment_sqlldr.ctl,代碼清單2-18是dp_comment_sqlldr.ctl的内容(略做了修改,删除了注釋部分)。

代碼清單 2-18

load data

infile 'd:\\dp_comment.txt'

insert into table dp_comment

fields terminated by x'2324' trailing nullcols

  "memberid" char(50) nullif "memberid"=blanks,

  "taste"         char(50) nullif "taste"=blanks,

  "environment" char(50) nullif "environment"=blanks,

  "service" char(50) nullif "service"=blanks,

  "level_score" char(50) nullif "level_score"=blanks,

  "content" char(2000) nullif "content"=blanks,

  "shopid" char(50) nullif "shopid"=blanks

)

代碼清單2-18中,x'2324'是十六進制的字元#$(由上文中sqluldr2的field選項指明)。上述控制檔案可以用于資料批量導入oracle資料庫中,控制檔案用于批量導入的方法請參閱批量導入章節的内容。

4. hive : hadoop fs

hive是hadoop平台上一款非常流行的資料倉庫分析工具,由于類似sql的語言風格,使得其學習成本很低,是以是大資料分析的必學工具。

hive資料導出,主要的應用場景是将hive表中的資料導出到linux作業系統中,然後供其他資料産品使用。該導出過程可以使用etl工具(參閱第2.3節),也可以使用hadoop shell指令完成。

例如,現在有一張hive表,其建立表的腳本如代碼清單2-19所示(未列出全部字段)。

代碼清單 2-19

create table adobe_nwd_prd

  accept_language string comment '浏覽器中可接受的語言标題' ,

browser bigint comment '實際用于單擊的浏覽器id' ,

domain string comment '使用者isp域' ,

duplicate_events string comment '列出計為重複的每個事件' ,

)                                                                                           

comment 'adobe pc端原始資料'

partitioned by(load_day string)

row format delimited

fields terminated by '\t'

stored as textfile;

hive表使用了load_day字段作為partition字段,即每天的資料存放在一個partition中,通過hive shell指令可以檢視目前表中的partition情況,如代碼清單2-20所示。

代碼清單 2-20

hive&gt; show partitions adobe_nwd_app;

ok

load_day=20150908

load_day=20150909

load_day=20150910

load_day=20150911

load_day=20150912

load_day=20150913

load_day=20150914

time taken: 0.379 seconds, fetched: 7 row(s)

每個partition對應一個hdfs目錄,按照1.2.3節中“hive分區表與增量更新”中的規則,hive表的資料存放路徑如代碼清單2-21所示。

代碼清單 2-21

[root@qzy ~]# hadoop fs -ls /data/adobe.adobe_nwd_app

found 7 items

drwxr-xr-x   -root supergroup 0 2015-09-09 00:40

/data/adobe.adobe_nwd_app/20150908

drwxr-xr-x   - root supergroup 0 2015-09-10 00:39

/data/adobe.adobe_nwd_app/20150909

drwxr-xr-x   - root supergroup 0 2015-09-11 06:18

/data/adobe.adobe_nwd_app/20150910

drwxr-xr-x   - root supergroup 0 2015-09-12 00:33

/data/adobe.adobe_nwd_app/20150911

drwxr-xr-x   - root supergroup 0 2015-09-13 00:33

/data/adobe.adobe_nwd_app/20150912

drwxr-xr-x   - root supergroup 0 2015-09-14 00:38

/data/adobe.adobe_nwd_app/20150913

drwxr-xr-x   - root supergroup 0 2015-09-14 10:33

/data/adobe.adobe_nwd_app/20150914

使用代碼清單2-22所示指令将partition(load_day=20150908)的所有資料導出至linux系統的本地目錄/tmp/datafiles中。

代碼清單 2-22

hadoop fs -copytolocal /data/adobe.adobe_nwd_app/20150908 /tmp/datafiles

通過程式可以循環導出指定時間範圍内的資料,第3章将提供一種java批量導出hive資料的多線程實作。

上述方式實作的是不含條件參數的批量導出,如果希望導出where條件限定的資料,則需要将資料事先生成一張中間表,然後将此中間表的全部資料導出,此處不再贅述。

2.2.3 ftp傳輸

由于資料緩沖區實際上是檔案伺服器,在内網環境中使用ftp進行傳輸是一種很友善的方式。在資料閉環中,ftp傳輸連接配接了資料緩沖區的上遊和下遊,穩定高效的檔案傳輸對整個資料閉環起重要作用。

對資料工程師來說,ftp自動傳輸可以通過etl工具、指令行、定制程式等方式實作。

etl工具一般都自帶檔案傳輸子產品,可以直接使用ftp檔案傳輸功能。例如,開源etl工具pentaho kettle的作業功能中,即有檔案傳輸子產品,包含ftp上傳、ftp下載下傳等元件,圖2-5所示的為kettle的檔案傳輸元件。

圖2-5 kettle中的檔案傳輸子產品

但etl工具提供的ftp傳輸子產品有一個局限,就是待傳輸的檔案名如果是動态的,例如檔案名稱以日期作為字尾,或者根據條件選擇不同的檔案進行傳輸,則使用etl工具實作起來會比較困難(雖然etl工具本身也提供簡單的參數輸入及檔案名稱表達式比對,但總體實作成本比其他兩種方式要高很多)。

一種替代的方案是使用腳本語言,也就是通過腳本将ftp指令包裝起來,以實作參數的傳入,解決動态檔案名問題。例如在linux系統上,使用shell腳本或python腳本,調用ftp指令來實作檔案的ftp上傳或ftp下載下傳。

本書推薦的方式是使用進階程式語言進行ftp檔案傳輸的包裝,因為進階程式語言一般都提供ftp檔案傳輸的程式包,可以根據需要記錄傳輸日志,并且可以友善實作多線程ftp檔案傳輸,提高傳輸效率。比如java的commons-net-x.x.x.jar(x.x.x代表版本号)包就提供了ftp指令的api接口,代碼清單2-23是該接口的調用示例(具體實作請參閱第3章)。

代碼清單 2-23

import org.apache.commons.net.ftp.ftpclient;

ftpclient ftpclient = new ftpclient();

ftpclient.connect(serverip);

ftpclient.login(user,passsword));

ftpclient.retrievefile(remotefilename, new fileoutputstream(localfilepath));

2.2.4 批量導入

1. sql server:bcp in

sql server提供的批量導入指令bcp in是bcp out的反向操作。其指令的格式與bcp out的也基本一緻,相比bcp out,bcpin增加了幾個重要選項,即錯誤檔案-e選項、最大允許錯誤行數-m選項,如代碼清單2-24所示(各選項的詳細說明見表2-10)。

代碼清單 2-24

bcp table_name  in data_file

[-f format_file]

[-e err_file]

[-m max_errors]

[-u login_id]

[-p password]

[-s server_name]

表2-10 bcp in的選項說明

選 項 說 明

-e err_file 用于指定錯誤檔案的完整路徑,此檔案用于存儲 bcp實用工具無法從檔案傳輸到資料庫的所有行,相當于錯誤日志

-m max_errors 用于指定bcp指令允許出現的最大錯誤行數,錯誤行數未達到max_errors時,bcp導入将繼續進行,預設值為10

-f format_file 用于指定格式化檔案的完整路徑,格式化檔案用于指定行列分隔符、是否跳過某些列等

-m選項在批量導入大檔案時是非常有用的,由于一些資料會含有少量的錯誤資料,而這些錯誤資料并不影響整體的資料效果,這些場景本身對資料的完整性要求并不高(不同于交易明細資料),這時使用-m選項指定一個門檻值,當錯誤行數小于門檻值的時候,bcp繼續執行,可以将正确的資料導入,確定業務的正常進行。

-m選項通常與-e選項配合使用,可以從-e選項指定的err_file中檢視出現錯誤的資料行。

目前許多網際網路公司網站中會嵌入網站日志分析工具,實時收集使用者的單擊行為,如adobe omniture、webtrends等,這些工具會産生大量日志資料,而這些日志資料中不可避免地存在部分無法正常導入資料庫的記錄,通過-m選項可以跳過這些錯誤資料,進而保證絕大部分資料可用。

2. db2:import

db2批量資料導入可使用db2 import 指令,該指令是db2 export指令的反向指令,其格式與db2 export指令的一緻。例如将第2.2.2節中“db2:export”導出的文本檔案“d:\\hb_static.del”再導入至表hb_static2(該表結構與表hb_static的相同)中,方法如代碼清單2-25所示。

代碼清單 2-25

db2 import from d:\\hb_static.del of del modified by chardel'' coldel; insert

into hb_static2

上述指令成功執行後,檔案中的資料即被導入至資料庫中。該指令執行後的回報資訊如代碼清單2-26所示。

代碼清單 2-26

sql3109n  實用程式正在開始從檔案 "d:\\hb_static.del" 裝入資料中。

sql3110n  實用程式已完成處理。從輸入檔案讀了 "115" 行。

sql3221w  ...開始 commit work。輸入記錄計數 = "115"。

sql3222w  ...對任何資料庫更改的 commit 都成功。

sql3149n  處理了輸入檔案中的 "115" 行。已将 "115" 行成功插入表中。拒絕了 "0"行。

讀取行數         = 115

跳過行數         = 0

插入行數         = 115

更新行數         = 0

拒絕行數         = 0

落實行數         = 115

3. oracle:sqlldr

oracle自帶的批量導入工具sqlldr可以實作資料的快速批量導入,其指令格式如代碼清單2-27所示(表2-11為其參數說明)。

代碼清單 2-27

sqlldr logon_str control=ctr_file log=log_file bad=bad_file errors=max_errors

表2-11 sqlldr的參數說明

參 數 說 明

ctr_file 控制檔案完整路徑

log_file 日志檔案完整路徑

bad_file 錯誤檔案完整路徑

max_errors 最大允許錯誤行數

其中控制檔案的作用類似于sql server中的格式檔案,可以通過此控制檔案指明輸入文本的格式,以及導入資料庫時的加載方式,圖2-6所示的為oracle控制檔案的格式說明。

圖2-6 oracle控制檔案的格式

oracle sqlldr批量導入有多種方式,表2-12對此做了總結。

表2-12 sqlldr的四種導入方式

導入方式 說 明

append into 若原表有資料,則在後面追加資料

insert into 裝載空表,如果原表有資料,則sqlldr會停止加載,此為預設值

replace into 原表資料全部删除後加載

truncate into 功能和replace的相同,會用truncate語句删除原有資料

下面嘗試将adobe omniture對某網站的監控日志導入oracle的資料庫中,指令如代碼清單2-28所示。

代碼清單 2-28

sqlldr ods/ods@yfb_orc control=e:\adobe\adobe_src_data.ctl

log=e:\adobe\log.txt bad=e:\adobe\error_record.txt errors=1000000

由于該檔案是對網站單擊行為的日志記錄,是以對錯誤行的容忍度是比較大的,為了保證能夠将正确的資料導入,我們設定errors=1000000,即允許導入過程出現1000000條錯誤記錄,這些錯誤記錄會被記錄到bad選項指定的“e:\adobe\error_record.txt”中。

控制檔案“e:\adobe\adobe_src_data.ctl”指明了資料檔案、檔案分隔符資訊、加載方式等内容,如代碼清單2-29所示。

代碼清單 2-29

load data infile 'e:\adobe\01-niwodai-prd_2015-07-26.tsv'

append into table adobe_src_data

fields terminated by '\t'  

trailing nullcols

  accept_language

,browser

,browser_height

)   

上述sqlldr指令在windows的cmd視窗中執行後,檢視log檔案“e:\adobe\log.txt ”的内容,如代碼清單2-30(僅列出部分内容)所示。

代碼清單 2-30

sql*loader: release 11.2.0.1.0 - production on 星期五 7月 31 09:22:22 2015

copyright (c) 1982, 2009, oracle and/or its affiliates.  all rights reserved.

控制檔案:      e:\adobe\adobe_src_data.ctl

資料檔案:      e:\adobe\01-niwodai-prd_2015-07-26.tsv

錯誤檔案:      e:\adobe\error_record.txt

廢棄檔案:      未作指定

 (可廢棄所有記錄)

要加載的數: all

要跳過的數: 0

允許的錯誤: 1000000

綁定數組: 64 行, 最大 256000 位元組

繼續:    未作指定

所用路徑:       正常

表 adobe_src_data,已加載從每個邏輯記錄

插入選項對此表 append 生效

trailing nullcols 選項生效

   列名                  位置      長度   中止  包裝資料類型

------------------------------ ---------- ----- ---- ---- ---------------------

accept_language             first     *   wht      character            

browser                     next      *   wht      character            

……

記錄 4245: 被拒絕 - 表 adobe_src_data 的列 user_agent 出現錯誤。

資料檔案的字段超出最大長度

表 adobe_src_data:

  417707 行 加載成功。

  由于資料錯誤, 是以913 行 沒有加載。

  由于所有 when 子句失敗, 是以 0 行 沒有加載。

  由于所有字段都為空的, 是以 0 行 沒有加載。

為綁定數組配置設定的空間:      168216 位元組 (1 行)

讀取緩沖區位元組數:          1048576

跳過的邏輯記錄總數:        0

讀取的邏輯記錄總數:        418620

拒絕的邏輯記錄總數:        913

廢棄的邏輯記錄總數:        0

從 星期五 7月  31 09:22:22 2015 開始運作

在 星期五 7月  31 10:19:56 2015 處運作結束

經過時間為: 00: 57: 34.13

cpu 時間為: 00: 03: 21.71

從以上代碼中可以看到,日志檔案記錄了很多重要資訊,如sqlldr指令中指定的控制檔案(control=e:\adobe\adobe_src_data.ctl)、錯誤檔案(bad= e:\adobe\ error_record.txt)、允許的錯誤條數(errors=1000000)。

日志還記錄了控制檔案指定的資料檔案(e:\adobe\01-niwodai-prd_2015-07-26.tsv)、插入選項對表 append 生效(即加載方式,append into)、trailing nullcols 選項生效(trailing nullcols,該選項生效時,當資料檔案中出現連續兩個列分隔符時,對應字段值将被置為null)。

在羅列了表中所有字段資訊之後,日志檔案記錄了被拒絕的記錄在資料檔案中所在的行,以及被拒絕的原因,之後的資訊還展示了加載成功的行數,以及由于錯誤被拒絕的行數,最後記錄了導入該檔案的耗時為57分鐘34秒,之是以消耗這麼長時間,是由于資料檔案每行包含了265個字段,且字段長度都比較大。

日志檔案有很多潛在的用途,通過程式讀取日志檔案可以将其中的重要資訊展示在頁面上,或寫入日志資料庫,進而便于作業的管理和監控。對日志檔案的處理是資料閉環監控中的重要手段之一。

4. hive:add partition

hive有多種批量加載方式,根據資料檔案存放的位置不同,hive加載資料面臨兩種情形:從本地檔案系統加載資料以及從hdfs中加載資料。hive shell提供load data指令可以完成上述兩種情形下的資料批量導入,如代碼清單2-31所示。

代碼清單 2-31

load data [local] inpath 'data_file' into table table_name;

當導入本地檔案至hive表中時,需要指明local關鍵字,并且随後的data_file參數用于指明基于本地檔案系統的完整檔案路徑;當導入hdfs檔案至hive表中時,不需要local關鍵字,且data_file為hdfs檔案系統的檔案路徑或hdfs檔案url。

為了便于程式化實作,這裡采用hadoop shell結合hve shell 的方式實作hive表的批量導入。

根據第1章的hive表更新規則,我們通過兩個步驟完成資料的批量導入。首先通過hive shell為hive表增加一個partition,并指定location;然後使用hadoop shell将檔案copy至該partition對應的location目錄,圖2-7展示了這個過程。

按照圖2-7所示的方式,我們嘗試将上述adobe資料檔案“e:\adobe\01-niwodai-prd_2015-07-26.tsv”導入至hive表中。

為了確定location指定的hdfs目錄存在,先執行hadoop shell指令,建立一個目錄,如代碼清單2-32所示。

圖2-7 hive shell + hadoop shell批量導入資料至hive表

代碼清單 2-32

hadoop fs -mkdir /data/adobe_log_app/20150726

下面就可以分兩步完成資料的導入。首先為表adobe_log_app增加一個新的partition,在hive shell環境中,執行如代碼清單2-33所示的指令。

代碼清單 2-33

hive&gt;alter table adobe_log_app add partition(load_day='20150726') location

'/data/adobe_log_app/20150726'

然後,使用hadoop shell将檔案複制到hdfs目錄“/data/adobe_log_app/20150726”中,如代碼清單2-34所示。

代碼清單 2-34

hadoop fs -copyfromlocal

/usr/queziyang/data/01-niwodai-prd_2015-07-26.tsv

/data/adobe_log_app/20150726

至此,即完成了資料檔案從本地檔案系統批量導入至hive表中,整個過程相對比較耗時的操作僅出現在資料檔案複制的過程中,且複制效率要比hive shell的load data的高。上述的各個指令,可以通過程式設計語言調用,進而實作自動化。

5. hbase:bulk load

hbase的資料加載方式也有很多種,但最高效的方式是使用hbase的bulk load指令。hbase的bulk load指令分為兩步:

1)使用一個mapreduce作業将資料轉換為hbase的内部資料格式。

2)将生成的storefiles直接加載到hbase叢集中。

代碼清單2-35展示了mapreduce生成hfile的過程,這個過程僅包含map,不需要reduce。

代碼清單 2-35

public class hfilegenerator {

    public static class hfilemapper extends mapper&lt;longwritable, text, immutablebyteswritable, keyvalue&gt; {

            @override

            protected void map(longwritable key, text value, context context)

            throws ioexception, interruptedexception {

                    // 擷取rowkey

                    immutablebyteswritable rowkey = new

                    immutablebyteswritable(items[0].getbytes());

                    //此處添加處理輸入資料檔案的代碼……

                    //按照org.apache.hadoop.hbase.keyvalue的格式輸出

                    keyvalue kv = new keyvalue(bytes.tobytes(items[0]),

                    bytes.tobytes("item"), bytes.tobytes(column),system.currenttimemillis(),

                    bytes.tobytes(prefvalue));

                    context.write(rowkey, kv);

}

    public static void main(string[] args) throws ioexception,

    interruptedexception, classnotfoundexception {

            configuration conf = new configuration();

            string[] dfsargs = new genericoptionsparser(conf, args).getremainingargs();

            job job = new job(conf, "hfile bulk load job");

            job.setjarbyclass(hfilegenerator.class);

            job.setmapperclass(hfilemapper.class);

            job.setreducerclass(keyvaluesortreducer.class);

            job.setmapoutputkeyclass(immutablebyteswritable.class);

            job.setmapoutputvalueclass(keyvalue.class);

            job.setpartitionerclass(simpletotalorderpartitioner.class);

            fileinputformat.addinputpath(job, new path(dfsargs[0]));

            fileoutputformat.setoutputpath(job, new path(dfsargs[1]));

            hfileoutputformat.configureincrementalload(job, hbasetablename);

            system.exit(job.waitforcompletion(true) ? 0 : 1);

    }

代碼主要包含hfilemapper和一個main()函數,在hfilemapper的map方法中處理輸入資料檔案,并按照org.apache.hadoop.hbase.keyvalue的格式輸出。

在main()函數中, fileinputformat.addinputpath用于指定輸入檔案的路徑(hdfs路徑),fileoutputformat.setoutputpath則用于指定mapreduce作業的輸出路徑,即生成的hfile最終的存放路徑。

最後通過hfileoutputformat.configureincrementalload(job, hbasetablename)導入hbase表的相關資訊,進而使得map的輸出最終與hbasetablename相比對。

上述過程在指定的輸出路徑中生成hfile檔案,可以通過org.apache.hadoop.hbase.mapreduce.loadincrementalhfiles的dobulkload方法将其挂載到對應的hbase表中,代碼清單2-36展示了這個過程。

代碼清單 2-36

public class hfileloader {

public static void main(string[] args) {

string[] dfsargs = null;

try {

dfsargs = new

genericoptionsparser(hbaseutils.getconfiguration(),

args).getremainingargs();

loadincrementalhfiles loader = new

loadincrementalhfiles(hbaseutils.getconfiguration());

loader.dobulkload(new path(dfsargs[0]),

hbaseutils.gettable(hbasetablename));

} catch (exception e) {

e.printstacktrace();

這個過程非常簡單,隻需要指明已經生成的hfile的路徑,其他的工作交給dobulkload方法即可。加載非常快,因為它實際上是執行類似hadoop的mv操作。

bulk load主要的耗時階段在于生成hfile,而在加載階段則非常迅速。這種方式比較适用于往hbase空表中加載資料的情況,是以當對hbase進行全量更新時,這是首選方式。

但這種方式在增量加載時,就沒有那麼高效了,因為新的hfile的加入,會觸發hbase的split和rebalance操作,這會使dobulkload的過程非常緩慢。是以在對hbase進行批量加載的時候,應該盡量使用全量更新的方式,如果增量更新不可避免,則使用原生的api接口逐條put入庫将是最後的選擇。