Hadoop RPC远程过程调用源码解析及实例

什么是RPC?

1、RPC(Remote Procedure Call)远程过程调用,它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的。经常用于分布式网络通信中。

2、Hadoop的进程间交互都是通过RPC来进行的,比如Namenode与Datanode之间,Jobtracker与Tasktracker之间等。

RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中, RPC跨越了传输层和应用层。 RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。

首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息,在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息给client然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

RPC特点

1、透明性:远程调用其他机器上的程序,对用户来说就像是调用本地方法一样。

2、高性能:RPC server能够并发处理多个来自Client的请求(请求队列)。3、可控性:jdk中已经提供了一个RPC框架–RMI,但是该RPC框架过于重量级并且可控之处比较少,所以Hadoop RPC实现了自定义的RPC框架。

Hadoop RPC通信

1、序列化层:Client与Server端 通信传递的信息采用了Hadoop里提供的序列化类或自定义Writable类型。

2、函数调用层:Hadoop RPC通过动态代理以及Java反射机制实现函数调用。

3、网络传输层:Hadoop RPC采用了基于TCP/IP的socket机制。

4、服务器端框架层:RPC Server利用Java NIO以及采用了事件驱动的I/O模型,提高RPC Server的并发处理能力

Hadoop的整个体系结构就是构建在RPC之上(org.apache.hadoop.ipc)。

Hadoop RPC设计技术

1、动态代理

2、反射3、序列化4、非阻塞的异步IO(NIO)

动态代理

1、动态代理可以提供对另一个对象的访问,同时隐藏实际对象的具体事实,代理对象对客户隐藏了实际对象。

2、动态代理可以对请求进行其他的一些处理,在不允许直接访问某些类,或需要对访问做一些特殊处理等,这时候可以考虑使用代理。3)目前Java开发包中提供了对动态代理的支持,但现在只支持对接口的实现。相关的类与接口:java.lang.reflect.Proxy--类 java.lang.reflect.InvocationHandler--接口

动态代理创建对象过程:

InvocationHandler handler = new InvocationHandlerImpl(...) Proxy.newInstance(...)

具体实现可参考如下:

根据上图查看hadoop2.6.0源码

Client

Server

RPC

几个重要的协议

ClientProtocol是客户端(FileSystem)与NameNode通信的接口。

DatanodeProtocol是DataNode与NameNode通信的接口NamenodeProtocol是SecondaryNameNode与NameNode通信的接口。DFSClient是直接调用NameNode接口的对象。用户代码是通过DistributedFileSystem调用DFSClient对象,才能与NameNode打交道。

