天天看点

Hadoop-0.20.0源代码分析(13)

通过对org.apache.hadoop.ipc包中,Hadoop实现了基于IPC模型的RPC机制,可以不需要像Java中实现的RMI机制一样,在RPC调用的C/S两端分别创建Stub和Skeleton,而是通过一组协议来进行RPC调用就可以实现通信。这主要是由于Hadoop所采用的序列化机制简化了RPC调用的复杂性。Hadoop定义了自己的通信协议,这些协议都是建立在TCP/IP协议之上的,规范了通信两端的约定。

为了阅读分析org.apache.hadoop.hdfs.DFSClient类的实现,我们还需要对Hadoop定义的通信协议簇来进行了解,因为DFSClient所执行的操作都是基于协议的规定来实现的,了解协议的内容可能会对DFSClient实例连接到HDFS执行的任务更好理解一点。

首先,了解一下Hadoop定义的通信双方需要遵循的一组协议,下面是协议接口的继承层次关系,并作简单介绍:

。org.apache.hadoop.ipc.VersionedProtocol

。org.apache.hadoop.hdfs.protocol.ClientProtocol

。org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol

。org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol

。org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol

。org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol

上述协议接口的基本含义简述如下:

VersionedProtocol协议是Hadoop的最顶层协议接口的抽象;

ClientProtocol协议是用户进程(包括客户端进程与Datanode进程)与Namenode进程之间进行通信所使用的协议,例如,(1)客户端进程需要向Datanode数据结点复制数据块,需要与Namenode进程通信,获取Datanode结点列表;(2)Datanode进程向Namenode进程发送心跳状态报告和块状态报告需要与Namenode进程交互;

ClientDatanodeProtocol协议是客户端进程与Datanode进程之间进行通信所使用的协议;

DatanodeProtocol协议是当Datanode进程需要与NameNode进程进行通信是需要基于此协议,例如发送心跳报告和块状态报告;

InterDatanodeProtocol协议是Datanode进程之间进行通信的协议,例如客户端进程启动复制数据块,此时可能需要在Datanode结点之间进行块副本的流水线复制操作。

下面我们一个一个来看:

  • VersionedProtocol协议

该接口的定义如下:

package org.apache.hadoop.ipc;

import java.io.IOException;

/**

* 使用Hadoop RPC机制的所有协议的超类

* 该接口的子类同样支持具有一个static final long的版本属性字段

*/

public interface VersionedProtocol {

/**

* 返回与指定协议protocol相关的协议版本

* @param protocol 协议接口的类名

* @param clientVersion 客户端欲与服务器进行交互,它所使用的协议版本

* @return 返回服务器将要与客户端进行交互,所需要使用的协议版本

*/

public long getProtocolVersion(String protocol, long clientVersion) throws IOException;

}

该接口是所有与Hadoop RPC调用相关的协议的最高层抽象。

  • ClientProtocol协议

当客户端进程想要与Namenode进程进行通信的时候,需要通过org.apache.hadoop.hdfs.DistributedFileSystem类,基于ClientProtocol协议来实现交互过程。用户代码通过ClientProtocol协议,可以操纵HDFS的目录命名空间、打开与关闭文件流等。

该接口中定义了一个static final的versionID字段,如下所示:

/**

* 与先前的版本进行比较

* 41: saveNamespace introduced.

*/

public static final long versionID = 41L;

下面介绍,该接口协议中定义的与文件内容相关的操作:

1、获取块位置信息

定义的方法为getBlockLocations,该方法的参数如下:

/**

* @param src 文件名称

* @param offset 范围的起始偏移位置

* @param length 范围大小,即offset+length

*/

public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException;

getBlockLocations方法的主要功能说明如下:

获取到指定范围内,指定文件的所有块的位置信息。 其中,调用该方法返回的org.apache.hadoop.hdfs.protocol.LocatedBlocks对象包含的内容为:文件长度、组成文件的块及其存储位置(所在的Datanode数据结点)。对于每个块所在DataNode结点的位置,市基于“到文件系统客户端地址的距离最近”的原则进行了排序。文件系统客户端必须与制定的Datanode进行交互,才能获取到所需要的实际的块据块。

