需求
做消息中心要求測試以及預估一下rabbitmq的消費能力,需求是建立1千個隊列,每個隊列1000條資料合計100W資料,然後每個隊列指定一定數量的消費者進行消費,看下吞吐量,監控下cpu以及記憶體變化.
手動建立1000隊列以及push1000消息
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestApplication.class)
@RequiredArgsConstructor
public class Queue_Producer {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 手動建立隊列以及push消息
* @throws IOException
*/
@Test
public void createQueue() throws IOException {
//擷取工廠建立連接配接
final ConnectionFactory factory = rabbitTemplate.getConnectionFactory();
final Connection connection = factory.createConnection();
for (int i = 1; i < 1000; i++) {
String currQueueName = "test_quque_" + i;
//建立隊列
connection.createChannel(false).queueDeclare(currQueueName, true, false, false, null);
AtomicInteger current = new AtomicInteger(0);
while (current.get() < 1000) {
current.incrementAndGet();
//對每個隊列進行push消息
rabbitTemplate.convertAndSend(currQueueName, "目前進入隊列" + i + "的為" + current.get());
System.out.println("目前進入隊列" + currQueueName + "的為" + current.get());
}
}
connection.close();
}
}
複制
指定消費隊列以及控制并發消費者
這裡指定了1000個隊列的隊列名,并設定每個隊列并發消費者數量
@Component
@RequiredArgsConstructor
@Slf4j
public class BatchQueueCustomer_demo {
private final RabbitTemplate rabbitMq;
/**
* 手動批量建立隊列的消費者
* @return
*/
@Bean
public SimpleMessageListenerContainer dealQueue(){
final ExecutorService pool = Executors.newCachedThreadPool();
List<String> queueNames=new LinkedList<>();
for (int i = 1; i <1000 ; i++) {
queueNames.add("test_quque_"+i);
}
final ConnectionFactory factory = rabbitMq.getConnectionFactory();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
//加入所有的待消費隊列到監聽器内
container.setQueueNames(queueNames.toArray(new String[queueNames.size()]));
container.setExposeListenerChannel(true);
//每個隊列的消費者個數
container.setConcurrentConsumers(1);
//設定最大消費者個數---當消息堆積過多時候我們這裡會自動增加消費者
container.setMaxConcurrentConsumers(3);
//設定每個消費者擷取的最大的消息數量
container.setPrefetchCount(1);
//設定确認模式為手工确認
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//監聽處理
container.setMessageListener(new MqConsumerHandle());
return container;
}
/**
* 消息消費處理類
*/
@Component
public class MqConsumerHandle implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) {
try {
//處理消息
System.out.println(message);
//成功傳回碼 該消息的index
//是否批量. true:将一次性ack所有小于deliveryTag的消息。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("mq消息轉發具體實作異常"+e);
try {
//deliveryTag:該消息的index。
//multiple:是否批量. true:将一次性拒絕所有小于deliveryTag的消息。
//requeue:被拒絕的是否重新入隊列。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
複制
接下裡就是控制變量了,主要測試系統消費能力,可以分别控制隊列數量,以及每個隊列的并發消費數量,也可以不同隊列按權重靈活控制的并發消費數量,看看我們的系統能力