天天看點

大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮

壓縮流和解壓縮流

Java最初版本的輸入/輸出系統是基于流的,流抽象了任何有能力産出資料的資料源,或者是有行能力接收資料的接收端。一般來說,通過設計模式裝飾,可以為流添加一些額外的功能,如前面提及的序列化流ObjectInutStream和ObjectOutputStream。

壓縮流(CompressionOutputStream)和解壓縮流(CompressioninputStream)是Hadoop壓縮架構中的另一對重要概念,它提供了基于流的壓縮解壓縮能力。

這裡隻分析和壓縮相關的代碼,即CompressionOutputStream及其子類。

OutputStream是一個抽象類,提供了進行流輸出的基本方法,它包含三個write成員函數,分別用幹往流中寫入一個位元組、一個位元組數組或一個位元組數組的一部分(需要提供起始偏移量和長度)。

注意

流實作中一般需要支援的close()和flush()方法,是java.io包中的相應接口的成員函數。

CompressionOutputStream繼承自OutputStream,也個抽象類。如前面提到的ObjectOutputStream、CompressionOutputStream為其他流添加了附加額外的壓縮功能,其它流儲存在類的成員變量out中,并在構造的時候被指派。

CompressionOutputStream實作了OutputStream的close()方法和flush()方法,但用于輸出資料的write方法、用于結束壓縮過程并将輸入寫到底層流的finish()方法和重置壓縮狀态的resetState()方法還是抽象方法,需要CompressionOutputStream的子類實作,相關代碼如下:

大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮
大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮
大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮

CompressionOutputStream規定了壓縮流對外接口,如果已經有了一個壓縮器的實作,能否提供一個通用的、使用壓縮器的壓縮流實作呢?答案是肯定的,CompressorStream使用的壓縮器實作了一個通用的壓縮流,代碼如下:

大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮
大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮
大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮
大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮
大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮
大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮

CompressorStream提供了幾個不同的構進函數,用子初始化相關的成員變量。其中,Compressorstream需要的底層輸出流out和壓縮時使用的壓縮器,都作為參數傳入構造函數,另一個參數是CompressorStrem工作時使用的緩沖區buffer的大小,構造時會利用這個簽數配置設定該緩沖區。

CompressorStream.write()方法用将待壓縮的資料寫入流中,待壓縮的資料在進行一番檢查後。最終調用壓縮器的setlnput()方法進人壓縮器。setlnput方法調用結束後,通過Comprcssor.needsInput()判斷足否需要調用compress()方法,擷取壓縮後的輸出數椐。

當finish()被調用(往往是CompressorStream被關閉),這時CompressorStream流調用壓縮器的finish()方法通知輸人已經結束,然後進人另一個循環,該循環不斷讀取壓縮器中未讀取的數,然後輸出到底層流out中。

支援Snappy壓縮

Snappye的前身是Zippy,雖然隻是一個資料壓縮庫,卻被Google用于許多内部項目,如BigTable、MapReduce等,Google表示泫箅法庫針對性能做了調整,針對64位x86處理器進行優化,并在英特爾酷睿i7處理器單一核心上實作了至少毎秒250MB的壓縮性能和每秒500MB的解壓縮性能。Snappy在Google的生産環境中經過了PB級資料壓縮的考驗,并使用NewBSD協定開源。

本文不介紹Snappy壓縮算法是如何實作的,而是在前面已有的基礎上,介沼如何在Hadoop提供的壓縮架構下內建新的壓縮算法。本文隻介紹和壓縮相關的實作,将涉及org.apache.hadoop.io.compress.snappy包下的代碼和org.apache.hadoop.io.compress.SnappyCodec類。

org.apache.hadoop.io.compress.snappy包括支援Snappy的壓縮器SnappyCompressor和解Qi器SnappyDecompressor.LoadSnappy類用于判斷Snappy的本地庫是否可用,如果可用,則通過System.loadLibrary()加栽本地庫。

SnappyCompressor實作了Compressor接口,是重點。前面提過,壓縮器的一般用法是循環調用setlnput()、finish()和compress()三個方法對資料進行壓縮。在分析這些方法前,了解SnappyCompressor的主要成員變量,如下所示:

大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮

SnappyCompressor的主要屬性有compressedDirectBuf和uncompressedDirectBuf,分別用于儲存壓縮前後的資料,類型都是Buffer。緩沖區Buffer代表一個有限容董的容器,是JavaNIO(新輸入/輸出系統)中的重要槪念,和基于流的JavaIO不同,緩沖區可以用于輸入,也可以用于輸出。為了支援這些特性,緩沖區會維持一些标記,記錄目前緩沖區中的資料存放情況。

成員變量userBuf、userBufDff和userBufLen用于儲存通過setlnput()設定的,但超過壓縮器工作空間uncompressedDirectBuf剩餘可用空間的資料。後面在分析setlnput()方法的時候,可以看到這三個變董是如何使用的。

在分析壓縮器/解壓器和壓縮流/解壓縮流時,一直強調Compressor的setlnput()、needslnput()、finish〇、finished()和compress()5個方法間的配合,那麼為什麼需要這樣的配合呢?讓我們先從setlnput()開始了解這些方法的實作。

1.setlnput()

seUnput()方法為壓縮器提供資料,在做了一番輸人資料的合法性檢査後,先将finished标志位置為false,并嘗試将輸人資料複制到内部緩沖區中,如果内部緩存器剩餘空間不夠大,那麼,壓縮器将'借用'輸人資料對應的緩沖區,即利用userBuLuserBufDff和userBufLen記錄輸入的資料。否則,setlnput()複制資料到uncompressedDirectBuf中。

需要注意的是,當'借用'發生時,使用的是引用,即資料并沒有發生實際的複制,使用者不能随便修改傳入的資料。同時,緩沖區隻能借用一次,使用者如果再次調用setlnputO,将會替代原來儲存的相關資訊,造成資料錯誤。代碼如下:

大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮

setlnput()借用外部緩沖區後就不能再就收資料,這是,使用者調用needsInput()将傳回false,就可以獲知這個資訊。

2.needsInput()

needsInput()方法傳回false有三種情況:輸出緩沖區(即儲存壓縮結果的緩沖區)有未讀取的資料、輸入緩沖區沒有空間,記憶壓縮器已經借用外部緩沖區。這時,使用者需要通過compress()方法取走已經壓縮的資料,知道needsInput()傳回true,才可以再次通過setInput()方法添加壓縮資料,代碼如下

大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮

3.compress

compress()方法用于擷取壓縮後的資料,它需要處理needslnput()返冋false的幾種情況。

如果壓縮資料緩沖區有資料,即comprcssedDirectBuf中還冇資料,則讀取這部分資料,并傳回。

如果該緩沖區為空,則需要壓縮資料。首先清理compressedDirectBuf,這個淸理是一個典型的Buffer操作。待壓縮的資料有兩個來源,輸人緩沖區uncompressedDirectBuf或者'借用'的資料緩沖區。

如果輸人緩沖區沒有資料,那待壓縮資料可能(可以在沒有任何帶壓縮資料的情況下調用compress()方法)在'借用'的資料緩沖區裡,這時使用seUnputFromSavedData()方法複制'借用'資料緩沖區中的資料到uncompressedDirectBuf中,setInputFromSavedData()函數調用結束後,待壓縮資料緩沖區裡還沒有資料,則設finished标記位,并傳回0,表明壓縮資料已經讀完。

uncompressedDirectBuf中的資料,利用native方法compressBytesDirect()進行壓縮,壓縮後的資料儲存在compressedDirectBuf中,由于待壓縮資料緩沖區和壓縮資料緩沖區的大小是一樣的,是以uncompressedDirectBuf中的資料是一次被處理完的。

compressBytesDirect()調用結束後,需要再次設定緩沖區的标記,并根據情況複制資料到Compress()的參數b提供的緩沖區中。相關代碼如下:

大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮
大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮
大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮

最後是finished()

finished()傳回true,表明壓縮過程已經結束,壓縮過程包含多個條件,包括finish标志位和finished标志位必須都為true,已經compressedDirectBuf沒有未取走的資料。finish為true表明使用者确認已經完成資料的輸入過程,finished為true表示壓縮器中沒有待壓縮的資料,這三個條件缺一不可,代碼如下

大資料與Hadoop系列之壓縮(二)壓縮流和解壓縮流支援Snappy壓縮

其他方法就不一一介紹了。

希望大家通過在本文能自己掌握如何自己添加內建一個壓縮方式。

繼續閱讀