天天看點

dubbo遠端調用源碼分析(二):服務端接收請求

dubbo遠端調用的源碼分析,分成了三篇文章位址分别如下:

dubbo遠端調用源碼分析(一):用戶端發送請求

dubbo遠端調用源碼分析(二):服務端接收請求

dubbo遠端調用源碼分析(三):用戶端接收回報後的處理

下面看provider端接收到消息後的處理過程

首先要經過NettyClient的事件處理器,順序是NettyCodecAdapter.DeCoder和NettyHandler

NettyCodecAdapter的Decoder是在NettyCodecAdapter類中定義的

private class InternalDecoder extends SimpleChannelUpstreamHandler {


        private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =

               com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;


        @Override

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception{

            Object o =event.getMessage();

            if (!(o instanceof ChannelBuffer)) {

               ctx.sendUpstream(event);

                return;

            }


           ChannelBuffer input = (ChannelBuffer) o;

            int readable = input.readableBytes();

            if(readable <= 0) {

                return;

            }

 

           com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;

            if(buffer.readable()) {

                if(buffer instanceof DynamicChannelBuffer) {

                   buffer.writeBytes(input.toByteBuffer());

                   message = buffer;

                } else{

                   int size = buffer.readableBytes() + input.readableBytes();

                   message =com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(

                           size > bufferSize ? size : bufferSize);

                   message.writeBytes(buffer, buffer.readableBytes());

                   message.writeBytes(input.toByteBuffer());

                }

            } else {

                message= com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(

                       input.toByteBuffer());

            }


           NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(),url, handler);

            Object msg;

            int saveReaderIndex;


            try {

                //decode object.

                do {

                   saveReaderIndex = message.readerIndex();

                    try{

                       msg = codec.decode(channel, message);

                    }catch (IOException e) {

                       buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

                       throw e;

                    }

                    if(msg == Codec2.DecodeResult.NEED_MORE_INPUT) {

                       message.readerIndex(saveReaderIndex);

                       break;

                    }else {

                       if (saveReaderIndex == message.readerIndex()) {

                           buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

                           throw new IOException("Decode without read data.");

                       }

                       if (msg != null) {

                           Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());

                       }

                    }

                } while(message.readable());

            } finally {

                if(message.readable()) {

                   message.discardReadBytes();

                    buffer= message;

                } else{

                   buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

                }

               NettyChannel.removeChannelIfDisconnected(ctx.getChannel());

            }

        }

 

        @Override

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {

           ctx.sendUpstream(e);

        }

    }
           

其中的messageReceived()方法就是接收消息之後執行的方法

首先從event把接收到的消息讀到ChannelBuffer中,然後用Codec2.decode方法解碼

Codec2接口由DubboCodec類實作,decode()方法具體是在在DubboCodec的父類ExchangeCodec類中:

public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {

        int readable =buffer.readableBytes();

        byte[] header =new byte[Math.min(readable, HEADER_LENGTH)];

       buffer.readBytes(header);

        return decode(channel, buffer, readable, header);

    }
           

最後調用的decode方法如下:

protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)throws IOException {

        // check magicnumber.

        if (readable> 0 && header[0] != MAGIC_HIGH

                ||readable > 1 && header[1] != MAGIC_LOW) {

            int length= header.length;

            if(header.length < readable) {

                header= Bytes.copyOf(header, readable);

               buffer.readBytes(header, length, readable - length);

            }

            for (int i= 1; i < header.length - 1; i++) {

                if(header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {

                   buffer.readerIndex(buffer.readerIndex() - header.length + i);

                   header = Bytes.copyOf(header, i);

                   break;

                }

            }

            return super.decode(channel, buffer, readable, header);

        }

        // checklength.

        if (readable< HEADER_LENGTH) {

            return DecodeResult.NEED_MORE_INPUT;

        }


        // get datalength.

        int len =Bytes.bytes2int(header, 12);

        checkPayload(channel,len);


        int tt = len +HEADER_LENGTH;

        if (readable< tt) {

            return DecodeResult.NEED_MORE_INPUT;

        }


        // limit inputstream.

       ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);


        try {

            return decodeBody(channel, is, header);

        } finally {

            if(is.available() > 0) {

                try {

                    if(logger.isWarnEnabled()) {

                       logger.warn("Skip input stream " + is.available());

                    }

                   StreamUtils.skipUnusedStream(is);

                } catch(IOException e) {

                   logger.warn(e.getMessage(), e);

                }

            }

        }

    }
           