2、在命名空间中创建一个文件入口(entry)

定义的方法为create,该方法的参数如下所示:

/**

* @param src 被创建文件的路径

* @param masked 权限

* @param clientName 当前客户端名称

* @param overwrite 如果待创建的文件已经存在,指定是否重写该文件

* @param replication 块副本因子

* @param blockSize 块的最大长度

*/

public void create(

String src,

FsPermission masked,

String clientName,

boolean overwrite,

short replication,

long blockSize) throws IOException;

create方法的主要功能说明如下:

在命名空间中创建一个文件入口。该方法将创建一个由src路径指定的空文件,该路径src应该反映了从root目录开始的一个完整路径名称。从客户端的角度,Namenode并没有“当前”目录的概念。一旦文件创建成功,该文件就是可见的,并可以被其它客户端来执行读操作。但是,其它客户端不能够对该文件进行删除、重命名、重写,而这些操作只有在该文件被完全或明确指定为租约到期,才可以执行。

每个块都具有最大长度限制,如果客户端想要创建多个块,可以调用addBlock(String, String)方法来实现。

3、追加写文件操作

定义的方法为append,该方法 的参数如下所示:

/**

* @param src 被创建文件的路径

* @param clientName 当前客户端的名称

*/

public LocatedBlock append(String src, String clientName) throws IOException;

append方法的主要功能说明如下:

向文件src中追加写入内容,返回一个org.apache.hadoop.hdfs.protocol.LocatedBlock对象,该对象封装了Block(Hadoop基本文件块)和DatanodeInfo[](Datanode的状态信息),通过追加写操作后的返回信息,可以定位到追加写入最后部分块的信息。

4、设置副本因子

定义的方法为setReplication,该方法 的参数如下所示:

/**

* Set replication for an existing file.

* <p>

* The NameNode sets replication to the new value and returns.

* The actual block replication is not expected to be performed during

* this method call. The blocks will be populated or removed in the

* background as the result of the routine block maintenance procedures.

*

* @param src 文件名称

* @param replication 新的副本因子

*/

public boolean setReplication(String src, short replication) throws IOException;

setReplication方法的主要功能说明如下:

该方法为一个指定的文件修改块副本因子。Namenode会为指定文件设置副本因子,但是,不期望在调用该方法的过程修改实际块的副本因子,而是由后台块维护进程来执行:如果当前副本因子小于设置的新副本因子,需要增加一些块副本,如果当前副本因子大于设置的新副本因子,就会删除一些块副本。

另外,基于文件操作的方法,还有如下一些:

/**

* 为已经存在的目录或者文件,设置给定的操作权限

*/

public void setPermission(String src, FsPermission permission) throws IOException;

/**

* 设置文件或目录属主

*/

public void setOwner(String src, String username, String groupname) throws IOException;

/**

* 客户端放弃对指定块的操作

*/

public void abandonBlock(Block b, String src, String holder) throws IOException;

/**

* 客户端向一个当前为写操作打开的文件写入数据块

*/

public LocatedBlock addBlock(String src, String clientName) throws IOException;

/**

* 客户端完成对指定文件的写操作,并期望能够写完,在写完以后关闭文件

*/

public boolean complete(String src, String clientName) throws IOException;

/**

* 客户端向Namenode报告corrupted块的信息(块在Datanode上的位置信息)

* @param blocks 待报告的块信息的数组

*/

public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;

/**

* 在文件系统命令空间中重命名一个文件或目录

*/

public boolean rename(String src, String dst) throws IOException;

/**

* 删除文件或目录src

*/

public boolean delete(String src) throws IOException;

/**

* 删除文件或目录src,根据recursive选项来执行

*/

public boolean delete(String src, boolean recursive) throws IOException;

/**

* 创建目录src,并赋予目录src指定的nasked权限

*/

public boolean mkdirs(String src, FsPermission masked) throws IOException;

