文章目錄
- FlinkX的安裝與簡單使用
-
- 1、FlinkX的安裝
-
- 1)、上傳并解壓
- 2)、配置環境變量
- 3)、給bin/flinkx這個檔案加上執行權限
- 4)、修改配置檔案,設定運作端口
- 2、FlinkX的簡單使用
-
- 1)、MySQLToHDFS
- 2)、MySQLToHive
- 3)、MySQLToHBase
FlinkX的安裝與簡單使用
1、FlinkX的安裝
1)、上傳并解壓
unzip flinkx-1.10.zip -d /usr/local/soft/
2)、配置環境變量
3)、給bin/flinkx這個檔案加上執行權限
chmod a+x flinkx
4)、修改配置檔案,設定運作端口
vim flinkconf/flink-conf.yaml
## web服務端口,不指定的話會随機生成一個
rest.bind-port: 8888
2、FlinkX的簡單使用
1)、MySQLToHDFS
- 配置檔案
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
],
"table": [
"student"
]
}
],
"column": [
"*"
],
"customSql": "",
"where": "clazz = '理科二班'",
"splitPk": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "hdfs://master:9000/data/flinkx/student",
"defaultFS": "hdfs://master:9000",
"column": [
{
"name": "col1",
"index": 0,
"type": "string"
},
{
"name": "col2",
"index": 1,
"type": "string"
},
{
"name": "col3",
"index": 2,
"type": "string"
},
{
"name": "col4",
"index": 3,
"type": "string"
},
{
"name": "col5",
"index": 4,
"type": "string"
},
{
"name": "col6",
"index": 5,
"type": "string"
}
],
"fieldDelimiter": ",",
"fileType": "text",
"writeMode": "overwrite"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 1
}
}
}
}
- 啟動任務
flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHDFS.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
- 監聽日志
flinkx 任務啟動後,會在執行指令的目錄下生成一個nohup.out檔案
tail -f nohup.out
- 通過web界面檢視任務運作情況
http://master:8888
2)、MySQLToHive
- 配置檔案
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
],
"table": [
"student"
]
}
],
"column": [
"*"
],
"customSql": "",
"where": "clazz < '文科二班'",
"splitPk": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hivewriter",
"parameter": {
"jdbcUrl": "jdbc:hive2://master:10000/testflinkx",
"username": "",
"password": "",
"fileType": "text",
"fieldDelimiter": ",",
"writeMode": "overwrite",
"compress": "",
"charsetName": "UTF-8",
"maxFileSize": 1073741824,
"tablesColumn": "{\"student\":[{\"key\":\"id\",\"type\":\"string\"},{\"key\":\"name\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"string\"}]}",
"defaultFS": "hdfs://master:9000"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 1
}
}
}
}
- 在hive中建立testflinkx資料庫,并建立student分區表
create database testflinkx;
use testflinkx;
CREATE TABLE `student`(
`id` string,
`name` string,
`age` string)
PARTITIONED BY (
`pt` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
- 啟動hiveserver2
# 第一種方式:
hiveserver2
# 第二種方式:
hive --service hiveserver2
- 啟動任務
flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
- 檢視日志及運作情況同上
3)、MySQLToHBase
- 配置檔案
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student"
],
"table": [
"score"
]
}
],
"column": [
"*"
],
"customSql": "",
"splitPk": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hbasewriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.property.clientPort": "2181",
"hbase.rootdir": "hdfs://master:9000/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "master,node1,node2",
"zookeeper.znode.parent": "/hbase"
},
"table": "testFlinkx",
"rowkeyColumn": "$(info:student_id)_$(info:course_id)",
"column": [
{
"name": "info:student_id",
"type": "string"
},
{
"name": "info:course_id",
"type": "string"
},
{
"name": "info:score",
"type": "string"
}
]
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 1
}
}
}
}
- 啟動hbase 并建立testflinkx表
create 'testFlinkx','info'
- 啟動任務
flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHBase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
- 檢視日志及運作情況同上