模拟Hadoop RPC通信


  1. package MyRPC; 
  2. import org.apache.hadoop.io.Text; 
  3. import org.apache.hadoop.ipc.VersionedProtocol; 
  4.  
  5. public interface MyRPCProtocal extends VersionedProtocol{ 
  6.     public static long versionID = 23234l;//很重要很重要,搞了一下午才解决掉。 
  7.     public Text test(Text t); 

  1. package MyRPC; 
  2.  
  3. import java.io.IOException; 
  4. import org.apache.hadoop.conf.Configuration; 
  5. import org.apache.hadoop.io.Text; 
  6. import org.apache.hadoop.ipc.ProtocolSignature; 
  7. import org.apache.hadoop.ipc.RPC; 
  8. import org.apache.hadoop.ipc.RPC.Server; 
  9.  
  10. public class RPCServer implements MyRPCProtocal{     
  11.     Server server = null; 
  12.     public RPCServer() throws IOException, InterruptedException{ 
  13.         //server = RPC.getServer(this,"localhost",8888,new Configuration()); 
  14.         //相对于以前的版本有略微的改动 
  15.         RPC.Builder ins = new RPC.Builder(new Configuration()); 
  16.         ins.setInstance(this); 
  17.         ins.setBindAddress("localhost"); 
  18.         ins.setPort(9999); 
  19.         ins.setProtocol(MyRPCProtocal.class); 
  20.         //RPC.setProtocolEngine(new Configuration(), MyRPCProtocal.class, RpcEngine.class); 
  21.         server = ins.build();//获得一个server实例 
  22.         server.start(); 
  23.         server.join();   
  24.     } 
  25.  
  26.     public static void main(String[] args) throws IOException, InterruptedException { 
  27.         new RPCServer(); 
  28.     } 
  29.  
  30.     @Override 
  31.     public long getProtocolVersion(String protocol, long clientVersion) 
  32.             throws IOException { 
  33.         return MyRPCProtocal.versionID; 
  34.     } 
  35.  
  36.     @Override 
  37.     public ProtocolSignature getProtocolSignature(String protocol, 
  38.             long clientVersion, int clientMethodsHash) throws IOException {      
  39.         return new ProtocolSignature(); 
  40.     } 
  41.  
  42.     @Override 
  43.     public Text test(Text t) { 
  44.         if(t.toString().equals("RPC")){ 
  45.             return new Text("ok"); 
  46.         } 
  47.         return new Text("false"); 
  48.     } 
  49. package MyRPC; 
  50.  
  51. import java.net.InetSocketAddress; 
  52.  
  53. import org.apache.hadoop.conf.Configuration; 
  54. import org.apache.hadoop.io.Text; 
  55. import org.apache.hadoop.ipc.RPC; 
  56.  
  57. public class RPCClient { 
  58.  
  59.     private MyRPCProtocal protocal; 
  60.  
  61.     public RPCClient() throws Exception{ 
  62.         InetSocketAddress address = new InetSocketAddress("localhost",9999); 
  63.  
  64.         protocal = (MyRPCProtocal)RPC.waitForProxy 
  65.                 (MyRPCProtocal.class,MyRPCProtocal.versionID, address, new Configuration()); 
  66.         //RPC.setProtocolEngine(new Configuration(), MyRPCProtocal.class, RpcEngine.class); 
  67.     } 
  68.  
  69.     public void call(String s){ 
  70.         final Text string = protocal.test(new Text(s)); 
  71.         System.out.println(string.toString()); 
  72.     } 
  73.  
  74.     public static void main(String[] args) throws Exception { 
  75.         RPCClient client = new RPCClient(); 
  76.         client.call("RPC"); 
  77.     } 

 

本文作者:佚名

来源:51CTO

时间: 2024-11-19 01:00:22

Hadoop RPC远程过程调用源码解析及实例的相关文章

Java集合学习(十七) TreeSet详细介绍(源码解析)和使用示例

这一章,我们对TreeSet进行学习. 我们先对TreeSet有个整体认识,然后再学习它的源码,最后再通过实例来学会使用TreeSet. 第1部分 TreeSet介绍 TreeSet简介 TreeSet 是一个有序的集合,它的作用是提供有序的Set集合.它继承于AbstractSet抽象类,实现了NavigableSet<E>, Cloneable, java.io.Serializable接口. TreeSet 继承于AbstractSet,所以它是一个Set集合,具有Set的属性和方法.

Java集合学习(十六) HashSet详细介绍(源码解析)和使用示例

这一章,我们对HashSet进行学习. 我们先对HashSet有个整体认识,然后再学习它的源码,最后再通过实例来学会使用HashSet. 第1部分 HashSet介绍 HashSet 简介 HashSet 是一个没有重复元素的集合. 它是由HashMap实现的,不保证元素的顺序,而且HashSet允许使用 null 元素. HashSet是非同步的.如果多个线程同时访问一个哈希 set,而其中至少一个线程修改了该 set,那么它必须 保持外部同步.这通常是通过对自然封装该 set 的对象执行同步

Java集合学习(十三) WeakHashMap详细介绍(源码解析)和使用示例

这一章,我们对WeakHashMap进行学习. 我们先对WeakHashMap有个整体认识,然后再学习它的源码,最后再通过实例来学会使用WeakHashMap. 第1部分 WeakHashMap介绍 WeakHashMap简介    WeakHashMap 继承于AbstractMap,实现了Map接口.    和HashMap一样,WeakHashMap 也是一个散列表,它存储的内容也是键值对(key-value)映射,而且键和值都可以是null.   不过WeakHashMap的键是"弱键&

Java集合学习(十二) TreeMap详细介绍(源码解析)和使用示例

这一章,我们对TreeMap进行学习. 第1部分 TreeMap介绍 TreeMap 简介 TreeMap 是一个有序的key-value集合,它是通过红黑树实现的. TreeMap继承于AbstractMap,所以它是一个Map,即一个key-value集合. TreeMap 实现了NavigableMap接口,意味着它支持一系列的导航方法.比如返回有序的key集合. TreeMap 实现了Cloneable接口,意味着它能被克隆. TreeMap 实现了java.io.Serializabl

Java集合学习(十一) Hashtable详细介绍(源码解析)和使用示例

这一章,我们对Hashtable进行学习. 我们先对Hashtable有个整体认识,然后再学习它的源码,最后再通过实例来学会使用Hashtable. 第1部分 Hashtable介绍 Hashtable 简介 和HashMap一样,Hashtable 也是一个散列表,它存储的内容是键值对(key-value)映射. Hashtable 继承于Dictionary,实现了Map.Cloneable.java.io.Serializable接口. Hashtable 的函数都是同步的,这意味着它是线

Java集合学习(十) HashMap详细介绍(源码解析)和使用示例

这一章,我们对HashMap进行学习. 我们先对HashMap有个整体认识,然后再学习它的源码,最后再通过实例来学会使用HashMap. 第1部分 HashMap介绍 HashMap简介 HashMap 是一个散列表,它存储的内容是键值对(key-value)映射. HashMap 继承于AbstractMap,实现了Map.Cloneable.java.io.Serializable接口. HashMap 的实现不是同步的,这意味着它不是线程安全的.它的key.value都可以为null.此外

Java集合学习(六) Vector详细介绍(源码解析)和使用示例

学完ArrayList和LinkedList之后,我们接着学习Vector.学习方式还是和之前一样,先对Vector有个整体认识,然后再学习它的源码:最后再通过实例来学会使用它. 第1部分 Vector介绍 Vector简介 Vector 是矢量队列,它是JDK1.0版本添加的类.继承于AbstractList,实现了List, RandomAccess, Cloneable这些接口. Vector 继承了AbstractList,实现了List:所以,它是一个队列,支持相关的添加.删除.修改.

Java集合学习(五) LinkedList详细介绍(源码解析)和使用示例

前面,我们已经学习了ArrayList,并了解了fail-fast机制.这一章我们接着学习List的实现类--LinkedList. 和学习ArrayList一样,接下来呢,我们先对LinkedList有个整体认识,然后再学习它的源码:最后再通过实例来学会使用LinkedList. 第1部分 LinkedList介绍 LinkedList简介 LinkedList 是一个继承于AbstractSequentialList的双向链表.它也可以被当作堆栈.队列或双端队列进行操作. LinkedLis

knockout.js源码解析

简介 本文主要对源码和内部机制做较深如的分析,基础部分请参阅官网文档. knockout.js (以下简称 ko )是最早将 MVVM 引入到前端的重要功臣之一.目前版本已更新到 3 .相比同类主要有特点有: 双工绑定基于 observe 模式,性能高. 插件和扩展机制非常完善,无论在数据层还是展现层都能满足各种复杂的需求. 向下支持到IE6 文档.测试完备,社区较活跃. 入口 以下分析都将对照 github 上3.x的版本.有一点需要先了解:ko 使用 google closure compi