千家信息网

HDFS中FileSystem是什么类

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,小编给大家分享一下HDFS中FileSystem是什么类,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!首先来看一下, Fi
千家信息网最后更新 2025年01月23日HDFS中FileSystem是什么类

小编给大家分享一下HDFS中FileSystem是什么类,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

首先来看一下, FileSystem(org.apache.hadoop.fs.FileSystem), 这是一个抽象类, 是所有文件系统的父类.

而我们要从HDFS(Hadoop Distributed FileSystem)下载数据, 应该获取一个DistributedFileSystem的实例,那么如何获取一个DistributedFileSystem的实例呢?

FileSystem fs = FileSystem.get(new Configuration());

在FileSystem中有3个重载的get()方法

// 1.通过配置文件获取一个FileSystem实例public static FileSystem get(Configuration conf)// 2.通过指定的FileSystem的URI, 配置文件获取一个FileSystem实例public static FileSystem get(URI uri, Configuration conf)// 3.通过指定的FileSystem的URI, 配置文件, FileSystem用户名获取一个FileSystem实例public static FileSystem get(final URI uri, final Configuration conf, final String user)

先调用FileSystem.get(Configuration conf)方法,再调用重载方法FileSystem.get(URI uri, Configuration conf)

public static FileSystem get(URI uri, Configuration conf) throws IOException {    // schem是FileSystem具体的URI方案如: file, hdfs, Webhdfs, har等等    String scheme = uri.getScheme();    // scheme = hdfs    // authority是NameNode的主机名, 端口号    String authority = uri.getAuthority();    // authority = node1:9000    ...    // disableCacheName = fs.hdfs.impl.disable.cache    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);        // 读取配置文件, 判断是否禁用缓存    if (conf.getBoolean(disableCacheName, false)) {    // 若禁用缓存      return createFileSystem(uri, conf);    // 直接调用创建FileSystem实例的方法    }    // 不禁用缓存, 先从FileSystem的静态成员变量CACHE中获取FileSystem实例    return CACHE.get(uri, conf);}

再调用FileSystem$Cache.get(URI uri, Configuration conf)方法(Cache是FileSystem的静态内部类)

FileSystem get(URI uri, Configuration conf) throws IOException{      Key key = new Key(uri, conf);    // key = (root (auth:SIMPLE))@hdfs://node1:9000      return getInternal(uri, conf, key);}

再调用FileSystem$Cache.getInternal(URI uri, Configuration conf, FileSystem$Cache$Key key)方法(Key又是Cache的静态内部类)

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{      FileSystem fs;      synchronized (this) {        // map是Cache中用来缓存FileSystem实例的成员变量, 其类型为HashMap        fs = map.get(key);      }      if (fs != null) {    // 如果从缓存map中获取到了相应的FileSystem实例        return fs;    // 则返回这个实例      }      // 否则, 调用FileSystem.createFileSystem(URI uri, Configuration conf)方法, 创建FileSystem实例      fs = createFileSystem(uri, conf);      /* 分割线1, 期待着createFileSystem()方法的返回 */      synchronized (this) { // refetch the lock again        /*         * 在多线程环境下, 可能另一个客户端(另一个线程)创建好了一个DistributedFileSystem实例, 并缓存到了map中         * 所以, 这时候就把当前客户端新创建的DistributedFileSystem实例注销         * 其实这是一个特殊的单例模式, 一个key映射一个DistributedFileSystem实例         */        FileSystem oldfs = map.get(key);        if (oldfs != null) { // a file system is created while lock is releasing          fs.close(); // close the new file system          return oldfs;  // return the old file system        }        /*         * now insert the new file system into the map         * 缓存当前新创建的DistributedFileSystem实例到map中         */        fs.key = key;        map.put(key, fs);        ...        return fs;      }}

来自分割线1, 先调用FileSystem.createFileSystem(URI uri, Configuration conf)方法

private static FileSystem createFileSystem(URI uri, Configuration conf      ) throws IOException {    // 通过读取配置文件, 获取FileSystem具体的URI模式: hdfs的类对象    Class clazz = getFileSystemClass(uri.getScheme(), conf); // clazz = org.apache.hadoop.hdfs.DistributedFileSystem    ...    // 反射出一个DistributedFileSystem实例    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);    // 对DistributedFileSystem实例初始化    fs.initialize(uri, conf);    return fs;}

在调用DistributedFileSystem.initialize(URI uri, Configuration conf)方法之前, 先来看一下DistributedFileSystem类吧.

DistributedFileSystem是抽象类FileSystem的子类实现,

public class DistributedFileSystem extends FileSystem {  ...  DFSClient dfs;    // DistributedFileSystem持有一个DFSClient类型的成员变量dfs, 最重要的成员变量!  ...}

调用DistributedFileSystem.initialize(URI uri, Configuration conf)方法

public void initialize(URI uri, Configuration conf) throws IOException {    ...    // new一个DFSClient实例, 成员变量dfs引用这个DFSClient实例    this.dfs = new DFSClient(uri, conf, statistics );    /* 分割线2, 期待着new DFSClient()的返回 */    ...}

在new DFSClient实例之前, 先来看一下DFSClient类吧! 看一下到底要为哪些成员变量赋值

public class DFSClient implements java.io.Closeable, RemotePeerFactory {  ...  final ClientProtocol namenode;    //DFSClient持有一个ClientProtocol类型的成员变量namenode, 一个RPC代理对象  /* The service used for delegation tokens */  private Text dtService;  ...}

来自分割线2, 调用DFSClient的构造函数DFSClient(URI nameNodeUri, Configuration conf, FileSystem$Statistics statistics), 再调用重载构造函数DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem$Statistics statistics)

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf,     FileSystem.Statistics stats) throws IOException {    ...    NameNodeProxies.ProxyAndInfo proxyInfo = null;    if (numResponseToDrop > 0) {    // numResponseToDrop = 0      // This case is used for testing.      LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY          + " is set to " + numResponseToDrop          + ", this hacked client will proactively drop responses");      proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,          nameNodeUri, ClientProtocol.class, numResponseToDrop);    }        if (proxyInfo != null) { // proxyInfo = null      this.dtService = proxyInfo.getDelegationTokenService();      this.namenode = proxyInfo.getProxy();    } else if (rpcNamenode != null) { // rpcNamenode = null      // This case is used for testing.      Preconditions.checkArgument(nameNodeUri == null);      this.namenode = rpcNamenode;      dtService = null;    } else {    // 前面两个if只在测试的情况下成立, 这个else的代码块才是重点      ...      /*       * 创建一个NameNodeProxies.ProxyAndInfo类型的对象, proxyInfo引用这个对象        * createProxy(conf, nameNodeUri, ClientProtocol.class)方法是不是和RPC.getProxy(Class protocol,       * long clientVersion, InetSocketAddress addr, Configuration conf)很像?       * 没错! 你没看错! 这说明createProxy()方法内部一定会调用RPC的相关方法       * conf    都是Configuration类型的conf       * nameNodeUri = hdfs://node1:9000    这不就是InetSocketAddress类型的addr的hostName和port       * ClientProtocol.class    都是RPC protocol接口的类对象       * ClientProtocol is used by user code via DistributedFileSystem class to communicate        * with the NameNode       * ClientProtocol是DistributedFileSystem用来与NameNode通信的       */      proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);      /* 分割线3, 期待着createProxy()方法的返回 */      this.dtService = proxyInfo.getDelegationTokenService();      this.namenode = proxyInfo.getProxy();    }    ...}

来自分割线3, 调用NameNodeProxies.createProxy(Configuration conf, URI nameNodeUri, Class xface)方法

/**   * Creates the namenode proxy with the passed protocol. This will handle   * creation of either HA- or non-HA-enabled proxy objects, depending upon   * if the provided URI is a configured logical URI.   * 通过传过来的protocol参数, 创建namenode的代理对象. 至于是HA还是非HA的namenode代理对象,    * 这取决于实际搭建的Hadoop环境   **/public static  ProxyAndInfo createProxy(Configuration conf, URI nameNodeUri, Class xface)    throws IOException {    // 获取Hadoop实际环境中HA的配置    Class> failoverProxyProviderClass =        getFailoverProxyProviderClass(conf, nameNodeUri, xface);    if (failoverProxyProviderClass == null) {    // 非HA,这里是Hadoop的伪分布式搭建      // Non-HA case, 创建一个非HA的namenode代理对象      return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,          UserGroupInformation.getCurrentUser(), true);    } else {    // HA      // HA case      FailoverProxyProvider failoverProxyProvider = NameNodeProxies          .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,              nameNodeUri);      Conf config = new Conf(conf);      T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,          RetryPolicies.failoverOnNetworkException(              RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,              config.maxRetryAttempts, config.failoverSleepBaseMillis,              config.failoverSleepMaxMillis));            Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);      // 返回一个proxy, dtService的封装对象proxyInfo      return new ProxyAndInfo(proxy, dtService);    }}

调用NameNodeProxies.createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries)方法

public static  ProxyAndInfo createNonHAProxy(Configuration conf, InetSocketAddress nnAddr,    Class xface, UserGroupInformation ugi, boolean withRetries) throws IOException {    Text dtService = SecurityUtil.buildTokenService(nnAddr);    //dtService = 192.168.8.101:9000    T proxy;    if (xface == ClientProtocol.class) {    // xface = ClientProtocol.class      // 创建一个namenode代理对象      proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi, withRetries);      /* 分割线4, 期待着createNNProxyWithClientProtocol()方法返回 */    } else if {      ...    }    // 把proxy, dtService封装成一个ProxyAndInfo对象, 并返回    return new ProxyAndInfo(proxy, dtService);  }

以上是"HDFS中FileSystem是什么类"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0