天天看点

Hadoop RPC 框架解析   概述    分析MapReduce通信协议解析

   概述 

        网络通信模块是分布式系统中最底层的模块。它直接支撑了上层分布式环境下复杂的进程间通信(Inter-Process Communication IPC)逻辑,是所有分布式系统的基础。远程过程调用(Remote Procedure Call RPC)是一种常用的分布式网络通信协议。它允许运行于一台计算机的程序调用另一台计算机的子程序,同时将网络通信细节隐藏起来,使得用户无须额外地为这个交互作用编程。

        作为一个分布式系统,hadoop实现了自己的RPC通信协议,它是上层多个分布式子系统(如Mapreduce,HDFS,HBase)公用的网络通信协议。

         RPC实际上是分布式计算机中客户机/服务器模型的一个应用实例。对于hadoop RPC而言,具有

         透明性:这是所有RPC框架的最根本特征,即当用户在一台计算机的程序调用另一台计算机上的子程序时,用户自身不应感觉到其间涉及跨机器间的通信,而是感觉是在执行一个本地调用。

         高性能:Hadoop各个系统均采用了Master/Slave结构。其中,Master实际上一个RPC Server,它负责处理集群中所有Slave发送的服务请求。为了保证Master的并发处理能力,RPC Server应是一个高性能服务器,能够有效处理来自多个Client的并发RPC请求。

        可控性:JDK中已经自带一个RPC框架-RMI(Remote Method Invocation,远程方法调用)。之所以不直接使用该框架,主要是因为考虑到RPC是Hadoop最底层、最核心的模块之一,保证其轻量级、高性能和可控性显得尤为重要,而RMI过于重量级且用户可控之处太少(如网络连接、超时和缓冲等均难以定制或者修改)。

        Hadoop RPC主要分为四个部分,分别是序列化层、函数调用层、网络传输层和服务器端处理框架

        序列化层:序列化层的主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储。在RPC框架中,它主要用于将用户请求中的参数或者应答转化成字节流以便于跨机器传输。Hadoop类实现Writable接口即可实现对象的序列化和反序列化。

        函数调用层:函数调用层的主要功能是定位要调用的函数并执行该函数。Hadoop RPC采用java反射机制与动态代理实现了函数调用。

        网络传输层:网络传输层描述了Client和Server之间的休息传输的方式。Hadoop RPC采用了基于TCP/IP的Socket机制。

       服务器端处理框架:服务器端处理框架可被抽象为网络I/O模型。它描述了客户端与服务器端间信息交互的方式。它的设计直接决定着服务器端的并发处理能力。

   分析

基本概念

        RPC是一种通过网络从远程计算机上请求服务,但不需要了解底层网络技术的协议。RPC协议假定某些传输协议已经存在,如TCP/UDP等,并通过这些传输协议为通信程序之间传递访问请求或者应答信息。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发分布式应用程序更加容易。         包括:         通信模块:两个相互协作的通信模块实现请求-应答协议。它们在客户机和服务器之间传递请求和应答消息,一般不会对数据包进行任何处理。        stub程序:客户端和服务器端均包含stub程序,可将之看做代理程序。它使得远程函数调用表现的跟本地调用一样,对用户程序完全透明。在客户端,它表现的就像一个本地程序,但不直接执行本地调用,而是将请求请求信息通过网络模块发送给服务器端。此外,当服务器发送应答后,他会解码对应结果。在服务器端,stub程序依次进行一下处理:解码请求消息中的参数、调用相应的服务过程和编码应答结果的返回值。        调度程序:调度程序接受来自通信模块的请求信息,并根据其中的标识选择Stub程序。通常客户端并发请求量比较大时,会采用线程池提高处理效率。       客户程序/服务过程:请求的发出者和请求的处理者。如果是单机环境,客户程序直接通过函数调用访问服务过程,但在分布式环境下,需要考虑网络通信,这不得不增加通信模块和Stub程序(保证函数调用透明性)。

基本框架

1,Hadoop RPC使用    RPC主要对外提供两种接口    public static VersionedProtocal getProxy/waitForProxy();构造一个客户端代理对象(该对象实现了某个协议),用于向服务器端发送RPC请求。    public static Server getServer():为某个协议(实际上是Java接口)实例构造一个服务器对象,用于处理客户端发送的请求。       通常Hadoop RPC使用方法可分为:

