千家信息网

Spark源码研读-散篇记录(二):Spark内置RPC框架之TransportConf

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,1 Spark版本Spark 2.1.0。2 说明去年在网易之初,已经开发了一个完整的RPC框架,其中使用的核心技术也是Netty,所以当看到Spark的RPC框架时,并不觉得太陌生,关于个人开发的这
千家信息网最后更新 2025年01月24日Spark源码研读-散篇记录(二):Spark内置RPC框架之TransportConf

1 Spark版本

Spark 2.1.0。

2 说明

去年在网易之初,已经开发了一个完整的RPC框架,其中使用的核心技术也是Netty,所以当看到Spark的RPC框架时,并不觉得太陌生,关于个人开发的这个RPC框架,真正完全可用是在今年,明年会完善一下,开源出来,因为个人觉得弄得一个简单RPC框架的技术原理,对于大数据、分布式计算相关的知识,真的是帮助太大。
本篇说一下TransportContext、TransportConf、ConfigProvider、SparkTransportConf,也是仅仅作为个人的阅读记录。
TransportContext是创建RPC server和client的关键类,其中需要使用到的配置信息保存在TransportConf对象,TransportConf对象用于存储核心配置信息的对象为ConfigProvider,在实际使用中,一般使用SparkTransportConf来创建TransportConf对象,可以说,SparkTransportConf通过ConfigProvider对象将SparkConf和TransportConf连接了起来,所以实际上,在TransportConf对象中,是可以读取到SparkConf的配置信息的。

3 源码

依然是在关键地方加了个人的注释,有些地方英文注释本身已经说得很明白了,就不加注释了。

3.1 TransportConf

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.spark.network.util;import com.google.common.primitives.Ints;/** * A central location that tracks all the settings we expose to users. */public class TransportConf {  static {    // Set this due to Netty PR #5661 for Netty 4.0.37+ to work    System.setProperty("io.netty.maxDirectMemory", "0");  }  private final String SPARK_NETWORK_IO_MODE_KEY;  private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;  private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY;  private final String SPARK_NETWORK_IO_BACKLOG_KEY;  private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY;  private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY;  private final String SPARK_NETWORK_IO_CLIENTTHREADS_KEY;  private final String SPARK_NETWORK_IO_RECEIVEBUFFER_KEY;  private final String SPARK_NETWORK_IO_SENDBUFFER_KEY;  private final String SPARK_NETWORK_SASL_TIMEOUT_KEY;  private final String SPARK_NETWORK_IO_MAXRETRIES_KEY;  private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;  private final String SPARK_NETWORK_IO_LAZYFD_KEY;  private final ConfigProvider conf;  // 配置提供者  private final String module;        // 配置的模块名称  public TransportConf(String module, ConfigProvider conf) {    this.module = module;    this.conf = conf;    SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");    SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");    SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");    SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");    SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY =  getConfKey("io.numConnectionsPerPeer");    SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");    SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads");    SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer");    SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer");    SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout");    SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries");    SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");    SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");  }  public int getInt(String name, int defaultValue) {    return conf.getInt(name, defaultValue);  }  private String getConfKey(String suffix) {    return "spark." + module + "." + suffix;  }  /** IO mode: nio or epoll */  public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(); }  /** If true, we will prefer allocating off-heap byte buffers within Netty. */  public boolean preferDirectBufs() {    return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);  }  /** Connect timeout in milliseconds. Default 120 secs. */  public int connectionTimeoutMs() {    long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(      conf.get("spark.network.timeout", "120s"));    long defaultTimeoutMs = JavaUtils.timeStringAsSec(      conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;    return (int) defaultTimeoutMs;  }  /** Number of concurrent connections between two nodes for fetching data. */  public int numConnectionsPerPeer() {    return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);  }  /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */  public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, -1); }  /** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */  public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }  /** Number of threads used in the client thread pool. Default to 0, which is 2x#cores. */  public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }  /**   * Receive buffer size (SO_RCVBUF).   * Note: the optimal size for receive buffer and send buffer should be   *  latency * network_bandwidth.   * Assuming latency = 1ms, network_bandwidth = 10Gbps   *  buffer size should be ~ 1.25MB   */  public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }  /** Send buffer size (SO_SNDBUF). */  public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }  /** Timeout for a single round trip of SASL token exchange, in milliseconds. */  public int saslRTTimeoutMs() {    return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_SASL_TIMEOUT_KEY, "30s")) * 1000;  }  /**   * Max number of times we will try IO exceptions (such as connection timeouts) per request.   * If set to 0, we will not do any retries.   */  public int maxIORetries() { return conf.getInt(SPARK_NETWORK_IO_MAXRETRIES_KEY, 3); }  /**   * Time (in milliseconds) that we will wait in order to perform a retry after an IOException.   * Only relevant if maxIORetries > 0.   */  public int ioRetryWaitTimeMs() {    return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_IO_RETRYWAIT_KEY, "5s")) * 1000;  }  /**   * Minimum size of a block that we should start using memory map rather than reading in through   * normal IO operations. This prevents Spark from memory mapping very small blocks. In general,   * memory mapping has high overhead for blocks close to or below the page size of the OS.   */  public int memoryMapBytes() {    return Ints.checkedCast(JavaUtils.byteStringAsBytes(      conf.get("spark.storage.memoryMapThreshold", "2m")));  }  /**   * Whether to initialize FileDescriptor lazily or not. If true, file descriptors are   * created only when data is going to be transferred. This can reduce the number of open files.   */  public boolean lazyFileDescriptor() {    return conf.getBoolean(SPARK_NETWORK_IO_LAZYFD_KEY, true);  }  /**   * Maximum number of retries when binding to a port before giving up.   */  public int portMaxRetries() {    return conf.getInt("spark.port.maxRetries", 16);  }  /**   * Maximum number of bytes to be encrypted at a time when SASL encryption is enabled.   */  public int maxSaslEncryptedBlockSize() {    return Ints.checkedCast(JavaUtils.byteStringAsBytes(      conf.get("spark.network.sasl.maxEncryptedBlockSize", "64k")));  }  /**   * Whether the server should enforce encryption on SASL-authenticated connections.   */  public boolean saslServerAlwaysEncrypt() {    return conf.getBoolean("spark.network.sasl.serverAlwaysEncrypt", false);  }}

3.2 ConfigProvider

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.spark.network.util;import java.util.NoSuchElementException;/** * Provides a mechanism for constructing a {@link TransportConf} using some sort of configuration. * Leaf Note: * 提供一种使用某些配置的方式去构造一个TransportConf对象,其实什么意思呢? * 看到其提供了抽象get方法,该类中所有非抽象方法最终都是调用该抽象方法的,所以显然在构造ConfigProvider对象时, * 就可以重载get(String name)方法,如何重载?它的返回值使用SparkConf的get()方法fuc获取SparkConf对象的配置 * 信息就可以了,查看SparkTransportConf,正是这样来使用ConfigProvider的 */public abstract class ConfigProvider {  /** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. */  public abstract String get(String name);  public String get(String name, String defaultValue) {    try {      return get(name);    } catch (NoSuchElementException e) {      return defaultValue;    }  }  public int getInt(String name, int defaultValue) {    return Integer.parseInt(get(name, Integer.toString(defaultValue)));  }  public long getLong(String name, long defaultValue) {    return Long.parseLong(get(name, Long.toString(defaultValue)));  }  public double getDouble(String name, double defaultValue) {    return Double.parseDouble(get(name, Double.toString(defaultValue)));  }  public boolean getBoolean(String name, boolean defaultValue) {    return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue)));  }}

