天天看点

第五章 构建分布式搜索引擎

第五章 构建分布式搜索引擎

Elasticsearch 简介

  • 一个分布式的、Restful风格的搜索引擎
  • 支持对各种类型的数据的检索
  • 搜索速度快,可以提供实时的搜索服务
  • 便于水平扩展,每秒可以处理PB级海量数据
Elasticsearch 术语
  • 索引、类型、文档、字段
  • ​索引​

    ​对应数据库的一张表
  • ​文档​

    ​对应数据库的一张表里面的一条数据
  • ​字段​

    ​对应数据库的一张表里面的一个字段
  • 类型逐渐地在es中进行废弃
  • 集群、节点、分片、副本

中文分词插件

​​https://github.com/medcl/elasticsearch-analysis-ik​​

测试es

查询es中一共有多少个索引
(GET)localhost:9200/_cat/indices?v      
第五章 构建分布式搜索引擎
新建一条索引
第五章 构建分布式搜索引擎
删除一条索引
第五章 构建分布式搜索引擎
往es里面插入一条数据
  • 提交数据,用​

    ​PUT​

    ​请求
  • test表示索引,相当于数据库中的表名
  • _doc表示类型,在6.0版本已经逐渐被废弃,新版本中完全废弃,这里使用的是6.0版本的es,所以使用​

    ​_doc​

    ​充当占位符
  • 1表示id,相当于数据库中的id字段
第五章 构建分布式搜索引擎
查询数据
第五章 构建分布式搜索引擎
删除一条数据
第五章 构建分布式搜索引擎
分词搜索
  • 搜索指定索引下的全部数据
第五章 构建分布式搜索引擎
  • 搜索test索引下title包含互联网的数据,注意默认格式都是​

    ​_search?q=​

第五章 构建分布式搜索引擎

Spring整合Elasticsearch

导入相关依赖

<!-- elasticsearch依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>      

配置es

spring:
# 配置Elasticsearch
  data:
    elasticsearch:
      cluster-name: nowcoder
      # es有两个端口:9200用于http连接,9300用于spring连接
      cluster-nodes: localhost:9300      

配置实体类

// indexName表示存储在es中的索引名字
@Document(indexName = "discusspost", shards = 6, replicas = 3)
public class DiscussPost {

    /**
     * 帖子id
     */
    @Id
    private int id;

    /**
     * 创建帖子的用户id
     */
    @Field(type = FieldType.Integer)
    private int userId;

    /**
     * 帖子标题
     * analyzer:存储数据的时候采用这个分词器,可以拆分出更多的词汇,增加搜索范围
     * searchAnalyzer:搜索的时候采用这个分词器,拆分出较少的词汇,满足搜索需求即可
     */
    @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
    private String title;

    /**
     * 帖子内容
     */
    @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
    private String content;

    /**
     * 帖子类型,0表示普通,1表示置顶
     */
    @Field(type = FieldType.Integer)
    private int type;

    /**
     * 帖子状态,0表示正常,1表示精华,2表示拉黑
     */
    @Field(type = FieldType.Integer)
    private int status;

    /**
     * 帖子创建时间
     */
    @Field(type = FieldType.Date)
    private Date createTime;

    /**
     * 帖子评论数量
     */
    @Field(type = FieldType.Integer)
    private int commentCount;

    /**
     * 帖子评分
     */
    @Field(type = FieldType.Double)
    private double score;

}      

数据层

/**
 * ElasticsearchRepository<DiscussPost, Integer>:
 * 第一个参数表示该接口需要处理的实体类是谁
 * 第二个参数表示该实体类的主键的类型
 *
 * @author xiexu
 * @create 2022-07-10 18:07
 */
@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost, Integer> {

}      

开发社区搜索功能

搜索服务
  • 将帖子保存至Elasticsearch服务器。
  • 从Elasticsearch服务器删除帖子。
  • 从Elasticsearch服务器搜索帖子。
发布事件
  • 发布帖子时,将帖子异步的提交到Elasticsearch服务器。
  • 增加评论时,将帖子异步的提交到Elasticsearch服务器。
  • 在消费组件中增加一个方法,消费帖子发布事件。
显示结果
  • 在控制器中处理搜索请求,在HTML上显示搜索结果。
第五章 构建分布式搜索引擎

业务层

@Service
public class ElasticsearchService {

