Spark源码研读-散篇记录(二):Spark内置RPC框架之TransportConf
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,1 Spark版本Spark 2.1.0。2 说明去年在网易之初,已经开发了一个完整的RPC框架,其中使用的核心技术也是Netty,所以当看到Spark的RPC框架时,并不觉得太陌生,关于个人开发的这
千家信息网最后更新 2025年01月24日Spark源码研读-散篇记录(二):Spark内置RPC框架之TransportConf
1 Spark版本
Spark 2.1.0。
2 说明
去年在网易之初,已经开发了一个完整的RPC框架,其中使用的核心技术也是Netty,所以当看到Spark的RPC框架时,并不觉得太陌生,关于个人开发的这个RPC框架,真正完全可用是在今年,明年会完善一下,开源出来,因为个人觉得弄得一个简单RPC框架的技术原理,对于大数据、分布式计算相关的知识,真的是帮助太大。
本篇说一下TransportContext、TransportConf、ConfigProvider、SparkTransportConf,也是仅仅作为个人的阅读记录。
TransportContext是创建RPC server和client的关键类,其中需要使用到的配置信息保存在TransportConf对象,TransportConf对象用于存储核心配置信息的对象为ConfigProvider,在实际使用中,一般使用SparkTransportConf来创建TransportConf对象,可以说,SparkTransportConf通过ConfigProvider对象将SparkConf和TransportConf连接了起来,所以实际上,在TransportConf对象中,是可以读取到SparkConf的配置信息的。
3 源码
依然是在关键地方加了个人的注释,有些地方英文注释本身已经说得很明白了,就不加注释了。
3.1 TransportConf
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.spark.network.util;import com.google.common.primitives.Ints;/** * A central location that tracks all the settings we expose to users. */public class TransportConf { static { // Set this due to Netty PR #5661 for Netty 4.0.37+ to work System.setProperty("io.netty.maxDirectMemory", "0"); } private final String SPARK_NETWORK_IO_MODE_KEY; private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY; private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY; private final String SPARK_NETWORK_IO_BACKLOG_KEY; private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY; private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY; private final String SPARK_NETWORK_IO_CLIENTTHREADS_KEY; private final String SPARK_NETWORK_IO_RECEIVEBUFFER_KEY; private final String SPARK_NETWORK_IO_SENDBUFFER_KEY; private final String SPARK_NETWORK_SASL_TIMEOUT_KEY; private final String SPARK_NETWORK_IO_MAXRETRIES_KEY; private final String SPARK_NETWORK_IO_RETRYWAIT_KEY; private final String SPARK_NETWORK_IO_LAZYFD_KEY; private final ConfigProvider conf; // 配置提供者 private final String module; // 配置的模块名称 public TransportConf(String module, ConfigProvider conf) { this.module = module; this.conf = conf; SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode"); SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs"); SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout"); SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog"); SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY = getConfKey("io.numConnectionsPerPeer"); SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads"); SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads"); SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer"); SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer"); SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout"); SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries"); SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait"); SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD"); } public int getInt(String name, int defaultValue) { return conf.getInt(name, defaultValue); } private String getConfKey(String suffix) { return "spark." + module + "." + suffix; } /** IO mode: nio or epoll */ public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(); } /** If true, we will prefer allocating off-heap byte buffers within Netty. */ public boolean preferDirectBufs() { return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true); } /** Connect timeout in milliseconds. Default 120 secs. */ public int connectionTimeoutMs() { long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec( conf.get("spark.network.timeout", "120s")); long defaultTimeoutMs = JavaUtils.timeStringAsSec( conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000; return (int) defaultTimeoutMs; } /** Number of concurrent connections between two nodes for fetching data. */ public int numConnectionsPerPeer() { return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1); } /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */ public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, -1); } /** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */ public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); } /** Number of threads used in the client thread pool. Default to 0, which is 2x#cores. */ public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); } /** * Receive buffer size (SO_RCVBUF). * Note: the optimal size for receive buffer and send buffer should be * latency * network_bandwidth. * Assuming latency = 1ms, network_bandwidth = 10Gbps * buffer size should be ~ 1.25MB */ public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); } /** Send buffer size (SO_SNDBUF). */ public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); } /** Timeout for a single round trip of SASL token exchange, in milliseconds. */ public int saslRTTimeoutMs() { return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_SASL_TIMEOUT_KEY, "30s")) * 1000; } /** * Max number of times we will try IO exceptions (such as connection timeouts) per request. * If set to 0, we will not do any retries. */ public int maxIORetries() { return conf.getInt(SPARK_NETWORK_IO_MAXRETRIES_KEY, 3); } /** * Time (in milliseconds) that we will wait in order to perform a retry after an IOException. * Only relevant if maxIORetries > 0. */ public int ioRetryWaitTimeMs() { return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_IO_RETRYWAIT_KEY, "5s")) * 1000; } /** * Minimum size of a block that we should start using memory map rather than reading in through * normal IO operations. This prevents Spark from memory mapping very small blocks. In general, * memory mapping has high overhead for blocks close to or below the page size of the OS. */ public int memoryMapBytes() { return Ints.checkedCast(JavaUtils.byteStringAsBytes( conf.get("spark.storage.memoryMapThreshold", "2m"))); } /** * Whether to initialize FileDescriptor lazily or not. If true, file descriptors are * created only when data is going to be transferred. This can reduce the number of open files. */ public boolean lazyFileDescriptor() { return conf.getBoolean(SPARK_NETWORK_IO_LAZYFD_KEY, true); } /** * Maximum number of retries when binding to a port before giving up. */ public int portMaxRetries() { return conf.getInt("spark.port.maxRetries", 16); } /** * Maximum number of bytes to be encrypted at a time when SASL encryption is enabled. */ public int maxSaslEncryptedBlockSize() { return Ints.checkedCast(JavaUtils.byteStringAsBytes( conf.get("spark.network.sasl.maxEncryptedBlockSize", "64k"))); } /** * Whether the server should enforce encryption on SASL-authenticated connections. */ public boolean saslServerAlwaysEncrypt() { return conf.getBoolean("spark.network.sasl.serverAlwaysEncrypt", false); }}
3.2 ConfigProvider
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.spark.network.util;import java.util.NoSuchElementException;/** * Provides a mechanism for constructing a {@link TransportConf} using some sort of configuration. * Leaf Note: * 提供一种使用某些配置的方式去构造一个TransportConf对象,其实什么意思呢? * 看到其提供了抽象get方法,该类中所有非抽象方法最终都是调用该抽象方法的,所以显然在构造ConfigProvider对象时, * 就可以重载get(String name)方法,如何重载?它的返回值使用SparkConf的get()方法fuc获取SparkConf对象的配置 * 信息就可以了,查看SparkTransportConf,正是这样来使用ConfigProvider的 */public abstract class ConfigProvider { /** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. */ public abstract String get(String name); public String get(String name, String defaultValue) { try { return get(name); } catch (NoSuchElementException e) { return defaultValue; } } public int getInt(String name, int defaultValue) { return Integer.parseInt(get(name, Integer.toString(defaultValue))); } public long getLong(String name, long defaultValue) { return Long.parseLong(get(name, Long.toString(defaultValue))); } public double getDouble(String name, double defaultValue) { return Double.parseDouble(get(name, Double.toString(defaultValue))); } public boolean getBoolean(String name, boolean defaultValue) { return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue))); }}
3.3 SparkTransportConf
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.spark.network.nettyimport org.apache.spark.SparkConfimport org.apache.spark.network.util.{ConfigProvider, TransportConf}/** * Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor, * Driver, or a standalone shuffle service) into a TransportConf with details on our environment * like the number of cores that are allocated to this JVM. * Leaf Note: * 一般创建TransportConf是通过SparkTransportConf来进行创建的, * SparkTransportConf一个很重要的作用是,将SparkConf与TransportConf连接起来,那怎么做到的? * 那就是使用SparkConf的get方法去代理实现ConfigProvider的抽象get方法,而恰恰TransportConf * 中有一个ConfigProvider的属性 */object SparkTransportConf { /** * Specifies an upper bound on the number of Netty threads that Spark requires by default. * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes * at a premium. * * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory * allocation. It can be overridden by setting the number of serverThreads and clientThreads * manually in Spark's configuration. */ privateval MAX_DEFAULT_NETTY_THREADS = 8 /** * Utility for creating a [[TransportConf]] from a [[SparkConf]]. * @param _conf the [[SparkConf]] * @param module the module name * @param numUsableCores if nonzero, this will restrict the server and client threads to only * use the given number of cores, rather than all of the machine's cores. * This restriction will only occur if these properties are not already set. */ def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = { val conf = _conf.clone // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily // assuming we have all the machine's cores). // NB: Only set if serverThreads/clientThreads not already set. val numThreads = defaultNumThreads(numUsableCores) conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString) conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString) new TransportConf(module, new ConfigProvider { override def get(name: String): String = conf.get(name) }) } /** * Returns the default number of threads for both the Netty client and server thread pools. * If numUsableCores is 0, we will use Runtime get an approximate number of available cores. */ private def defaultNumThreads(numUsableCores: Int): Int = { val availableCores = if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors() math.min(availableCores, MAX_DEFAULT_NETTY_THREADS) }}
对象
方法
配置
框架
个人
信息
关键
地方
实际
技术
是在
核心
注释
开发
源码
重要
陌生
作用
分布式
原理
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
东营陶瓷软件开发价格
服务器的启动规程
怎样访问linux服务器
自己电脑做数据服务器
数据库模型可以分为哪几类
山大网络安全先进单位
数据库管理与开发复习题
云引擎 云服务器
lol总断开聊天服务器
沈阳警示教育展馆软件开发
大学生网络安全宣传创意短片
政采云网络安全审查
阿里开源的数据库连接池
camera数据库
数据库日期函数
ctf 国际网络安全
软件开发 发票单位
计算机网络技术员是做什么
为什么有网却不能联接服务器
上海物流软件开发机构
服务器插上usb鼠标没反应
网络安全装备英文
数据库名称可以随意命名对不对
dbf 数据库文件管理器
ibm免费云服务器测评
查看远程服务器的端口
对dba加密的数据库
数据库日期函数
软件开发的资源文件
慈溪最火软件开发价格