天天看點

DataLakeAnalysis: 使用DataX同步Kafka資料到OSS進行分析

平常業務開發中我們經常有流式資料儲存在Kafka裡面,這部分資料很多場景也是需要分析的,今天給大家介紹下如果使用DataX把資料從Kafka同步到OSS,儲存成對分析友好的Parquet格式,然後利用DLA進行分析的全流程。

為了後續叙述的友善,我們假設Kafka裡面儲存的是訂單的資料,它包含如下字段:

  • id, int類型
  • name, string類型
  • gmt_create, bigint類型, 時間戳字段
  • map_col: MAP 類型
  • array_col: ARRAY 類型
  • struct_col: STRUCT 類型

我們最終希望把Kafka上的這個資料最終儲存到OSS上,并且映射到DLA裡面的一個分區表,表結構如下:

CREATE EXTERNAL TABLE orders_p (
    id int,
    name string,
    gmt_create timestamp,
    map_col MAP<string, string>,
    array_col ARRAY<string>,
    struct_col STRUCT<id:bigint,name:string>
)
PARTITIONED BY (dt string)
STORED AS PARQUET 
LOCATION 'oss://test-bucket/datasets/oss_demo/orders_p/';           

注意我們最終表結構裡面有一個分區字段 dt, 因為分析場景下資料量都很大,進行分區才能提高分析的效率。而這個分區字段在原始資料裡面是沒有直接對應的。

因為從DataX過來的資料無法自動根據目錄分區,是以我們建議從Kafka過來的資料放到中間表 orders 裡面去, 在資料進入orders之後我們再跟一個任務做一個“資料拆分”的工作,把資料拆分到具體的分區裡面去。總結一下:要把Kafka的資料正确的落到DLA裡面的 orders_p 需要經過如下步驟:

  • kafka -> oss: DataX定時把資料同步到中間表: orders。
  • orders -> orders_p: DLA任務定時把資料從中間表orders同步到orders_p

這兩個任務串行執行,第二個任務依賴第一個任務,每5分鐘排程一次。這樣就可以Kafka裡面的資料以5分鐘延時的粒度不斷地寫入到OSS裡面去,然後使用DLA進行高效的分析。

kafka -> oss: DataX定時把資料同步到中間表: orders

因為Kafka上的資料量很大,在DLA中一般會進行分區處理以獲得更好的分析性能,但是DataX目前還無法支援直接把資料寫入到分區表,是以我們要搞一個中間表: orders 過度一下,它的表結構跟最終表orders_p幾乎一樣,隻是沒有分區

CREATE EXTERNAL TABLE orders (
    id int,
    name string,
    gmt_create timestamp,
    map_col MAP<string, string>,
    array_col ARRAY<string>,
    struct_col STRUCT<id:bigint,name:string>
)
STORED AS PARQUET 
LOCATION 'oss://test-bucket/datasets/oss_demo/orders/';           

那麼我們第一步要做的事情就是要通過DataX把資料寫到這個 orders 表對應的LOCATION: oss://test-bucket/datasets/oss_demo/orders/。

整個DataX的任務的JSON配置蠻複雜的,我們直接貼在這裡:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {
                "record": 5
            }
        },
        "content": [
            {
                "reader": {
                    "name": "kafkareader",
                    "parameter": {
                        "server": "127.0.0.2:9093",
                        "column": [
                            "id",
                            "name",
                            "gmt_create",
                            "map_col",
                            "array_col",
                            "struct_col"
                        ],
                        "kafkaConfig": {
                            "group.id": "demo_test",
                            "java.security.auth.login.config": "/the-path/kafka/kafka_client_jaas.conf",
                            "ssl.truststore.location": "/the-path/kafka.client.truststore.jks",
                            "ssl.truststore.password": "KafkaOnsClient",
                            "security.protocol": "SASL_SSL",
                            "sasl.mechanism": "PLAIN",
                            "ssl.endpoint.identification.algorithm": ""
                        },
                        "topic": "yucha",
                        "waitTime": "10",
                        "partition_": "0",
                        "keyType": "ByteArray",
                        "valueType": "ByteArray",
                        "seekToBeginning_": "true",
                        "seekToLast_": "true",
                        "beginDateTime": "20190501010000",
                        "endDateTime":   "20190501010500"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "oss://test-bucket",
                        "fileType": "parquet",
                        "path": "/datasets/oss_demo/kpt",
                        "fileName": "test",
                        "writeMode": "truncate",
                        "compress":"SNAPPY",
                        "encoding":"UTF-8",
                        "hadoopConfig": {
                            "fs.oss.accessKeyId": "the-access-id",
                            "fs.oss.accessKeySecret": "the-access-key",
                            "fs.oss.endpoint": "oss-cn-hangzhou.aliyuncs.com"
                        },
                        "parquetSchema": "message test {\n    required int64 id;\n    optional binary name (UTF8);\n    optional int64 gmt_create;\n    required group map_col (MAP) {\n        repeated group key_value {\n            required binary key (UTF8);\n            required binary value (UTF8);\n        }\n    }\n    required group array_col (LIST) {\n        repeated group list {\n            required binary element (UTF8);\n        }\n    }\n    required group struct_col {\n        required int64 id;\n        required binary name (UTF8);\n    }    \n}",
                        "dataxParquetMode": "fields"
                    }
                }
            }
        ]
    }
}           

