天天看點

Dubbo——HTTP 協定 + JSON-RPC

前言

Protocol 還有一個實作分支是 AbstractProxyProtocol,如下圖所示:

Dubbo——HTTP 協定 + JSON-RPC

從圖中我們可以看到:gRPC、HTTP、WebService、Hessian、Thrift 等協定對應的 Protocol 實作,都是繼承自 AbstractProxyProtocol 抽象類。

目前網際網路的技術棧百花齊放,很多公司會使用 Node.js、Python、Rails、Go 等語言來開發 一些 Web 端應用,同時又有很多服務會使用 Java 技術棧實作,這就出現了大量的跨語言調用的需求。Dubbo 作為一個 RPC 架構,自然也希望能實作這種跨語言的調用,目前 Dubbo 中使用“HTTP 協定 + JSON-RPC”的方式來達到這一目的,其中 HTTP 協定和 JSON 都是天然跨語言的标準,在各種語言中都有成熟的類庫。

下面就重點來分析 Dubbo 對 HTTP 協定的支援。首先,會介紹 JSON-RPC 的基礎,并通過一個示例,快速入門,然後介紹 Dubbo 中 HttpProtocol 的具體實作,也就是如何将 HTTP 協定與 JSON-RPC 結合使用,實作跨語言調用的效果。

JSON-RPC

Dubbo 中支援的 HTTP 協定實際上使用的是 JSON-RPC 協定。

JSON-RPC 是基于 JSON 的跨語言遠端調用協定。Dubbo 中的 dubbo-rpc-xml、dubbo-rpc-webservice 等子產品支援的 XML-RPC、WebService 等協定與 JSON-RPC 一樣,都是基于文本的協定,隻不過 JSON 的格式比 XML、WebService 等格式更加簡潔、緊湊。與 Dubbo 協定、Hessian 協定等二進制協定相比,JSON-RPC 更便于調試和實作,可見 JSON-RPC 協定還是一款非常優秀的遠端調用協定。

在 Java 體系中,有很多成熟的 JSON-RPC 架構,例如 jsonrpc4j、jpoxy 等,其中,jsonrpc4j 本身體積小巧,使用友善,既可以獨立使用,也可以與 Spring 無縫集合,非常适合基于 Spring 的項目。

下面先來看看 JSON-RPC 協定中請求的基本格式:

{
    "id":1,
    "method":"sayHello",
    "params":[
        "Dubbo json-rpc"
    ]
}
           

JSON-RPC請求中各個字段的含義如下:

  • id 字段,用于唯一辨別一次遠端調用。
  • method 字段,指定了調用的方法名。
  • params 數組,表示方法傳入的參數,如果方法無參數傳入,則傳入空數組。

在 JSON-RPC 的服務端收到調用請求之後,會查找到相應的方法并進行調用,然後将方法的傳回值整理成如下格式,傳回給用戶端:

{
    "id":1,
    "result":"Hello Dubbo json-rpc",
    "error":null
}
           

JSON-RPC響應中各個字段的含義如下:

  • id 字段,用于唯一辨別一次遠端調用,該值與請求中的 id 字段值保持一緻。
  • result 字段,記錄了方法的傳回值,若無傳回值,則傳回空;若調用錯誤,傳回 null。
  • error 字段,表示調用發生異常時的異常資訊,方法執行無異常時該字段為 null。

jsonrpc4j 基礎使用

Dubbo 使用 jsonrpc4j 庫來實作 JSON-RPC 協定,下面使用 jsonrpc4j 編寫一個簡單的 JSON-RPC 服務端示例程式和用戶端示例程式,并通過這兩個示例程式說明 jsonrpc4j 最基本的使用方式。

首先,需要建立服務端和用戶端都需要的 domain 類以及服務接口。先來建立一個 User 類,作為最基礎的資料對象:

public class User implements Serializable {
    private int userId;
    private String name;
    private int age;
    // 省略上述字段的getter/setter方法以及toString()方法
}
           