    @Autowired
    private DiscussPostRepository discussRepository;

    @Autowired
    private ElasticsearchTemplate elasticTemplate;

    /**
     * 将帖子信息保存到es
     *
     * @param post
     */
    public void saveDiscussPost(DiscussPost post) {
        discussRepository.save(post);
    }

    /**
     * 在es中删除对应id的帖子
     *
     * @param id
     */
    public void deleteDiscussPost(int id) {
        discussRepository.deleteById(id);
    }

    /**
     * 根据关键字搜索帖子
     *
     * @param keyword 关键字
     * @param current 当前显示第几页,从0开始
     * @param limit   每页显示多少条数据
     * @return
     */
    public Page<DiscussPost> searchDiscussPost(String keyword, int current, int limit) {
        SearchQuery searchQuery = new NativeSearchQueryBuilder()
                // 只在title和content字段进行搜索
                .withQuery(QueryBuilders.multiMatchQuery(keyword, "title", "content"))
                // 优先根据type字段进行倒序排序
                .withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
                // 然后根据score字段进行倒序排序
                .withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
                // 最后根据createTime字段进行倒序排序
                .withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
                // 从第几页开始,每页显示多少条数据
                .withPageable(PageRequest.of(current, limit))
                // 高亮显示字段
                .withHighlightFields(
                        // 对title字段进行高亮显示,在前端页面显示<em>title信息</em>
                        new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
                        // 对content字段进行高亮显示,在前端页面显示<em>content信息</em>
                        new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")).build();

        Page<DiscussPost> page = elasticTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {
            @Override
            public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> aClass, Pageable pageable) {
                // 先获取这次搜索命中的数据
                SearchHits hits = response.getHits();
                // 如果没查到数据就直接返回null
                if (hits.getTotalHits() <= 0) {
                    return null;
                }

                List<DiscussPost> list = new ArrayList<>();
                // 遍历搜索命中的数据
                for (SearchHit hit : hits) {
                    DiscussPost post = new DiscussPost();

                    String id = hit.getSourceAsMap().get("id").toString();
                    post.setId(Integer.valueOf(id));

                    String userId = hit.getSourceAsMap().get("userId").toString();
                    post.setUserId(Integer.valueOf(userId));

                    // 这是原始的title,并不是高亮显示的title,高亮显示的下面单独处理
                    String title = hit.getSourceAsMap().get("title").toString();
                    post.setTitle(title);

                    String content = hit.getSourceAsMap().get("content").toString();
                    post.setContent(content);

                    String status = hit.getSourceAsMap().get("status").toString();
                    post.setStatus(Integer.valueOf(status));

                    String createTime = hit.getSourceAsMap().get("createTime").toString();
                    post.setCreateTime(new Date(Long.valueOf(createTime)));

                    String commentCount = hit.getSourceAsMap().get("commentCount").toString();
                    post.setCommentCount(Integer.valueOf(commentCount));

                    // 处理高亮显示的结果
                    // 获取与title有关的高亮显示的内容
                    HighlightField titleField = hit.getHighlightFields().get("title");
                    if (titleField != null) {
                        // 高亮有可能是多个数据,所以我们这里只设置第一个数据高亮即可
                        post.setTitle(titleField.getFragments()[0].toString());
                    }

                    // 获取与content有关的高亮显示的内容
                    HighlightField contentField = hit.getHighlightFields().get("content");
                    if (contentField != null) {
                        // 高亮有可能是多个数据,所以我们这里只设置第一个数据高亮即可
                        post.setContent(contentField.getFragments()[0].toString());
                    }

                    list.add(post);
                }

                return new AggregatedPageImpl(list, pageable, hits.getTotalHits(), response.getAggregations(), response.getScrollId(), hits.getMaxScore());
            }
        });

        return page;
    }

}      

控制层

/**
     * 发布帖子
     *
     * @param title
     * @param content
     * @return
     */
    @RequestMapping(path = "/add", method = RequestMethod.POST)
    @ResponseBody
    public String addDiscussPost(String title, String content) {
        User user = hostHolder.getUser();
        if (user == null) {
            return CommunityUtil.getJSONString(403, "您还没有登录哦!");
        }
        DiscussPost post = new DiscussPost();
        post.setUserId(user.getId());
        post.setTitle(title);
        post.setContent(content);
        post.setCreateTime(new Date());
        discussPostService.addDiscussPost(post);

        // 触发发帖事件,把新发布的帖子存到es中
        Event event = new Event()
                // 事件主题
                .setTopic(TOPIC_PUBLISH)
                // 事件的触发人
                .setUserId(user.getId())
                // 事件发生的实体类型
                .setEntityType(ENTITY_TYPE_POST)
                // 实体id
                .setEntityId(post.getId());
        // 发布事件
        eventProducer.fireEvent(event);

        // 报错的情况以后统一处理
        return CommunityUtil.getJSONString(0, "发布成功!");
    }      