這個配置分為兩段: kafkareader 和 hdfswriter, 分别負責讀寫資料。我們分别詳細介紹一下。

kafkareader 裡面大多數參數都比較好了解,比較重要的參數是 beginDateTime, endDateTime, 指定這個任務要消費的kafka資料的範圍,比如我們任務每5分鐘跑一次,那麼這裡指定的可能就是目前時間往前推5分鐘的時間範圍,比如我們示例代碼裡面的是 20190501010000 到 20190501010500, 時間精确到秒。關于KafkaReader更詳細的資訊可以參考KafkaReader文檔。

hdfswriter, 這裡我們使用hdfswriter來寫oss資料是因為OSS實作了Hadoop File System的接口,我們可以通過HDFS Writer來向OSS導資料,因為倒過來的資料後面要通過DLA來分析,推薦使用Parquet這種列存格式來儲存,目前HDFS Writer支援PARQUET的絕大部分類型,包括基本類型以及複雜類型如array, map, struct, 要以Parquet格式同步資料,我們首先要描述一下這個Parquet的格式, 我們示例資料對應的Parquet的Schema如下:

message test {
    required int64 id;
    optional binary name (UTF8);
    optional int64 gmt_create;
    required group map_col (MAP) {
        repeated group key_value {
            required binary key (UTF8);
            required binary value (UTF8);
        }
    }
    required group array_col (LIST) {
        repeated group list {
            required binary element (UTF8);
        }
    }
    required group struct_col {
        required int64 id;
        required binary name (UTF8);
    }    
}           

上面DataX任務描述檔案裡面的parquetSchema字段裡面的内容就是上面這段,隻不過縮成了一行以保證整個DataX描述檔案符合JSON格式。 關于Parquet Schema更多的資訊可以檢視Parquet Logical Type Definitions。

另外一個注意的配置點是 writeMode, 在我們的這個方案裡面,我們推薦使用 truncate, 因為這個任務是每5分鐘排程一次,下一次執行的時候需要把前一次執行的資料清空掉(truncate)。

拆分的SQL每個具體的業務會不一樣,我們這個示例裡面比較簡單,主要幹了兩件事:

把原始的 bigint 類型的 gmt_create轉成了timestamp類型。

從gmt_create裡面生成新的dt字段。

INSERT INTO orders_p
SELECT 
id, 
name, 
from_unixtime(gmt_create),  -- bigint -> timestamp
map_col, array_col, struct_col, 
cast(date(from_unixtime(gmt_create)) as string) -- 添加分區字段 
FROM orders           

這兩個任務的串行操作可以通過任務排程服務比如阿裡雲上DataWorks來進行串聯,在 kafka -> oss 的任務完成後,運作這個 資料拆分 的任務。

總結

這篇文章介紹了如何把Kafka裡面的資料實時地流入OSS,利用DLA進行高效的資料分析。借助于DataX對于Parquet複雜類型的支援,我們已經可以幫助使用者把各種複雜資料搬進OSS,希望對有類似場景的客戶有所幫助。

歡迎關注資料湖技術社群

資料湖開發者社群由 阿裡雲開發者社群 與 阿裡雲Data Lake Analytics團隊 共同發起,緻力于推廣資料湖相關技術,包括hudi、delta、spark、presto、oss、中繼資料、存儲加速、格式發現等,學習如何建構資料湖分析系統,打造适合業務的資料架構。

DataLakeAnalysis: 使用DataX同步Kafka資料到OSS進行分析