Step By Step
1、kafka執行個體的建立&獨享資料內建資源組的建立參考部落格(資源建立部分):
Dataworks實時資料同步(Kafka -> maxcompute)2、資料內建配置Kafka資料源&測試連通性

3、maxcompute建立測試資料表
CREATE TABLE IF NOT EXISTS odps_to_kafka1(key1 STRING,value1 STRING);
INSERT INTO odps_to_kafka1 VALUES ("key_key1","value_value1");
INSERT INTO odps_to_kafka1 VALUES ("key_key2","value_value2");
INSERT INTO odps_to_kafka1 VALUES ("key_key3","value_value3");
INSERT INTO odps_to_kafka1 VALUES ("key_key4","value_value4");
INSERT INTO odps_to_kafka1 VALUES ("key_key5","value_value5");
SELECT * FROM odps_to_kafka1;
4、配置離線同步腳本(注意目前Kafka僅支援腳本模式,不支援想到模式)
{
"type": "job",
"steps": [
{
"stepType": "odps",
"parameter": {
"partition": [],
"datasource": "odps_first",
"envType": 1,
"column": [
"key1",
"value1"
],
"table": "odps_to_kafka1" // maxcompute中表的名稱
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "Kafka",
"parameter": {
"server": "192.168.0.67:9092,192.168.0.66:9092,192.168.0.65:9092", // 注意配置kafka内網位址
"keyIndex": 0, // key值對應maxcompute讀取column的第一列
"valueIndex": 1, // value值對應maxcompute讀取column的第二列
"valueType": "BYTEARRAY",
"topic": "from_odps1", // kafka 中表的名稱
"batchSize": 1024,
"keyType": "BYTEARRAY"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "2"
},
"speed": {
"throttle": false,
"concurrent": 2
}
}
}
注意: 儲存腳本的時候如果提示不滿足json格式規範,将注釋部分删除即可。
5、執行同步任務
6、Kafka控制台檢視資料同步情況