网络世界离不开的就是通信,不管是任何框架只要是提供互联网服务就必须有通信的能力。作为业务开发者可能我们业务中也会遇到这样的需求,或者是有啥好的创意性框架,但是无奈现在通信层框架都太厚重了,想要快速学习使用显然不太可能,所以呢 本文就是教你如何来实现一个封装一个通信层框架。

当看完本篇你将会对Java技术有一个更深入的了解。你可以利用本文学习的内容去实现一下业务
- 自己设计一个Tomcat容器;
- 自己设计一款RPC框架;
- 也可以在你的应用程序内部去在启动一个通信服务。
文章后面有演示。项目github地址关注私信: 01 自动回复
一、通信框架设计要考虑的点
通信肯定是双方间的,客户端发送数据,服务端处理数据。我们日常的开发都是基于http协议的,是不用考虑服务端和客户端如何去发送数据的。因为我们理解的数据都是明文的模型,而http协议底层会将其转换成二进制数据通过TCP/IP协议传递给服务端和客户端。而下面内容是要讨论的如何将明文数据转换成二进制数据,以及让客户端和服务端都能理解这样的二进制数据。
1. 什么是协议?以及如何去设计协议?
协议就是通信双方约定的明文和二进制数据的转换格式。客户端按照约定将明文数据转换成二进制数据,服务端按照约定将二进制数据转换成明文数据。
如我们约定读取的第一个字节是协议类型,第二个字节是序列化类型,第三个是报文长度,第四是报文内容。
1.1 客户端根据协议去构建报文
那么客户端在发数据给服务端的时候就要根据前面定的协议去拼装二进制数据。
@Override protected void doEncode(ChannelHandlerContext ctx, RpcProtocolHeader msg, ByteBuf out) throws Exception { //1. 获取协议类型(1个字节) out.writeByte(msg.getProtocolType()); //2. 获取序列化类型(1个字节) out.writeByte(msg.getSerializationType()); //3. 根据序列化类型找到数据转换器生成二进制数据 Serializer serializer = SerializeEnum.ofByType(msg.getSerializationType()).getSerialize().newInstance(); byte[] data = serializer.serialize(msg); //4. 写入报文长度(4个字节) out.writeInt(data.length); //5. 写入报文内容(数组) out.writeBytes(data); }
1.2 服务端根据协议去解析报文
这里通常会遇到很多问题,比如拆包和粘包问题
- 拆包是说一个数据发送时候发送的是hello,但是服务端收到的是2次的请求,第一次是hel,第二次是lo。
- 粘包是说发了客户端发了2次hello,但是服务端收到的是在一起的hellohello,两个报文粘在一起的。 如果要自定义协议就必须要解决这两个问题,如何来解决呢? 其实很简单,就是要知道,报文什么时候结束的。 就像上面的说的协议,为什么要把报文长度写放到报文里面呢? 前面的协议接受到的数据最少是6个字节。
1个字节的协议类型,1个字节的序列化类型,4个字节的数据长度,剩下的是数据包内容。
当服务端在处理二进制数据时候如果发现可读的字节不到6个字节,那肯定说明报文不完整,就先不处理,等待报文都到了在处理。 如果到了6个字节,那么我们肯定就能知道真正的报文长度是多少,然后再读取真正的报文内容,就能知道什么时候报文是结束了。 如果报文真正内容不够,继续等待,等待数据都到齐。
@Override protected void doDecode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List out) throws Exception { byte[] dataArr; //1. 不可读就关闭 if (!inByteBuf.isReadable()) { Channel channel = ctx.channel(); SocketAddress socketAddress = channel.remoteAddress(); channel.close(); System.err.println(">>>>>>>>>[" + socketAddress + "]客户端已主动断开连接...."); return; } //2. 可读的数据大小 int dataHeadSize = inByteBuf.readableBytes(); //3. 不是完整的数据头(6个字节) if (!isFullMessageHeader(dataHeadSize)) { // 如果不到就直接返回,继续等待数据 return; } //4. 完整的数据头就开始看数据长度是否满足(防止数据真正报文不够,在这里先打个标记) inByteBuf.markReaderIndex(); //协议类型 byte protocolType = inByteBuf.readByte(); //序列化类型 byte serializationType = inByteBuf.readByte(); //数据长度 int dataSize = inByteBuf.readInt(); //5. 拆包的直接返回下次数据完整了,在处理 if (!isFullMessage(inByteBuf, dataSize)) { // 回滚到4所在的标记,让下次再处理 inByteBuf.resetReaderIndex(); System.out.println(); System.err.println("######################数据不足已重置buffer######################"); return; } //6. 数据都到了,就直接根据报文长度来读取数据 dataArr = new byte[dataSize]; inByteBuf.readBytes(dataArr, 0, dataSize); //7. 根据序列化类型找到指定的处理类 SerializeEnum serializeEnum = SerializeEnum.ofByType(serializationType); Class extends Serializer> serialize = serializeEnum.getSerialize(); //根据类型获取序列化器 Serializer serializerNewInstance = serialize.newInstance(); //8. 转换成java模型 Object deserialize = serializerNewInstance.deserialize(dataArr); out.add(deserialize); }
2. 通信层如何选技术
到底是使用Java提供的通信层API,还是选其他封装好的通信层框架呢? 小编明显选择了后者
Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。
为什么要使用Netty呢? 因为Java的通信API实在太难用了,学习成本高,所以就用Netty已经封装好的简单的API吧。
只要告诉Netty你们的通信解码编码器,就是告诉Netty你们客户端和服务端约定好的协议类型。Netty就能帮你把二进制数据
转化成Java模型。
现在市面上的RPC框架尤其是阿里体系内的Dubbo还是Hsf都是依赖Netty来实现的通信层。还有ElasticSearch等等。
3. 高性能高可靠
这些都是要有考虑的点,需要精益求精的去code设计和思考另外也要方方面面的压测和验证,但是不在本文的讨论范围。 本文是想先告诉你怎么去实现,只有当知道如何去实现才能去考虑如何优化。
二、框架演示展示
下面演示的框架就是小编基于上面的内容来开发的。目的就是让我们能自定义数据模型,而不用关心底层二进制转换。 当我们想进行通信的时候不去用启动一个tomcat或者是不用启动一个dubbo服务,只用两行代码就能搞定了。
1. 定义一个数据模型
继承一个协议头RpcProtocolHeader。框架会自动将你定义的模型去转换成二进制数据,也可以将二进制数据转换成你的定义的数据模型,而这里的协议,就是前面说的协议。协议叫啥名字都无所谓, 如果你框架的名字叫dubbox,那你可以把前面约定的协议叫做dubbox协议。
class RpcUserRequest extends RpcProtocolHeader { private String message; public RpcUserRequest(String message) { this.message = message; } @Override public String toString() { return "RpcUserRequest{" + "message='" + message + ''' + '}'; } } class RpcUserResponse extends RpcProtocolHeader { private String message; public RpcUserResponse(String message) { this.message = message; } @Override public String toString() { return "RpcUserResponse{" + "message='" + message + ''' + '}'; } }
2. 创建一个服务端
API非常简单,如果使用链式编程风格真的就是几行行代码。
这里举一个简单的例子,收到数据后将请求的message直接在返回给客户端即可。
@Test public void serverTest() throws Exception { Installer.server(RpcUserRequest.class, RpcUserResponse.class) //这里接受客户端的请求,并返回一个相应 .serverHandler((channel, rpcRequest) -> new RpcUserResponse("服务端返回: " + rpcRequest.message)) .create().start(12306); }
3. 创建一个客户端并发送数据
最后会打印出//RpcUserResponse{message='服务端返回: 关注微信公众号:程序猿升级课'}
@Test public void clientTest() throws Exception { //构建一个客户端端 Client client = Installer.client(RpcUserRequest.class, RpcUserResponse.class) .conncet("127.0.0.1", 12306); //发送数据 MojitoFuture mojitoFuture = client.sendAsync(new RpcUserRequest("关注微信公众号:程序猿升级课")); //RpcUserResponse{message='服务端返回: 关注微信公众号:程序猿升级课'} System.out.println(mojitoFuture.get()); client.close(); }
三、这个框架能做什么?
1. 实现一个最小的HTTP服务端
快速就可以构建出一个服务端,这个服务端即可以像上面那样使用RPC的风格调用(前提要自己定义模型),也可以简单 的构建一个支持http协议的服务端。就像下文那样。
在SpringBoot应用启动类后面重新启动一个http服务器端口号是8083。当然这么写不够优雅,建议你自己封装一个SpringBoot的Start来使用。
@SpringBootApplicationpublic class LayuiWebLearnApplication { public static void main(String[] args) { SpringApplication.run(LayuiWebLearnApplication.class, args); // 启动一个监听8083端的支持http协议的服务器 HttpCodecFactory httpCodecFactory = new HttpCodecFactory(new SubServerHandler() { @Override public HttpResponseFacade handler(EnhanceChannel enhanceChannel, HttpRequestFacade httpRequestFacade) throws RemotingException { Map requestParams = httpRequestFacade.getRequestParams(); Constant.setName(requestParams.getOrDefault("name", "请给我的爱人来一杯Mojito")); return HttpResponseFacade.ok(); } }); httpCodecFactory.getServer().startAsync(8083); }}
这是一个页面,name展示在页面这个地方,通过自定义框架的能力,将这个值给改变了。
访问http://localhost:8083?name=请给我的爱人来一杯mojito2
2. 实现一个RPC风格的框架
框架提供了最简单的创建服务端和客户端的API。当然里面还允许指定序列化方式。可以自己去摸索。
创建一个要执行的类
public class UserInvoker { public String getName(String name) { return "反射执行:" + name; }}
构建服务端
@Test public void testRpcServer() { Installer.ServerCreator serverCreator = Installer.server(RpcRequest.class, RpcResponse.class) .serverHandler((channel, request) -> { RpcResponse response = new RpcResponse(); try { //1. 拿到要执行的类 Class> serviceType = request.getServiceType(); //2. 拿到要执行类的方法 Method method = serviceType.getMethod(request.getMethodName(), request.getArgsType()); Constructor> constructor = serviceType.getConstructor(); Object instance = constructor.newInstance(); //3. 反射执行结果 Object invoke = method.invoke(instance, request.getArgs()); response.setSuccess(true); response.setResult(invoke); } catch (Exception e) { e.printStackTrace(); response.setSuccess(false); } return response; }); serverCreator.create().start(8084); }
构建客户端
RpcResponse{id=1, protocolType=2, serializationType=3, type=1, timeout=0, serviceType=null, methodName='null', argsType=null, result=反射执行:欢迎关注程序猿升级课, success=true, message='null', code=null, returnType=null}
@Test public void testRpcClient() throws Exception { Client client = Installer.client(RpcRequest.class, RpcResponse.class).create(); client.connect("127.0.0.1", 8084); RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setServiceType(UserInvoker.class); rpcRequest.setMethodName("getName"); rpcRequest.setArgsType(new Class[]{String.class}); rpcRequest.setArgs(new Object[]{"欢迎关注程序猿升级课"}); MojitoFuture future = client.sendAsync(rpcRequest); System.out.println(future.get()); }
3. 实现一个MQ消息系统
消息系统如何实现?
服务端
@Test public void queueTest() { Installer.server(Message.class, QueueStatus.class) .serverHandler(new SubServerHandler() { //1. 收到消息之后如果处理成功就返回给客户端。 //2. 如果是订阅的链接,就启动服务去消费。 private Map> routeKeyChannelMap = new ConcurrentHashMap<>(); private Map> routeKeyQueueMap = new ConcurrentHashMap<>(); private AtomicBoolean atomicBoolean = new AtomicBoolean(false); @Override public QueueStatus handler(EnhanceChannel channel, Message message) { ProtocolEnum protocolEnum = ProtocolEnum.byType(message.getProtocolType()); //1. 如果发现是注册协议,就将当前客户端的连接给保存到指定的topic里面 if (protocolEnum == ProtocolEnum.MQ_REG) { List enhanceChannels = routeKeyChannelMap.computeIfAbsent(message.routeKey, k -> new ArrayList<>()); enhanceChannels.add(channel); return new QueueStatus("订阅成功"); } else if (protocolEnum == ProtocolEnum.MQ_SEND) { //2. 如果发现是发送协议,就向指定的topic去添加一条消息 Queue queue = routeKeyQueueMap.get(message.routeKey); if (queue == null) { queue = new ArrayBlockingQueue(23); routeKeyQueueMap.put(message.routeKey, queue); } queue.add(message.message); //3. 启动一个线程去将topic信息,推送给客户端 messageProcessing(); return new QueueStatus("发送成功"); } //推送给客户端,并收到客户端消息。看是否要把消息移出。 return new QueueStatus("未知"); } private void messageProcessing() { if (!atomicBoolean.get()) { atomicBoolean.compareAndSet(atomicBoolean.get(), true); new Thread(() -> { while (true) { for (Map.Entry> queueEntry : routeKeyQueueMap.entrySet()) { String routeKey = queueEntry.getKey(); Queue routeKeyMessages = queueEntry.getValue(); String message = routeKeyMessages.poll(); if (!StringTools.isBlank(message)) { //获取到订阅的链接 List enhanceChannels = routeKeyChannelMap.get(routeKey); for (EnhanceChannel enhanceChannel : enhanceChannels) { if (enhanceChannel.isConnected()) { Message serverMsg = new Message(routeKey, message); serverMsg.setProtocolType(ProtocolEnum.MQ_SEND.getType()); enhanceChannel.send(serverMsg); } else { enhanceChannels.remove(enhanceChannel); } } } } } }).start(); } } }).create().start(12306); }