3.3 SparkTransportConf

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.spark.network.nettyimport org.apache.spark.SparkConfimport org.apache.spark.network.util.{ConfigProvider, TransportConf}/** * Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor, * Driver, or a standalone shuffle service) into a TransportConf with details on our environment * like the number of cores that are allocated to this JVM.  * Leaf Note:  * 一般创建TransportConf是通过SparkTransportConf来进行创建的,  * SparkTransportConf一个很重要的作用是,将SparkConf与TransportConf连接起来,那怎么做到的?  * 那就是使用SparkConf的get方法去代理实现ConfigProvider的抽象get方法,而恰恰TransportConf  * 中有一个ConfigProvider的属性 */object SparkTransportConf {  /**   * Specifies an upper bound on the number of Netty threads that Spark requires by default.   * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core   * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes   * at a premium.   *   * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory   * allocation. It can be overridden by setting the number of serverThreads and clientThreads   * manually in Spark's configuration.   */  privateval MAX_DEFAULT_NETTY_THREADS = 8  /**   * Utility for creating a [[TransportConf]] from a [[SparkConf]].   * @param _conf the [[SparkConf]]   * @param module the module name   * @param numUsableCores if nonzero, this will restrict the server and client threads to only   *                       use the given number of cores, rather than all of the machine's cores.   *                       This restriction will only occur if these properties are not already set.   */  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {    val conf = _conf.clone    // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily    // assuming we have all the machine's cores).    // NB: Only set if serverThreads/clientThreads not already set.    val numThreads = defaultNumThreads(numUsableCores)    conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)    conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)    new TransportConf(module, new ConfigProvider {      override def get(name: String): String = conf.get(name)    })  }  /**   * Returns the default number of threads for both the Netty client and server thread pools.   * If numUsableCores is 0, we will use Runtime get an approximate number of available cores.   */  private def defaultNumThreads(numUsableCores: Int): Int = {    val availableCores =      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()    math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)  }}
0