天天看點

建構大資料ETL通道--Json資料的流式轉換--Avro轉Parquet(二)

    如果我們生成的日志是Avro格式的檔案,那麼可以直接采用 kite-dataset将Avro資料流式轉換成Parquet資料進行存儲,并在Hive進行查詢。以下是具體步驟:

1 建立dataset ./kite-dataset create dataset:hdfs://test/user/litao/test/parquet/litao/ --schema litao.avsc --format parquet

2 檢視schema,驗證datesite是否建立成功 ./kite-dataset schema dataset:hdfs://test/user/litao/test/parquet/litao/

删除dataset: ./kite-dataset delete dataset:hdfs://test/user/litao/test/parquet/litao/

3 配置kite dataset sink # Name the components on this agent a1.sources = r1 a1.channels = c1 a1.sinks = k1

# Sources info a1.sources.r1.channels = c1 #a1.sources.r1.deserializer = AVRO #a1.sources.r1.deserializer.schemaType = LITERAL a1.sources.r1.type = com.bigo.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers = kafka1:9093,kafka2:9093,kafka3:9093,kafka4:9093,kafka5:9093,kafka6:9093 a1.sources.r1.kafka.topics = test_2018-03-14 a1.sources.r1.kafka.consumer.group.id = test_2018-03-14.conf_flume_group a1.sources.r1.kafka.consumer.timeout.ms = 100 #a1.sources.r1.batchSize = 2

# Inject the Schema into the header so the AvroEventSerializer can pick it up a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key=flume.avro.schema.url a1.sources.r1.interceptors.i1.value=hdfs://test/user/litao/litao.avsc

# Channels info a1.channels.c1.type = memory a1.channels.c1.capacity = 5000 a1.channels.c1.transactionCapacity =1000 a1.channels.c1.keep-alive = 50

# Sink info a1.sinks.k1.type = org.apache.flume.sink.kite.DatasetSink a1.sinks.k1.channel = c1 a1.sinks.k1.kite.dataset.uri = dataset:hdfs://test/flume/test/parquet

4 模拟生成資料存儲在hdfs json資料 ---avro-tool.jar---> avro資料 ---> flume spooldir soource ---> kafka ---dataset sink ---> hdfs ---建立hive的parquet表--> hive解析展示

5 建立hive的parquet表,解析資料,驗證資料 -- 資料的完整性。 -- 資料是否解析正确。 CREATE TABLE `tmp.test_hdfs_litao_parquet`( `name` string, `age` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://bigocluster/flume/test/parquet/' TBLPROPERTIES ( 'transient_lastDdlTime'='1521541748')

繼續閱讀