千家信息网

Spark RDD的创建方式及算子的使用方法是什么

发表于:2025-01-26 作者:千家信息网编辑
千家信息网最后更新 2025年01月26日,这篇文章主要介绍"Spark RDD的创建方式及算子的使用方法是什么",在日常操作中,相信很多人在Spark RDD的创建方式及算子的使用方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的
千家信息网最后更新 2025年01月26日Spark RDD的创建方式及算子的使用方法是什么

这篇文章主要介绍"Spark RDD的创建方式及算子的使用方法是什么",在日常操作中,相信很多人在Spark RDD的创建方式及算子的使用方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Spark RDD的创建方式及算子的使用方法是什么"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

一:简单了解RDD和RDD处理数据

RDD,全称为Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。

RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。

RDD本质上是一个内存数据集,在访问RDD时,指针只会指向与操作相关的部分。例如存在一个面向列的数据结构,其中一个实现为Int的数组,另一个实现为Float的数组。如果只需要访问Int字段,RDD的指针可以只访问Int数组,避免了对整个数据结构的扫描。

RDD将操作分为两类:transformation与action。无论执行了多少次transformation操作,RDD都不会真正执行运算,只有当action操作被执行时,运算才会触发。而在RDD的内部实现机制中,底层接口则是基于迭代器的,从而使得数据访问变得更高效,也避免了大量中间结果对内存的消耗。

在实现时,RDD针对transformation操作,都提供了对应的继承自RDD的类型,例如map操作会返回MappedRDD,而flatMap则返回FlatMappedRDD。当我们执行map或flatMap操作时,不过是将当前RDD对象传递给对应的RDD对象而已。

注意:创建的Maven工程,以下是 pom.xml 中的依赖:

                                        junit                        junit                        4.12                                                        org.apache.spark                        spark-core_2.10                        1.6.1                                                        org.apache.hadoop                        hadoop-client                        2.6.4                                                        org.apache.spark                        spark-sql_2.10                        1.6.1                        

二:从Hadoop文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive,HBase)输出(HDFS)创建。

eg: 求HDFS文件中内容所有行数据长度及总长度。

