一、背景介紹
一、背景介紹
項目有個接收UDP消息的需求,于是在網上找了一些基于integration的實作代碼,調通了。沒想到在實際生産環境中,由于接收的消息資料量太大,redis又是單線程的原因,導緻處理速度越來越慢,最後執行的jar包都不報nohup.out,隻能重新開機。而且在觀察運作狀态時,每一條接收到的消息都是順序執行的,是以當時懷疑org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter這個管道預設是隊列的,于是去網上查了查UnicastReceivingChannelAdapter的多線程配置,始終沒查到,又看到大家都是基于netty來接收UDP消息,于是也跟着實作了。今日有暇,參考了TCP and UDP Support (spring.io)這篇文章,發現UnicastReceivingChannelAdapter是可以實作多線程接收UDP消息的。
二、實作方法
在上述參考文章的IP Configuration Attributes一節,表2中提到一個名為pool-size的參數,它指定可以同時處理的資料包數量,其預設值為5(完全感受不到是多線程)。是以我調高了該參數,然後發送999條UDP消息進行測試,果然是多線程的感覺了。
/**
* 啟動UDP服務
*
* @author c 2021/4/20
*/
@Configuration
@Slf4j
public class UDPServiceImpl {
//UDP port
@Value("${SYSLOG.UDP_PORT}")
public Integer UDP_PORT;
@Value("${SYSLOG.POOL_SIZE}")
public Integer POOL_SIZE;
@Bean
public IntegrationFlow processUniCastUdpMessage() {
log.info("UDP服務啟動成功,端口号為: {}", UDP_PORT);
UnicastReceivingChannelAdapter unicastReceivingChannelAdapter = new UnicastReceivingChannelAdapter(UDP_PORT);
unicastReceivingChannelAdapter.setPoolSize(POOL_SIZE);
return IntegrationFlows
.from(unicastReceivingChannelAdapter)
.handle("businessHandle", "handleMessage")
.get();
}
}
解釋一下,businessHandle是業務類,handleMessage是類裡面接收UDP消息的方法。
/**
* 接收syslog日志
*
* @param message 日志資訊
*/
public void handleMessage(Message message) {
long startTime = System.currentTimeMillis();
MessageHeaders headers = message.getHeaders();
Object ip = headers.get("ip_address");
String data = new String((byte[]) message.getPayload());
log.info("syslog傳輸進來的結果:【{}】", JsonUtils.object2Json(data));
}