天天看点

使用UnicastReceivingChannelAdapter多线程接收UDP消息的方法

一、背景介绍

一、背景介绍

项目有个接收UDP消息的需求,于是在网上找了一些基于integration的实现代码,调通了。没想到在实际生产环境中,由于接收的消息数据量太大,redis又是单线程的原因,导致处理速度越来越慢,最后执行的jar包都不报nohup.out,只能重启。而且在观察运行状态时,每一条接收到的消息都是顺序执行的,所以当时怀疑org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter这个管道默认是队列的,于是去网上查了查UnicastReceivingChannelAdapter的多线程配置,始终没查到,又看到大家都是基于netty来接收UDP消息,于是也跟着实现了。今日有暇,参考了TCP and UDP Support (spring.io)这篇文章,发现UnicastReceivingChannelAdapter是可以实现多线程接收UDP消息的。

二、实现方法

在上述参考文章的IP Configuration Attributes一节,表2中提到一个名为pool-size的参数,它指定可以同时处理的数据包数量,其默认值为5(完全感受不到是多线程)。因此我调高了该参数,然后发送999条UDP消息进行测试,果然是多线程的感觉了。

/**
 * 启动UDP服务
 *
 * @author c 2021/4/20
 */
@Configuration
@Slf4j
public class UDPServiceImpl {

    //UDP port
    @Value("${SYSLOG.UDP_PORT}")
    public Integer UDP_PORT;

    @Value("${SYSLOG.POOL_SIZE}")
    public Integer POOL_SIZE;

    @Bean
    public IntegrationFlow processUniCastUdpMessage() {
        log.info("UDP服务启动成功,端口号为: {}", UDP_PORT);
        UnicastReceivingChannelAdapter unicastReceivingChannelAdapter = new UnicastReceivingChannelAdapter(UDP_PORT);
        unicastReceivingChannelAdapter.setPoolSize(POOL_SIZE);
        return IntegrationFlows
                .from(unicastReceivingChannelAdapter)
                .handle("businessHandle", "handleMessage")
                .get();
    }

}
           

解释一下,businessHandle是业务类,handleMessage是类里面接收UDP消息的方法。

/**
     * 接收syslog日志
     *
     * @param message 日志信息
     */
    public void handleMessage(Message message) {

        long startTime = System.currentTimeMillis();
        MessageHeaders headers = message.getHeaders();
        Object ip = headers.get("ip_address");

        String data = new String((byte[]) message.getPayload());
        log.info("syslog传输进来的结果:【{}】", JsonUtils.object2Json(data));
}