天天看点

使用RabbitMQ实现业务解耦

在一个系统中,对于一些允许异步处理的业务,消息中间件在业务解耦上总能起到很重要的作用,一定程度上能够提高系统的相应时间以及吞吐量。消息中间件本质是一个队列,工作方式就是生产者-消费者模式。

使用RabbitMQ实现业务解耦

市场上消息队列的产品有很多,比如:RabbitMQ、RocketMQ、Kafka,产品之间有不同的性能。如果对于消息队列没有太多了解,可看一下这篇文章:一个用消息队列的人,不知道为啥用MQ,这就有点尴尬 ,文章来源于网上,个人感觉写得不错。

本文实现对某小说网站内容的爬取,同时接入RabbitMQ实现业务解耦。首先说明为什么使用消息队列,一方面,数据库的CRUD的速度与爬虫的速度不一样,利用RabbitMQ可以实现数据库读取业务和爬虫业务解耦,保证爬虫整体的效率;另一方面,使用RabbitMQ的监控平台可以很好的监控爬取的进度情况。实现的功能大体如下:

  • 基于schedule的定时功能,固定时间爬取小说信息和章节列表保存到数据库中
  • 跟随系统启动单独开启一条线程,每隔一定时间查询数据库中还没有内容的章节,同时将查询到的内容添加到消息队列中。
  • 消费者监听队列获取章节信息,爬取对应链接的章节内容,内容的爬取需要考虑分页的问题。
  • 定时从数据库中查询还在更新中的小说,爬取新的章节信息。

RabbitMQ有三种消息的传递模式,这里我们采用的是“发布-订阅”模式。在RabbitMQ中,生产者并非直接把消息发送到队列,而是先发送到转换器,转换器根据路由键再将消息传递到对应的队列中。RabbitMQ正是通过路由键实现Exchange和队列的绑定。因此通过路由键我们便可将消息发送到指定队列或从指定队列中获取数据。

使用RabbitMQ实现业务解耦

该功能涉及到的路由键:

mq.config.exchange=kd.novel.direct
#存储小说的队列
mq.config.queue.novel=mq.config.queue.novel
mq.config.routing.novel.key=novel.routing
#存储章节内容的队列
mq.config.queue.chapter=mq.config.queue.chapter
mq.config.routing.chapter.key=chapter.routing
#存储章节下一页内容的队列
mq.config.queue.next.chapter=mq.config.queue.chapter.next
mq.config.routing.next.chapter.key=chapter.next.routing
#存储章节最后一页内容的队列
mq.config.queue.end.chapter=mq.config.queue.chapter.end
mq.config.routing.end.chapter.key=chapter.end.routing
#存储更新的章节信息
mq.config.queue.chapter.update=mq.config.queue.chapter.update
mq.config.routing.chapter.update.key=chapter.update.routing
           

查询所有还没有章节内容的章节信息,添加到消息队列中:

@Component
public class ChapterInfoSync implements ApplicationRunner {

    @Autowired
    private ChapterService chapterService;
    @Value("${mq.config.exchange}")
    private String novelExchange;
    @Value("${mq.config.routing.chapter.key}")
    private String chapterRouting;
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception {
        while (true){
            int counts = chapterService.getInfoCounts();
            List<Chapter> chapterList = null;
            for(int i = 1; i <= counts; i++){
                chapterList = chapterService.getChapterList(i , SIZE);
                for(Chapter chapter : chapterList){
                    //将查询到的数据添加到消息队列中
                    amqpTemplate.convertAndSend(novelExchange, chapterRouting, JsonUtils.objectToJson(chapter));
                }
                TimeUnit.MINUTES.sleep(5);
            }
        }
    }

    private static final int SIZE = 500;
}
           

使用@RabbitListener实现队列监听,获取消息队列中的信息,解析数据得到章节内容的访问链接,从而爬取章节的内容。

@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.chapter.update}", autoDelete = "false"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.routing.chapter.update.key}"
        )
)
public class ChapterReceiver {

    @Autowired
    private NovelService novelService;
    @Autowired
    private AmqpTemplate amqpTemplate;
    @Value("${mq.config.exchange}")
    private String novelExchange;
    @Value("${mq.config.routing.novel.key}")
    private String novelRouting;

    @RabbitHandler
    public void novelInfo(String msg) throws Exception{
        Novel novel = JsonUtils.jsonToPojo(msg, Novel.class);
        HttpClientBuilder builder = HttpClients.custom();
        builder.setUserAgent("Mozilla/5.0(Windows;U;Windows NT 5.1;en-US;rv:0.9.4)");
        CloseableHttpClient httpclient = builder.build();
        CloseableHttpResponse response = null;
        try {
            HttpGet httpget = new HttpGet(novel.getLink());
            response = httpclient.execute(httpget); //执行
            int code = response.getStatusLine().getStatusCode(); //获取响应状态码
            String html = "";
            if (code == 200) {
                html = EntityUtils.toString(response.getEntity(), "utf-8");
            } else {
                EntityUtils.consume(response.getEntity());
            }
            if ("".equals(html)) {
                return;
            }
            // 解析数据
            Document document = Jsoup.parse(html);
            Elements chapter = document.select("ul[class=_chapter] li a");
            //章节列表
            List<Chapter> chapterList = new ArrayList<Chapter>();
            int nId = novel.getId();
            for (Element element : chapter) {
                Chapter info = new Chapter();
                String url = Encoder.encodeUrl(element.attr("href"));
                String content = Encoder.encodeHtml(element.text());
                info.setId(nId);
                info.setName(content);
                info.setLink(url);
                chapterList.add(info);
            }
            int size = novelService.getNovelSize(nId);
            //大小有变化则加入消费队列
            if(size != chapterList.size()){
                NovelColl novelColl = new NovelColl();
                novelColl.setNovel(novel);
                novelColl.setChapters(chapterList);
                amqpTemplate.convertAndSend(novelExchange, novelRouting, JsonUtils.objectToJson(novelColl));
            }
        }catch (Exception e){
            logger.error("crawel chapter err;", e.getMessage());
        }finally {
            if(response != null){
                response.close();
            }
            if(httpclient != null){
                httpclient.close();
            }
        }
    }

    private static final Logger logger = LoggerFactory.getLogger(ChapterReceiver.class);
}
           

小说的爬虫是基于WebCollector实现和HttpClient实现。WebCollector默认使用多线程进行内容爬取,同时可以设置爬取的深度,因此速度很快,适合小说基本信息和章节列表的爬取。而基于HttpClient实现的章节爬虫则是单线程,方便控制章节内容和小说进行对应。

如果你感兴趣,可以点击链接查看源码:https://gitee.com/hsfeng/bicrawel