天天看点

Hadoop源代码分析【RPC】

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。当我们讨论HDFS的,通信可能发生在:

  • Client-NameNode之间,其中NameNode是服务器
  • Client-DataNode之间,其中DataNode是服务器
  • DataNode-NameNode之间,其中NameNode是服务器
  • DataNode-DateNode之间,其中某一个DateNode是服务器,另一个是客户端

如果我们考虑Hadoop 的Map/Reduce以后,这些系统间的通信就更复杂了。为了解决这些客户机/服务器之间的通信,Hadoop引入了一个RPC框架。该RPC框架利用 的Java的反射能力,避免了某些RPC解决方案中需要根据某种接口语言(如CORBA的IDL)生成存根和框架的问题。但是,该RPC框架要求调用的参 数和返回结果必须是Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组。同时,接口方法应该只抛出 IOException异常。(参考自http://zhangyu8374.javaeye.com/blog/86306 )

既然是RPC,当然就有客户端和服务器,当然,org.apache.hadoop.rpc也就有了类Client和类Server。但是类Server是一个抽象类,类RPC封装了Server,利用反射,把某个对象的方法开放出来,变成RPC中的服务器。

下图是org.apache.hadoop.rpc的类图。

Hadoop源代码分析【RPC】

hadoop中RPC的使用方法

1
 
     public

      

     class

      UseRPC {



      2
 
     



      3
 
        

     //

     hadoop配置信息

     



      4
 
      

        

     private

      

     static

      Configuration conf

     =

     new

      Configuration();



      5
 
       



      6
 
        

     //

     定义接口

     



      7
 
      

        

     public

      

     interface

      Serverif{



      8
 
            

     public

      String method(String args);



      9
 
        }



     10
 
       



     11
 
        

     //

     服务器端的实现

     



     12
 
        

     public

      

     static

      

     class

      ServerImpl 

     implements

      Serverif{



     13
 
       



     14
 
            

     //

     业务逻辑的实现

     



     15
 
            

     public

      String method(String args){



     16
 
                

     return

      args;



     17
 
            }



     18
 
     



     19
 
        }



     20
 
       



     21
 
        

     public

      

     static

      

     void

      main(String args[]) 

     throws

      Exception {



     22
 
            ServerImpl si 

     =

      

     new

      ServerImpl();



     23
 
            

     int

      port

     =

     8668

     ;



     24
 
            org.apache.hadoop.ipc.Server server 

     =

      RPC.getServer(si, port, 

     10

     , 

     true

     ,conf);



     25
 
            server.start();



     26
 
            server.join();



     27
 
        }



     28
 
     



     29
 
        

     //

     客户端的实现

     



     30
 
        

     public

      

     static

      

     class

      Client {



     31
 
     



     32
 
            

     private

      

     static

      

     final

      Method GETTASK;



     33
 
           



     34
 
           



     35
 
            

     //

     利用代理的方式调用,如果通过代理方式,服务器只能有一个

     



     36
 
            

     public

      String method_proxy(String args) 

     throws

      Exception{



     37
 
            InetSocketAddress sa

     =

     new

      InetSocketAddress(

     "

     192.168.1.1

     "

     ,

     8668

     );



     38
 
                Serverif si

     =

     (Serverif) RPC.getProxy(Serverif.

     class

     , sa, conf);



     39
 
                

     return

      si.method(args);



     40
 
            }



     41
 
       



     42
 
            

     //

     利用反射的方式调用,如果通过反射方式,服务器可以有多个,



     43
 
            

     //

     参数为一个二维数据,相对应每个服务器的方法

     



     44
 
            

     public

      String method_reflected(String args) 

     throws

      Exception{



     45
 
            InetSocketAddress[] sa

     =

     new

      InetSocketAddress[]{



     46
 
                    

     new

      InetSocketAddress(

     "

     192.168.1.1

     "

     ,

     8668

     ),



     47
 
                    

     new

      InetSocketAddress(

     "

     192.168.1.2

     "

     ,

     8668

     )};



     48
 
                Object[][] params 

     =

      

     new

      Object[

     2

     ][

     1

     ];



     49
 
                params[

     0

     ][

     0

     ]

     =

     String.

     class

     ;



     50
 
                params[

     1

     ][

     0

     ]

     =

     String.

     class

     ;



     51
 
                

     //

     得方法的反射

     



     52
 
                METHOD 

     =

      Serverif.

     class

     .getMethod(

     "

     method

     "

     , 

     new

      Class[] {String.

     class

     });



     53
 
                

     return

      (String)RPC.call(METHOD, params, sa, conf);



     54
 
            }



     55
 
           



     56
 
            

     public

      

     static

      

     void

      main(String args[]) 

     throws

      Exception {



     57
 
                String remoteIP

     =

     "

     192.168.1.1

     "

     ;



     58
 
                

     int

      port

     =

     8668

     ;



     59
 
                Client c

     =

     new

      Client();



     60
 
                System.out.println(c.method_proxy(

     "

     hello world

     "

     ));



     61
 
            }



     62
 
           



     63
 
        }



     64
 
       



     65
 
     }

    
      

【2-反射方式】该RPC框架利用 的Java的反射能力,避免了某些RPC解决方案中需要根据某种接口语言(如CORBA的IDL)生成存根和框架的问题。为什么Corba使用IDL要生 成存根和框架?因为调用对象方法在编译期间会做静态检查,所以生成存根作为代理,可以使用代理通过静态检查。同【1-代理方式】,不过Hadoop的 RPC可以直接通过传递参数在运行时生成本地代理,这样使用更加方便。

http://labs.chinamobile.com/groups/10216_20254

http://www.tbdata.org/archives/1413

【异步消息机制兼谈Hadoop RPC】http://blog.csdn.net/historyasamirror/archive/2011/01/22/6159248.aspx