千家信息网

spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,学习spark任何的技术之前,请正确理解spark,可以参考:正确理解sparkRDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用sca
千家信息网最后更新 2025年01月25日spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

学习spark任何的技术之前,请正确理解spark,可以参考:正确理解spark


RDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用scala api,我们先自己简单的实现一个scala版本和java版本的RDD和SparkContext


一、简单实现scala版本的RDD和SparkContext

class RDD[T](value: Seq[T]) {  //RDD的map操作  def map[U](f: T => U): RDD[U] = {    new RDD(value.map(f))  }    def iterator[T] = value.iterator  }class SparkContext {  //创建一个RDD  def createRDD(): RDD[Integer] = new RDD[Integer](Seq(1, 2, 3))}


二、简单实现java版本的RDD和SparkContext

//这个时java中的一个接口//我们可以将scala中的map需要的函数其实就是对应着java中的一个接口package com.twq.javaapi.java7.function;public interface Function extends Serializable {  R call(T1 v1) throws Exception;}//这边实现的java版的RDD和SparkContext其实还是用scala代码实现,只不过这些scala代码可以被java代码调用了import java.util.{Iterator => JIterator}import scala.collection.JavaConverters._import com.twq.javaapi.java7.function.{Function => JFunction}//每一个JavaRDD都会含有一个scala的RDD,用于调用该RDD的apiclass JavaRDD[T](val rdd: RDD[T]) {  def map[R](f: JFunction[T, R]): JavaRDD[R] =    //这里是关键,调用scala RDD中的map方法    //我们将java的接口构造成scala RDD的map需要的函数函数    new JavaRDD(rdd.map(x => f.call(x)))  //我们需要将scala的Iterator转成java版的Iterator  def iterator: JIterator[T] = rdd.iterator.asJava}//每个JavaSparkContext含有一个scala版本的SparkContextclass JavaSparkContext(sc: SparkContext) {  def this() = this(new SparkContext())  //转调scala版本的SparkContext来实现JavaSparkContext的功能  def createRDD(): JavaRDD[Integer] = new JavaRDD[Integer](sc.createRDD())}

三、写java代码调用rdd java api

package com.twq.javaapi.java7;import com.twq.javaapi.java7.function.Function;import com.twq.rdd.api.JavaRDD;import com.twq.rdd.api.JavaSparkContext;import java.util.Iterator;/** * Created by tangweiqun on 2017/9/16. */public class SelfImplJavaRDDTest {    public static void main(String[] args) {        //初始化JavaSparkContext        JavaSparkContext jsc = new JavaSparkContext();        //调用JavaSparkContext的api创建一个RDD        JavaRDD firstRDD = jsc.createRDD();        //对创建好的firstRDD应用JavaRDD中的map操作        JavaRDD strRDD = firstRDD.map(new Function() {            @Override            public String call(Integer v1) throws Exception {                return v1 + "test";            }        });        //将得到的RDD的结果打印,结果为        //1test        //2test        //3test        Iterator result = strRDD.iterator();        while (result.hasNext()) {            System.out.println(result.next());        }    }}


以上就是RDD java api调用scala api的实现原理,虽然只举了map操作,但是其他的类似于flatMap操作的实现都是类似的


接下来可以详细了解RDD java的每一个api


我们可以spark core RDD api来详细理解scala中的每一个api。。。


系统学习spark:
1、[老汤] Spark 2.x 之精讲Spark Core:https://edu.51cto.com/sd/88429
2、[老汤]Spark 2.x 之精讲Spark SQL专题:https://edu.51cto.com/sd/16f3d
3、[老汤]Scala内功修炼系列专题:https://edu.51cto.com/sd/8e85b
4、[老汤]Spark 2.x之精讲Spark Streamig:https://edu.51cto.com/sd/8c525
5、[老汤]Spark 2.x精讲套餐:https://edu.51cto.com/sd/ff9a4
6、从Scala到Spark 2.x专题:https://edu.51cto.com/sd/d72af

0