第五章 构建分布式搜索引擎
Elasticsearch 简介
- 一个分布式的、Restful风格的搜索引擎
- 支持对各种类型的数据的检索
- 搜索速度快,可以提供实时的搜索服务
- 便于水平扩展,每秒可以处理PB级海量数据
Elasticsearch 术语
- 索引、类型、文档、字段
-
对应数据库的一张表索引
-
对应数据库的一张表里面的一条数据文档
-
对应数据库的一张表里面的一个字段字段
- 类型逐渐地在es中进行废弃
- 集群、节点、分片、副本
中文分词插件
https://github.com/medcl/elasticsearch-analysis-ik
测试es
查询es中一共有多少个索引
(GET)localhost:9200/_cat/indices?v
新建一条索引
删除一条索引
往es里面插入一条数据
- 提交数据,用
请求PUT
- test表示索引,相当于数据库中的表名
- _doc表示类型,在6.0版本已经逐渐被废弃,新版本中完全废弃,这里使用的是6.0版本的es,所以使用
充当占位符_doc
- 1表示id,相当于数据库中的id字段
查询数据
删除一条数据
分词搜索
- 搜索指定索引下的全部数据
- 搜索test索引下title包含互联网的数据,注意默认格式都是
_search?q=
Spring整合Elasticsearch
导入相关依赖
<!-- elasticsearch依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
配置es
spring:
# 配置Elasticsearch
data:
elasticsearch:
cluster-name: nowcoder
# es有两个端口:9200用于http连接,9300用于spring连接
cluster-nodes: localhost:9300
配置实体类
// indexName表示存储在es中的索引名字
@Document(indexName = "discusspost", shards = 6, replicas = 3)
public class DiscussPost {
/**
* 帖子id
*/
@Id
private int id;
/**
* 创建帖子的用户id
*/
@Field(type = FieldType.Integer)
private int userId;
/**
* 帖子标题
* analyzer:存储数据的时候采用这个分词器,可以拆分出更多的词汇,增加搜索范围
* searchAnalyzer:搜索的时候采用这个分词器,拆分出较少的词汇,满足搜索需求即可
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String title;
/**
* 帖子内容
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String content;
/**
* 帖子类型,0表示普通,1表示置顶
*/
@Field(type = FieldType.Integer)
private int type;
/**
* 帖子状态,0表示正常,1表示精华,2表示拉黑
*/
@Field(type = FieldType.Integer)
private int status;
/**
* 帖子创建时间
*/
@Field(type = FieldType.Date)
private Date createTime;
/**
* 帖子评论数量
*/
@Field(type = FieldType.Integer)
private int commentCount;
/**
* 帖子评分
*/
@Field(type = FieldType.Double)
private double score;
}
数据层
/**
* ElasticsearchRepository<DiscussPost, Integer>:
* 第一个参数表示该接口需要处理的实体类是谁
* 第二个参数表示该实体类的主键的类型
*
* @author xiexu
* @create 2022-07-10 18:07
*/
@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost, Integer> {
}
开发社区搜索功能
搜索服务
- 将帖子保存至Elasticsearch服务器。
- 从Elasticsearch服务器删除帖子。
- 从Elasticsearch服务器搜索帖子。
发布事件
- 发布帖子时,将帖子异步的提交到Elasticsearch服务器。
- 增加评论时,将帖子异步的提交到Elasticsearch服务器。
- 在消费组件中增加一个方法,消费帖子发布事件。
显示结果
- 在控制器中处理搜索请求,在HTML上显示搜索结果。
业务层
@Service
public class ElasticsearchService {
@Autowired
private DiscussPostRepository discussRepository;
@Autowired
private ElasticsearchTemplate elasticTemplate;
/**
* 将帖子信息保存到es
*
* @param post
*/
public void saveDiscussPost(DiscussPost post) {
discussRepository.save(post);
}
/**
* 在es中删除对应id的帖子
*
* @param id
*/
public void deleteDiscussPost(int id) {
discussRepository.deleteById(id);
}
/**
* 根据关键字搜索帖子
*
* @param keyword 关键字
* @param current 当前显示第几页,从0开始
* @param limit 每页显示多少条数据
* @return
*/
public Page<DiscussPost> searchDiscussPost(String keyword, int current, int limit) {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
// 只在title和content字段进行搜索
.withQuery(QueryBuilders.multiMatchQuery(keyword, "title", "content"))
// 优先根据type字段进行倒序排序
.withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
// 然后根据score字段进行倒序排序
.withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
// 最后根据createTime字段进行倒序排序
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
// 从第几页开始,每页显示多少条数据
.withPageable(PageRequest.of(current, limit))
// 高亮显示字段
.withHighlightFields(
// 对title字段进行高亮显示,在前端页面显示<em>title信息</em>
new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
// 对content字段进行高亮显示,在前端页面显示<em>content信息</em>
new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")).build();
Page<DiscussPost> page = elasticTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {
@Override
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> aClass, Pageable pageable) {
// 先获取这次搜索命中的数据
SearchHits hits = response.getHits();
// 如果没查到数据就直接返回null
if (hits.getTotalHits() <= 0) {
return null;
}
List<DiscussPost> list = new ArrayList<>();
// 遍历搜索命中的数据
for (SearchHit hit : hits) {
DiscussPost post = new DiscussPost();
String id = hit.getSourceAsMap().get("id").toString();
post.setId(Integer.valueOf(id));
String userId = hit.getSourceAsMap().get("userId").toString();
post.setUserId(Integer.valueOf(userId));
// 这是原始的title,并不是高亮显示的title,高亮显示的下面单独处理
String title = hit.getSourceAsMap().get("title").toString();
post.setTitle(title);
String content = hit.getSourceAsMap().get("content").toString();
post.setContent(content);
String status = hit.getSourceAsMap().get("status").toString();
post.setStatus(Integer.valueOf(status));
String createTime = hit.getSourceAsMap().get("createTime").toString();
post.setCreateTime(new Date(Long.valueOf(createTime)));
String commentCount = hit.getSourceAsMap().get("commentCount").toString();
post.setCommentCount(Integer.valueOf(commentCount));
// 处理高亮显示的结果
// 获取与title有关的高亮显示的内容
HighlightField titleField = hit.getHighlightFields().get("title");
if (titleField != null) {
// 高亮有可能是多个数据,所以我们这里只设置第一个数据高亮即可
post.setTitle(titleField.getFragments()[0].toString());
}
// 获取与content有关的高亮显示的内容
HighlightField contentField = hit.getHighlightFields().get("content");
if (contentField != null) {
// 高亮有可能是多个数据,所以我们这里只设置第一个数据高亮即可
post.setContent(contentField.getFragments()[0].toString());
}
list.add(post);
}
return new AggregatedPageImpl(list, pageable, hits.getTotalHits(), response.getAggregations(), response.getScrollId(), hits.getMaxScore());
}
});
return page;
}
}
控制层
/**
* 发布帖子
*
* @param title
* @param content
* @return
*/
@RequestMapping(path = "/add", method = RequestMethod.POST)
@ResponseBody
public String addDiscussPost(String title, String content) {
User user = hostHolder.getUser();
if (user == null) {
return CommunityUtil.getJSONString(403, "您还没有登录哦!");
}
DiscussPost post = new DiscussPost();
post.setUserId(user.getId());
post.setTitle(title);
post.setContent(content);
post.setCreateTime(new Date());
discussPostService.addDiscussPost(post);
// 触发发帖事件,把新发布的帖子存到es中
Event event = new Event()
// 事件主题
.setTopic(TOPIC_PUBLISH)
// 事件的触发人
.setUserId(user.getId())
// 事件发生的实体类型
.setEntityType(ENTITY_TYPE_POST)
// 实体id
.setEntityId(post.getId());
// 发布事件
eventProducer.fireEvent(event);
// 报错的情况以后统一处理
return CommunityUtil.getJSONString(0, "发布成功!");
}
// 添加评论
@RequestMapping(path = "/add/{discussPostId}", method = RequestMethod.POST)
public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) {
comment.setUserId(hostHolder.getUser().getId());
comment.setStatus(0);
comment.setCreateTime(new Date());
commentService.addComment(comment);
// 添加评论以后触发评论事件
Event event = new Event().setTopic(TOPIC_COMMENT)
// 当前用户去评论
.setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId())
// 帖子id
.setData("postId", discussPostId);
if (comment.getEntityType() == ENTITY_TYPE_POST) { // 如果评论的是帖子
DiscussPost target = discussPostService.findDiscusspostById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
// 计算帖子分数
String redisKey = RedisKeyUtil.getPostScoreKey();
redisTemplate.opsForSet().add(redisKey, discussPostId);
} else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) { // 评论的是评论
Comment target = commentService.findCommentById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}
// 发布事件
eventProducer.fireEvent(event);
// 只有评论的是帖子,才触发发帖事件
if (comment.getEntityType() == ENTITY_TYPE_POST) {
// 触发发帖事件,把新发布的帖子存到es中
event = new Event()
// 事件主题
.setTopic(TOPIC_PUBLISH)
// 事件的触发人
.setUserId(comment.getUserId())
// 事件发生的实体类型
.setEntityType(ENTITY_TYPE_POST)
// 实体id
.setEntityId(discussPostId);
// 发布事件
eventProducer.fireEvent(event);
}
// 重定向到帖子详情页
return "redirect:/discuss/detail/" + discussPostId;
}
消费事件类
// 消费者
@Component
public class EventConsumer implements CommunityConstant {
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
@Autowired
private MessageService messageService;
@Autowired
private DiscussPostService discussPostService;
@Autowired
private ElasticsearchService elasticsearchService;
// 消费发帖事件
@KafkaListener(topics = {TOPIC_PUBLISH})
public void handlePublishMessage(ConsumerRecord record) {
if (record == null || record.value() == null) {
logger.error("消息的内容为空!");
return;
}
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误!");
return;
}
// 根据帖子id查询帖子信息
DiscussPost post = discussPostService.findDiscusspostById(event.getEntityId());
// 将帖子信息添加到es中
elasticsearchService.saveDiscussPost(post);
}
}
控制层
@Controller
public class SearchController implements CommunityConstant {
@Autowired
private ElasticsearchService elasticsearchService;
@Autowired
private UserService userService;
@Autowired
private LikeService likeService;
// search?keyword=xxx
@RequestMapping(path = "/search", method = RequestMethod.GET)
public String search(String keyword, Page page, Model model) {
// 搜索帖子
org.springframework.data.domain.Page<DiscussPost> searchResult = elasticsearchService.searchDiscussPost(keyword, page.getCurrent() - 1, page.getLimit());
// 聚合数据
List<Map<String, Object>> discussPosts = new ArrayList<>();
if (searchResult != null) {
for (DiscussPost post : searchResult) {
Map<String, Object> map = new HashMap<>();
// 把帖子存进去
map.put("post", post);
// 查询搜索到的数据对应的用户信息
User user = userService.findUserById(post.getUserId());
map.put("user", user);
// 查询搜索到的数据对应的帖子点赞数
long likeCount = likeService.findEntityLikeCount(ENTITY_TYPE_POST, post.getId());
map.put("likeCount", likeCount);
discussPosts.add(map);
}
}
model.addAttribute("discussPosts", discussPosts);
model.addAttribute("keyword", keyword);
// 分页信息
page.setPath("/search?keyword=" + keyword);
page.setRows(searchResult == null ? 0 : (int) searchResult.getTotalElements());
return "/site/search";
}
}