天天看點

【原創】自己動手實作RPC服務調用架構

引言

本文利用java自帶的socket程式設計實作了一個簡單的rpc調用架構,由兩個工程組成分别名為battercake-provider(服務提供者)、battercake-consumer(服務調用者)。

設計思路如下:

1、在battercake-provider中,寫一個服務叫BatterCakeService

2、在battercake-provider中,啟動RpcProvider,釋出該服務

3、在battercake-consumer中,啟動測試類RpcTest

4、在battercake-consumer中,利用jdk動态代理,獲得BatterCakeService的動态代理類BatterCakeService$Proxy0

5、在battercake-consumer中,動态代理類BatterCakeService$Proxy0,與battercake-provider建立socket連接配接,battercake-provider針對每一個連接配接,都會啟動一個ServerThread處理請求,代理類則發送服務參數等相關資訊

6、在battercake-consumer中,接收battercake-provider的ServerThread請求傳回的結果。

上述過程時序圖如下所示

【原創】自己動手實作RPC服務調用架構

接下來上代碼!!

服務提供者

本部分的工程為battercake-provider,項目結構圖如下圖所示

【原創】自己動手實作RPC服務調用架構

先上使用的部分的代碼

先建立一個微服務,接口如下

package com.rjzheng.service;     public interface BatterCakeService {     	/**     	 * 賣煎餅的服務     	 * @param name     	 * @return     	 */     	public String sellBatterCake(String name);     }           

實作類如下

