千家信息网

【总结】Spark优化(1)-多Job并发执行

发表于:2024-11-20 作者:千家信息网编辑
千家信息网最后更新 2024年11月20日,Spark程序中一个Job的触发是通过一个Action算子,比如count(), saveAsTextFile()等在这次Spark优化测试中,从Hive中读取数据,将其另外保存四份,
千家信息网最后更新 2024年11月20日【总结】Spark优化(1)-多Job并发执行

Spark程序中一个Job的触发是通过一个Action算子,比如count(), saveAsTextFile()等

在这次Spark优化测试中,从Hive中读取数据,将其另外保存四份,其中两个Job采用串行方式,另外两个Job采用并行方式。将任务提交到Yarn中执行。能够明显看出串行与兵线处理的性能。


每个Job执行时间:

JobID开始时间结束时间耗时
Job 016:59:4517:00:3449s
Job 117:00:3417:01:1339s
Job 217:01:1517:01:55
40s
Job 317:01:1617:02:1256s

四个Job都是自执行相同操作,Job0,Job1一组采用串行方式,Job2,Job3采用并行方式。

Job0,Job1串行方式耗时等于两个Job耗时之和 49s+39s=88s

Job2,Job3并行方式耗时等于最先开始和最后结束时间只差17:02:12-17:01:15=57s


代码:

package com.cn.ctripotb;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.hive.HiveContext;import java.util.*;import java.util.concurrent.Callable;import java.util.concurrent.Executors;/** * Created by Administrator on 2016/9/12. */public class HotelTest {    static ResourceBundle rb = ResourceBundle.getBundle("filepath");    public static void main(String[] args) {        SparkConf conf = new SparkConf()                .setAppName("MultiJobWithThread")                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");        JavaSparkContext sc = new JavaSparkContext(conf);        HiveContext hiveContext = new HiveContext(sc.sc());  //测试真实数据时要把这里放开        final DataFrame df = getHotelInfo(hiveContext);        //没有多线程处理的情况,连续执行两个Action操作,生成两个Job        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file1",com.hadoop.compression.lzo.LzopCodec.class);        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file2",com.hadoop.compression.lzo.LzopCodec.class);        //用Executor实现多线程方式处理Job        java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2);        executorService.submit(new Callable() {            @Override            public Void call(){                df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class);                return null;            }        });        executorService.submit(new Callable() {            @Override            public Void call(){                df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class);                return null;            }        });        executorService.shutdown();    }    public static DataFrame getHotelInfo(HiveContext hiveContext){        String sql = "select * from common.dict_hotel_ol";        return  hiveContext.sql(sql);    }}


0