接下來建立一個 UserService 接口作為服務接口,其中定義了 5 個方法,分别用來建立 User、查詢 User 以及相關資訊、删除 User:

public interface UserService {
    User createUser(int userId, String name, int age); 
    User getUser(int userId);
    String getUserName(int userId);
    int getUserId(String name);
    void deleteAll();
}
           

UserServiceImpl 是 UserService 接口的實作類,其中使用一個 ArrayList 集合管理 User 對象,具體實作如下:

public class UserServiceImpl implements UserService {
    // 管理所有User對象
    private List<User> users = new ArrayList<>(); 
    @Override
    public User createUser(int userId, String name, int age) {
        System.out.println("createUser method");
        User user = new User();
        user.setUserId(userId);
        user.setName(name);
        user.setAge(age);
        users.add(user); // 建立User對象并添加到users集合中
        return user;
    }
    @Override
    public User getUser(int userId) {
        System.out.println("getUser method");
        // 根據userId從users集合中查詢對應的User對象
        return users.stream().filter(u -> u.getUserId() == userId).findAny().get();
    }
    @Override
    public String getUserName(int userId) {
        System.out.println("getUserName method");
        // 根據userId從users集合中查詢對應的User對象之後,擷取該User的name
        return getUser(userId).getName();
    }
    @Override
    public int getUserId(String name) {
        System.out.println("getUserId method");
        // 根據name從users集合中查詢對應的User對象,然後擷取該User的id
        return users.stream().filter(u -> u.getName().equals(name)).findAny().get().getUserId();
    }
    @Override
    public void deleteAll() {
        System.out.println("deleteAll");
        users.clear(); // 清空users集合
    }
}
           

整個使用者管理業務的核心大緻如此。下面我們來看服務端如何将 UserService 與 JSON-RPC 關聯起來。

首先,建立 RpcServlet 類,它是 HttpServlet 的子類,并覆寫了 HttpServlet 的 service() 方法。我們知道,HttpServlet 在收到 GET 和 POST 請求的時候,最終會調用其 service() 方法進行處理;HttpServlet 還會将 HTTP 請求和響應封裝成 HttpServletRequest 和 HttpServletResponse 傳入 service() 方法之中。這裡的 RpcServlet 實作之中會建立一個 JsonRpcServer,并在 service() 方法中将 HTTP 請求委托給 JsonRpcServer 進行處理:

public class RpcServlet extends HttpServlet {
    private JsonRpcServer rpcServer = null;
    public RpcServlet() {
        super();
        // JsonRpcServer會按照json-rpc請求,調用UserServiceImpl中的方法
        rpcServer = new JsonRpcServer(new UserServiceImpl(), UserService.class);
    }
    @Override
    protected void service(HttpServletRequest request,
                           HttpServletResponse response) throws ServletException, IOException {
        rpcServer.handle(request, response);
    }
}
           

最後,建立一個 JsonRpcServer 作為服務端的入口類,在其 main() 方法中會啟動 Jetty 作為 Web 容器,具體實作如下:

public class JsonRpcServer {
    public static void main(String[] args) throws Throwable {
        // 伺服器的監聽端口
        Server server = new Server(9999);
        // 關聯一個已經存在的上下文
        WebAppContext context = new WebAppContext();
        // 設定描述符位置
        context.setDescriptor("/dubbo-demo/json-rpc-demo/src/main/webapp/WEB-INF/web.xml");
        // 設定Web内容上下文路徑
        context.setResourceBase("/dubbo-demo/json-rpc-demo/src/main/webapp");
        // 設定上下文路徑
        context.setContextPath("/");
        context.setParentLoaderPriority(true);
        server.setHandler(context);
        server.start();
        server.join();
    }
}
           

這裡使用到的 web.xml 配置檔案如下:

<?xml version="1.0" encoding="UTF-8"?>
<web-app
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns="http://xmlns.jcp.org/xml/ns/javaee"
        xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
        version="3.1">
    <servlet>
        <servlet-name>RpcServlet</servlet-name>
        <servlet-class>com.demo.RpcServlet</servlet-class>
    </servlet>
    <servlet-mapping>
        <servlet-name>RpcServlet</servlet-name>
        <url-pattern>/rpc</url-pattern>
    </servlet-mapping>
</web-app>
           

完成服務端的編寫之後,下面再繼續編寫 JSON-RPC 的用戶端。在 JsonRpcClient 中會建立 JsonRpcHttpClient,并通過 JsonRpcHttpClient 請求服務端:

public class JsonRpcClient {
    private static JsonRpcHttpClient rpcHttpClient;
    public static void main(String[] args) throws Throwable {
        // 建立JsonRpcHttpClient
        rpcHttpClient = new JsonRpcHttpClient(new URL("http://127.0.0.1:9999/rpc"));
        JsonRpcClient jsonRpcClient = new JsonRpcClient();
        jsonRpcClient.deleteAll(); // 調用deleteAll()方法删除全部User
        // 調用createUser()方法建立User
        System.out.println(jsonRpcClient.createUser(1, "testName", 30));
        // 調用getUser()、getUserName()、getUserId()方法進行查詢
        System.out.println(jsonRpcClient.getUser(1));
        System.out.println(jsonRpcClient.getUserName(1));
        System.out.println(jsonRpcClient.getUserId("testName"));
    }
    public void deleteAll() throws Throwable {
        // 調用服務端的deleteAll()方法
        rpcHttpClient.invoke("deleteAll", null); 
    }
    public User createUser(int userId, String name, int age) throws Throwable {
        Object[] params = new Object[]{userId, name, age};
        // 調用服務端的createUser()方法
        return rpcHttpClient.invoke("createUser", params, User.class);
    }
    public User getUser(int userId) throws Throwable {
        Integer[] params = new Integer[]{userId};
        // 調用服務端的getUser()方法
        return rpcHttpClient.invoke("getUser", params, User.class);
    }
    public String getUserName(int userId) throws Throwable {
        Integer[] params = new Integer[]{userId};
        // 調用服務端的getUserName()方法
        return rpcHttpClient.invoke("getUserName", params, String.class);
    }
    public int getUserId(String name) throws Throwable {
        String[] params = new String[]{name};
        // 調用服務端的getUserId()方法
        return rpcHttpClient.invoke("getUserId", params, Integer.class);
    }
}
// 輸出:
// User{userId=1, name='testName', age=30}
// User{userId=1, name='testName', age=30}
// testName
// 1
           

AbstractProxyProtocol

在 AbstractProxyProtocol 的 export() 方法中,首先會根據 URL 檢查 exporterMap 緩存,如果查詢失敗,則會調用 ProxyFactory.getProxy() 方法将 Invoker 封裝成業務接口的代理類,然後通過子類實作的 doExport() 方法啟動底層的 ProxyProtocolServer,并初始化 serverMap 集合。具體實作如下:

public abstract class AbstractProxyProtocol extends AbstractProtocol {

    protected ProxyFactory proxyFactory;
	
    @Override
    @SuppressWarnings("unchecked")
    public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
		// 首先查詢exporterMap集合
        final String uri = serviceKey(invoker.getUrl());
        Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
        if (exporter != null) {
            // When modifying the configuration through override, you need to re-expose the newly modified service.
            if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
                return exporter;
            }
        }
		// 通過ProxyFactory建立代理類,将Invoker封裝成業務接口的代理類
        final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
        // doExport()方法傳回的Runnable是一個回調,其中會銷毀底層的Server,将會在unexport()方法中調用該Runnable
		exporter = new AbstractExporter<T>(invoker) {
            @Override
            public void unexport() {
                super.unexport();
                exporterMap.remove(uri);
                if (runnable != null) {
                    try {
                        runnable.run();
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
            }
        };
        exporterMap.put(uri, exporter);
        return exporter;
    }
}
           