步骤1 

        定义RPC协议。RPC协议是客户端和服务器端之间的通信接口,它定义了服务器端对外提供的服务接口。如以下代码所示,我们定义了一个ClientProtocol通信接口,声明两个方法:echo()和add()。需要注意的是,Hadoop中所有自定义RPC接口都要继承VersionedProtocol接口,它描述了协议的版本信息。

      interface ClientProtocol extends VersionedProtocol{            public static final long versionID=1L;//版本号。默认情况下,不同版本的RPC Client和RPC Server之间不能相互通信            String echo(String value) throws IOException;            int add(int v1,int v2) throws IOException;       }

步骤2

实现RPC协议。Hadoop RPC协议通常是一个Java接口,用户需要实现该接口。如以下代码所示,对ClientProtocol接口进行简单的实现: public static class ClientProtocolImpl implments ClientProtocol{    public long getProtocolVersion(String protocol ,long clientVersion){        return ClientProtocol.versionID;    }    public String echo(String value) throws IOException{        return value;     }    public int add(int v1,int v2) throws IOException{       return v1+v2;     } }

步骤3

构造并启动RPC Server。直接使用静态方法getServer()构造一个RPC Server,并调用函数start()启动该Server: server=RPC.getServer(new ClientProtocolImpl(),serverHost,serverport,numHandlers,false,conf); server.start(); 其中serverHost和serverPort分别表示服务器的host和监听端口号,而numHandlers表示服务器端处理请求的线程数据。到此为止,服务器处理监听状态,等待客户端请求到达。

步骤4

       构造RPC Client,并发送RPC请求。使用静态方法getProxy()构造客户端代理对象,直接通过代理对象调用远程端的方法,具体如下所示: proxy=(ClientProtocol)RPC.getProxy(ClientProtocol.class,ClientProtocol.versionID,addr,conf); int result=proxy.add(5,6); String echoResult=proxy.echo("result");        至此搭建了一个RPC server/client网络模型。        Hadoop RPC主要由三个大类组成,分别是RPC,Client,Server,分别对应对外编程接口,客户端实现和服务器端实现。        RPC类实际上是对底层客户机/服务器网络模型的封装,以便为程序员提供一套更方便简洁的编程接口。        Client主要完成的功能是发送远程过程调用信息并接收执行结果。        Server充当master角色,Server接收并处理所有Slave发送的请求,这就要求ipc.Server将高并发和可扩展性作为设计目标。为此,Server采用了许多具有高并发处理能力的技术,主要包括线程池,事件驱动和Reactor设计模式等。这些技术均采用了JDK自带的库实现。

MapReduce通信协议解析

        在Hadoop Mapreduce中,不同组件之间的通信协议均是基于RPC的。他们就像系统的骨架,支撑起整个mapreduce系统。         在Hadoop 1.x版本中,MapReduce框架中共有6个主要的通信协议。其中,直接面向Client的通信协议有4个。         JobSubmissionProtocol:Client(一般为普通用户)与JobTracker之间的通信协议。用户通过该协议提交作业,查看作业的运行情况等。         RefreshUserMappingProtocol:Client(一般为管理员)通过该协议更新用户-用户组映射关系。         RefreshAuthorizationPolicyProtocol:Client(一般为管理员)通过该协议更新MapReduce服务级别访问控制列表。         AdminOperationProtocol:Client(一般为管理员)通过该协议更新队列(存在于JonTracker或者Scheduler中)访问控制列表和节点列表。         在Hadoop线上环境中,考虑到安全因素,通常将JobSubmissionProtocol使用权授予普通用户,而其他三个通信协议的权限授予管理员。         另外其他两个通信协议位于MapReduce框架内部,如下。          InterTrackerProtocol:TaskTracker与JobTracker之间通信协议。TaskTracker通过相关接口汇报本节点的资源使用情况和任务运行状态灯信息,并执行JobTracker发送的命令。        TaskUmbilicalProtocol:Task和TaskTracker之间的通信协议。每个Task实际上是其同节点TaskTracker的子进程,他们通过该协议汇报Task运行状态、运行进度等信息。 在Hadoop中,所有Hadoop RPC的协议基类均为VersionProtocol。该类主要用于描述版本号,以防止不同版本号的客户端与服务器端之间通信。          InterTrackerProtocol定义了jobtracker和tasktracker之间的通信和信息交流:        定义了方法heartbeat,TaskTracker定期调用更新在jobTracker中的状态。然后JobTracker返回HeartbeatResponse对象指导 TaskTracker作出一系列的action操作。有  LAUNCH_TASK,KILL_TASK

    KILL_JOB,  

    REINIT_TRACKER,  

    COMMIT_TASK          HeartbeatResponse heartbeat(TaskTrackerStatus status, 

                              boolean restarted, 

                              boolean initialContact,

                              boolean acceptNewTasks,

                              short responseId)

    throws IOException; 等等方法。

继续阅读