/**

* 获取指定目录src中的文件列表

*/

public FileStatus[] getListing(String src) throws IOException;

下面是接口ClientProtocol定义的与系统的管理相关的方法的定义:

1、监听客户端

方法renewLease定义为:

public void renewLease(String clientName) throws IOException;

该方法主要是Namenode监听到某个客户端发送的心跳状态,如果再一段时间内无法获取到某个客户端的心跳状态,很可能是该客户端因为某些异常崩溃掉了,被加上了不能继续正常工作的状态锁,Namenode进程会周期地调用该方法来确定指定的客户端clientName是否确实挂掉了,如果又重新接收到该客户端发送的心跳报告,则为该客户端进行解锁操作,恢复其正常的工作。

2、获取文件系统的状态统计数据

定义的方法为getStats,如下所示:

/**

* 返回一组标识文件系统不同信息的索引数组:

* <ul>

* <li> [0] 包含文件系统总存储容量(按字节计算)</li>

* <li> [1] 包含文件系统已使用空间(按字节计算)</li>

* <li> [2] 包含文件系统可使用空间(按字节计算)</li>

* <li> [3] 包含文件系统中不满足副本因子数量的块的数量</li>

* <li> [4] 包含corrupt副本的块的数量 </li>

* <li> [5] 包含没有任何可以用的块副本的块的数量</li>

* </ul>

*/

public long[] getStats() throws IOException;

上面返回的long[]对应定义的如下几个常量:

public int GET_STATS_CAPACITY_IDX = 0;

public int GET_STATS_USED_IDX = 1;

public int GET_STATS_REMAINING_IDX = 2;

public int GET_STATS_UNDER_REPLICATED_IDX = 3;

public int GET_STATS_CORRUPT_BLOCKS_IDX = 4;

public int GET_STATS_MISSING_BLOCKS_IDX = 5;

3、安全模式开关操作

定义的方法为setSafeMode,如下所示:

public boolean setSafeMode(FSConstants.SafeModeAction action) throws IOException;

通过调用该方法可以执行如下操作:进入安全模式、退出安全模式、获取安全模式。

当一个Namenode启动时候,首先会进入到安全模式这种特殊状态,在该状态下不能够进行数据块的复制操作。Namenode接收HDFS集群中所有的Datanode结点的心跳状态报告和数据块状态报告,根据状态报告来决定是否开始进入工作状态。如果某些Datanode结点发送的心跳状态报告不正常或者根本无从接收到,Namenode会将这些Datanode视为故障结点,在进入工作状态的时候,将这些故障结点排除在工作集群之外。如果某些Datanode结点上的数据块状态报告存在问题,会根据要求进行处理,比如某Datanode据结点上数据块的块副本未达到副本因子,则会在退出安全模式之后,进行块复制操作,满足副本要求。

4、保存命名空间映像

定义的方法为saveNamespace,如下所示:

/**

* 保存命名空间映像

*

* 保存FsImage映像,同时将更新同步到EditLog中。

* 要求具有超级权限,并且在安全模式下进行。

*/

public void saveNamespace() throws IOException;

5、持久化文件系统元数据

定义的方法为metaSave,如下所示:

/**

* 将Namenode结点上的数据结构写入到指定的文件中,如果指定文件已经存在,则追加到该文件中

*/

public void metaSave(String filename) throws IOException;

这里,引用Hadoop架构设计要点中的一段文字来描述一下,文件系统元数据的持久化:

“Namenode上保存着HDFS的名字空间。对于任何对文件系统元数据产生修改的操作,Namenode都会使用一种称为EditLog的事务日志记录下来。例如,在HDFS中创建一个文件,Namenode就会在Editlog中插入一条记录来表示;同样地,修改文件的副本系数也将往Editlog插入一条记录。Namenode在本地操作系统的文件系统中存储这个Editlog。整个文件系统的名字空间,包括数据块到文件的映射、文件的属性等,都存储在一个称为FsImage的文件中,这个文件也是放在Namenode所在的本地文件系统上。