在 HttpProtocol 的 doExport() 方法中,與前面介紹的 DubboProtocol 的實作類似,也要啟動一個 RemotingServer。為了适配各種 HTTP 伺服器,例如,Tomcat、Jetty 等,Dubbo 在 Transporter 層抽象出了一個 HttpServer 的接口。

Dubbo——HTTP 協定 + JSON-RPC

dubbo-remoting-http 子產品的入口是 HttpBinder 接口,它被 @SPI 注解修飾,是一個擴充接口,有三個擴充實作,預設使用的是 JettyHttpBinder 實作,如下圖所示:

Dubbo——HTTP 協定 + JSON-RPC

HttpBinder 接口中的 bind() 方法被 @Adaptive 注解修飾,會根據 URL 的 server 參數選擇相應的 HttpBinder 擴充實作,不同 HttpBinder 實作傳回相應的 HttpServer 實作。HttpServer 的繼承關系如下圖所示:

Dubbo——HTTP 協定 + JSON-RPC

這裡以 JettyHttpServer 為例簡單介紹 HttpServer 的實作,在 JettyHttpServer 中會初始化 Jetty Server,其中會配置 Jetty Server 使用到的線程池以及處理請求 Handler:

public class JettyHttpServer extends AbstractHttpServer {

    private Server server;

    private URL url;

    public JettyHttpServer(URL url, final HttpHandler handler) {
		// 初始化AbstractHttpServer中的url字段和handler字段
        super(url, handler);
        this.url = url;
        // TODO we should leave this setting to slf4j
        // we must disable the debug logging for production use
        Log.setLog(new StdErrLog());
        Log.getLog().setDebugEnabled(false);

		// 添加HttpHandler
        DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), handler);

		// 建立線程池
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        QueuedThreadPool threadPool = new QueuedThreadPool();
        threadPool.setDaemon(true);
        threadPool.setMaxThreads(threads);
        threadPool.setMinThreads(threads);
		
		// 建立Jetty Server
        server = new Server(threadPool);

		// 建立ServerConnector,并指定綁定的ip和port
        ServerConnector connector = new ServerConnector(server);

        String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
        if (!url.isAnyHost() && NetUtils.isValidLocalHost(bindIp)) {
            connector.setHost(bindIp);
        }
        connector.setPort(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()));

        server.addConnector(connector);
		// 建立ServletHandler并與Jetty Server關聯,由DispatcherServlet處理全部的請求
        ServletHandler servletHandler = new ServletHandler();
        ServletHolder servletHolder = servletHandler.addServletWithMapping(DispatcherServlet.class, "/*");
        servletHolder.setInitOrder(2);

        // dubbo's original impl can't support the use of ServletContext
        //        server.addHandler(servletHandler);
        // TODO Context.SESSIONS is the best option here? (In jetty 9.x, it becomes ServletContextHandler.SESSIONS)
        // 建立ServletContextHandler并與Jetty Server關聯
		ServletContextHandler context = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
        context.setServletHandler(servletHandler);
        ServletManager.getInstance().addServletContext(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), context.getServletContext());

        try {
            server.start();
        } catch (Exception e) {
            throw new IllegalStateException("Failed to start jetty server on " + url.getParameter(Constants.BIND_IP_KEY) + ":" + url.getParameter(Constants.BIND_PORT_KEY) + ", cause: "
                + e.getMessage(), e);
        }
    }
}
           

可以看到 JettyHttpServer 收到的全部請求将委托給 DispatcherServlet 這個 HttpServlet 實作,而 DispatcherServlet 的 service() 方法會把請求委托給對應接端口的 HttpHandler 處理:

public class DispatcherServlet extends HttpServlet {