// 添加评论
    @RequestMapping(path = "/add/{discussPostId}", method = RequestMethod.POST)
    public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) {
        comment.setUserId(hostHolder.getUser().getId());
        comment.setStatus(0);
        comment.setCreateTime(new Date());
        commentService.addComment(comment);
        // 添加评论以后触发评论事件
        Event event = new Event().setTopic(TOPIC_COMMENT)
                // 当前用户去评论
                .setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId())
                // 帖子id
                .setData("postId", discussPostId);

        if (comment.getEntityType() == ENTITY_TYPE_POST) { // 如果评论的是帖子
            DiscussPost target = discussPostService.findDiscusspostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
            // 计算帖子分数
            String redisKey = RedisKeyUtil.getPostScoreKey();
            redisTemplate.opsForSet().add(redisKey, discussPostId);
        } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) { // 评论的是评论
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }

        // 发布事件
        eventProducer.fireEvent(event);

        // 只有评论的是帖子,才触发发帖事件
        if (comment.getEntityType() == ENTITY_TYPE_POST) {
            // 触发发帖事件,把新发布的帖子存到es中
            event = new Event()
                    // 事件主题
                    .setTopic(TOPIC_PUBLISH)
                    // 事件的触发人
                    .setUserId(comment.getUserId())
                    // 事件发生的实体类型
                    .setEntityType(ENTITY_TYPE_POST)
                    // 实体id
                    .setEntityId(discussPostId);
            // 发布事件
            eventProducer.fireEvent(event);
        }

        // 重定向到帖子详情页
        return "redirect:/discuss/detail/" + discussPostId;
    }      

消费事件类

// 消费者
@Component
public class EventConsumer implements CommunityConstant {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @Autowired
    private MessageService messageService;

    @Autowired
    private DiscussPostService discussPostService;

    @Autowired
    private ElasticsearchService elasticsearchService;

    // 消费发帖事件
    @KafkaListener(topics = {TOPIC_PUBLISH})
    public void handlePublishMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空!");
            return;
        }
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误!");
            return;
        }

        // 根据帖子id查询帖子信息
        DiscussPost post = discussPostService.findDiscusspostById(event.getEntityId());
        // 将帖子信息添加到es中
        elasticsearchService.saveDiscussPost(post);
    }

}      

控制层

@Controller
public class SearchController implements CommunityConstant {

    @Autowired
    private ElasticsearchService elasticsearchService;

    @Autowired
    private UserService userService;

    @Autowired
    private LikeService likeService;

    // search?keyword=xxx
    @RequestMapping(path = "/search", method = RequestMethod.GET)
    public String search(String keyword, Page page, Model model) {
        // 搜索帖子
        org.springframework.data.domain.Page<DiscussPost> searchResult = elasticsearchService.searchDiscussPost(keyword, page.getCurrent() - 1, page.getLimit());
        // 聚合数据
        List<Map<String, Object>> discussPosts = new ArrayList<>();
        if (searchResult != null) {
            for (DiscussPost post : searchResult) {
                Map<String, Object> map = new HashMap<>();
                // 把帖子存进去
                map.put("post", post);
                // 查询搜索到的数据对应的用户信息
                User user = userService.findUserById(post.getUserId());
                map.put("user", user);
                // 查询搜索到的数据对应的帖子点赞数
                long likeCount = likeService.findEntityLikeCount(ENTITY_TYPE_POST, post.getId());
                map.put("likeCount", likeCount);

                discussPosts.add(map);
            }
        }
        model.addAttribute("discussPosts", discussPosts);
        model.addAttribute("keyword", keyword);

        // 分页信息
        page.setPath("/search?keyword=" + keyword);
        page.setRows(searchResult == null ? 0 : (int) searchResult.getTotalElements());

        return "/site/search";
    }

}      

继续阅读