天天看点

canal,elasticsearch,mq,增删改查示例

作者:独一无二的abcdefg

1,

Canal Service 配置:

  • 创建 conf/example/instance.properties 文件,并添加以下内容:

# 实例名称,自定义

canal.instance.master.address = your_mysql_address:3306

canal.instance.dbUsername = your_username

canal.instance.dbPassword = your_password

canal.instance.defaultDatabaseName = your_database

# 监听的表配置

canal.instance.filter.regex = your_database.a_table,your_database.b_table

2,

Spring Boot 项目配置:

  • 添加所需的依赖:在 pom.xml 文件中添加以下依赖:

<dependency>

<groupId>com.alibaba.otter</groupId>

<artifactId>canal.client</artifactId>

<version>1.1.5</version>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-data-elasticsearch</artifactId>

</dependency>

<dependency>

<groupId>org.elasticsearch</groupId>

<artifactId>elasticsearch</artifactId>

<version>7.1.0</version>

</dependency>

3,配置 Elasticsearch:在 application.properties 文件中配置 Elasticsearch 的连接信息。

spring.elasticsearch.rest.uris=http://localhost:9200

创建 Canal 监听器:

import com.alibaba.otter.canal.protocol.CanalEntry;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.elasticsearch.core.ElasticsearchOperations;

import org.springframework.stereotype.Component;

import java.util.HashMap;

import java.util.Map;

@Component

public class CanalListener {

private static final String INDEX_NAME = "your_index";

private static final String TYPE_NAME_A = "a_type";

private static final String TYPE_NAME_B = "b_type";

@Autowired

private ElasticsearchOperations elasticsearchOperations;

@RabbitListener(queues = "canalQueue")

public void processMessage(CanalEntry.Entry entry) {

CanalEntry.RowChange rowChange;

try {

rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("Failed to parse row change", e);

}

for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {

Map<String, Object> data = new HashMap<>();

if (entry.getHeader().getTableName().equals("a_table")) {

// 处理 A 表数据

data.put("id", rowData.getAfterColumns(0).getValue());

data.put("name", rowData.getAfterColumns(1).getValue());

data.put("age", rowData.getAfterColumns(2).getValue());

data.put("createds", rowData.getAfterColumns(3).getValue());

// 插入到 Elasticsearch

elasticsearchOperations.index(new IndexQueryBuilder()

.withIndexName(INDEX_NAME)

.withType(TYPE_NAME_A)

.withSource(data)

.build());

} else if (entry.getHeader().getTableName().equals("b_table")) {

// 处理 B 表数据

data.put("id", rowData.getAfterColumns(0).getValue());

data.put("aid", rowData.getAfterColumns(1).getValue());

data.put("name", rowData.getAfterColumns(2).getValue());

data.put("port", rowData.getAfterColumns(3).getValue());

data.put("class", rowData.getAfterColumns(4).getValue());

data.put("createds", rowData.getAfterColumns(5).getValue());

// 插入到 Elasticsearch

elasticsearchOperations.index(new IndexQueryBuilder()

.withIndexName(INDEX_NAME)

.withType(TYPE_NAME_B)

.withSource(data)

.build());

}

}

}

}

配置 RabbitMQ:在 application.properties 文件中配置 RabbitMQ 的连接信息

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

创建增删改查的 API:在 Spring Boot 项目的 Controller 中,创建相应的 API 来处理增删改查操作。

import org.elasticsearch.action.search.SearchRequest;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.index.query.QueryBuilders;

import org.elasticsearch.search.builder.SearchSourceBuilder;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.elasticsearch.core.ElasticsearchOperations;

import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;

import org.springframework.web.bind.annotation.*;

import java.io.IOException;

import java.util.Map;

@RestController

@RequestMapping("/api")

public class DataController {

private static final String INDEX_NAME = "your_index";

private static final String TYPE_NAME_A = "a_type";

private static final String TYPE_NAME_B = "b_type";

@Autowired

private ElasticsearchOperations elasticsearchOperations;

@Autowired

private RestHighLevelClient elasticsearchClient;

@PostMapping("/a")

public void createAData(@RequestBody Map<String, Object> data) throws IOException {

// 插入 A 表数据到 Elasticsearch

elasticsearchOperations.index(new IndexQueryBuilder()

.withIndexName(INDEX_NAME)

.withType(TYPE_NAME_A)

.withSource(data)

.build());

}

@PostMapping("/b")

public void createBData(@RequestBody Map<String, Object> data) throws IOException {

// 插入 B 表数据到 Elasticsearch

elasticsearchOperations.index(new IndexQueryBuilder()

.withIndexName(INDEX_NAME)

.withType(TYPE_NAME_B)

.withSource(data)

.build());

}

@DeleteMapping("/a/{id}")

public void deleteAData(@PathVariable String id) throws IOException {

// 根据 ID 删除 A 表数据

elasticsearchOperations.delete(INDEX_NAME, TYPE_NAME_A, id);

}

@DeleteMapping("/b/{id}")

public void deleteBData(@PathVariable String id) throws IOException {

// 根据 ID 删除 B 表数据

elasticsearchOperations.delete(INDEX_NAME, TYPE_NAME_B, id);

}

@PutMapping("/a/{id}")

public void updateAData(@PathVariable String id, @RequestBody Map<String, Object> data) throws IOException {

// 根据 ID 更新 A 表数据

elasticsearchOperations.index(new IndexQueryBuilder()

.withIndexName(INDEX_NAME)

.withType(TYPE_NAME_A)

.withId(id)

.withSource(data)

.build());

}

@PutMapping("/b/{id}")

public void updateBData(@PathVariable String id, @RequestBody Map<String, Object> data) throws IOException {

// 根据 ID 更新 B 表数据

elasticsearchOperations.index(new IndexQueryBuilder()

.withIndexName(INDEX_NAME)

.withType(TYPE_NAME_B)

.withId(id)

.withSource(data)

.build());

}

@GetMapping("/search")

public void searchData(@RequestParam String keyword) throws IOException {

SearchRequest request = new SearchRequest(INDEX_NAME)

.source(new SearchSourceBuilder()

.query(QueryBuilders.matchQuery("field", keyword)));

elasticsearchClient.search(request, RequestOptions.DEFAULT);

}

}

继续阅读