1,
Canal Service 配置:
- 建立 conf/example/instance.properties 檔案,并添加以下内容:
# 執行個體名稱,自定義
canal.instance.master.address = your_mysql_address:3306
canal.instance.dbUsername = your_username
canal.instance.dbPassword = your_password
canal.instance.defaultDatabaseName = your_database
# 監聽的表配置
canal.instance.filter.regex = your_database.a_table,your_database.b_table
2,
Spring Boot 項目配置:
- 添加所需的依賴:在 pom.xml 檔案中添加以下依賴:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.1.0</version>
</dependency>
3,配置 Elasticsearch:在 application.properties 檔案中配置 Elasticsearch 的連接配接資訊。
spring.elasticsearch.rest.uris=http://localhost:9200
建立 Canal 監聽器:
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class CanalListener {
private static final String INDEX_NAME = "your_index";
private static final String TYPE_NAME_A = "a_type";
private static final String TYPE_NAME_B = "b_type";
@Autowired
private ElasticsearchOperations elasticsearchOperations;
@RabbitListener(queues = "canalQueue")
public void processMessage(CanalEntry.Entry entry) {
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("Failed to parse row change", e);
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
Map<String, Object> data = new HashMap<>();
if (entry.getHeader().getTableName().equals("a_table")) {
// 處理 A 表資料
data.put("id", rowData.getAfterColumns(0).getValue());
data.put("name", rowData.getAfterColumns(1).getValue());
data.put("age", rowData.getAfterColumns(2).getValue());
data.put("createds", rowData.getAfterColumns(3).getValue());
// 插入到 Elasticsearch
elasticsearchOperations.index(new IndexQueryBuilder()
.withIndexName(INDEX_NAME)
.withType(TYPE_NAME_A)
.withSource(data)
.build());
} else if (entry.getHeader().getTableName().equals("b_table")) {
// 處理 B 表資料
data.put("id", rowData.getAfterColumns(0).getValue());
data.put("aid", rowData.getAfterColumns(1).getValue());
data.put("name", rowData.getAfterColumns(2).getValue());
data.put("port", rowData.getAfterColumns(3).getValue());
data.put("class", rowData.getAfterColumns(4).getValue());
data.put("createds", rowData.getAfterColumns(5).getValue());
// 插入到 Elasticsearch
elasticsearchOperations.index(new IndexQueryBuilder()
.withIndexName(INDEX_NAME)
.withType(TYPE_NAME_B)
.withSource(data)
.build());
}
}
}
}
配置 RabbitMQ:在 application.properties 檔案中配置 RabbitMQ 的連接配接資訊
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
建立增删改查的 API:在 Spring Boot 項目的 Controller 中,建立相應的 API 來處理增删改查操作。
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.Map;
@RestController
@RequestMapping("/api")
public class DataController {
private static final String INDEX_NAME = "your_index";
private static final String TYPE_NAME_A = "a_type";
private static final String TYPE_NAME_B = "b_type";
@Autowired
private ElasticsearchOperations elasticsearchOperations;
@Autowired
private RestHighLevelClient elasticsearchClient;
@PostMapping("/a")
public void createAData(@RequestBody Map<String, Object> data) throws IOException {
// 插入 A 表資料到 Elasticsearch
elasticsearchOperations.index(new IndexQueryBuilder()
.withIndexName(INDEX_NAME)
.withType(TYPE_NAME_A)
.withSource(data)
.build());
}
@PostMapping("/b")
public void createBData(@RequestBody Map<String, Object> data) throws IOException {
// 插入 B 表資料到 Elasticsearch
elasticsearchOperations.index(new IndexQueryBuilder()
.withIndexName(INDEX_NAME)
.withType(TYPE_NAME_B)
.withSource(data)
.build());
}
@DeleteMapping("/a/{id}")
public void deleteAData(@PathVariable String id) throws IOException {
// 根據 ID 删除 A 表資料
elasticsearchOperations.delete(INDEX_NAME, TYPE_NAME_A, id);
}
@DeleteMapping("/b/{id}")
public void deleteBData(@PathVariable String id) throws IOException {
// 根據 ID 删除 B 表資料
elasticsearchOperations.delete(INDEX_NAME, TYPE_NAME_B, id);
}
@PutMapping("/a/{id}")
public void updateAData(@PathVariable String id, @RequestBody Map<String, Object> data) throws IOException {
// 根據 ID 更新 A 表資料
elasticsearchOperations.index(new IndexQueryBuilder()
.withIndexName(INDEX_NAME)
.withType(TYPE_NAME_A)
.withId(id)
.withSource(data)
.build());
}
@PutMapping("/b/{id}")
public void updateBData(@PathVariable String id, @RequestBody Map<String, Object> data) throws IOException {
// 根據 ID 更新 B 表資料
elasticsearchOperations.index(new IndexQueryBuilder()
.withIndexName(INDEX_NAME)
.withType(TYPE_NAME_B)
.withId(id)
.withSource(data)
.build());
}
@GetMapping("/search")
public void searchData(@RequestParam String keyword) throws IOException {
SearchRequest request = new SearchRequest(INDEX_NAME)
.source(new SearchSourceBuilder()
.query(QueryBuilders.matchQuery("field", keyword)));
elasticsearchClient.search(request, RequestOptions.DEFAULT);
}
}