天天看點

Springboot+Rabbitmq全手動批量建立隊列以及批量指定消費者測試性

需求

做消息中心要求測試以及預估一下rabbitmq的消費能力,需求是建立1千個隊列,每個隊列1000條資料合計100W資料,然後每個隊列指定一定數量的消費者進行消費,看下吞吐量,監控下cpu以及記憶體變化.

手動建立1000隊列以及push1000消息

@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestApplication.class)
@RequiredArgsConstructor
public class Queue_Producer {


    @Autowired
    RabbitTemplate rabbitTemplate;


    /**
     * 手動建立隊列以及push消息
     * @throws IOException
     */
    @Test
    public void createQueue() throws IOException {
      //擷取工廠建立連接配接
        final ConnectionFactory factory = rabbitTemplate.getConnectionFactory();
        final Connection connection = factory.createConnection();
        for (int i = 1; i < 1000; i++) {
            String currQueueName = "test_quque_" + i;
           //建立隊列  
          connection.createChannel(false).queueDeclare(currQueueName, true, false, false, null);

            AtomicInteger current = new AtomicInteger(0);
            while (current.get() < 1000) {
                current.incrementAndGet();
               //對每個隊列進行push消息
               rabbitTemplate.convertAndSend(currQueueName, "目前進入隊列" + i + "的為" + current.get());
                System.out.println("目前進入隊列" + currQueueName + "的為" + current.get());
            }
        }
        connection.close();
    }
}           

複制

指定消費隊列以及控制并發消費者

這裡指定了1000個隊列的隊列名,并設定每個隊列并發消費者數量

@Component
@RequiredArgsConstructor
@Slf4j
public class BatchQueueCustomer_demo {
    private final RabbitTemplate rabbitMq;


    /**
     * 手動批量建立隊列的消費者
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer dealQueue(){
        final ExecutorService pool = Executors.newCachedThreadPool();
        List<String> queueNames=new LinkedList<>();
        for (int i = 1; i <1000 ; i++) {
            queueNames.add("test_quque_"+i);
        }

        final ConnectionFactory factory = rabbitMq.getConnectionFactory();
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
        //加入所有的待消費隊列到監聽器内
        container.setQueueNames(queueNames.toArray(new String[queueNames.size()]));
        container.setExposeListenerChannel(true);
        //每個隊列的消費者個數
        container.setConcurrentConsumers(1);
        //設定最大消費者個數---當消息堆積過多時候我們這裡會自動增加消費者
        container.setMaxConcurrentConsumers(3);
        //設定每個消費者擷取的最大的消息數量
        container.setPrefetchCount(1);
        //設定确認模式為手工确認
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //監聽處理
        container.setMessageListener(new MqConsumerHandle());
        return container;
    }

    /**
     * 消息消費處理類
     */
    @Component
    public class MqConsumerHandle implements ChannelAwareMessageListener {

        @Override
        public void onMessage(Message message, Channel channel) {

            try {
                //處理消息
                System.out.println(message);


                //成功傳回碼 該消息的index
                //是否批量. true:将一次性ack所有小于deliveryTag的消息。
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                System.out.println("mq消息轉發具體實作異常"+e);
                try {
                    //deliveryTag:該消息的index。
                    //multiple:是否批量. true:将一次性拒絕所有小于deliveryTag的消息。
                    //requeue:被拒絕的是否重新入隊列。
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

}           

複制

接下裡就是控制變量了,主要測試系統消費能力,可以分别控制隊列數量,以及每個隊列的并發消費數量,也可以不同隊列按權重靈活控制的并發消費數量,看看我們的系統能力