該方法一開始根據head資訊,判斷目前接收到的消息是否完整,這個判定是和dubbo的響應機制有關,dubbo讀取消息是以事件機制(Event)為基礎的,是一種NIO的設計,當事件響應機制得到事件後,資訊的傳輸可能不全,如果不全,則終止該次處理,等消息傳輸完整後再處理。

如果消息完整,需要解碼,調用了decodeBody()方法:

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {

        byte flag =header[2], proto = (byte) (flag & SERIALIZATION_MASK);

        Serialization s= CodecSupport.getSerialization(channel.getUrl(), proto);

        ObjectInput in= s.deserialize(channel.getUrl(), is);

        // get requestid.

        long id =Bytes.bytes2long(header, 4);

        if ((flag &FLAG_REQUEST) == 0) {

            // decode response.

            Response res = new Response(id);

            if ((flag& FLAG_EVENT) != 0) {

               res.setEvent(Response.HEARTBEAT_EVENT);

            }

            // getstatus.

            byte status= header[3];

           res.setStatus(status);

            if (status== Response.OK) {

                try {

                   Object data;

                    if(res.isHeartbeat()) {

                       data = decodeHeartbeatData(channel, in);

                    }else if (res.isEvent()) {

                       data = decodeEventData(channel, in);

                    }else {

                       data = decodeResponseData(channel, in, getRequestData(id));

                    }

                   res.setResult(data);

                } catch(Throwable t) {

                   res.setStatus(Response.CLIENT_ERROR);

                   res.setErrorMessage(StringUtils.toString(t));

                }

            } else {

               res.setErrorMessage(in.readUTF());

            }

            return res;

        } else {

            // decoderequest.

            Request req= new Request(id);

           req.setVersion("2.0.0");

           req.setTwoWay((flag & FLAG_TWOWAY) != 0);

            if ((flag& FLAG_EVENT) != 0) {

               req.setEvent(Request.HEARTBEAT_EVENT);

            }

            try {

                Objectdata;

                if(req.isHeartbeat()) {

                   data = decodeHeartbeatData(channel, in);

                } else if (req.isEvent()) {

                   data = decodeEventData(channel, in);

                } else{

                   data = decodeRequestData(channel, in);

                }

               req.setData(data);

            } catch(Throwable t) {

                // badrequest

               req.setBroken(true);

               req.setData(t);

            }

            return req;

        }

    }
           

該方法最開始從header裡面擷取了flag和proto,并且擷取序列化方式,下面有個flag的if判斷,是因為provider接收消息和consumer接收provider回報時都用這段代碼,當provider接收消息時,代碼走的是else部分,也就是建立了一個Request對象,然後把消息解碼,放入Request對象。

得到Request對象後,回到InternalDecoder的messageReceived()方法,codec.decode()方法獲得的Request對象指派給msg,如果消息是完整的而且正常解碼了,會調用Channels.fireMessageReceived()方法做一些資訊設定,該方法代碼如下:

/**

     * Sends a {@code"messageReceived"} event to the

     * {@linkChannelUpstreamHandler} which is placed in the closest upstream

     * from the handlerassociated with the specified

     * {@linkChannelHandlerContext}.

     *

     * @param message        the received message

     * @param remoteAddress  the remote address wherethe received message

     *                       came from

     */

    public static void fireMessageReceived(

           ChannelHandlerContext ctx, Object message, SocketAddress remoteAddress){

       ctx.sendUpstream(new UpstreamMessageEvent(

               ctx.getChannel(), message, remoteAddress));

    }
           

至此NettyCodecAdapter.DeCoder結束,下面是NettyHandler,觸發的是NettyHandler的messageReceived()方法:

@Override

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);

        try {

           handler.received(channel, e.getMessage());

        } finally {

           NettyChannel.removeChannelIfDisconnected(ctx.getChannel());

        }

    }
           

其中調用了handler的received()方法,該方法的實作在DecodeHandler類中:

public void received(Channel channel, Object message) throws RemotingException {

        if (message instanceof Decodeable) {

           decode(message);

        }


        if (message instanceof Request) {

           decode(((Request) message).getData());

        }


        if (message instanceof Response) {

           decode(((Response) message).getResult());

        }


       handler.received(channel, message);

    }
           