package com.rjzheng.service.impl;     import com.rjzheng.service.BatterCakeService;     public class BatterCakeServiceImpl implements BatterCakeService {     	@Override     	public String sellBatterCake(String name) {     		// TODO Auto-generated method stub     		return name+"煎餅,賣的特别好";     	}     }           

接下來就是釋出服務

package com.rjzheng.start;     import com.rjzheng.rpc.RpcProvider;     import com.rjzheng.service.BatterCakeService;     import com.rjzheng.service.impl.BatterCakeServiceImpl;     public class RpcBootStrap {     	public static void main(String[] args) throws Exception {     		BatterCakeService batterCakeService =new BatterCakeServiceImpl();     		//釋出賣煎餅的服務,注冊在20006端口     		RpcProvider.export(20006,batterCakeService);     	}     }           

接下來是rpc架構調用部分的代碼,RpcProvider,該部分代碼可以總結為兩步

  1. 将需要釋出的服務存儲在一個記憶體變量serviceList中
  2. 啟動socket,server.accept()方法阻塞在那,監聽輸入
  3. 針對每一個請求,單獨啟動一個線程處理
package com.rjzheng.rpc;     import java.net.ServerSocket;     import java.net.Socket;     import java.util.ArrayList;     import java.util.Arrays;     import java.util.List;     /**      * RPC服務提供器      * @author zhengrongjun      *      */     public class RpcProvider {     	//存儲注冊的服務清單     	private static List<Object> serviceList;     	/**     	 * 釋出rpc服務     	 * @param object     	 * @param port     	 * @throws Exception     	 */     	public static void export(int port,Object... services) throws Exception {     		serviceList=Arrays.asList(services);     		ServerSocket server = new ServerSocket(port);     		Socket client = null;     		while (true) {     			//阻塞等待輸入     			client = server.accept();     			//每一個請求,啟動一個線程處理     			new Thread(new ServerThread(client,serviceList)).start();     		}     	}     }           

接下來ServerThread線程處理類的代碼,ServerThread主要做以下幾個步驟

  1. 讀取用戶端發送的服務名
  2. 判斷服務是否釋出
  3. 如果釋出,則走反射邏輯,動态調用,傳回結果
  4. 如果未釋出,則傳回提示通知
package com.rjzheng.rpc;     import java.io.IOException;     import java.io.ObjectInputStream;     import java.io.ObjectOutputStream;     import java.lang.reflect.Method;     import java.net.Socket;     import java.util.List;     public class ServerThread implements Runnable {     	private Socket client = null;     	private List<Object> serviceList = null;     	public ServerThread(Socket client, List<Object> service) {     		this.client = client;     		this.serviceList = service;     	}     	@Override     	public void run() {     		ObjectInputStream input = null;     		ObjectOutputStream output = null;     		try {     			input = new ObjectInputStream(client.getInputStream());     			output = new ObjectOutputStream(client.getOutputStream());     			// 讀取用戶端要通路那個service     			Class serviceClass = (Class) input.readObject();     			// 找到該服務類     			Object obj = findService(serviceClass);     			if (obj == null) {     				output.writeObject(serviceClass.getName() + "服務未發現");     			} else {     				//利用反射調用該方法,傳回結果     				try {     					String methodName = input.readUTF();     					Class<?>[] parameterTypes = (Class<?>[]) input.readObject();     					Object[] arguments = (Object[]) input.readObject();     					Method method = obj.getClass().getMethod(methodName, parameterTypes);                           Object result = method.invoke(obj, arguments);                           output.writeObject(result);      				} catch (Throwable t) {     					output.writeObject(t);     				}     			}     		} catch (Exception e) {     			e.printStackTrace();     		} finally {     			try {     				client.close();     				input.close();     				output.close();     			} catch (IOException e) {     				// TODO Auto-generated catch block     				e.printStackTrace();     			}     		}     	}     	private Object findService(Class serviceClass) {     		// TODO Auto-generated method stub     		for (Object obj : serviceList) {     			boolean isFather = serviceClass.isAssignableFrom(obj.getClass());     			if (isFather) {     				return obj;     			}     		}     		return null;     	}     }           

服務消費者

本部分的工程為battercake-consumer,項目結構圖如下圖所示

【原創】自己動手實作RPC服務調用架構

先上rpc架構調用部分的代碼RpcConsumer,步驟分兩步

  1. 封裝一個代理類處理器
  2. 傳回service的代理類對象
package com.rjzheng.rpc;     import java.lang.reflect.Proxy;     public class RpcConsumer {     	public static <T> T getService(Class<T> clazz,String ip,int port) {     		ProxyHandler proxyHandler =new ProxyHandler(ip,port);     		return (T)Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler);     	}     }           

接下來上代理類處理器的代碼,代理類處理步驟分以下幾步

  1. 建立socket連接配接
  2. 封裝請求資料,發送給服務提供者
  3. 傳回結果
package com.rjzheng.rpc;     import java.io.ObjectInputStream;     import java.io.ObjectOutputStream;     import java.lang.reflect.InvocationHandler;     import java.lang.reflect.Method;     import java.net.Socket;     import com.rjzheng.service.BatterCakeService;     public class ProxyHandler implements InvocationHandler {     	private String ip;     	private int port;     	public ProxyHandler(String ip, int port) {     		// TODO Auto-generated constructor stub     		this.ip = ip;     		this.port = port;     	}     	@Override     	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {     		// TODO Auto-generated method stub     		Socket socket = new Socket(this.ip, this.port);     		ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());     		ObjectInputStream input = new ObjectInputStream(socket.getInputStream());     		try {     			output.writeObject(proxy.getClass().getInterfaces()[0]);     			output.writeUTF(method.getName());     			output.writeObject(method.getParameterTypes());     			output.writeObject(args);     			output.flush();     			Object result = input.readObject();     			if(result instanceof Throwable) {     				throw (Throwable) result;     			}     				return result;     		} finally {     			socket.shutdownOutput();     		}     	}     }           

接下來建立一個測試類RpcTest如下(跑該測試類前,記得運作在battercake-provider端的RpcBootstrap類釋出BatterCakeService服務)

package com.rjzheng.start;     import com.rjzheng.rpc.RpcConsumer;     import com.rjzheng.service.BatterCakeService;     public class RpcTest {     	public static void main(String[] args) {     		BatterCakeService batterCakeService=RpcConsumer.getService(BatterCakeService.class, "127.0.0.1", 20006);     		String result=batterCakeService.sellBatterCake("雙蛋");     		System.out.println(result);     	}     }           

輸出結果如下

雙蛋煎餅,賣的特别好           

至此,我們就實作了一個簡易的rpc服務調用架構

作者:孤獨煙

出處: http://rjzheng.cnblogs.com/

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。如果覺得還有幫助的話,可以點一下右下角的【推薦】。