天天看點

阿裡雲Dataworks離線資料同步寫入Kafka

Step By Step

1、kafka執行個體的建立&獨享資料內建資源組的建立參考部落格(資源建立部分):

Dataworks實時資料同步(Kafka -> maxcompute)

2、資料內建配置Kafka資料源&測試連通性

阿裡雲Dataworks離線資料同步寫入Kafka

3、maxcompute建立測試資料表

CREATE TABLE IF NOT EXISTS odps_to_kafka1(key1 STRING,value1 STRING);

INSERT INTO odps_to_kafka1 VALUES ("key_key1","value_value1");
INSERT INTO odps_to_kafka1 VALUES ("key_key2","value_value2");
INSERT INTO odps_to_kafka1 VALUES ("key_key3","value_value3");
INSERT INTO odps_to_kafka1 VALUES ("key_key4","value_value4");
INSERT INTO odps_to_kafka1 VALUES ("key_key5","value_value5");

SELECT * FROM odps_to_kafka1;           
阿裡雲Dataworks離線資料同步寫入Kafka

4、配置離線同步腳本(注意目前Kafka僅支援腳本模式,不支援想到模式)

{
    "type": "job",
    "steps": [
        {
            "stepType": "odps",
            "parameter": {
                "partition": [],
                "datasource": "odps_first",
                "envType": 1,
                "column": [
                    "key1",
                    "value1"
                ],
                "table": "odps_to_kafka1"  // maxcompute中表的名稱
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "Kafka",
            "parameter": {
                "server": "192.168.0.67:9092,192.168.0.66:9092,192.168.0.65:9092", // 注意配置kafka内網位址
                "keyIndex": 0,   // key值對應maxcompute讀取column的第一列
                "valueIndex": 1,  // value值對應maxcompute讀取column的第二列
                "valueType": "BYTEARRAY",
                "topic": "from_odps1",  // kafka 中表的名稱
                "batchSize": 1024,
                "keyType": "BYTEARRAY"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "2"
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    }
}           
注意: 儲存腳本的時候如果提示不滿足json格式規範,将注釋部分删除即可。

5、執行同步任務

阿裡雲Dataworks離線資料同步寫入Kafka

6、Kafka控制台檢視資料同步情況

阿裡雲Dataworks離線資料同步寫入Kafka

更多參考

Kafka Writer 新增和使用獨享資料內建資源組