這裡的message是Request,是以走的是第二個if裡的判斷,這裡面是解碼,前面已經解碼過了,其實不會再解碼了,方法的最後是其他handler.received()方法,這個handler的實作類是HeaderExchangeHandler類,他的received()方法如下:

public void received(Channel channel, Object message) throws RemotingException {

       channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());

        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);

        try {

            if (message instanceof Request) {

                //handle request.

                Request request = (Request) message;

                if(request.isEvent()) {

                   handlerEvent(channel, request);

                } else{

                    if(request.isTwoWay()) {

                       Response response = handleRequest(exchangeChannel, request);

                       channel.send(response);

                    }else {

                       handler.received(exchangeChannel, request.getData());

                    }

                }

            } else if(message instanceof Response) {

               handleResponse(channel, (Response) message);

            } else if(message instanceof String) {

                if(isClientSide(channel)) {

                   Exception e = new Exception("Dubbo client can not supported stringmessage: " + message + " in channel: " + channel + ", url:" + channel.getUrl());

                   logger.error(e.getMessage(), e);

                } else{

                    String echo =handler.telnet(channel, (String) message);

                    if(echo != null && echo.length() > 0) {

                       channel.send(echo);

                    }

                }

            } else {

               handler.received(exchangeChannel, message);

            }

        } finally {

           HeaderExchangeChannel.removeChannelIfDisconnected(channel);

        }

    }
           

該方法中的message是Request類型,并且request.isTwoWay是true,會調用handleReuqest方法得到處理結果Response,然後用Channel.send()方法把處理結果發送給consumer,下面是handleRequest()方法的代碼:

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {

        Response res =new Response(req.getId(), req.getVersion());

        if(req.isBroken()) {

            Object data = req.getData();

 
            String msg;

            if (data ==null) msg = null;

            else if(data instanceof Throwable) msg = StringUtils.toString((Throwable) data);

            else msg =data.toString();

            res.setErrorMessage("Failto decode request due to: " + msg);

           res.setStatus(Response.BAD_REQUEST);


            return res;

        }

        // find handlerby message class.

        Object msg =req.getData();

        try {

            // handledata.

            Object result = handler.reply(channel, msg);

           res.setStatus(Response.OK);

           res.setResult(result);

        } catch(Throwable e) {

           res.setStatus(Response.SERVICE_ERROR);

           res.setErrorMessage(StringUtils.toString(e));

        }

        return res;

    }
           

方法建立了一個Response對象,首先判斷請求是否有效,無效則會直接傳回辨別錯誤的Response,否則從Request中得到封裝的對象,然後調用handler.reply()方法,得到結果,這個handler是由DubboProtocol類中定義的ExchangeHandler類實作的,ExchangeHandler實際上是一個ExchangeHandlerAdapter,除了reply方法之外還實作了Channel接口的很多方法比如connected,received等,ExchangeHandler類的定義和reply方法如下:

private ExchangeHandler requestHandler = newExchangeHandlerAdapter() {

        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (message instanceof Invocation) {

               Invocation inv = (Invocation) message;

               Invoker<?> invoker = getInvoker(channel, inv);

                //如果是callback需要處理高版本調用低版本的問題

                if(Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){

                   String methodsStr =invoker.getUrl().getParameters().get("methods");

                   boolean hasMethod = false;

                    if (methodsStr == null ||methodsStr.indexOf(",") == -1) {

                       hasMethod = inv.getMethodName().equals(methodsStr);

                    }else {

                       String[] methods = methodsStr.split(",");

                        for (String method : methods) {

                           if (inv.getMethodName().equals(method)) {

                                hasMethod =true;

                                break;

                           }

                       }

                    }

                    if(!hasMethod) {

                       logger.warn(new IllegalStateException("The methodName " +inv.getMethodName() + " not found in callback service interface ,invokewill be ignored. please update the api interface. url is:" +invoker.getUrl()) + " ,invocation is :" + inv);

                       return null;

                    }

                }

               RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());

                return invoker.invoke(inv);

            }

            throw new RemotingException(channel, "Unsupported request: " + message == null? null : (message.getClass().getName() + ": " + message) + ",channel: consumer: " + channel.getRemoteAddress() + " -->provider: " + channel.getLocalAddress());

        }

    //還有很多别的方法,此處省略

    }
           