Namenode在内存中保存着整个文件系统的名字空间和文件数据块映射(Blockmap)的映像。这个关键的元数据结构设计得很紧凑,因而一个有4G内存的Namenode足够支撑大量的文件和目录。当Namenode启动时,它从硬盘中读取Editlog和FsImage,将所有Editlog中的事务作用在内存中的FsImage上,并将这个新版本的FsImage从内存中保存到本地磁盘上,然后删除旧的Editlog,因为这个旧的Editlog的事务都已经作用在FsImage上了。这个过程称为一个检查点(checkpoint)。在当前实现中,检查点只发生在Namenode启动时,在不久的将来将实现支持周期性的检查点。

Datanode将HDFS数据以文件的形式存储在本地的文件系统中,它并不知道有关HDFS文件的信息。它把每个HDFS数据块存储在本地文件系统的一个单独的文件中。Datanode并不在同一个目录创建所有的文件,实际上,它用试探的方法来确定每个目录的最佳文件数目,并且在适当的时候创建子目录。在同一个目录中创建所有的本地文件并不是最优的选择,这是因为本地文件系统可能无法高效地在单个目录中支持大量的文件。当一个Datanode启动时,它会扫描本地文件系统,产生一个这些本地文件对应的所有HDFS数据块的列表,然后作为报告发送到Namenode,这个报告就是块状态报告。”

另外,ClientProtocol协议接口还定义了其它一些方法,如下所示:

/**

* 获取集群系统中当前的Datanode的状态报告

* 每个Datanode返回一个DatanodeInfo对象,包括Datanode的类型LIVE、DEAD或ALL.

*/

public DatanodeInfo[] getDatanodeReport(FSConstants.DatanodeReportType type) throws IOException;

/**

* 获取指定文件的块大小

*/

public long getPreferredBlockSize(String filename) throws IOException;

/**

* 告诉Namenode重新读取集群结点列表

*/

public void refreshNodes() throws IOException;

/**

* 完成之前的升级操作

* 删除在升级期间保存的文件系统的状态,一旦执行该方法,改变将不可逆转。

*/

public void finalizeUpgrade() throws IOException;

/**

* 报告分布式升级进程,或强制使当前升级执行

*/

public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException;

/**

* 获取指定文件或目录的状态

*/

public FileStatus getFileInfo(String src) throws IOException;

/**

* 获取某个目录的ContentSummary信息

*/

public ContentSummary getContentSummary(String path) throws IOException;

/**

* 将指定文件的全部元数据写入到持久存储中

* 其中,必须保证文件为写操作而打开。

*/

public void fsync(String src, String client) throws IOException;

/**

* 设置指定文件的修改和访问时间

*/

public void setTimes(String src, long mtime, long atime) throws IOException;

/**

* 为指定目录设置分配配额信息

*/

public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException;

  • NamenodeProtocol协议

该协议接口定义了次级Namenode(Secondary NameNode)与Namenode进行通信所需进行的操作。其中,Secondary NameNode是一个用来辅助Namenode的服务器端进程,主要是对映像文件执行特定的操作,另外还包括获取指定Datanode上块的操作。

该接口定义的操作如下所示:

/**

* 获取datanode上大小为size的块

*/

public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException;

/**

* 获取当前EditLog文件的大小(in bytes)

*/

public long getEditLogSize() throws IOException;

/**

* 关闭当前EditLog文件,并重新打开一个新的

* 当系统处于安全模式下,执行该方法会失败

*/

public CheckpointSignature rollEditLog() throws IOException;

/**

* 回滚FsImage日志:删除旧的FsImage,拷贝新的映像到FsImage文件中,删除旧的EditLog文件并重命名edits.new为edits。

*/

public void rollFsImage() throws IOException;

  • ClientDatanodeProtocol协议

当客户端进程需要与Datanode进程进行通信的时候,需要基于该协议。该协议接口定义数据块恢复的方法,如下所示:

public interface ClientDatanodeProtocol extends VersionedProtocol {

public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);

/**

* 3: add keepLength parameter.

*/

public static final long versionID = 3L;

