maxcompute作為阿裡雲大資料平台的核心計算元件,擁有強大的計算能力,能夠排程大量的節點做并行計算,同時對分布式計算中的failover,重試等均有一套行之有效的處理管理機制。 而maxcompute sql能在簡明的語義上實作各種資料處理邏輯,在集團内外更是廣為應用,在其上實作與各種資料源的互通,對于打通整個阿裡雲的資料生态具有重要意義。基于這一點,最近maxcompute團隊依托maxcompute2.0系統架構,引入了非結構化資料處理架構:通過外部表,為各種資料在maxcompute上的計算處理提供了入口。這裡以maxcompute處理存儲在oss上的資料為例,介紹這些新功能。
現階段maxcompute sql面對的主要是以cfile列格式,存儲在内部maxcompute表格中的結構化資料。而對于maxcompute表外的各種使用者資料(包括文本以及各種非結構化的資料),需要首先通過各種工具導入maxcompute表,才能在其上面進行計算。這個資料導入的過程,具有較大的局限性。以oss為例子,想要在maxcompute裡處理oss上的資料,通常有兩種做法:
通過oss sdk或者其他工具從oss下載下傳資料,然後再通過maxcompute tunnel将資料導入表裡。
寫udf,在udf裡直接調用oss sdk通路oss資料。
但這兩種做法都不夠好。#1需要在maxcompute系統外部做一次中轉,如果oss資料量太大,還需要考慮如何并發來加速,無法充分利用maxcompute大規模計算的能力;#2通常需要申請udf網絡通路權限,還要開發者自己控制作業并發數和資料如何分片的問題。
本文介紹了一種外部表的功能 ,支援旨在提供處理除了maxcompute現有表格以外的其他資料的能力。在這個架構中,通過一條簡單的ddl語句,即可在maxcompute上建立一張外部表,建立maxcompute表與外部資料源的關聯,提供各種資料的接入和輸出能力。建立好的外部表可以像普通的maxcompute表一樣使用(大部分場景),充分利用maxcompute sql的強大計算功能。
這裡的“各種資料”涵蓋兩個次元:
多樣的資料存儲媒體
插件式的架構可以對接多種資料存儲媒體,比如oss、ots
多樣的資料格式:maxcompute表是結構化的資料,而外部表可以不限于結構化資料
完全無結構資料;比如圖像,音頻,視訊檔案,raw binaries等
半結構化資料;比如csv,tsv等隐含一定schema的文本檔案
非cfile的結構化資料; 比如orc/parquet檔案,甚至hbase/ots資料
下面通過一個簡單例子,來示範如何在maxcompute上輕松通路oss上的資料。
使用maxcompute内置的extractor,可以非常友善的讀取按照約定格式存儲的oss資料。我們隻需要建立一個外部表,就能以這張表為源表做查詢。假設有一份csv資料存在oss上,endpoint為<code>oss-cn-hangzhou-zmf.aliyuncs.com</code>,bucket為<code>oss-odps-test</code>,資料檔案放在<code>/demo/sampledata/csv/ambulancedata/vehicle.csv</code>。
首先需要在ram中授權maxcompute通路oss的權限。登入ram控制台,建立角色<code>aliyunodpsdefaultrole</code>,并将政策内容設定為:
然後編輯該角色的授權政策,将權限<code>aliyunodpsrolepolicy</code>授權給該角色。
執行一條ddl語句,建立外部表:
注釋:
這個功能是maxcompute2.0的一部分,目前處于試用狀态。這裡需要提前設定一些開關來臨時打開這個功能(這個開關設定前需要申請試用maxcompute2.0,文章末尾有介紹),本文後面所有的sql例子語句運作前都需要設定這些開關,為了便于閱讀,我隻在這裡明确寫出。
<code>com.aliyun.odps.csvstoragehandler</code>是内置的處理csv格式檔案的storagehandler,它定義了如何讀寫csv檔案。我們隻需要指明這個名字,相關邏輯已經由系統實作。
location必須指定一個oss目錄,預設系統會讀取這個目錄下所有的檔案。
外部表隻是在系統中記錄了與oss目錄的關聯,當drop這張表時,對應的location資料不會被删除。
前面我們已經通過ram将賬号中的oss資源通路權限授權給了maxcompute。在後續的通路oss過程中,maxcompute将通過sts拿到對oss資源的臨時權限。需要注意的是,maxcompute在擷取權限時,是以表的建立者的身份去sts申請的,是以,這裡建立表的賬号和oss必須是同一個雲賬号,而且必須是主賬号,不能是子賬号。
外部表建立成功後,我們可以像對普通表一樣使用這個外部表。
假設<code>/demo/sampledata/csv/ambulancedata/vehicle.csv</code>資料為:
執行:
這條語句會送出一個作業,調用内置csv extractor,從oss讀取資料進行處理。
結果:
當oss中資料格式比較複雜,内置的extractor無法滿足需求時,需要自定義extractor來讀取oss檔案中的資料。
例如有一個text資料檔案,并不是csv格式,記錄之間的列通過<code>|</code>分割。<code>/demo/sampledata/customtxt/ambulancedata/vehicle.csv</code>資料為:
這個時候可以寫一個通用的extractor,将分隔符作為參數傳進來,可以處理所有類似格式的text檔案。
<code>inputs</code>是一個<code>inputstreamset</code>,每次調用<code>next()</code>傳回一個<code>inputstream</code>,這個<code>inputstream</code>可以讀取一個oss檔案的所有内容。
<code>delimiter</code>通過ddl語句傳參
<code>extactor()</code>調用傳回一條<code>record</code>,代表外部表中的一條記錄
傳回<code>null</code>來表示這個表中已經沒有記錄可讀了
storagehandler作為external table自定義邏輯的統一入口。
将自定義代碼編譯打包,并上傳到maxcompute。
與使用内置extractor類似,我們同樣需要建立一個外部表,不同的是這次需要指定外部表通路資料的時候,使用自定義的storagehandler。
<code>stored by</code>指定自定義storagehandler的類名
<code>serdeproperites</code>可以指定參數,這些參數會通過<code>dataattributes</code>傳遞到extractor代碼中
同時需要指定類定義所在的jar包
這條語句會送出一個作業,調用自定義的extractor,從oss讀取資料進行處理。
在前面我們看到了通過内置與自定義的extractor可以輕松處理存儲在oss上的csv等文本資料。接下來我們以語音資料(wav格式檔案)為例,來看看怎樣通過自定義的extractor來通路處理oss上的非文本檔案。
這裡我們從最終執行的sql query開始,介紹以maxcompute sql為入口,處理存放在oss上的語音檔案的使用方法:
這裡我們依然建立的外部表,并且通過外部表的schema定義了我們希望通過外部表從語音檔案中抽取出來的資訊:
一個語音檔案中的語句信噪比(snr):sentence_snr
對應語音檔案的名字:id
建立了外部表後,通過标準的select語句進行查詢,則會觸發extractor運作計算。
從這我們可以更直接的感受到,在讀取處理oss資料時,除了之前介紹過的對文本檔案做簡單的反序列化處理,還可以通過在自定義的extractor中實作更複雜的資料處理抽取邏輯:在這個例子中,我們通過自定義的com.aliyun.odps.udf.example.speech. speechstoragehandler 中封裝的extractor, 實作了對語音檔案計算平均有效語句信噪比的功能,并将抽取出來的結構化資料直接進行sql運算(where sentence_snr > 10), 最終傳回所有信噪比大于10的語音檔案以及對應的信噪比值。
在oss位址<code>oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/speechsentencetest/</code>上,存儲了原始的多個wav格式的語音檔案,maxcompute 架構将讀取該位址上的所有檔案,并在必要的時候進行檔案級别的分片,自動将檔案配置設定給多個計算節點處理。每個計算節點上的extractor則負責處理通過inputstreamset配置設定給該節點的檔案集。具體的處理邏輯則與使用者單機程式相仿,使用者不用關心分布計算中的種種細節,按照類單機方式實作其使用者算法即可。
這裡簡單介紹一下定制化的speechsentencesnrextractor主體邏輯。首先我們在setup接口中讀取參數,進行初始化,并且導入語音處理模型(通過resource引入):
<code>extractor()</code>接口中,實作了對語音檔案的具體讀取和處理邏輯,對讀取的資料根據語音模型進行信噪比的計算,并且将結果填充成[snr, id]格式的record。
這個例子中對實作進行了簡化,同時也沒有包括涉及語音處理的算法邏輯,具體實作可參見:
<a href="https://github.com/aliyun/aliyun-odps-java-sdk/blob/master/odps-sdk-impl/odps-udf-example/src/main/java/com/aliyun/odps/udf/example/speech/speechsentencesnrextractor.java">https://github.com/aliyun/aliyun-odps-java-sdk/blob/master/odps-sdk-impl/odps-udf-example/src/main/java/com/aliyun/odps/udf/example/speech/speechsentencesnrextractor.java</a>
執行一開始的sql語句,可獲得計算結果:
可以看到,通過自定義extractor,我們在sql語句上即可分布式地處理多個oss上語音資料檔案。同樣的,用類似的方法,我們可以友善的利用maxcompute的大規模計算能力,完成對圖像,視訊等各種類型非結構化資料的處理。
在前面的例子中,一個外部表關聯的資料通過location上指定的oss“目錄”來實作,而在處理的時候,maxcompute對讀取“目錄”下面的所有資料,包括子目錄中的所有檔案。在資料量比較大,尤其是對于随着時間不斷積累的資料目錄,這樣子的全目錄掃描,可能帶來不必要的額外io以及資料處理時間。 解決這個問題通常有兩種做法:
比較直接的減少通路資料量的方法是使用者自己負責對資料存放位址做好規劃,同時考慮使用多個external table來描述不同部分的資料,讓每個externaltable的location指向資料的一個子集。
另一個方面,external table與内部表一樣,支援分區表的功能,使用者可以利用這個功能來對資料做系統化的管理。這個章節主要介紹一下external table的分區功能。
與maxcompute内部表不同,對于存放在外部存儲上(如oss)上面的資料,maxcompute不擁有資料的管理權,是以使用者如果希望系統的使用分區表功能的話,在oss上的資料檔案的存放路徑應該符合一定的格式,具體說來就是路徑為如下格式:
<code>partitionkey1=value1\partitionkey2=value2\...</code>
舉一個例子,假設使用者每天的log檔案存放在oss上面,然後希望能在通過maxcompute處理的時候,能夠按照粒度為“天”來通路一部分資料。簡單假設這些log檔案就是csv的格式(複雜自定義格式用法也類似),那麼可以使用如下的分區外部表來定義資料:
可以看到,這裡和前面的例子不一樣的主要就是在定義external table的時候,我們通過<code>partitioned by</code>的文法,來指定該外部表為分區表,這裡舉了一個三層分區的例子,分區的key分别是 <code>year</code>, <code>month</code> 和 <code>day</code>。為了讓這樣的分區能生效,在oss上面存儲資料的時候需要遵循上面提到的路徑格式。這裡舉一個有效的路徑存儲layout的例子:
需要再次強調的,因為使用者自己離線準備了資料,即通過osscmd或者其他oss工具自行上載到oss存儲服務上,是以資料路徑格式是由使用者決定的,如果想要external table的分區表功能正常工作的話,我們推薦使用者上載資料的時候遵循如上路徑格式。在這個前提下,通過<code>alter table add partition</code>ddl語句,就可以把這些分區資訊引入maxcompute。在這裡對應的ddl語句是:
比如如果隻想分析2016年6月1号當天,有多少不同的ip出現在log裡面,可以通過如下語句實作:
這種情況下, 對log_table_external這個外表對應的目錄,将隻通路<code>log_data/year=2016/month=06/day=01</code>子目錄下的檔案(<code>logfile</code>和<code>logfile.1</code>), 而不會對整個<code>log_data/</code>目錄作全量資料掃描,避免大量無用的io操作。
同樣如果隻希望對2016年下半年的資料做分析, 則可以通過
來隻通路oss上面存儲的下半年的log資料.
如果使用者已經有事先存在oss上面的曆史資料,但是又不是根據<code>partitionkey1=value1\partitionkey2=value2\...</code>的路徑格式來組織存放的,那也是可以通過maxcompute的分區方式來進行通路計算的。雖然通常情況下不這樣推薦,但是在非結構化資料處理這方面,maxcompute同樣提供了通過自定義路徑來引入partition的方法。
比如假設使用者的資料路徑上隻有簡單的分區值(而無分區key資訊),也就是資料的layout為:
那麼要綁定不同的子目錄到不同的分區上,可以通過類似如下自定義分區路徑的ddl語句實作:
在add partition的時候增加了location資訊,進而實作自定義分區資料路徑後,即使資料存放不符合推薦的<code>partitionkey1=value1\partitionkey2=value2\...</code>格式,也能正确的實作對子目錄資料的分區通路了。
最後在一些特殊情況下,使用者可能會有通路某個oss路徑上的任意檔案子集的需求,而這個檔案子集中的檔案在路徑格式上沒有明顯的規律性。在這方面maxcompute非結構化資料處理架構也提供了相應的支援,但是在這裡不展開做介紹了。如果對于這些高階的特殊用法,有詳細的具體需求和場景描述的話,可以聯系maxcompute技術團隊。
maxcompute上增添處理非結構化資料的能力,能夠有效的利用maxcompute架構上成熟的大規模計算資源,處理來自各種資料源上的資料,進而真正實作資料計算互通。随着maxcompute 2.0架構的逐漸上線,這些新功能将為集團内外使用者提供更多的計算價值。目前對于oss資料的讀取計算功能,在集團内一些急需大規模非結構化資料處理能力的團隊中已經使用。maxcompute團隊将進一步完善相關功能,并且提供對更多資料源的支援,例如tablestore(ots)等。後繼我們也會在ata上做更多的介紹。