reply方法開始的時候調用getInvoker()方法,得到了Invoker,getInvoker()方法如下:

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {

        boolean isCallBackServiceInvoke = false;

        boolean isStubServiceInvoke = false;

        int port =channel.getLocalAddress().getPort();

        String path =inv.getAttachments().get(Constants.PATH_KEY);

        //如果是用戶端的回調服務.

       isStubServiceInvoke =Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));

        if(isStubServiceInvoke) {

            port =channel.getRemoteAddress().getPort();

        }

        //callback

        isCallBackServiceInvoke= isClientSide(channel) && !isStubServiceInvoke;

        if(isCallBackServiceInvoke) {

            path =inv.getAttachments().get(Constants.PATH_KEY) + "." +inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);

           inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE,Boolean.TRUE.toString());

        }

        String serviceKey = serviceKey(port, path,inv.getAttachments().get(Constants.VERSION_KEY),inv.getAttachments().get(Constants.GROUP_KEY));

 
       DubboExporter<?> exporter = (DubboExporter<?>)exporterMap.get(serviceKey);


        if (exporter ==null)

            throw new RemotingException(channel, "Not found exported service: " +serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " +channel.getRemoteAddress() + " --> provider: " +channel.getLocalAddress() + ", message:" + inv);

 
        return exporter.getInvoker();

    }
           

這個方法用端口号,接口位址,版本,分組号等資訊擷取了一個serviceKey,根據這個serviceKey從exporterMap中獲得了目标Invoker,實際上這個exporterMap是在provider初始化的時候生成的,放在DubboProtocol的父類AbstractProtocol中,每個接口都會生成一個Exporter對象放在exporterMap裡,用端口号、接口位址、版本、分組号當做map的key,Exporter對象會包含一個Invoker對象。

擷取serviceKey的方法serviceKey()的代碼在DubboProtocol的父類AbstractProtocol中:

protected static String serviceKey(int port, String serviceName, String serviceVersion, StringserviceGroup) {

        return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);

    }
           

ProtocolUtils.serviceKey()方法代碼:

public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {

        StringBuilderbuf = new StringBuilder();

        if(serviceGroup != null && serviceGroup.length() > 0) {

           buf.append(serviceGroup);

           buf.append("/");

        }

       buf.append(serviceName);

        if(serviceVersion != null && serviceVersion.length() > 0 &&!"0.0.0".equals(serviceVersion)) {

           buf.append(":");

           buf.append(serviceVersion);

        }

       buf.append(":");

       buf.append(port);

       return buf.toString();

    }
           

其實就是用各種參數組合成的一個字元串

回到ExchangeHandler類的reply()方法,方法最後調用的invoker.invoke()方法,這個Invoker反射實際的接口實作類并處理資訊,使用了工廠模式和動态代理的方式,這裡的代理是Javassist代理,工廠代碼在JavassistProxyFactory類中,如下:

public class JavassistProxyFactory extends AbstractProxyFactory {

 
   @SuppressWarnings("unchecked")

    public <T>T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {

        return (T)Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));

    }

 
    public <T>Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {

        // TODO Wrapper類不能正确處理帶$的類名

        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ?proxy.getClass() : type);

        return new AbstractProxyInvoker<T>(proxy, type, url) {

            @Override

            protected Object doInvoke(T proxy, String methodName,

                                     Class<?>[] parameterTypes,

                                      Object[] arguments) throws Throwable {

                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);

            }

        };

    }


}
           

處理完資訊并得到傳回值後,回到HeaderExchangeHandler類的received()方法,上面的代碼就是調用handleRequest()方法獲得Response的過程,然後就調用channel.send()方法,把傳回值發送給consumer,channel的實作類NettyChannel,他的send方法就是上面consumer給provider發資訊的那個方法,

public void send(Object message, boolean sent) throws RemotingException {

       super.send(message, sent);


        boolean success= true;

        int timeout =0;

        try {

           ChannelFuture future = channel.write(message);

            if (sent) {

                timeout= getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);

                success= future.await(timeout);

            }

            Throwable cause = future.getCause();

            if (cause!= null) {

                throw cause;

            }

        } catch(Throwable e) {

            throw new RemotingException(this, "Failed to send message " + message + "to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);

        }


        if (!success) {

            throw new RemotingException(this, "Failed to send message " + message + "to " + getRemoteAddress()

                    +"in timeout(" + timeout + "ms) limit");

        }

    }
           

