中国做网站最好的,不用登录的传奇游戏,网店设计美工培训,做网站的域名怎样买1、DFSClient类简介 DFSClient 是 Hadoop 分布式文件系统#xff08;HDFS#xff09;中的一个核心类#xff0c;用于客户端与 HDFS 之间的交互。它提供了一组方法#xff0c;使客户端应用程序可以方便地与 HDFS 进行通信#xff0c;包括文件的读取、写入、创建、删除、重命…1、DFSClient类简介 DFSClient 是 Hadoop 分布式文件系统HDFS中的一个核心类用于客户端与 HDFS 之间的交互。它提供了一组方法使客户端应用程序可以方便地与 HDFS 进行通信包括文件的读取、写入、创建、删除、重命名等操作。DFSClient 封装了与 NameNode 和 DataNode 的通信细节使得客户端开发者可以通过高级 API 进行文件系统操作而不必关心底层的实现细节。
2、DFSClient主要功能
2.1、文件读取和写入
提供方法用于读取和写入 HDFS 上的文件。例如open 方法用于打开文件以读取create 方法用于创建新文件以写入。
2.2、文件操作
支持文件的创建、删除、重命名、追加等操作。例如delete 方法用于删除文件或目录rename 方法用于重命名文件或目录。
2.3、目录操作
支持创建、删除和列出目录。例如mkdirs 方法用于创建目录listPaths 方法用于列出目录内容。
2.4、获取文件和目录信息
提供方法获取文件和目录的元数据信息。例如getFileInfo 方法用于获取文件或目录的详细信息getLocatedBlocks 方法用于获取文件的块位置。
2.5、与NN、DN通信
管理与 NameNode 的通信用于获取文件的元数据和块位置信息。管理与 DataNode 的通信用于读取和写入实际的数据块。
3、DFSClient核心源码 DFSClient源码主要包括创建客户端连接(配置获取、令牌处理、连接地址解析)
3.1、构造方法
3.1.1、代码概述
该构造函数已废弃接受一个Configuration对象并调用另一个构造函数获取NameNode地址 Deprecatedpublic DFSClient(Configuration conf) throws IOException {this(DFSUtilClient.getNNAddress(conf), conf);}
该构造函数接受一个InetSocketAddress对象和一个Configuration对象并将InetSocketAddress 转换为URI然后调用另一个基于URI的构造函数 public DFSClient(InetSocketAddress address, Configuration conf)throws IOException {this(DFSUtilClient.getNNUri(address), conf);}
该构造函数接受一个URI对象和一个Configuration对象并将FileSystem.Statistics参数设置为 null然后调用另一个更完整的构造函数 public DFSClient(URI nameNodeUri, Configuration conf) throws IOException {this(nameNodeUri, conf, null);}
该构造函数接受一个URI对象、一个Configuration对象和一个FileSystem.Statistics对象然后调用最完整的构造函数 public DFSClient(URI nameNodeUri, Configuration conf,FileSystem.Statistics stats) throws IOException {this(nameNodeUri, null, conf, stats);} 最底层构造函数该方法不建议直接调用。 VisibleForTestingpublic DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,Configuration conf, FileSystem.Statistics stats) throws IOException {// Copy only the required DFSClient configurationthis.tracer FsTracer.get(conf);this.dfsClientConf new DfsClientConf(conf);this.conf conf;this.stats stats;this.socketFactory NetUtils.getSocketFactory(conf, ClientProtocol.class);this.dtpReplaceDatanodeOnFailure ReplaceDatanodeOnFailure.get(conf);this.dtpReplaceDatanodeOnFailureReplication (short) conf.getInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION,HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION_DEFAULT);LOG.debug(Sets {} to {},HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, dtpReplaceDatanodeOnFailureReplication);this.ugi UserGroupInformation.getCurrentUser();this.namenodeUri nameNodeUri;this.clientName DFSClient_ dfsClientConf.getTaskId() _ ThreadLocalRandom.current().nextInt() _ Thread.currentThread().getId();int numResponseToDrop conf.getInt(DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);ProxyAndInfoClientProtocol proxyInfo null;AtomicBoolean nnFallbackToSimpleAuth new AtomicBoolean(false);if (numResponseToDrop 0) {// This case is used for testing.LOG.warn({} is set to {} , this hacked client will proactively drop responses,DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, numResponseToDrop);proxyInfo NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,nameNodeUri, ClientProtocol.class, numResponseToDrop,nnFallbackToSimpleAuth);}if (proxyInfo ! null) {this.dtService proxyInfo.getDelegationTokenService();this.namenode proxyInfo.getProxy();} else if (rpcNamenode ! null) {// This case is used for testing.Preconditions.checkArgument(nameNodeUri null);this.namenode rpcNamenode;dtService null;} else {Preconditions.checkArgument(nameNodeUri ! null,null URI);proxyInfo NameNodeProxiesClient.createProxyWithClientProtocol(conf,nameNodeUri, nnFallbackToSimpleAuth);this.dtService proxyInfo.getDelegationTokenService();this.namenode proxyInfo.getProxy();}String localInterfaces[] conf.getTrimmedStrings(DFS_CLIENT_LOCAL_INTERFACES);localInterfaceAddrs getLocalInterfaceAddrs(localInterfaces);if (LOG.isDebugEnabled() 0 ! localInterfaces.length) {LOG.debug(Using local interfaces [{}] with addresses [{}],Joiner.on(,).join(localInterfaces),Joiner.on(,).join(localInterfaceAddrs));}Boolean readDropBehind (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) null) ?null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);Long readahead (conf.get(DFS_CLIENT_CACHE_READAHEAD) null) ?null : conf.getLongBytes(DFS_CLIENT_CACHE_READAHEAD, 0);this.serverDefaultsValidityPeriod conf.getTimeDuration(DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY,DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT,TimeUnit.MILLISECONDS);Boolean writeDropBehind (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) null) ?null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);this.defaultReadCachingStrategy new CachingStrategy(readDropBehind, readahead);this.defaultWriteCachingStrategy new CachingStrategy(writeDropBehind, readahead);this.clientContext ClientContext.get(conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),dfsClientConf, conf);if (dfsClientConf.getHedgedReadThreadpoolSize() 0) {this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());}this.initThreadsNumForStripedReads(dfsClientConf.getStripedReadThreadpoolSize());this.saslClient new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);}
3.1.2、重点剖析
DFSClient的核心构建方式是传入namenode节点对应的URI以及配置信息也是我们构建DFSClient通常使用的方法
public DFSClient(URI nameNodeUri, Configuration conf) throws IOException {this(nameNodeUri, conf, null);
}
3.2、委托令牌处理 这段源码是一个用于续约和取消 HDFS 委托令牌Delegation Token的 Renewer 类它继承自 TokenRenewer 类。主要功能是通过与 NameNode 通信维护和管理委托令牌的生命周期。
3.2.1、代码概述 3.2.2、重点剖析
static静态代码块为初始化hdfs配置文件handleKind方法用于判断是否处理指定类型的委托令牌在当前源码中会默认判定是否为HDFS的委托令牌类型renew 方法用于续约委托令牌。它通过 getNNProxy 方法获取到与委托令牌对应的 NameNode 代理然后调用 renewDelegationToken 方法进行委托令牌的续约操作cancel 方法用于取消委托令牌。它也通过 getNNProxy 方法获取 NameNode 代理然后调用 cancelDelegationToken 方法执行委托令牌的取消操作getNNProxy 方法根据委托令牌获取对应的 NameNode 代理。它首先根据委托令牌的信息构建 URI然后通过 NameNodeProxiesClient 类的静态方法创建 NameNode 的代理对象并返回该代理对象。
3.3、getLocalInterfaceAddrs
3.3.1、代码概述 这个方法的作用是接受一个接口名称的数组并根据每个接口名称解析成对应的本地地址可以是 IP 地址、子网或域名。它首先尝试将接口名称视为一个 IP 地址如果不是则检查它是否是一个有效的子网如果仍然不是则假定它是一个域名并通过 DNS 解析。最终所有解析出的地址都被封装为 InetSocketAddress 对象并返回一个包含这些地址的数组。
private static SocketAddress[] getLocalInterfaceAddrs(String interfaceNames[]) throws UnknownHostException {ListSocketAddress localAddrs new ArrayList();for (String interfaceName : interfaceNames) {if (InetAddresses.isInetAddress(interfaceName)) {localAddrs.add(new InetSocketAddress(interfaceName, 0));} else if (NetUtils.isValidSubnet(interfaceName)) {for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {localAddrs.add(new InetSocketAddress(addr, 0));}} else {for (String ip : DNS.getIPs(interfaceName, false)) {localAddrs.add(new InetSocketAddress(ip, 0));}}}return localAddrs.toArray(new SocketAddress[localAddrs.size()]);}
3.3.2、重点剖析
该方法首先检查interfaceName是否是一个有效的IP地址如果不是IP地址检查interfaceName是否是一个有效的子网如果是有效的子网获取该子网中的所有IP地址并将每个IP地址封装为InetSocketAddress对象添加到localAddrs列表中。如果既不是IP地址也不是子网假定它是一个域名通过DNS解析获取该域名的所有IP地址并将每个IP地址封装为InetSocketAddress对象添加到localAddrs列表中。
3.4、getRandomLocalInterfaceAddr
3.4.1、代码概述 这个方法的作用是从一组预先配置的本地接口地址 (localInterfaceAddrs 数组) 中随机选择一个地址并返回。
SocketAddress getRandomLocalInterfaceAddr() {if (localInterfaceAddrs.length 0) {return null;}final int idx r.nextInt(localInterfaceAddrs.length);final SocketAddress addr localInterfaceAddrs[idx];LOG.debug(Using local interface {}, addr);return addr;}
3.4.2、重点剖析
检查 localInterfaceAddrs 数组是否为空如果为空则返回 null。使用随机数生成器 r 生成一个随机索引 idx。获取并返回 localInterfaceAddrs 数组中对应索引 idx 的 SocketAddress 对象。在返回之前记录调试日志以便于跟踪选中的本地接口地址。
3.5、读写超时时间判定
3.5.1、代码概述 这段代码包含两个方法getDatanodeWriteTimeout 和 getDatanodeReadTimeout它们用于计算数据节点写入和读取的超时时间。每个方法都接收一个参数 numNodes表示数据节点的数量。
int getDatanodeWriteTimeout(int numNodes) {final int t dfsClientConf.getDatanodeSocketWriteTimeout();return t 0? t HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
}int getDatanodeReadTimeout(int numNodes) {final int t dfsClientConf.getSocketTimeout();return t 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes t: 0;
}
3.5.2、重点剖析
通过dfsclientconf获取写入\读取超时时间t如果t大于0则返回 t 加上一个扩展超时时间这个扩展超时时间是常量 HdfsConstants.WRITE_TIMEOUT_EXTENSION 乘以 numNodes数据节点数量如果t0则返回0
3.6、租约管理
3.6.1、代码概述 这段代码定义了三个方法getLeaseRenewer、beginFileLease 和 endFileLease用于管理HDFS中的文件租约。文件租约机制确保文件在写入过程中不会被其他客户端修改或删除。
public LeaseRenewer getLeaseRenewer() {return LeaseRenewer.getInstance(namenodeUri ! null ? namenodeUri.getAuthority() : null, ugi, this);}/** Get a lease and start automatic renewal */private void beginFileLease(final String key, final DFSOutputStream out) {synchronized (filesBeingWritten) {putFileBeingWritten(key, out);LeaseRenewer renewer getLeaseRenewer();boolean result renewer.put(this);if (!result) {// Existing LeaseRenewer cannot add another Daemon, so remove existing// and add new one.LeaseRenewer.remove(renewer);renewer getLeaseRenewer();renewer.put(this);}}}/** Stop renewal of lease for the file. */void endFileLease(final String renewLeaseKey) {synchronized (filesBeingWritten) {removeFileBeingWritten(renewLeaseKey);// remove client from renewer if no files are openif (filesBeingWritten.isEmpty()) {getLeaseRenewer().closeClient(this);}}}3.6.2、重点剖析
获取租约续约器getLeaseRenewer 方法返回一个 LeaseRenewer 实例用于管理租约的续约。 获取租约续约器调用 LeaseRenewer.getInstance 方法获取 LeaseRenewer 实例。如果 namenodeUri 不为空则使用其权限部分authority否则使用 null。ugi用户组信息和当前 DFSClient 实例this作为参数传递给 LeaseRenewer.getInstance开始文件租约beginFileLease 方法将文件添加到写入记录中并确保当前客户端的租约续约器能够处理该文件的续约。 使用 key 和 outDFSOutputStream 实例调用 putFileBeingWritten 方法记录正在写入的文件获取 LeaseRenewer 实例调用 renewer.put(this) 方法将当前客户端添加到租约续约器中如果返回结果为 false表示现有的 LeaseRenewer 不能添加新的守护线程则移除现有的 LeaseRenewer,获取新的 LeaseRenewer 实例并将当前客户端添加到新的 LeaseRenewer 中结束文件租约endFileLease 方法移除文件写入记录并在没有文件写入时关闭客户端的租约续约 使用 renewLeaseKey 调用 removeFileBeingWritten 方法从记录中移除正在写入的文件如果没有文件在写入filesBeingWritten 为空则获取 LeaseRenewer 实例调用 renewer.closeClient(this) 方法关闭当前客户端的租约续约。
todo未完待续