博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop rpc客户端初始化和调用过程详解
阅读量:7211 次
发布时间:2019-06-29

本文共 7793 字,大约阅读时间需要 25 分钟。

  hot3.png

本文主要记录hadoop rpc的客户端部分的初始化和调用的过程,下面的介绍中主要通过DFSClient来说明,为什么用DFSClient呢?DFSClient作为namenode的客户端,通过rpc来操作hdfs。限于篇幅,本文对下文引用到的类,做了较大的剪裁,只给出了关键的部分,如有疑问,可以一起交流。

DFSClient的初始化

DFSClient的初始化主要看其构造函数,其中rpc部分我们主要关注属性final ClientProtocol namenode,DFSClient的文件系统操作都是由他代理完成,构造函数中的关键代码如下:

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,      Configuration conf, FileSystem.Statistics stats)    throws IOException {	proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class); 	this.dtService = proxyInfo.getDelegationTokenService(); 	this.namenode = proxyInfo.getProxy();}

显然,DFSClient中的namenode是一个代理类。

接着NameNodeProxies类的createProxy方法,下面给出了NameNodeProxies中需要用到的一些方法:

public class NameNodeProxies {public static 
ProxyAndInfo
createProxy(Configuration conf, URI nameNodeUri, Class
xface) throws IOException { return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true);}public static
ProxyAndInfo
createNonHAProxy( Configuration conf, InetSocketAddress nnAddr, Class
xface, UserGroupInformation ugi, boolean withRetries) throws IOException { proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries); return new ProxyAndInfo
(proxy, dtService);}/** 这部分是重点*/private static ClientProtocol createNNProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries) throws IOException { ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy).getProxy(); proxy = (ClientNamenodeProtocolPB) RetryProxy.create( ClientNamenodeProtocolPB.class, new DefaultFailoverProxyProvider
( ClientNamenodeProtocolPB.class, proxy), methodNameToPolicyMap, defaultPolicy); return new ClientNamenodeProtocolTranslatorPB(proxy);}}

该类中前面两个方法做跳转用,直接看createNNProxyWithClientProtocol方法,这里两行很关键的代码,proxy实例的初始化,这里先提示注意前一行中的getProxy() 对于这个方法是需要注意的,这样也保证了类型的一致。

这时候就不得不调出RPC这个类来看看他是怎么生成proxy的实例的了,看代码:ProtobufRpcEngineProtobufRpcEngineProtobufRpcEngineProtobufRpcEngine

public class RPC {public static 
ProtocolProxy
getProtocolProxy(Class
protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy); }}

RPC中还是需要进一步的跳转,但是这里需要注意,getProtocolEngine这个方法,这里做一个说明,查看
RpcEngine的依赖,看图:
 在我的2.4.1的hadoop的版本中,hadoop的序列化框架已经用了Protobuf,所以getProtocolEngine方法得到的是ProtobufRpcEngine类的一个实例,那好,我们进一步跟踪ProtobufRpcEngine类的getProxy方法,看代码:
public class ProtobufRpcEngine implements RpcEngine {	public 
ProtocolProxy
getProxy(Class
protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy ) throws IOException { final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy); return new ProtocolProxy
(protocol, (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker), false); }}

对java的动态代理有点了解的人看到Proxy.newProxyInstance这个方法应该都很清楚这就是生成一个远程代理类实例(特别注意:在NameNodeProxies类的createNNProxyWithClientProtocol方法中getProxy方法拿到的对象也就是这个对象),其中的invoker参数,确实我们不能忽略的,因为他暗藏玄机,java的动态代理中,invoker的类需要实现InvocationHandler接口,该接口只听过一个方法invoke,共代理类使用,及通过Proxy.newProxyInstance生成的代理类,在使用的时候是通过InvocationHandler的invoke方法来起作用的。好吧,现在我们可以顺便看看在ProtobufRpcEngine类的getProxy方法中invoker局部变量的类依赖图:,显然有刚才提到的实现关系,现在再让我们看看Invoker的内部,包括构造函数和invoke方法:

private Invoker(Class
protocol, Client.ConnectionId connId, Configuration conf, SocketFactory factory) { this.remoteId = connId; this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class); this.protocolName = RPC.getProtocolName(protocol); this.clientProtocolVersion = RPC .getProtocolVersion(protocol); }public Object invoke(Object proxy, Method method, Object[] args) throws ServiceException { val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);}

在构造函数请注意一个属性client,他的类型正式 org.apache.hadoop.ipc.Client,而且在invoke方法中发起远程调用的正是这个client属性,能够读到这里的同学,相信应该比较清楚了,在DFSClient中发起远程访问的就是这个Client类的实例。

