資料組織形式、存儲格式及Parquet格式介紹
在介紹parquet資料格式之前,我們先介紹資料的幾種組織形式以及存儲形式。

結構化、半結構化、非結構化資料
結構化資料
結構化資料源對資料定義了一種模式。通過這些關于底層資料的額外資訊,結構化資料源提供高效的存儲和性能。例如,列式資料存儲Parquet和ORC,使得從一個列子集中提取資料更加容易。當資料查詢隻需要擷取一少部分列的資料時,通過周遊每行資料的方式需要查詢出過多的資料。基于行的存儲格式,如Avro通過高效的序列化存儲資料提供了存儲優勢。但是,這種優勢是以複雜性為代價的。例如,由于結構不夠靈活,模式轉換将成為挑戰。
半結構化資料
半結構化資料源是每行記錄一個結構,但不需要對整體記錄有一個全局的模式定義。是以,每行記錄是通過其自身的模式資訊對其進行擴充。JSON和XML就是其中最流行的例子。半結構化資料的優勢在于通過每行記錄自身的描述資訊,增強了展示資料資訊的靈活性。由于有很多輕量級的解析器用于處理這些記錄,是以半結構化資料格式在很多應用中普遍被使用,并且在可讀性上存在優勢。但是,它的主要缺陷也在于會産生額外的解析開銷,不能專門應用于即席查詢。
非結構化資料
相比之下,非結構化資料源是任意格式的文本或不包含标記或中繼資料的二進制對象(例如以逗号分隔的CSV檔案)來組織資料。新聞文章,醫療記錄,圖像斑點,應用日志經常被當成是非結構化資料。這些資料源分類一般需要根據資料的上下文才能解析。是以,需要清楚知道某個檔案是圖檔還是新聞,才能正确進行解析。大多數資料源都是非結構化的,要從這些非結構化的資料中擷取資料價值,由于其格式本身的笨重,需要經過大量轉換和特征提取技術去解釋這些資料集,成本較高。
列式存儲、行式存儲
列式存儲
列式存儲(Column-oriented Storage)并不是一項新技術,最早可以追溯到 1983 年的論文 Cantor。然而,受限于早期的硬體條件和使用場景,主流的事務型資料庫(OLTP)大多采用行式存儲,直到近幾年分析型資料庫(OLAP)的興起,列式存儲這一概念又變得流行。
總的來說,列式存儲的優勢一方面展現在存儲上能節約空間、減少 IO,另一方面依靠列式資料結構做了計算上的優化。
行式存儲
行式存儲通過逐行組織資料,所有的資料在存儲媒體上通過首位相連、逐條存儲,行式存儲是一種傳統的組織資料的方法。
parquet格式介紹
Apache Parquet 是Hadoop生态系統中通用的列式存儲格式,獨立于資料處理架構、資料模型、程式設計語言;
Parquet的靈感來自于2010年Google發表的Dremel論文,文中介紹了一種支援嵌套結構的存儲格式,并且使用了列式存儲的方式提升查詢性能。
Parquet跟Json對比
對比特征 | parquet | json |
---|---|---|
存儲形式 | 列式 | 行式 |
資料組織形式 | 結構化 | 半結構化 |
檔案大小 | 小 | 大 |
讀取速度 | 快 | 慢 |
Json轉Parquet代碼
如果有大批量的Json格式資料需要轉為Parquet格式資料,參考以下代碼;
import os
import multiprocessing
from json2parquet import convert_json
def split_file(file_name, path):
result_path = "parquet/"
file_path = path + file_name
res_path = result_path + file_name + ".parquet"
convert_json(file_path, res_path)
def main():
path = "data/"
file_list = os.listdir(path)
pool = multiprocessing.Pool(processes=20)
for file_name in file_list:
pool.apply_async(split_file, (file_name, path,))
pool.close()
pool.join()
if __name__ == '__main__':
main()
Parquet格式運作任務
使用parquet資料格式,來運作作業,使用spark read api中的parquet接口;
其中包括可以讀指定的單個檔案,或者一組檔案;
spark.read.parquet("your parquet file or files")
讀取單個parquet 檔案方法
/**
* Loads a Parquet file, returning the result as a `DataFrame`. See the documentation
* on the other overloaded `parquet()` method for more details.
*
* @since 2.0.0
*/
def parquet(path: String): DataFrame = {
// This method ensures that calls that explicit need single argument works, see SPARK-16009
parquet(Seq(path): _*)
}
讀取一組paruqet 檔案方法
/**
* Loads a Parquet file, returning the result as a `DataFrame`.
*
* You can set the following Parquet-specific option(s) for reading Parquet files:
* <ul>
* <li>`mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets
* whether we should merge schemas collected from all Parquet part-files. This will override
* `spark.sql.parquet.mergeSchema`.</li>
* </ul>
* @since 1.4.0
*/
@scala.annotation.varargs
def parquet(paths: String*): DataFrame = {
format("parquet").load(paths: _*)
}
簡單作業使用parquet資料源示例
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
object OSSExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("OSSExample")
.getOrCreate()
val data=spark.read.parquet.load("oss://your-bucket-name/parquet file")
val data1 = data.groupBy("subject", "level").count()
val window = Window.partitionBy("subject").orderBy(org.apache.spark.sql.functions.col("count").desc)
val data2 = data1.withColumn("topn", row_number().over(window)).where("topn <= 1" )
data2.write.format("parquet").save("your store path")
}
}
作業性能對比
資料格式 | 計算用時 | 讀OSS流量 | 讀OSS次數 | 寫OSS流量 |
---|---|---|---|---|
15min | 1384G | 1387458 | 12M | |
9min | 104G | 286029 |
Parquet的優勢
1、可以跳過不符合條件的資料,隻讀取需要的資料,降低 IO 資料量,提升作業運作性能;
2、壓縮編碼可以降低磁盤存儲空間。由于同一列的資料類型是一樣的,可以使用更高效的壓縮編碼(例如 Run Length Encoding 和 Delta Encoding)進一步節約存儲空間;這樣能夠更少的使用OSS存儲空間,減少資料存儲成本;
3、隻讀取需要的列,支援向量運算,能夠擷取更好的掃描性能。