一、背景介绍
一、背景介绍
项目有个接收UDP消息的需求,于是在网上找了一些基于integration的实现代码,调通了。没想到在实际生产环境中,由于接收的消息数据量太大,redis又是单线程的原因,导致处理速度越来越慢,最后执行的jar包都不报nohup.out,只能重启。而且在观察运行状态时,每一条接收到的消息都是顺序执行的,所以当时怀疑org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter这个管道默认是队列的,于是去网上查了查UnicastReceivingChannelAdapter的多线程配置,始终没查到,又看到大家都是基于netty来接收UDP消息,于是也跟着实现了。今日有暇,参考了TCP and UDP Support (spring.io)这篇文章,发现UnicastReceivingChannelAdapter是可以实现多线程接收UDP消息的。
二、实现方法
在上述参考文章的IP Configuration Attributes一节,表2中提到一个名为pool-size的参数,它指定可以同时处理的数据包数量,其默认值为5(完全感受不到是多线程)。因此我调高了该参数,然后发送999条UDP消息进行测试,果然是多线程的感觉了。
/**
* 启动UDP服务
*
* @author c 2021/4/20
*/
@Configuration
@Slf4j
public class UDPServiceImpl {
//UDP port
@Value("${SYSLOG.UDP_PORT}")
public Integer UDP_PORT;
@Value("${SYSLOG.POOL_SIZE}")
public Integer POOL_SIZE;
@Bean
public IntegrationFlow processUniCastUdpMessage() {
log.info("UDP服务启动成功,端口号为: {}", UDP_PORT);
UnicastReceivingChannelAdapter unicastReceivingChannelAdapter = new UnicastReceivingChannelAdapter(UDP_PORT);
unicastReceivingChannelAdapter.setPoolSize(POOL_SIZE);
return IntegrationFlows
.from(unicastReceivingChannelAdapter)
.handle("businessHandle", "handleMessage")
.get();
}
}
解释一下,businessHandle是业务类,handleMessage是类里面接收UDP消息的方法。
/**
* 接收syslog日志
*
* @param message 日志信息
*/
public void handleMessage(Message message) {
long startTime = System.currentTimeMillis();
MessageHeaders headers = message.getHeaders();
Object ip = headers.get("ip_address");
String data = new String((byte[]) message.getPayload());
log.info("syslog传输进来的结果:【{}】", JsonUtils.object2Json(data));
}