关于DFSClient的初始化阶段中关于rpc的部分,总结一句,就是创建一个namenode的代理对象,供后续的文件系统操作调用。

DFSClient的getFileLinkInfo方法

DFSClient提供了相当丰富的API供客户端操作hadoop的文件系统,这里以 getFileLinkInfo为例,讲解rpc客户端的调用过程。注意:如果是FileSystem类的话,请使用方法getFileLinkStatus,他对DFSClient提供的getFileLinkInfo做了一层包装,仅此而已。

直接看DFSClient中的代码:

public HdfsFileStatus getFileLinkInfo(String src) throws IOException {    checkOpen();    try {      return namenode.getFileLinkInfo(src);    } catch(RemoteException re) {      throw re.unwrapRemoteException(AccessControlException.class,                                     UnresolvedPathException.class);     }   }

很简答的一行代码,通过namenode属性的调用操作完成,看了DFSClient的初始化过程,我们很容易知道namenode的实例化类是ClientNamenodeProtocolTranslatorPB,继续看调用过程,代码转到了ClientNamenodeProtocolTranslatorPB中:

@Override  public HdfsFileStatus getFileLinkInfo(String src)      throws AccessControlException, UnresolvedLinkException, IOException {    GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder()        .setSrc(src).build();    try {      GetFileLinkInfoResponseProto result = rpcProxy.getFileLinkInfo(null, req);      return result.hasFs() ?            PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs()) : null;    } catch (ServiceException e) {      throw ProtobufHelper.getRemoteException(e);    }  }

这时候我们会发现一个属性rpcProxy,再回过头看看NameNodeProxies类的createProxy方法,我们就可以很清楚的知道,rpcProxy就是那个能发起远程调用的代理类,它封装了Invoker对象,当然就也有了使用Client类的能力,很好,这里我们稍微总结下,在DFSClient类中,调用getFileLinkInfo方法,最终就是通过Client的call方法,发起远程访问,获取数据。

这时候,我们可以进一步来探讨下Hadoop中RPC的Client类了,下面我把Client类主要的部分抽取出来了,看下面的代码:

public class Client {Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {    		return new Call(rpcKind, rpcRequest);    }public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,      	ConnectionId remoteId, int serviceClass) throws IOException {		final Call call = createCall(rpcKind, rpcRequest);    		Connection connection = getConnection(remoteId, call, serviceClass);		connection.sendRpcRequest(call);                 // send the rpc request		return call.getRpcResponse();}private class Connection extends Thread {		private void receiveRpcResponse() {					}		public void sendRpcRequest(final Call call)        		throws InterruptedException, IOException {		}}}

看了DFSclient的初始化部分,我们就可以知道,DFSClient的远程调用,是通过Client的call方法起作用的。其实Client的call方法已经很能够说明问题了,先封装一个call,然后获取连接,再得到结果。简单的说Client就是这样了。可以在稍微复杂一点,在Client的call方法中,封装了call后,getConnection的方法不仅是获取一个连接,同时会启动连接代表的线程,这个线程的作用就是等待请求的完成,完成后,将结果写到call中(该过程天内各国Connection的receiveRpcRespoce方法完成),在call方法中获取连接后,会发送请求的参数到namenode的服务端,等待namenode处理完毕,Connection的receiveRpcRespoce方法写返回结果,最后call方法中返回结果。大概的过程就是这个样子了。

好像整个过程也不太复杂,只是不熟悉的情况下跟踪代码会比较累点。

转载于:https://my.oschina.net/psuyun/blog/372492

你可能感兴趣的文章
使用react的一点提醒17/10/26
查看>>
Java 内部类的阐述
查看>>
redis-大key寻找
查看>>
EF 数据查询(更改默认排序)
查看>>
求连续子数组的最大和
查看>>
SpringMVC最简单配置应用
查看>>
jQuery与Zepto的异同
查看>>
jsp注册页面的省份联动(网上copy别人的,然后自己弄了一下才知道怎么用)
查看>>
CRC检错技术原理
查看>>
读取Ini文件字段
查看>>
asp获取来源Url
查看>>
第一次实验
查看>>
Redis基础操作
查看>>
clob大数据转换为多行数据
查看>>
bootstrap的流式布局
查看>>
如何通过线程池异步调用
查看>>
Squid配置详解
查看>>
070104_微积分:随机变量及其分布(二项分布,均匀分布,正态分布)
查看>>
LeetCode – Refresh – Binary Tree Zigzag Level Order Traversal
查看>>
python操作三大主流数据库(13)python操作redis之新闻项目实战①新闻数据的导入
查看>>