前言
Protocol 還有一個實作分支是 AbstractProxyProtocol,如下圖所示:

從圖中我們可以看到:gRPC、HTTP、WebService、Hessian、Thrift 等協定對應的 Protocol 實作,都是繼承自 AbstractProxyProtocol 抽象類。
目前網際網路的技術棧百花齊放,很多公司會使用 Node.js、Python、Rails、Go 等語言來開發 一些 Web 端應用,同時又有很多服務會使用 Java 技術棧實作,這就出現了大量的跨語言調用的需求。Dubbo 作為一個 RPC 架構,自然也希望能實作這種跨語言的調用,目前 Dubbo 中使用“HTTP 協定 + JSON-RPC”的方式來達到這一目的,其中 HTTP 協定和 JSON 都是天然跨語言的标準,在各種語言中都有成熟的類庫。
下面就重點來分析 Dubbo 對 HTTP 協定的支援。首先,會介紹 JSON-RPC 的基礎,并通過一個示例,快速入門,然後介紹 Dubbo 中 HttpProtocol 的具體實作,也就是如何将 HTTP 協定與 JSON-RPC 結合使用,實作跨語言調用的效果。
JSON-RPC
Dubbo 中支援的 HTTP 協定實際上使用的是 JSON-RPC 協定。
JSON-RPC 是基于 JSON 的跨語言遠端調用協定。Dubbo 中的 dubbo-rpc-xml、dubbo-rpc-webservice 等子產品支援的 XML-RPC、WebService 等協定與 JSON-RPC 一樣,都是基于文本的協定,隻不過 JSON 的格式比 XML、WebService 等格式更加簡潔、緊湊。與 Dubbo 協定、Hessian 協定等二進制協定相比,JSON-RPC 更便于調試和實作,可見 JSON-RPC 協定還是一款非常優秀的遠端調用協定。
在 Java 體系中,有很多成熟的 JSON-RPC 架構,例如 jsonrpc4j、jpoxy 等,其中,jsonrpc4j 本身體積小巧,使用友善,既可以獨立使用,也可以與 Spring 無縫集合,非常适合基于 Spring 的項目。
下面先來看看 JSON-RPC 協定中請求的基本格式:
{
"id":1,
"method":"sayHello",
"params":[
"Dubbo json-rpc"
]
}
JSON-RPC請求中各個字段的含義如下:
- id 字段,用于唯一辨別一次遠端調用。
- method 字段,指定了調用的方法名。
- params 數組,表示方法傳入的參數,如果方法無參數傳入,則傳入空數組。
在 JSON-RPC 的服務端收到調用請求之後,會查找到相應的方法并進行調用,然後将方法的傳回值整理成如下格式,傳回給用戶端:
{
"id":1,
"result":"Hello Dubbo json-rpc",
"error":null
}
JSON-RPC響應中各個字段的含義如下:
- id 字段,用于唯一辨別一次遠端調用,該值與請求中的 id 字段值保持一緻。
- result 字段,記錄了方法的傳回值,若無傳回值,則傳回空;若調用錯誤,傳回 null。
- error 字段,表示調用發生異常時的異常資訊,方法執行無異常時該字段為 null。
jsonrpc4j 基礎使用
Dubbo 使用 jsonrpc4j 庫來實作 JSON-RPC 協定,下面使用 jsonrpc4j 編寫一個簡單的 JSON-RPC 服務端示例程式和用戶端示例程式,并通過這兩個示例程式說明 jsonrpc4j 最基本的使用方式。
首先,需要建立服務端和用戶端都需要的 domain 類以及服務接口。先來建立一個 User 類,作為最基礎的資料對象:
public class User implements Serializable {
private int userId;
private String name;
private int age;
// 省略上述字段的getter/setter方法以及toString()方法
}
接下來建立一個 UserService 接口作為服務接口,其中定義了 5 個方法,分别用來建立 User、查詢 User 以及相關資訊、删除 User:
public interface UserService {
User createUser(int userId, String name, int age);
User getUser(int userId);
String getUserName(int userId);
int getUserId(String name);
void deleteAll();
}
UserServiceImpl 是 UserService 接口的實作類,其中使用一個 ArrayList 集合管理 User 對象,具體實作如下:
public class UserServiceImpl implements UserService {
// 管理所有User對象
private List<User> users = new ArrayList<>();
@Override
public User createUser(int userId, String name, int age) {
System.out.println("createUser method");
User user = new User();
user.setUserId(userId);
user.setName(name);
user.setAge(age);
users.add(user); // 建立User對象并添加到users集合中
return user;
}
@Override
public User getUser(int userId) {
System.out.println("getUser method");
// 根據userId從users集合中查詢對應的User對象
return users.stream().filter(u -> u.getUserId() == userId).findAny().get();
}
@Override
public String getUserName(int userId) {
System.out.println("getUserName method");
// 根據userId從users集合中查詢對應的User對象之後,擷取該User的name
return getUser(userId).getName();
}
@Override
public int getUserId(String name) {
System.out.println("getUserId method");
// 根據name從users集合中查詢對應的User對象,然後擷取該User的id
return users.stream().filter(u -> u.getName().equals(name)).findAny().get().getUserId();
}
@Override
public void deleteAll() {
System.out.println("deleteAll");
users.clear(); // 清空users集合
}
}
整個使用者管理業務的核心大緻如此。下面我們來看服務端如何将 UserService 與 JSON-RPC 關聯起來。
首先,建立 RpcServlet 類,它是 HttpServlet 的子類,并覆寫了 HttpServlet 的 service() 方法。我們知道,HttpServlet 在收到 GET 和 POST 請求的時候,最終會調用其 service() 方法進行處理;HttpServlet 還會将 HTTP 請求和響應封裝成 HttpServletRequest 和 HttpServletResponse 傳入 service() 方法之中。這裡的 RpcServlet 實作之中會建立一個 JsonRpcServer,并在 service() 方法中将 HTTP 請求委托給 JsonRpcServer 進行處理:
public class RpcServlet extends HttpServlet {
private JsonRpcServer rpcServer = null;
public RpcServlet() {
super();
// JsonRpcServer會按照json-rpc請求,調用UserServiceImpl中的方法
rpcServer = new JsonRpcServer(new UserServiceImpl(), UserService.class);
}
@Override
protected void service(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
rpcServer.handle(request, response);
}
}
最後,建立一個 JsonRpcServer 作為服務端的入口類,在其 main() 方法中會啟動 Jetty 作為 Web 容器,具體實作如下:
public class JsonRpcServer {
public static void main(String[] args) throws Throwable {
// 伺服器的監聽端口
Server server = new Server(9999);
// 關聯一個已經存在的上下文
WebAppContext context = new WebAppContext();
// 設定描述符位置
context.setDescriptor("/dubbo-demo/json-rpc-demo/src/main/webapp/WEB-INF/web.xml");
// 設定Web内容上下文路徑
context.setResourceBase("/dubbo-demo/json-rpc-demo/src/main/webapp");
// 設定上下文路徑
context.setContextPath("/");
context.setParentLoaderPriority(true);
server.setHandler(context);
server.start();
server.join();
}
}
這裡使用到的 web.xml 配置檔案如下:
<?xml version="1.0" encoding="UTF-8"?>
<web-app
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
version="3.1">
<servlet>
<servlet-name>RpcServlet</servlet-name>
<servlet-class>com.demo.RpcServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>RpcServlet</servlet-name>
<url-pattern>/rpc</url-pattern>
</servlet-mapping>
</web-app>
完成服務端的編寫之後,下面再繼續編寫 JSON-RPC 的用戶端。在 JsonRpcClient 中會建立 JsonRpcHttpClient,并通過 JsonRpcHttpClient 請求服務端:
public class JsonRpcClient {
private static JsonRpcHttpClient rpcHttpClient;
public static void main(String[] args) throws Throwable {
// 建立JsonRpcHttpClient
rpcHttpClient = new JsonRpcHttpClient(new URL("http://127.0.0.1:9999/rpc"));
JsonRpcClient jsonRpcClient = new JsonRpcClient();
jsonRpcClient.deleteAll(); // 調用deleteAll()方法删除全部User
// 調用createUser()方法建立User
System.out.println(jsonRpcClient.createUser(1, "testName", 30));
// 調用getUser()、getUserName()、getUserId()方法進行查詢
System.out.println(jsonRpcClient.getUser(1));
System.out.println(jsonRpcClient.getUserName(1));
System.out.println(jsonRpcClient.getUserId("testName"));
}
public void deleteAll() throws Throwable {
// 調用服務端的deleteAll()方法
rpcHttpClient.invoke("deleteAll", null);
}
public User createUser(int userId, String name, int age) throws Throwable {
Object[] params = new Object[]{userId, name, age};
// 調用服務端的createUser()方法
return rpcHttpClient.invoke("createUser", params, User.class);
}
public User getUser(int userId) throws Throwable {
Integer[] params = new Integer[]{userId};
// 調用服務端的getUser()方法
return rpcHttpClient.invoke("getUser", params, User.class);
}
public String getUserName(int userId) throws Throwable {
Integer[] params = new Integer[]{userId};
// 調用服務端的getUserName()方法
return rpcHttpClient.invoke("getUserName", params, String.class);
}
public int getUserId(String name) throws Throwable {
String[] params = new String[]{name};
// 調用服務端的getUserId()方法
return rpcHttpClient.invoke("getUserId", params, Integer.class);
}
}
// 輸出:
// User{userId=1, name='testName', age=30}
// User{userId=1, name='testName', age=30}
// testName
// 1
AbstractProxyProtocol
在 AbstractProxyProtocol 的 export() 方法中,首先會根據 URL 檢查 exporterMap 緩存,如果查詢失敗,則會調用 ProxyFactory.getProxy() 方法将 Invoker 封裝成業務接口的代理類,然後通過子類實作的 doExport() 方法啟動底層的 ProxyProtocolServer,并初始化 serverMap 集合。具體實作如下:
public abstract class AbstractProxyProtocol extends AbstractProtocol {
protected ProxyFactory proxyFactory;
@Override
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
// 首先查詢exporterMap集合
final String uri = serviceKey(invoker.getUrl());
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
if (exporter != null) {
// When modifying the configuration through override, you need to re-expose the newly modified service.
if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
return exporter;
}
}
// 通過ProxyFactory建立代理類,将Invoker封裝成業務接口的代理類
final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
// doExport()方法傳回的Runnable是一個回調,其中會銷毀底層的Server,将會在unexport()方法中調用該Runnable
exporter = new AbstractExporter<T>(invoker) {
@Override
public void unexport() {
super.unexport();
exporterMap.remove(uri);
if (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
};
exporterMap.put(uri, exporter);
return exporter;
}
}
在 HttpProtocol 的 doExport() 方法中,與前面介紹的 DubboProtocol 的實作類似,也要啟動一個 RemotingServer。為了适配各種 HTTP 伺服器,例如,Tomcat、Jetty 等,Dubbo 在 Transporter 層抽象出了一個 HttpServer 的接口。
dubbo-remoting-http 子產品的入口是 HttpBinder 接口,它被 @SPI 注解修飾,是一個擴充接口,有三個擴充實作,預設使用的是 JettyHttpBinder 實作,如下圖所示:
HttpBinder 接口中的 bind() 方法被 @Adaptive 注解修飾,會根據 URL 的 server 參數選擇相應的 HttpBinder 擴充實作,不同 HttpBinder 實作傳回相應的 HttpServer 實作。HttpServer 的繼承關系如下圖所示:
這裡以 JettyHttpServer 為例簡單介紹 HttpServer 的實作,在 JettyHttpServer 中會初始化 Jetty Server,其中會配置 Jetty Server 使用到的線程池以及處理請求 Handler:
public class JettyHttpServer extends AbstractHttpServer {
private Server server;
private URL url;
public JettyHttpServer(URL url, final HttpHandler handler) {
// 初始化AbstractHttpServer中的url字段和handler字段
super(url, handler);
this.url = url;
// TODO we should leave this setting to slf4j
// we must disable the debug logging for production use
Log.setLog(new StdErrLog());
Log.getLog().setDebugEnabled(false);
// 添加HttpHandler
DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), handler);
// 建立線程池
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setDaemon(true);
threadPool.setMaxThreads(threads);
threadPool.setMinThreads(threads);
// 建立Jetty Server
server = new Server(threadPool);
// 建立ServerConnector,并指定綁定的ip和port
ServerConnector connector = new ServerConnector(server);
String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
if (!url.isAnyHost() && NetUtils.isValidLocalHost(bindIp)) {
connector.setHost(bindIp);
}
connector.setPort(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()));
server.addConnector(connector);
// 建立ServletHandler并與Jetty Server關聯,由DispatcherServlet處理全部的請求
ServletHandler servletHandler = new ServletHandler();
ServletHolder servletHolder = servletHandler.addServletWithMapping(DispatcherServlet.class, "/*");
servletHolder.setInitOrder(2);
// dubbo's original impl can't support the use of ServletContext
// server.addHandler(servletHandler);
// TODO Context.SESSIONS is the best option here? (In jetty 9.x, it becomes ServletContextHandler.SESSIONS)
// 建立ServletContextHandler并與Jetty Server關聯
ServletContextHandler context = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
context.setServletHandler(servletHandler);
ServletManager.getInstance().addServletContext(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), context.getServletContext());
try {
server.start();
} catch (Exception e) {
throw new IllegalStateException("Failed to start jetty server on " + url.getParameter(Constants.BIND_IP_KEY) + ":" + url.getParameter(Constants.BIND_PORT_KEY) + ", cause: "
+ e.getMessage(), e);
}
}
}
可以看到 JettyHttpServer 收到的全部請求将委托給 DispatcherServlet 這個 HttpServlet 實作,而 DispatcherServlet 的 service() 方法會把請求委托給對應接端口的 HttpHandler 處理:
public class DispatcherServlet extends HttpServlet {
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// 從HANDLERS集合中查詢端口對應的HttpHandler對象
HttpHandler handler = HANDLERS.get(request.getLocalPort());
if (handler == null) {// service not found.
// 端口沒有對應的HttpHandler實作
response.sendError(HttpServletResponse.SC_NOT_FOUND, "Service not found.");
} else {
// 将請求委托給HttpHandler對象處理
handler.handle(request, response);
}
}
}
了解了 Dubbo 對 HttpServer 的抽象以及 JettyHttpServer 的核心之後,回到 HttpProtocol 中的 doExport() 方法繼續分析。
在 HttpProtocol.doExport() 方法中會通過 HttpBinder 建立前面介紹的 HttpServer 對象,并記錄到 serverMap 中用來接收 HTTP 請求。這裡初始化 HttpServer 以及處理請求用到的 HttpHandler 是 HttpProtocol 中的内部類,在其他使用 HTTP 協定作為基礎的 RPC 協定實作中也有類似的 HttpHandler 實作類,如下圖所示:
在 HttpProtocol.InternalHandler 中的 handle() 實作中,會将請求委托給 skeletonMap 集合中記錄的 JsonRpcServer 對象進行處理:
public class HttpProtocol extends AbstractProxyProtocol {
private final Map<String, JsonRpcServer> skeletonMap = new ConcurrentHashMap<>();
private class InternalHandler implements HttpHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response)
throws ServletException {
String uri = request.getRequestURI();
JsonRpcServer skeleton = skeletonMap.get(uri);
if (cors) {
// 處理跨域問題
response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*");
response.setHeader(ACCESS_CONTROL_ALLOW_METHODS_HEADER, "POST");
response.setHeader(ACCESS_CONTROL_ALLOW_HEADERS_HEADER, "*");
}
if (request.getMethod().equalsIgnoreCase("OPTIONS")) {
// 處理OPTIONS請求
response.setStatus(200);
} else if (request.getMethod().equalsIgnoreCase("POST")) {
// 隻處理POST請求
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
try {
skeleton.handle(request.getInputStream(), response.getOutputStream());
} catch (Throwable e) {
throw new ServletException(e);
}
} else {
// 其他Method類型的請求,例如,GET請求,直接傳回500
response.setStatus(500);
}
}
}
}
skeletonMap 集合中的 JsonRpcServer 是與 HttpServer 對象一同在 doExport() 方法中初始化的。最後,我們來看 HttpProtocol.doExport() 方法的實作:
public class HttpProtocol extends AbstractProxyProtocol {
@Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
String addr = getAddr(url);
// 先查詢serverMap緩存
ProtocolServer protocolServer = serverMap.get(addr);
if (protocolServer == null) {// 查詢緩存失敗
// 建立HttpServer,注意,傳入的HttpHandler實作是InternalHandler
RemotingServer remotingServer = httpBinder.bind(url, new InternalHandler(url.getParameter("cors", false)));
serverMap.put(addr, new ProxyProtocolServer(remotingServer));
}
// 建立JsonRpcServer對象,并将URL與JsonRpcServer的映射關系記錄到skeletonMap集合中
final String path = url.getAbsolutePath();
final String genericPath = path + "/" + GENERIC_KEY;
JsonRpcServer skeleton = new JsonRpcServer(impl, type);
JsonRpcServer genericServer = new JsonRpcServer(impl, GenericService.class);
skeletonMap.put(path, skeleton);
skeletonMap.put(genericPath, genericServer);
return () -> {// 傳回Runnable回調,在Exporter中的unexport()方法中執行
skeletonMap.remove(path);
skeletonMap.remove(genericPath);
};
}
}
介紹完 HttpProtocol 暴露服務的相關實作之後,下面再來看 HttpProtocol 中引用服務相關的方法實作,即 protocolBindinRefer() 方法實作。該方法首先通過 doRefer() 方法建立業務接口的代理,這裡會使用到 jsonrpc4j 庫中的 JsonProxyFactoryBean 與 Spring 進行內建,在其 afterPropertiesSet() 方法中會建立 JsonRpcHttpClient 對象:
public class JsonProxyFactoryBean extends UrlBasedRemoteAccessor implements MethodInterceptor, InitializingBean, FactoryBean<Object>, ApplicationContextAware {
public void afterPropertiesSet() {
super.afterPropertiesSet();
this.proxyObject = ProxyFactory.getProxy(this.getServiceInterface(), this);
if (this.objectMapper == null && this.applicationContext != null && this.applicationContext.containsBean("objectMapper")) {
this.objectMapper = (ObjectMapper)this.applicationContext.getBean("objectMapper");
}
if (this.objectMapper == null && this.applicationContext != null) {
try {
this.objectMapper = (ObjectMapper)BeanFactoryUtils.beanOfTypeIncludingAncestors(this.applicationContext, ObjectMapper.class);
} catch (Exception var3) {
}
}
if (this.objectMapper == null) {
this.objectMapper = new ObjectMapper();
}
try {
// 建立JsonRpcHttpClient,用于後續發送json-rpc請求
this.jsonRpcHttpClient = new JsonRpcHttpClient(this.objectMapper, new URL(this.getServiceUrl()), this.extraHttpHeaders);
this.jsonRpcHttpClient.setRequestListener(this.requestListener);
this.jsonRpcHttpClient.setSslContext(this.sslContext);
this.jsonRpcHttpClient.setHostNameVerifier(this.hostNameVerifier);
} catch (MalformedURLException var2) {
throw new RuntimeException(var2);
}
}
}
public class HttpProtocol extends AbstractProxyProtocol {
@Override
protected <T> T doRefer(final Class<T> serviceType, URL url) throws RpcException {
final String generic = url.getParameter(GENERIC_KEY);
final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
JsonProxyFactoryBean jsonProxyFactoryBean = new JsonProxyFactoryBean();
JsonRpcProxyFactoryBean jsonRpcProxyFactoryBean = new JsonRpcProxyFactoryBean(jsonProxyFactoryBean);
jsonRpcProxyFactoryBean.setRemoteInvocationFactory((methodInvocation) -> {
RemoteInvocation invocation = new JsonRemoteInvocation(methodInvocation);
if (isGeneric) {
invocation.addAttribute(GENERIC_KEY, generic);
}
return invocation;
});
String key = url.setProtocol("http").toIdentityString();
if (isGeneric) {
key = key + "/" + GENERIC_KEY;
}
jsonRpcProxyFactoryBean.setServiceUrl(key);
jsonRpcProxyFactoryBean.setServiceInterface(serviceType);
jsonProxyFactoryBean.afterPropertiesSet();
// 傳回的是serviceType類型的代理對象
return (T) jsonProxyFactoryBean.getObject();
}
}
public abstract class AbstractProxyProtocol extends AbstractProtocol {
@Override
protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
Result result = target.invoke(invocation);
// FIXME result is an AsyncRpcResult instance.
Throwable e = result.getException();
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
if (rpcException.isAssignableFrom(e.getClass())) {
throw getRpcException(type, url, invocation, e);
}
}
}
return result;
} catch (RpcException e) {
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
e.setCode(getErrorCode(e.getCause()));
}
throw e;
} catch (Throwable e) {
throw getRpcException(type, url, invocation, e);
}
}
};
// 将Invoker添加到invokers集合中
invokers.add(invoker);
return invoker;
}
}