天天看点

消息队列积压问题处理

问题:做的项目中,出现了mq消息大量积压,一方面是入队数量很多,另一方面就是消费很慢,解决消费慢的问题,这边主要采用了两种方式

1、多线程消费
/**
* 线程池
*/
public final class ThreadPoolUtil {
    /**
     * 默认线程数
     */
    private static final int DEFAULT_POOL_SIZE = 20;

    /**
     * 默认阻塞队列数1024
     */
    private static final int DEFAULT_CAPACITY = 1 << 10;

    /**
     * 最大线程数
     */
    private static final int MAX_POOL_SIZE = 40;

    /**
     * 最大阻塞队列数2048
     */
    private static final int MAX_CAPACITY = 1 << 11;

    /**
     * 默认线程空闲时长(seconds)
     */
    private static final long KEEP_ALIVE_TIME = 200L;

    /**
     * 获取JDK的线程池
     * @return
     */
    public static ThreadPoolExecutor getThreadPool(String threadPoolName) {
        ThreadFactory threadFactory
                = new ThreadFactoryBuilder().setNameFormat(threadPoolName + "_thread").build();
        return new ThreadPoolExecutor(DEFAULT_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(DEFAULT_CAPACITY),
                threadFactory, new ThreadPoolExecutor.DiscardPolicy());
    }
}

/**
* 处理消费
*/
@Test
public void testThread() throws InterruptedException {
    ThreadPoolExecutor executorService = ThreadPoolUtil.getThreadPool("executeMessage");
    List<Message> messages = listener();
    //countDownLatch阻塞主线程
    CountDownLatch countDownLatch = new CountDownLatch(messages.size());
    for (Message message : messages) {
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getId() + " 线程消费消息: " + JacksonUtil.obj2json(message));
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        });
    }
    countDownLatch.await();
}
           

这里使用

countdownLatch

的目的有两个:一个是控制线程数量,避免将线程池打满,导致走阻塞队列或者拒绝策略;另一个是阻塞主线程,减轻机器压力。

mq消费的机制就是,这个机器从消息队列中抓取消息后,会加锁,当机器处理完毕返回一个ack标识,则解锁,继续抓取后面的消息。

如果我们不阻塞主线程,开线程的时候,主线程直接执行完毕返回ack,而实际上机器并没有处理完毕,随后就是一波又一波的消息涌入,从而将机器压垮。

2、压力转移到机器

如果机器性能完全hold的住,那么我们完全可以将mq的压力转移到机器上。

List<Message> messageList = Lists.newArrayList();
messageList.addAll(messages);

//....我们可以将这个messageList开线程处理或存入缓存处理或再存入缓冲消息队列处理

return; //直接返回ack