    @Override
    protected void service(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        // 從HANDLERS集合中查詢端口對應的HttpHandler對象
		HttpHandler handler = HANDLERS.get(request.getLocalPort());
        if (handler == null) {// service not found.
			// 端口沒有對應的HttpHandler實作
            response.sendError(HttpServletResponse.SC_NOT_FOUND, "Service not found.");
        } else {
			// 将請求委托給HttpHandler對象處理
            handler.handle(request, response);
        }
    }
}
           

了解了 Dubbo 對 HttpServer 的抽象以及 JettyHttpServer 的核心之後,回到 HttpProtocol 中的 doExport() 方法繼續分析。

在 HttpProtocol.doExport() 方法中會通過 HttpBinder 建立前面介紹的 HttpServer 對象,并記錄到 serverMap 中用來接收 HTTP 請求。這裡初始化 HttpServer 以及處理請求用到的 HttpHandler 是 HttpProtocol 中的内部類,在其他使用 HTTP 協定作為基礎的 RPC 協定實作中也有類似的 HttpHandler 實作類,如下圖所示:

Dubbo——HTTP 協定 + JSON-RPC

在 HttpProtocol.InternalHandler 中的 handle() 實作中,會将請求委托給 skeletonMap 集合中記錄的 JsonRpcServer 對象進行處理:

public class HttpProtocol extends AbstractProxyProtocol {

	private final Map<String, JsonRpcServer> skeletonMap = new ConcurrentHashMap<>();

    private class InternalHandler implements HttpHandler {
	
	    @Override
        public void handle(HttpServletRequest request, HttpServletResponse response)
                throws ServletException {
            String uri = request.getRequestURI();
            JsonRpcServer skeleton = skeletonMap.get(uri);
            if (cors) {
				// 處理跨域問題
                response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*");
                response.setHeader(ACCESS_CONTROL_ALLOW_METHODS_HEADER, "POST");
                response.setHeader(ACCESS_CONTROL_ALLOW_HEADERS_HEADER, "*");
            }
            if (request.getMethod().equalsIgnoreCase("OPTIONS")) {
				// 處理OPTIONS請求
                response.setStatus(200);
            } else if (request.getMethod().equalsIgnoreCase("POST")) {
				// 隻處理POST請求
                RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
                try {
                    skeleton.handle(request.getInputStream(), response.getOutputStream());
                } catch (Throwable e) {
                    throw new ServletException(e);
                }
            } else {
				// 其他Method類型的請求,例如,GET請求,直接傳回500
                response.setStatus(500);
            }
        }
	}
}
           

skeletonMap 集合中的 JsonRpcServer 是與 HttpServer 對象一同在 doExport() 方法中初始化的。最後,我們來看 HttpProtocol.doExport() 方法的實作:

public class HttpProtocol extends AbstractProxyProtocol {

    @Override
    protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
        String addr = getAddr(url);
		// 先查詢serverMap緩存
        ProtocolServer protocolServer = serverMap.get(addr);
        if (protocolServer == null) {// 查詢緩存失敗
			// 建立HttpServer,注意,傳入的HttpHandler實作是InternalHandler
            RemotingServer remotingServer = httpBinder.bind(url, new InternalHandler(url.getParameter("cors", false)));
            serverMap.put(addr, new ProxyProtocolServer(remotingServer));
        }
		// 建立JsonRpcServer對象,并将URL與JsonRpcServer的映射關系記錄到skeletonMap集合中
        final String path = url.getAbsolutePath();
        final String genericPath = path + "/" + GENERIC_KEY;
        JsonRpcServer skeleton = new JsonRpcServer(impl, type);
        JsonRpcServer genericServer = new JsonRpcServer(impl, GenericService.class);
        skeletonMap.put(path, skeleton);
        skeletonMap.put(genericPath, genericServer);
        return () -> {// 傳回Runnable回調,在Exporter中的unexport()方法中執行
            skeletonMap.remove(path);
            skeletonMap.remove(genericPath);
        };
    }
}
           