其中的channel.write就是發送的方法,在調用這個方法的時候會觸發下行事件的事件處理器,也和consumer往provider發消息時一樣,觸發NettyCodecAdapter.encoder和NettyHandler,隻不過此時調用的是DubboCodec的encodeResponse()方法(具體實作在父類ExchangeCodec中),而不是之前的encodeRequest()方法,是以encodeResponse()方法如下:

protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {

        int savedWriteIndex = buffer.writerIndex();

        try {

           Serialization serialization = getSerialization(channel);

            // header.

            byte[]header = new byte[HEADER_LENGTH];

            // set magic number.

           Bytes.short2bytes(MAGIC, header);

            // set request and serialization flag.

            header[2] =serialization.getContentTypeId();

            if(res.isHeartbeat()) header[2] |= FLAG_EVENT;

            // set response status.

            byte status= res.getStatus();

            header[3] =status;

            // set request id.

           Bytes.long2bytes(res.getId(), header, 4);

 

           buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);

           ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);

            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

            // encode response data or error message.

            if (status== Response.OK) {

                if(res.isHeartbeat()) {

                   encodeHeartbeatData(channel, out, res.getResult());

                } else{

                   encodeResponseData(channel, out, res.getResult());

                }

            } else out.writeUTF(res.getErrorMessage());

           out.flushBuffer();

           bos.flush();

            bos.close();

 

            int len =bos.writtenBytes();

           checkPayload(channel, len);

           Bytes.int2bytes(len, header, 12);

            // write

           buffer.writerIndex(savedWriteIndex);

           buffer.writeBytes(header); // write header.

           buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);

        } catch(Throwable t) {

            // 将buffer内容清空

           buffer.writerIndex(savedWriteIndex);

            // 發送失敗資訊給Consumer,否則Consumer隻能等逾時了

            if(!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {

               Response r = new Response(res.getId(), res.getVersion());

               r.setStatus(Response.BAD_RESPONSE);

 

                if (t instanceof ExceedPayloadLimitException) {

                   logger.warn(t.getMessage(), t);

                    try{

                       r.setErrorMessage(t.getMessage());

                       channel.send(r);

                       return;

                    } catch (RemotingException e) {

                       logger.warn("Failed to send bad_response info back: " +t.getMessage() + ", cause: " + e.getMessage(), e);

                    }

                } else{

                    //FIXME 在Codec中列印出錯日志?在IoHanndler的caught中統一處理?

                   logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);

                    try{

                       r.setErrorMessage("Failed to send response: " + res + ",cause: " + StringUtils.toString(t));

                       channel.send(r);

                       return;

                    }catch (RemotingException e) {

                       logger.warn("Failed to send bad_response info back: " + res +", cause: " + e.getMessage(), e);

                    }

                }

            }

 

            // 重新抛出收到的異常

            if (t instanceof IOException) {

                throw (IOException) t;

            } else if(t instanceof RuntimeException) {

                throw(RuntimeException) t;

            } else if(t instanceof Error) {

                throw(Error) t;

            } else {

                throw new RuntimeException(t.getMessage(), t);

            }

        }

    }
           

和encodeRequest差不多,拼裝消息頭,編碼,序列化消息體,最後調用encodeResponseData方法,因為這裡本身調用的是ExchangeCodec的子類DubboCodec,是以方法中的encodeResponseData方法調用的是DubboCodec重寫的方法,代碼如下:

@Override

    protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {

        Result result =(Result) data;

 
        Throwable th =result.getException();

        if (th == null){

            Object ret= result.getValue();

            if (ret ==null) {

               out.writeByte(RESPONSE_NULL_VALUE);

            } else {

               out.writeByte(RESPONSE_VALUE);

               out.writeObject(ret);

            }

        } else {

           out.writeByte(RESPONSE_WITH_EXCEPTION);

           out.writeObject(th);

        }

    }
           

首先判斷結果中否包含異常,沒有異常再判斷結果中有沒有傳回值類型,要是沒有傳回值類型就不寫消息體了。

至此provider的處理過程結束

繼續閱讀