dubbo遠端調用的源碼分析,分成了三篇文章位址分别如下:
dubbo遠端調用源碼分析(一):用戶端發送請求
dubbo遠端調用源碼分析(二):服務端接收請求
dubbo遠端調用源碼分析(三):用戶端接收回報後的處理
下面看provider端接收到消息後的處理過程
首先要經過NettyClient的事件處理器,順序是NettyCodecAdapter.DeCoder和NettyHandler
NettyCodecAdapter的Decoder是在NettyCodecAdapter類中定義的
private class InternalDecoder extends SimpleChannelUpstreamHandler {
private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception{
Object o =event.getMessage();
if (!(o instanceof ChannelBuffer)) {
ctx.sendUpstream(event);
return;
}
ChannelBuffer input = (ChannelBuffer) o;
int readable = input.readableBytes();
if(readable <= 0) {
return;
}
com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
if(buffer.readable()) {
if(buffer instanceof DynamicChannelBuffer) {
buffer.writeBytes(input.toByteBuffer());
message = buffer;
} else{
int size = buffer.readableBytes() + input.readableBytes();
message =com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
size > bufferSize ? size : bufferSize);
message.writeBytes(buffer, buffer.readableBytes());
message.writeBytes(input.toByteBuffer());
}
} else {
message= com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
input.toByteBuffer());
}
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(),url, handler);
Object msg;
int saveReaderIndex;
try {
//decode object.
do {
saveReaderIndex = message.readerIndex();
try{
msg = codec.decode(channel, message);
}catch (IOException e) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw e;
}
if(msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
}else {
if (saveReaderIndex == message.readerIndex()) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw new IOException("Decode without read data.");
}
if (msg != null) {
Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
}
}
} while(message.readable());
} finally {
if(message.readable()) {
message.discardReadBytes();
buffer= message;
} else{
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
}
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
ctx.sendUpstream(e);
}
}
其中的messageReceived()方法就是接收消息之後執行的方法
首先從event把接收到的消息讀到ChannelBuffer中,然後用Codec2.decode方法解碼
Codec2接口由DubboCodec類實作,decode()方法具體是在在DubboCodec的父類ExchangeCodec類中:
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable =buffer.readableBytes();
byte[] header =new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}
最後調用的decode方法如下:
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)throws IOException {
// check magicnumber.
if (readable> 0 && header[0] != MAGIC_HIGH
||readable > 1 && header[1] != MAGIC_LOW) {
int length= header.length;
if(header.length < readable) {
header= Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i= 1; i < header.length - 1; i++) {
if(header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// checklength.
if (readable< HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// get datalength.
int len =Bytes.bytes2int(header, 12);
checkPayload(channel,len);
int tt = len +HEADER_LENGTH;
if (readable< tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit inputstream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
return decodeBody(channel, is, header);
} finally {
if(is.available() > 0) {
try {
if(logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch(IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
該方法一開始根據head資訊,判斷目前接收到的消息是否完整,這個判定是和dubbo的響應機制有關,dubbo讀取消息是以事件機制(Event)為基礎的,是一種NIO的設計,當事件響應機制得到事件後,資訊的傳輸可能不全,如果不全,則終止該次處理,等消息傳輸完整後再處理。
如果消息完整,需要解碼,調用了decodeBody()方法:
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag =header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s= CodecSupport.getSerialization(channel.getUrl(), proto);
ObjectInput in= s.deserialize(channel.getUrl(), is);
// get requestid.
long id =Bytes.bytes2long(header, 4);
if ((flag &FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
if ((flag& FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// getstatus.
byte status= header[3];
res.setStatus(status);
if (status== Response.OK) {
try {
Object data;
if(res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
}else if (res.isEvent()) {
data = decodeEventData(channel, in);
}else {
data = decodeResponseData(channel, in, getRequestData(id));
}
res.setResult(data);
} catch(Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
} else {
res.setErrorMessage(in.readUTF());
}
return res;
} else {
// decoderequest.
Request req= new Request(id);
req.setVersion("2.0.0");
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag& FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Objectdata;
if(req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
data = decodeEventData(channel, in);
} else{
data = decodeRequestData(channel, in);
}
req.setData(data);
} catch(Throwable t) {
// badrequest
req.setBroken(true);
req.setData(t);
}
return req;
}
}
該方法最開始從header裡面擷取了flag和proto,并且擷取序列化方式,下面有個flag的if判斷,是因為provider接收消息和consumer接收provider回報時都用這段代碼,當provider接收消息時,代碼走的是else部分,也就是建立了一個Request對象,然後把消息解碼,放入Request對象。
得到Request對象後,回到InternalDecoder的messageReceived()方法,codec.decode()方法獲得的Request對象指派給msg,如果消息是完整的而且正常解碼了,會調用Channels.fireMessageReceived()方法做一些資訊設定,該方法代碼如下:
/**
* Sends a {@code"messageReceived"} event to the
* {@linkChannelUpstreamHandler} which is placed in the closest upstream
* from the handlerassociated with the specified
* {@linkChannelHandlerContext}.
*
* @param message the received message
* @param remoteAddress the remote address wherethe received message
* came from
*/
public static void fireMessageReceived(
ChannelHandlerContext ctx, Object message, SocketAddress remoteAddress){
ctx.sendUpstream(new UpstreamMessageEvent(
ctx.getChannel(), message, remoteAddress));
}
至此NettyCodecAdapter.DeCoder結束,下面是NettyHandler,觸發的是NettyHandler的messageReceived()方法:
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
其中調用了handler的received()方法,該方法的實作在DecodeHandler類中:
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
這裡的message是Request,是以走的是第二個if裡的判斷,這裡面是解碼,前面已經解碼過了,其實不會再解碼了,方法的最後是其他handler.received()方法,這個handler的實作類是HeaderExchangeHandler類,他的received()方法如下:
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
//handle request.
Request request = (Request) message;
if(request.isEvent()) {
handlerEvent(channel, request);
} else{
if(request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
}else {
handler.received(exchangeChannel, request.getData());
}
}
} else if(message instanceof Response) {
handleResponse(channel, (Response) message);
} else if(message instanceof String) {
if(isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported stringmessage: " + message + " in channel: " + channel + ", url:" + channel.getUrl());
logger.error(e.getMessage(), e);
} else{
String echo =handler.telnet(channel, (String) message);
if(echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
該方法中的message是Request類型,并且request.isTwoWay是true,會調用handleReuqest方法得到處理結果Response,然後用Channel.send()方法把處理結果發送給consumer,下面是handleRequest()方法的代碼:
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res =new Response(req.getId(), req.getVersion());
if(req.isBroken()) {
Object data = req.getData();
String msg;
if (data ==null) msg = null;
else if(data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else msg =data.toString();
res.setErrorMessage("Failto decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
return res;
}
// find handlerby message class.
Object msg =req.getData();
try {
// handledata.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch(Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
方法建立了一個Response對象,首先判斷請求是否有效,無效則會直接傳回辨別錯誤的Response,否則從Request中得到封裝的對象,然後調用handler.reply()方法,得到結果,這個handler是由DubboProtocol類中定義的ExchangeHandler類實作的,ExchangeHandler實際上是一個ExchangeHandlerAdapter,除了reply方法之外還實作了Channel接口的很多方法比如connected,received等,ExchangeHandler類的定義和reply方法如下:
private ExchangeHandler requestHandler = newExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback需要處理高版本調用低版本的問題
if(Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr =invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null ||methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
}else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod =true;
break;
}
}
}
if(!hasMethod) {
logger.warn(new IllegalStateException("The methodName " +inv.getMethodName() + " not found in callback service interface ,invokewill be ignored. please update the api interface. url is:" +invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null? null : (message.getClass().getName() + ": " + message) + ",channel: consumer: " + channel.getRemoteAddress() + " -->provider: " + channel.getLocalAddress());
}
//還有很多别的方法,此處省略
}
reply方法開始的時候調用getInvoker()方法,得到了Invoker,getInvoker()方法如下:
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port =channel.getLocalAddress().getPort();
String path =inv.getAttachments().get(Constants.PATH_KEY);
//如果是用戶端的回調服務.
isStubServiceInvoke =Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
if(isStubServiceInvoke) {
port =channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke= isClientSide(channel) && !isStubServiceInvoke;
if(isCallBackServiceInvoke) {
path =inv.getAttachments().get(Constants.PATH_KEY) + "." +inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE,Boolean.TRUE.toString());
}
String serviceKey = serviceKey(port, path,inv.getAttachments().get(Constants.VERSION_KEY),inv.getAttachments().get(Constants.GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>)exporterMap.get(serviceKey);
if (exporter ==null)
throw new RemotingException(channel, "Not found exported service: " +serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " +channel.getRemoteAddress() + " --> provider: " +channel.getLocalAddress() + ", message:" + inv);
return exporter.getInvoker();
}
這個方法用端口号,接口位址,版本,分組号等資訊擷取了一個serviceKey,根據這個serviceKey從exporterMap中獲得了目标Invoker,實際上這個exporterMap是在provider初始化的時候生成的,放在DubboProtocol的父類AbstractProtocol中,每個接口都會生成一個Exporter對象放在exporterMap裡,用端口号、接口位址、版本、分組号當做map的key,Exporter對象會包含一個Invoker對象。
擷取serviceKey的方法serviceKey()的代碼在DubboProtocol的父類AbstractProtocol中:
protected static String serviceKey(int port, String serviceName, String serviceVersion, StringserviceGroup) {
return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}
ProtocolUtils.serviceKey()方法代碼:
public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
StringBuilderbuf = new StringBuilder();
if(serviceGroup != null && serviceGroup.length() > 0) {
buf.append(serviceGroup);
buf.append("/");
}
buf.append(serviceName);
if(serviceVersion != null && serviceVersion.length() > 0 &&!"0.0.0".equals(serviceVersion)) {
buf.append(":");
buf.append(serviceVersion);
}
buf.append(":");
buf.append(port);
return buf.toString();
}
其實就是用各種參數組合成的一個字元串
回到ExchangeHandler類的reply()方法,方法最後調用的invoker.invoke()方法,這個Invoker反射實際的接口實作類并處理資訊,使用了工廠模式和動态代理的方式,這裡的代理是Javassist代理,工廠代碼在JavassistProxyFactory類中,如下:
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T>T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T)Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T>Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper類不能正确處理帶$的類名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ?proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
處理完資訊并得到傳回值後,回到HeaderExchangeHandler類的received()方法,上面的代碼就是調用handleRequest()方法獲得Response的過程,然後就調用channel.send()方法,把傳回值發送給consumer,channel的實作類NettyChannel,他的send方法就是上面consumer給provider發資訊的那個方法,
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success= true;
int timeout =0;
try {
ChannelFuture future = channel.write(message);
if (sent) {
timeout= getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
success= future.await(timeout);
}
Throwable cause = future.getCause();
if (cause!= null) {
throw cause;
}
} catch(Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + "to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + "to " + getRemoteAddress()
+"in timeout(" + timeout + "ms) limit");
}
}
其中的channel.write就是發送的方法,在調用這個方法的時候會觸發下行事件的事件處理器,也和consumer往provider發消息時一樣,觸發NettyCodecAdapter.encoder和NettyHandler,隻不過此時調用的是DubboCodec的encodeResponse()方法(具體實作在父類ExchangeCodec中),而不是之前的encodeRequest()方法,是以encodeResponse()方法如下:
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
int savedWriteIndex = buffer.writerIndex();
try {
Serialization serialization = getSerialization(channel);
// header.
byte[]header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] =serialization.getContentTypeId();
if(res.isHeartbeat()) header[2] |= FLAG_EVENT;
// set response status.
byte status= res.getStatus();
header[3] =status;
// set request id.
Bytes.long2bytes(res.getId(), header, 4);
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// encode response data or error message.
if (status== Response.OK) {
if(res.isHeartbeat()) {
encodeHeartbeatData(channel, out, res.getResult());
} else{
encodeResponseData(channel, out, res.getResult());
}
} else out.writeUTF(res.getErrorMessage());
out.flushBuffer();
bos.flush();
bos.close();
int len =bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch(Throwable t) {
// 将buffer内容清空
buffer.writerIndex(savedWriteIndex);
// 發送失敗資訊給Consumer,否則Consumer隻能等逾時了
if(!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
Response r = new Response(res.getId(), res.getVersion());
r.setStatus(Response.BAD_RESPONSE);
if (t instanceof ExceedPayloadLimitException) {
logger.warn(t.getMessage(), t);
try{
r.setErrorMessage(t.getMessage());
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " +t.getMessage() + ", cause: " + e.getMessage(), e);
}
} else{
//FIXME 在Codec中列印出錯日志?在IoHanndler的caught中統一處理?
logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
try{
r.setErrorMessage("Failed to send response: " + res + ",cause: " + StringUtils.toString(t));
channel.send(r);
return;
}catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + res +", cause: " + e.getMessage(), e);
}
}
}
// 重新抛出收到的異常
if (t instanceof IOException) {
throw (IOException) t;
} else if(t instanceof RuntimeException) {
throw(RuntimeException) t;
} else if(t instanceof Error) {
throw(Error) t;
} else {
throw new RuntimeException(t.getMessage(), t);
}
}
}
和encodeRequest差不多,拼裝消息頭,編碼,序列化消息體,最後調用encodeResponseData方法,因為這裡本身調用的是ExchangeCodec的子類DubboCodec,是以方法中的encodeResponseData方法調用的是DubboCodec重寫的方法,代碼如下:
@Override
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
Result result =(Result) data;
Throwable th =result.getException();
if (th == null){
Object ret= result.getValue();
if (ret ==null) {
out.writeByte(RESPONSE_NULL_VALUE);
} else {
out.writeByte(RESPONSE_VALUE);
out.writeObject(ret);
}
} else {
out.writeByte(RESPONSE_WITH_EXCEPTION);
out.writeObject(th);
}
}
首先判斷結果中否包含異常,沒有異常再判斷結果中有沒有傳回值類型,要是沒有傳回值類型就不寫消息體了。
至此provider的處理過程結束