千家信息网

第19课:Spark高级排序彻底解密

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,本节课内容:1、基础排序算法实战2、二次排序算法实战3、更高级别排序算法4、排序算法内幕解密排序在Spark运用程序中使用的比较多,且维度也不一样,如二次排序,三次排序等,在机器学习算法中经常碰到,所
千家信息网最后更新 2025年02月01日第19课:Spark高级排序彻底解密

本节课内容:

1、基础排序算法实战

2、二次排序算法实战

3、更高级别排序算法

4、排序算法内幕解密


排序在Spark运用程序中使用的比较多,且维度也不一样,如二次排序,三次排序等,在机器学习算法中经常碰到,所以非常重要,必须掌握!

所谓二次排序,就是根据两列值进行排序,如下测试数据:

2 3

4 1

3 2

4 3

8 7

2 1

经过二次排序后的结果(升序):

2 1

2 3

3 2

4 1

4 3

8 7

在编写二次排序代码前,先简单的写下单个key排序的代码:

val conf = new SparkConf().setAppName("SortByKey").setMaster("local")

val sc = new SparkContext(conf)

val lines = sc.textFile("C:\\User\\Test.txt")

valwords = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

val wordcount = words.map(word=>(word._2,word._1)).sortByKey(false).map(word=>(word._2,word._1))

wordcount.collect().foreach(println)

以上就是简单的wordcount程序,程序中使用了sortByKey排序


下面我们通过代码实现二次排序算法

首先我们先通过Java代码实现上面测试数据进行二次排序

排序最主要的就是Key的准备,我们先用Java编写二次排序的key,参考代码如下:


import java.io.Serializable;

import scala.math.Ordered;

public class SecondarySortKey implements Ordered, Serializable {

private int first;

private int second;

@Override

public int hashCode() {

final int prime = 31;

int result = 1;

result = prime * result + first;

result = prime * result + second;

return result;

}

@Override

public boolean equals(Object obj) {

if (this == obj)

return true;

if (obj == null)

return false;

if (getClass() != obj.getClass())

return false;

SecondarySortKey other = (SecondarySortKey) obj;

if (first != other.first)

return false;

if (second != other.second)

return false;

return true;

}

public int getFirst() {

return first;

}

public void setFirst(int first) {

this.first = first;

}

public int getSecond() {

return second;

}

public void setSecond(int second) {

this.second = second;

}

public SecondarySortKey(int first, int second) {

this.first = first;

this.second = second;

}

public boolean $greater(SecondarySortKey other) {

if (this.first > other.getFirst()) {

return true;

} else if (this.first == other.getFirst() && this.second > other.getSecond()) {

return true;

}

return false;

}

public boolean $greater$eq(SecondarySortKey other) {

if (this.$greater(other)) {

return true;

} else if (this.first == other.getFirst() && this.second == other.getSecond()) {

return true;

}

return false;

}

public boolean $less(SecondarySortKey other) {

if (this.first < other.getFirst()) {

return true;

} else if (this.first == other.getFirst() && this.second < other.getSecond()) {

return true;

}

return false;

}

public boolean $less$eq(SecondarySortKey other) {

if (this.$less(other)) {

return true;

} else if (this.first == other.getFirst() && this.second < other.getSecond()) {

return true;

}

return false;

}

public int compare(SecondarySortKey other) {

if (this.first - other.getFirst() != 0) {

return this.first - other.getFirst();

} else {

return this.second - other.getSecond();

}

}

public int compareTo(SecondarySortKey other) {

if (this.first - other.getFirst() != 0) {

return this.first - other.getFirst();

} else {

return this.second - other.getSecond();

}

}

根据上面生成的排序key编写对测试数据的二次排序

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**

* DT_Spark大数据梦工厂

* 二次排序,具体的实现步骤:

* 第一步:按照Ordered和Serializable接口实现自定义排序的key

* 第二步:将要进行二次排序的文件加载进来生成类型的RDD

* 第三步:使用sortByKey基于自定义的Key进行二次排序

* 第四步:去除掉排序的Key,只保留排序的结果

*/

public class SecondarySortKeyApp {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("SecondarySortKeyApp").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD lines = sc.textFile("C:\\Users\\Test.txt");

//将自定义的key添加进来

JavaPairRDD pairs = lines

.mapToPair(new PairFunction() {

private static final long serialVersionUID = 1L;

public Tuple2 call(String line) throws Exception {

String[] splited = line.split(" ");

SecondarySortKey key = new SecondarySortKey(Integer.valueOf(splited[0]),

Integer.valueOf(splited[1]));

return new Tuple2(key, line);

}

});

//根据我们自定义的key进行升序排序

JavaPairRDD sorted = pairs.sortByKey(); //sortByKey(false) 降序

// 过滤掉排序后的自定义的Key,保留排序的结果

JavaRDD secondarySort = sorted.map(new Function, String>() {

public String call(Tuple2 sortedContent) throws Exception {

return sortedContent._2;

}

});

secondarySort.foreach(new VoidFunction() {

public void call(String sorted) throws Exception {

System.out.println(sorted);

}

});

}

运行结果:

2 1

2 3

3 2

4 1

4 3

8 7


下面我通过Scala方式实现上述二次排序,scala代码非常简洁

先创建我们自定义排序key

/**

*DT_Spark大数据梦工厂
* 自定义二次排序的key
*/
class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable {
def compare(other: SecondarySortKey): Int = {
if (this.first - other.first != 0) {
this.first - other.first
}
else {
this.second - other.second
}
}


根据自定义排序Key实现二次排序

import org.apache.spark.{SparkConf, SparkContext}

/**
* 二次排序,具体的实现步骤:
* 第一步:按照OrderedSerializable接口实现自定义排序的key
* 第二步:将要进行的二次排序的文件加载进来生成<key,value>类型的RDD
* 第三步:使用sortByKey基于自定义的Key进行二次排序 第
* 四步:去除掉排序的Key,只保留排序的结果
*/

object

SecondarySortKeyApp {

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SecondarySortKeyApp").setMaster("local");
val sc = new SparkContext(conf)
val lines = sc.textFile("C:\\Users\\Test.txt")//添加key,组合成(key,value)格式
val pairWithSortKey = lines.map(line => (new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line))
//格式:Tuple2(key,value)
val sorted = pairWithSortKey.sortByKey()

//过滤掉key,只保留value
val sortedResult = sorted.map(sort => sort._2)

//显示结果
sortedResult.collect().foreach(println)

}

}


运行结果:

2 1

2 3

3 2

4 1

4 3

8 7

从上面的代码可以看出,通过scala代码实现二次排序确实非常简洁,这也是scala的强大之处所在。

更高级别排序算法和内幕解密在后续课程中在分享。


备注:

资料来源于:DT_大数据梦工厂

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上2000开设的Spark永久免费公开课,地址YY房间号:68917580


0