Reactor Netty参考指南目录
原文地址
Reactor Netty
提供了易于使用和配置的
UdpServer
。它隐藏了创建
UDP
服务器所需的大部分
Netty
的功能,并增加了
Reactive Streams
背压。
7.1.启动和停止
如果要启动一个
UDP
服务器,您必须创建并且配置一个
UdpServer
实例对象。默认情况下,host为
localhost
,port为
12012
。下面是创建并且启动一个
UdpServer
服务器的例子:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/create/Application.java<1> 创建一个import reactor.netty.Connection; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { Connection server = UdpServer.create() //<1> .bindNow(Duration.ofSeconds(30)); //<2> server.onDispose() .block(); } }
UdpServer
实例用来做配置操作。
<2> 使用阻塞等待的方式启动服务器,直到初始化完成。
返回的
Connection
提供了简单的服务器API,包括
disposeNow()
,这个方法可以以阻塞等待的方式来关闭服务器。
7.1.1.Host和Port
想要设置特定
host
和
port
,您可以用下面的方式来配置
UDP
服务器:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/address/Application.java<1> 配置import reactor.netty.Connection; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { Connection server = UdpServer.create() .host("localhost") //<1> .port(8080) //<2> .bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
UDP
服务器的host
<2> 配置
服务器的port
UDP
7.2.预先初始化
默认情况下,
UdpServer
初始化资源的操作在需要使用的时候才进行。这意味着初始化加载的时候
bind operation
会占用额外的时间:
- 事件循环组
- native传输库(当使用了native传输的时候)
当您需要预加载这些资源的时候,您可以按照以下方式来配置
UdpServer
:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/warmup/Application.java<1> 初始化和加载事件循环组和native传输库import io.netty.channel.socket.DatagramPacket; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { UdpServer udpServer = UdpServer.create() .handle((in, out) -> out.sendObject( in.receiveObject() .map(o -> { if (o instanceof DatagramPacket) { DatagramPacket p = (DatagramPacket) o; return new DatagramPacket(p.content().retain(), p.sender()); } else { return Mono.error(new Exception("Unexpected type of the message: " + o)); } }))); udpServer.warmup() //<1> .block(); Connection server = udpServer.bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
7.3.写出数据
如果要发送数据到一个已连接的客户端,您必须添加一个I/O处理器。这个I/O处理器可以通过
UdpOutbound
来写出数据。下面是一个发送
hello
字符串的例子:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/send/Application.java<1> 发送一个import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.socket.DatagramPacket; import io.netty.util.CharsetUtil; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { Connection server = UdpServer.create() .handle((in, out) -> out.sendObject( in.receiveObject() .map(o -> { if (o instanceof DatagramPacket) { DatagramPacket p = (DatagramPacket) o; ByteBuf buf = Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8); return new DatagramPacket(buf, p.sender()); //<1> } else { return Mono.error(new Exception("Unexpected type of the message: " + o)); } }))) .bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
字符串到远端
hello
7.4.消费数据
如果要接收从远端发过来的数据,您必须添加一个I/O处理器。这个I/O处理器可以通过
UdpInbound
来读取数据。示例如下:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/read/Application.java<1> 接收从远端发过来的数据import io.netty.channel.socket.DatagramPacket; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { Connection server = UdpServer.create() .handle((in, out) -> out.sendObject( in.receiveObject() .map(o -> { if (o instanceof DatagramPacket) { DatagramPacket p = (DatagramPacket) o; return new DatagramPacket(p.content().retain(), p.sender()); //<1> } else { return Mono.error(new Exception("Unexpected type of the message: " + o)); } }))) .bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
7.5.生命周期回调
下面的生命周期回调用参数是提供给您用来扩展
UdpServer
的:
Callback | Description |
---|---|
| 当服务器channel即将被绑定的时候调用。 |
| 当服务器channel已经被绑定的时候调用。 |
| 当channel初始化的时候被调用。 |
| 当服务器channel解绑的时候被调用。 |
下面是使用
doOnBound
和
doOnChannelInit
回调的例子:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/lifecycle/Application.java<1> 当channel绑定的时候添加一个import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.logging.LoggingHandler; import reactor.netty.Connection; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { Connection server = UdpServer.create() .doOnBound(conn -> conn.addHandler(new LineBasedFrameDecoder(8192))) //<1> .doOnChannelInit((observer, channel, remoteAddress) -> channel.pipeline() .addFirst(new LoggingHandler("reactor.netty.examples"))) //<2> .bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
到
LineBasedFrameDecoder
Netty
pipeline。
<2> 当初始化channel的时候添加一个
到
LoggingHandler
pipeline。
Netty
7.6.连接配置
这一章将给您介绍以下三种UDP层的配置方式:
- Channel Options
- Wire Logger
- Event Loop Group
7.6.1.Channel Options
默认情况下,
UDP
服务器配置了以下options:
./…/…/reactor-netty-core/src/main/java/reactor/netty/udp/UdpServerBind.javaUdpServerBind() { this.config = new UdpServerConfig( Collections.singletonMap(ChannelOption.AUTO_READ, false), () -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT)); }
如果需要添加新的option或者修改已有的option,您可以使用如下的方式:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/channeloptions/Application.javaimport io.netty.channel.ChannelOption; import reactor.netty.Connection; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { Connection server = UdpServer.create() .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
您可以通过以下的链接找到更多关于Netty channel options的信息:
-
ChannelOption
- Socket Options
7.6.2.Wire Logger
Reactor Netty提供了线路记录(wire logging)用来检查点对点的流量。默认情况下,线路记录是关闭的。如果想要开启它,您必须将日志
reactor.netty.udp.UdpServer
的设置为
DEBUG
等级并且按如下方式进行配置:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/wiretap/Application.java<1> 开启线路记录import reactor.netty.Connection; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { Connection server = UdpServer.create() .wiretap(true) //<1> .bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
默认情况下,线路记录在输出内容的时候会使用AdvancedByteBufFormat#HEX_DUMP。您也可以通过配置
UdpServer
改为AdvancedByteBufFormat#SIMPLE或者AdvancedByteBufFormat#TEXTUAL:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/wiretap/custom/Application.java<1> 开启线路记录并使用AdvancedByteBufFormat#TEXTUAL来输出内容。import io.netty.handler.logging.LogLevel; import reactor.netty.Connection; import reactor.netty.transport.logging.AdvancedByteBufFormat; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { Connection server = UdpServer.create() .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) //<1> .bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
7.6.3.Event Loop Group
默认情况下,UDP服务器使用"Event Loop Group",工作线程数等于初始化的时候可以用的处理器数量(但最小是4)。您也可以使用LoopResource
#create
其中的一个方法来修改配置。
默认的
Event Loop Group
配置如下:
./…/…/reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java/** * Default worker thread count, fallback to available processor * (but with a minimum value of 4) */ public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount"; /** * Default selector thread count, fallback to -1 (no selector thread) */ public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount"; /** * Default worker thread count for UDP, fallback to available processor * (but with a minimum value of 4) */ public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount"; /** * Default quiet period that guarantees that the disposal of the underlying LoopResources * will not happen, fallback to 2 seconds. */ public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod"; /** * Default maximum amount of time to wait until the disposal of the underlying LoopResources * regardless if a task was submitted during the quiet period, fallback to 15 seconds. */ public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout"; /** * Default value whether the native transport (epoll, kqueue) will be preferred, * fallback it will be preferred when available */ public static final String NATIVE = "reactor.netty.native";
如果需要修改这些设置,您也可以通过如下方式进行配置:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/eventloop/Application.javaimport reactor.netty.Connection; import reactor.netty.resources.LoopResources; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { LoopResources loop = LoopResources.create("event-loop", 1, 4, true); Connection server = UdpServer.create() .runOn(loop) .bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
7.7.度量
UDP服务器支持与Micrometer的内置集成。它暴露了所有前缀为
reactor.netty.udp.server
的度量。
下面的表格提供了UDP服务器度量的相关信息:
度量名称 | 类型 | 描述 |
---|---|---|
reactor.netty.udp.server.data.received | DistributionSummary | 收到的数据量,以字节为单位 |
reactor.netty.udp.server.data.sent | DistributionSummary | 发送的数据量,以字节为单位 |
reactor.netty.udp.server.errors | Counter | 发生的错误数量 |
下面额外的度量也是可用的:
ByteBufAllocator
度量
度量名称 | 类型 | 描述 |
---|---|---|
reactor.netty.bytebuf.allocator.used.heap.memory | Gauge | 堆内存的字节数 |
reactor.netty.bytebuf.allocator.used.direct.memory | Gauge | 堆外内存的字节数 |
reactor.netty.bytebuf.allocator.used.heap.arenas | Gauge | 堆内存的个数(当使用 的时候) |
reactor.netty.bytebuf.allocator.used.direct.arenas | Gauge | 堆外内存的个数(当使用 的时候) |
reactor.netty.bytebuf.allocator.used.threadlocal.caches | Gauge | threadlocal的缓存数量(当使用 的时候) |
reactor.netty.bytebuf.allocator.used.tiny.cache.size | Gauge | 微小缓存的大小(当使用 的时候) |
reactor.netty.bytebuf.allocator.used.small.cache.size | Gauge | 小缓存的大小(当使用 的时候) |
reactor.netty.bytebuf.allocator.used.normal.cache.size | Gauge | 一般缓存的大小(当使用 的时候) |
reactor.netty.bytebuf.allocator.used.chunk.size | Gauge | 一个区域的块大小(当使用 的时候) |
下面是开启集成的度量的例子:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/metrics/Application.java<1> 开启内建集成的Micrometerimport reactor.netty.Connection; import reactor.netty.udp.UdpServer; import java.time.Duration; public class Application { public static void main(String[] args) { Connection server = UdpServer.create() .metrics(true) //<1> .bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
如果您想让UDP服务端度量与除了
Micrometer
之外的系统集成或者想提供自己与
Micrometer
的集成来添加自己的度量记录器,您可以按如下方式实现:
https://github.com/reactor/reactor-netty/blob/master/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/udp/server/metrics/custom/Application.java<1> 开启UDP服务端度量并且提供import reactor.netty.Connection; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.udp.UdpServer; import java.net.SocketAddress; import java.time.Duration; public class Application { public static void main(String[] args) { Connection server = UdpServer.create() .metrics(true, CustomChannelMetricsRecorder::new) //<1> .bindNow(Duration.ofSeconds(30)); server.onDispose() .block(); } }
的实现。
ChannelMetricsRecorder
Suggest Edit to “UDP Server”
Reactor Netty参考指南目录
版权声明:如需转载,请带上本文链接、注明来源和本声明。否则将追究法律责任。 https://www.immuthex.com/posts/reactor-netty-reference-guide/udp-server