/**

* @param block 指定的数据块

* @param keepLength 保持数据块的长度

* @param targets 指定的块的可能位置列表

* @return 如果恢复成功,返回块ID和时间戳

*/

LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets) throws IOException;

}

  • DatanodeProtocol协议

该协议用于一个DFS Datanode用户与Namenode进行通信的协议。该接口定义如下所示:

package org.apache.hadoop.hdfs.server.protocol;

import java.io.*;

import org.apache.hadoop.hdfs.protocol.Block;

import org.apache.hadoop.hdfs.protocol.DatanodeID;

import org.apache.hadoop.hdfs.protocol.LocatedBlock;

import org.apache.hadoop.ipc.VersionedProtocol;

public interface DatanodeProtocol extends VersionedProtocol {

/**

* 19: 发送心跳,返回一个DatanodeCommand对象数组

*/

public static final long versionID = 19L;

// 定义错误代码

final static int NOTIFY = 0;

final static int DISK_ERROR = 1;

final static int INVALID_BLOCK = 2;

/**

* 当接收到Datanode的命令的时候,根据下述状态码确定Datanode应该执行何种操作,

*/

final static int DNA_UNKNOWN = 0; // 未知

final static int DNA_TRANSFER = 1; // 将数据块从一个Datanode转移到另一个Datanode

final static int DNA_INVALIDATE = 2; // 未验证数据块

final static int DNA_SHUTDOWN = 3; // 关闭Datanode

final static int DNA_REGISTER = 4; // 重新注册

final static int DNA_FINALIZE = 5; // 完成先前执行的升级操作

final static int DNA_RECOVERBLOCK = 6; // 数据块恢复操作请求

/**

* 注册Datanode

*

* @return 更新后的{@link org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration},包含:一个新的storageID(如果Datanode没有storageID)、新的registration ID

*/

public DatanodeRegistration register(DatanodeRegistration registration) throws IOException;

/**

* Datanode向Namenode发送心跳状态报告

*/

public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration, long capacity, long dfsUsed, long remaining, int xmitsInProgress, int xceiverCount) throws IOException;

/**

* Datanode向Namenode发送块状态报告

*/

public DatanodeCommand blockReport(DatanodeRegistration registration, long[] blocks) throws IOException;

/**

* Datanode向Namenode报告最近接收到的数据块、删除的多余块副本

*/

public void blockReceived(DatanodeRegistration registration, Block blocks[], String[] delHints) throws IOException;

/**

* 向Namenode报告错误信息

*/

public void errorReport(DatanodeRegistration registration, int errorCode, String msg) throws IOException;

public NamespaceInfo versionRequest() throws IOException;

/**

* Datanode向Namenode发送一个升级命令

*/

UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException;

/**

* Datanode向Namenode报告Bad Blocks

*/

public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;

/**

* 为数据块生成新的时间戳

*/

public long nextGenerationStamp(Block block) throws IOException;

/**

* 在恢复数据块期间,提交事务:数据块同步

*/

public void commitBlockSynchronization(Block block, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) throws IOException;

}

一般来说Namenode不直接对Datanode进行RPC调用,如果一个Namenode需要与Datanode通信,唯一的方式就通过调用该协议接口定义的方法。

  • InterDatanodeProtocol协议

该协议用于Datanode进程之间进行通信。

该接口的定义如下所示:

package org.apache.hadoop.hdfs.server.protocol;

import java.io.IOException;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.hdfs.protocol.Block;

import org.apache.hadoop.ipc.VersionedProtocol;

public interface InterDatanodeProtocol extends VersionedProtocol {

public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);

/**

* 3: 向更新块中增加一个表示完成的参数

*/

public static final long versionID = 3L;

/**

* 获取指定块的元数据

*/

BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;

/**

* 更新数据块

*/

void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException;

}

上面,对不同进程之间通信所使用的协议的接口进行了阅读分析,应该能够了解每种协议应用的场景,如果想要基于某种场景实现端端通信,可以选择合适的协议接口来实现它。

继续阅读