public class TestRDD1 {                public static void main(String[] args) {                createRDDFromHDFS();        }                private static void createRDDFromHDFS(){                SparkConf conf = new SparkConf();                conf.set("spark.testing.memory", "269522560000");                JavaSparkContext sc = new JavaSparkContext("local", "Spark Test", conf);                System.out.println(  sc );                                JavaRDD rdd = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt");                                JavaRDD newRDD = rdd.map( new Function(){                        private static final long serialVersionUID = 1L;                        public Integer call(String string) throws Exception {                                System.out.println(  string + "  " + string.length() );                                return string.length();                        }                });                                System.out.println(   newRDD.count() );                                int length = newRDD.reduce( new Function2(){                        private static final long serialVersionUID = 1L;                        public Integer call(Integer int1, Integer int2) throws Exception {                                return int1+int2;                        }                                        });                                System.out.println("总和" + length);        }        }

三:通过parallelize或makeRDD将单击数据创建为分布式RDD。

eg:求总和。

public class TestRDD2 {                public static void main(String[] args) {                createRDDFromSuperRDD();        }        /**         * JavaSparkContext(String master, String appName, SparkConf conf)         * master - Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).         * appName - A name for your application, to display on the cluster web UI         * conf - a SparkConf object specifying other Spark parameters         * */        private static void createRDDFromSuperRDD(){                SparkConf conf = new SparkConf();                conf.set("spark.testing.memory", "269522560000");                JavaSparkContext sc = new JavaSparkContext("local", "Spark Test", conf);                System.out.println(  sc );                                List list = new ArrayList();                                for( int i=1;i<=10;i++){                        list.add(i);                }                                JavaRDD rdd = sc.parallelize(list);                                JavaRDD newRDD = rdd.map( new Function(){                        private static final long serialVersionUID = 1L;                        public Integer call(Integer int1) throws Exception {                                return int1;                        }                });                                int count = newRDD.reduce( new Function2(){                        private static final long serialVersionUID = 1L;                        public Integer call(Integer int1, Integer int2) throws Exception {                                return int1+int2;                        }                                        });                                System.out.println("总和" + count);        }        }

注意: 上述两段代码中,在获取 JavaSparkContext的时候,是这样写的:

SparkConf conf = new SparkConf();

conf.set("spark.testing.memory", "269522560000"); // 给jvm足够的资源。

JavaSparkContext sc = new JavaSparkContext("local", "Spark Test", conf);

而对于标记的加粗红色部分,参照API如下:

JavaSparkContext(String master, String appName, SparkConf conf)


-master - Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
-appName - A name for your application, to display on the cluster web UI
-conf - a SparkConf object specifying other Spark parameters

对于master,官网有详细的介绍:

我这里写的是 "local",表示的是:

对于本地模式测试和单元测试,可以通过"local"在spark内运行程序。

******************************

另外写的一段,对算子中一些基本方法的使用

参考学习:

RDD算子分类: http://my.oschina.net/gently/blog/686800 (自己的。)

public class TestRDD3 {                private static String appName = "Test Spark RDD";        private static String master = "local";        public static void main(String[] args) {                SparkConf conf = new SparkConf();                conf.set("spark.testing.memory", "269522560000");                JavaSparkContext sc = new JavaSparkContext(master, appName, conf);                System.out.println(  sc );                                List list = new ArrayList();                list.add( "Berg" );                list.add( "Hadoop" );                list.add( "HBase" );                list.add( "Hive" );                list.add( "Spark" );                                JavaRDD rdd =  sc.parallelize(list);                                JavaRDD newrdd = rdd.map( new Function(){                        private static final long serialVersionUID = 1L;                        public Integer call(String string) throws Exception {                                System.out.println(  string + "\t" +string.length() );                                return string.length();                        }                });                                Integer length = newrdd.reduce( new Function2() {                        private static final long serialVersionUID = 1L;                        public Integer call(Integer i1, Integer i2) throws Exception {                                return i1+i2;                        }                });                                long count = newrdd.count();                List listnewrdd = newrdd.collect();                for (Integer integer : listnewrdd) {                        System.out.print(integer + " \t" );                }                                System.out.println(  "\nlength --> " + length + "  " + count );                System.out.println( "\n\n**************************************\n\n");                                List list1 = new ArrayList();                for( int i=1; i<=5;i++){                        list1.add( i );                }                                JavaRDD rdd1 = sc.parallelize(list1);                JavaRDD unionrdd = newrdd.union(rdd1);                                JavaRDD rdd2 = unionrdd.map( new Function(){                        private static final long serialVersionUID = 1L;                        public Integer call(Integer i) throws Exception {                                return i;                        }                });                                long count2 = rdd2.reduce( new Function2() {                        private static final long serialVersionUID = 1L;                        public Integer call(Integer arg0, Integer arg1) throws Exception {                                return arg0 + arg1;                        }                });                                System.out.println("count2 --> " +count2 );                                rdd2.foreach( new VoidFunction(){                        private static final long serialVersionUID = 1L;                        public void call(Integer arg0) throws Exception {                                System.out.println(  "foreach--> " + arg0 );                        }                                        });        }}

到此,关于"Spark RDD的创建方式及算子的使用方法是什么"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

数据 方法 算子 学习 使用方法 方式 内存 总和 数据结构 数组 结构 部分 分布式 对象 指针 文件 更多 系统 长度 e.g. 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络技术与数据库管理 武大网络安全夏令营名单 ajax跨服务器访问 黑客文化于网络安全章节测试答案 惠普服务器开机显示器不显示 网络安全领域涉及网络信息的 多举措网络安全 数据库查询学校总人数 查询数据库中有多少表格 服务器 安全防御 织梦初始化数据库 服务器带外管理和带内管理的区别 国家网络安全宣传周折页 怎样提高数据库的安全 m1电脑适合软件开发吗 服务器发送短信 resset数据库的优势 上海蒂姆维澳网络技术有限公司 网络安全事件调查取证教程 互联网科技创新利与弊 华为云服务器免费6个月 西安电子科大 网络安全学院 数据库的处理与应用 网络技术发开属于哪类行业 cf连接服务器没反应 南京erp软件开发费用多少钱 3306数据库是什么 小学三年级网络安全教育主题教案 软件开发哪个城市好找工作吗 命令模式下如何构建FTP服务器
0