天天看點

使用UnicastReceivingChannelAdapter多線程接收UDP消息的方法

一、背景介紹

一、背景介紹

項目有個接收UDP消息的需求,于是在網上找了一些基于integration的實作代碼,調通了。沒想到在實際生産環境中,由于接收的消息資料量太大,redis又是單線程的原因,導緻處理速度越來越慢,最後執行的jar包都不報nohup.out,隻能重新開機。而且在觀察運作狀态時,每一條接收到的消息都是順序執行的,是以當時懷疑org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter這個管道預設是隊列的,于是去網上查了查UnicastReceivingChannelAdapter的多線程配置,始終沒查到,又看到大家都是基于netty來接收UDP消息,于是也跟着實作了。今日有暇,參考了TCP and UDP Support (spring.io)這篇文章,發現UnicastReceivingChannelAdapter是可以實作多線程接收UDP消息的。

二、實作方法

在上述參考文章的IP Configuration Attributes一節,表2中提到一個名為pool-size的參數,它指定可以同時處理的資料包數量,其預設值為5(完全感受不到是多線程)。是以我調高了該參數,然後發送999條UDP消息進行測試,果然是多線程的感覺了。

/**
 * 啟動UDP服務
 *
 * @author c 2021/4/20
 */
@Configuration
@Slf4j
public class UDPServiceImpl {

    //UDP port
    @Value("${SYSLOG.UDP_PORT}")
    public Integer UDP_PORT;

    @Value("${SYSLOG.POOL_SIZE}")
    public Integer POOL_SIZE;

    @Bean
    public IntegrationFlow processUniCastUdpMessage() {
        log.info("UDP服務啟動成功,端口号為: {}", UDP_PORT);
        UnicastReceivingChannelAdapter unicastReceivingChannelAdapter = new UnicastReceivingChannelAdapter(UDP_PORT);
        unicastReceivingChannelAdapter.setPoolSize(POOL_SIZE);
        return IntegrationFlows
                .from(unicastReceivingChannelAdapter)
                .handle("businessHandle", "handleMessage")
                .get();
    }

}
           

解釋一下,businessHandle是業務類,handleMessage是類裡面接收UDP消息的方法。

/**
     * 接收syslog日志
     *
     * @param message 日志資訊
     */
    public void handleMessage(Message message) {

        long startTime = System.currentTimeMillis();
        MessageHeaders headers = message.getHeaders();
        Object ip = headers.get("ip_address");

        String data = new String((byte[]) message.getPayload());
        log.info("syslog傳輸進來的結果:【{}】", JsonUtils.object2Json(data));
}