ä»ä¹æ¯CDCï¼
CDCæ¯ï¼Change Data Capture åæ´æ°æ®è·åï¼çç®ç§°ãæ ¸å¿ææ³æ¯ï¼çæµå¹¶æè·æ°æ®åºçåå¨ï¼å æ¬æ°æ® æ æ°æ®è¡¨çæå ¥INSERTãæ´æ°UPDATEãå é¤DELETEçï¼ï¼å°è¿äºåæ´æåçç顺åºå®æ´è®°å½ä¸æ¥ï¼åå ¥å°æ¶æ¯ä¸é´ä»¶ä¸ä»¥ä¾å ¶ä»æå¡è¿è¡è®¢é åæ¶è´¹ã

1. ç¯å¢åå¤
- mysql
- elasticsearch
- flink on yarn
说æï¼å¦ææ²¡æå®è£ hadoopï¼é£ä¹å¯ä»¥ä¸ç¨yarnï¼ç´æ¥ç¨flink standaloneç¯å¢å§ã
2. ä¸è½½ä¸åä¾èµå
ä¸é¢ä¸¤ä¸ªå°åä¸è½½flinkçä¾èµå ï¼æ¾å¨libç®å½ä¸é¢ã
- ââflink-sql-connector-elasticsearch7_2.11-1.13.5.jarââ
- ââflink-sql-connector-mysql-cdc-1.4.0.jarââ
è¿éflink-sql-connector-mysql-cdcï¼å¨è¿éåªè½ä¸å°ææ°ç1.4ï¼
å¯ä»¥èªè¡https://github.com/ververica/flink-cdc-connectorsä¸è½½æ°çmvn clean install -DskipTests èªå·±ç¼è¯ã
è¿æ¯æç¼è¯çææ°ç2.2ï¼ä¼ ä¸å»åç°å¤ªæ°äºï¼å¦æéæ°æ¢ä¸ªçæ¬ï¼æå¾å»giteeä¸è½½æºç ï¼ä¸ç¶githubéåº¦å¤ªæ ¢äºï¼ç¶åç¨IDEAç¼è¯æå ï¼åå¾ä¸è½½ä¸å ä¾èµãææéï¼æç´æ¥å»ç½ä¸ä¸è½½äºä¸ª1.4çç´æ¥ç¨äºã
æä¸è½½çjarå ï¼æ¾å¨flinkçlibç®å½ä¸é¢ï¼
flink-sql-connector-elasticsearch7_2.11-1.13.5.jar
flink-sql-connector-mysql-cdc-1.4.0.jar
3. å¯å¨flink-sql client
- å å¨yarnä¸é¢å¯å¨ä¸ä¸ªapplicationï¼è¿å ¥flink13.5ç®å½ï¼æ§è¡ï¼
bin/yarn-session.sh -d -s 1 -jm 1024 -tm 2048
- è¿å ¥flink sqlå½ä»¤è¡
bin/sql-client.sh embedded -s flink-cdc
4. åæ¥æ°æ®
è¿éæä¸å¼ mysql表ï¼
CREATE TABLE `product_view` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) NOT NULL,
`product_id` int(11) NOT NULL,
`server_id` int(11) NOT NULL,
`duration` int(11) NOT NULL,
`times` varchar(11) NOT NULL,
`time` datetime NOT NULL,
PRIMARY KEY (`id`),
KEY `time` (`time`),
KEY `user_product` (`user_id`,`product_id`) USING BTREE,
KEY `times` (`times`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- æ ·æ¬æ°æ®
INSERT INTO `product_view` VALUES ('1', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('2', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('3', '1', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('4', '1', '1', '2', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('5', '8', '1', '1', '120', '120', '2020-05-14 13:14:00');
INSERT INTO `product_view` VALUES ('6', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
INSERT INTO `product_view` VALUES ('7', '8', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('8', '8', '1', '3', '120', '120', '2020-04-23 13:14:00');
INSERT INTO `product_view` VALUES ('9', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
- åå»ºæ°æ®è¡¨å ³èmysql
CREATE TABLE product_view_source (
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.2',
'port' = '3306',
'username' = 'bigdata',
'password' = 'bigdata',
'database-name' = 'test',
'table-name' = 'product_view'
);
è¿æ ·ï¼æä»¬å¨flink sql clientæä½è¿ä¸ªè¡¨ç¸å½äºæä½mysqléé¢ç对åºè¡¨ã
- åå»ºæ°æ®è¡¨å ³èelasticsearch
CREATE TABLE product_view_sink(
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.1.2:9200',
'index' = 'product_view_index',
'username' = 'elastic',
'password' = 'elastic'
);
è¿æ ·ï¼eséé¢çproduct_view_indexè¿ä¸ªç´¢å¼ä¼è¢«èªå¨å建ï¼å¦ææ³æå®ä¸äºå±æ§ï¼å¯ä»¥æåæå¨å建好索å¼ï¼æä»¬æä½è¡¨product_view_sinkï¼å¾éé¢æå ¥æ°æ®ï¼å¯ä»¥åç°esä¸å·²ç»ææ°æ®äºã
3ï¼ åæ¥æ°æ®
建ç«åæ¥ä»»å¡ï¼å¯ä»¥ä½¿ç¨sqlå¦ä¸ï¼
insert into product_view_sink select * from product_view_source;
è¿ä¸ªæ¶åæ¯å¯ä»¥éåºflink sql-clientçï¼ç¶åè¿å ¥flink web-uiï¼å¯ä»¥çå°mysqlè¡¨æ°æ®å·²ç»åæ¥å°elasticsearchä¸äºï¼å¯¹mysqlè¿è¡æå ¥å 餿´æ°ï¼elasticsearch齿¯åæ¥æ´æ°çã