介紹完 HttpProtocol 暴露服務的相關實作之後,下面再來看 HttpProtocol 中引用服務相關的方法實作,即 protocolBindinRefer() 方法實作。該方法首先通過 doRefer() 方法建立業務接口的代理,這裡會使用到 jsonrpc4j 庫中的 JsonProxyFactoryBean 與 Spring 進行內建,在其 afterPropertiesSet() 方法中會建立 JsonRpcHttpClient 對象:

public class JsonProxyFactoryBean extends UrlBasedRemoteAccessor implements MethodInterceptor, InitializingBean, FactoryBean<Object>, ApplicationContextAware {

    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        this.proxyObject = ProxyFactory.getProxy(this.getServiceInterface(), this);
        if (this.objectMapper == null && this.applicationContext != null && this.applicationContext.containsBean("objectMapper")) {
            this.objectMapper = (ObjectMapper)this.applicationContext.getBean("objectMapper");
        }

        if (this.objectMapper == null && this.applicationContext != null) {
            try {
                this.objectMapper = (ObjectMapper)BeanFactoryUtils.beanOfTypeIncludingAncestors(this.applicationContext, ObjectMapper.class);
            } catch (Exception var3) {
            }
        }

        if (this.objectMapper == null) {
            this.objectMapper = new ObjectMapper();
        }

        try {
			// 建立JsonRpcHttpClient,用于後續發送json-rpc請求
            this.jsonRpcHttpClient = new JsonRpcHttpClient(this.objectMapper, new URL(this.getServiceUrl()), this.extraHttpHeaders);
            this.jsonRpcHttpClient.setRequestListener(this.requestListener);
            this.jsonRpcHttpClient.setSslContext(this.sslContext);
            this.jsonRpcHttpClient.setHostNameVerifier(this.hostNameVerifier);
        } catch (MalformedURLException var2) {
            throw new RuntimeException(var2);
        }
    }
}
           
public class HttpProtocol extends AbstractProxyProtocol {

    @Override
    protected <T> T doRefer(final Class<T> serviceType, URL url) throws RpcException {
        final String generic = url.getParameter(GENERIC_KEY);
        final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
        JsonProxyFactoryBean jsonProxyFactoryBean = new JsonProxyFactoryBean();
        JsonRpcProxyFactoryBean jsonRpcProxyFactoryBean = new JsonRpcProxyFactoryBean(jsonProxyFactoryBean);
        jsonRpcProxyFactoryBean.setRemoteInvocationFactory((methodInvocation) -> {
            RemoteInvocation invocation = new JsonRemoteInvocation(methodInvocation);
            if (isGeneric) {
                invocation.addAttribute(GENERIC_KEY, generic);
            }
            return invocation;
        });
        String key = url.setProtocol("http").toIdentityString();
        if (isGeneric) {
            key = key + "/" + GENERIC_KEY;
        }

        jsonRpcProxyFactoryBean.setServiceUrl(key);
        jsonRpcProxyFactoryBean.setServiceInterface(serviceType);

        jsonProxyFactoryBean.afterPropertiesSet();
		// 傳回的是serviceType類型的代理對象
        return (T) jsonProxyFactoryBean.getObject();
    }
}
           
public abstract class AbstractProxyProtocol extends AbstractProtocol {

    @Override
    protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {
        final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
        Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
            @Override
            protected Result doInvoke(Invocation invocation) throws Throwable {
                try {
                    Result result = target.invoke(invocation);
                    // FIXME result is an AsyncRpcResult instance.
                    Throwable e = result.getException();
                    if (e != null) {
                        for (Class<?> rpcException : rpcExceptions) {
                            if (rpcException.isAssignableFrom(e.getClass())) {
                                throw getRpcException(type, url, invocation, e);
                            }
                        }
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                        e.setCode(getErrorCode(e.getCause()));
                    }
                    throw e;
                } catch (Throwable e) {
                    throw getRpcException(type, url, invocation, e);
                }
            }
        };
		// 将Invoker添加到invokers集合中
        invokers.add(invoker);
        return invoker;
    }
}
           

總結