打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
大数据IMF传奇行动绝密课程第19课:Spark高级排序彻底解密

基础排序算法实战
二次排序算法实战
更高级排序算法
排序算法内幕解密

sc.setLogLevel("WARN")

基础排序算法:

sc.textFile().flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)).collect

所谓二次排序,就是指,排序的时候考虑两个维度

2 3
4 1
3 2
4 3
9 7
2 1

构造器要有val,因为要做个成员
Scala实现

package com.tom.sparkimport org.apache.spark.{SparkConf, SparkContext}class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable{  override def compare(other: SecondarySortKey): Int = {    if(this.first - other.first != 0) {this.first - other.first}    else {this.second - other.second}  }}object SecondarySortKey {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("SecondarySortKey").setMaster("local")    val sc = new SparkContext(conf)    val lines = sc.textFile("F:/helloSpark2.txt")    val pairWithSortKey = lines.map(line => {      (new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line)    }    )    val sorted = pairWithSortKey.sortByKey()    val sortedResult = sorted.map(pair => pair._2)    sortedResult.collect().foreach(println)  }}

java实现

/** * SecondarySortKey.java */package com.tom.spark.SparkApps.cores;import java.io.Serializable;import scala.math.Ordered;/** * 自定义二次排序的Key */public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable{    //需要二次排序的Key    private int first;    private int second;    //二次排序的公开构造器    public SecondarySortKey(int first, int second) {        this.first = first;        this.second = second;    }    public int getFirst() {        return first;    }    public void setFirst(int first) {        this.first = first;    }    public int getSecond() {        return second;    }    public void setSecond(int second) {        this.second = second;    }    public boolean $greater(SecondarySortKey other) {        // TODO Auto-generated method stub        if(this.first > other.getSecond())            return true;        else if(this.first == other.getFirst() && this.second > other.getSecond())            return true;        else return false;    }    public boolean $greater$eq(SecondarySortKey other) {        // TODO Auto-generated method stub        if($greater(other))            return true;        else if ( this.first == other.getFirst() && this.second == other.second)            return true;        else return false;    }    public boolean $less(SecondarySortKey other) {        // TODO Auto-generated method stub        return !$greater$eq(other);    }    public boolean $less$eq(SecondarySortKey other) {        // TODO Auto-generated method stub        return !$greater(other);    }    public int compare(SecondarySortKey other) {        // TODO Auto-generated method stub        if(this.first != other.getFirst())            return this.first - other.getFirst();        else return this.second - other.getSecond();    }    public int compareTo(SecondarySortKey other) {        // TODO Auto-generated method stub        if(this.first != other.getFirst())            return this.first - other.getFirst();        else return this.second - other.getSecond();    }    @Override    public int hashCode() {        final int prime = 31;        int result = 1;        result = prime * result + first;        result = prime * result + second;        return result;    }    @Override    public boolean equals(Object obj) {        if (this == obj)            return true;        if (obj == null)            return false;        if (getClass() != obj.getClass())            return false;        SecondarySortKey other = (SecondarySortKey) obj;        if (first != other.first)            return false;        if (second != other.second)            return false;        return true;    }    /**     * @param args     */    public static void main(String[] args) {        // TODO Auto-generated method stub    }}
/** * SecondarySortKeyApp.java */package com.tom.spark.SparkApps;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;import com.tom.spark.SparkApps.cores.SecondarySortKey;/** * 二次排序,具体实现步骤: * 第一步:按照Ordered和Serializable接口实现自定义排序的Key * 第二步:将要排序的二次排序的文件加载进<Key, Value>类型的RDD * 第三步:使用sortByKey基于自定义的Key进行二次排序 * 第四步:去除掉排序的Key,只保留排序后的结果 * */public class SecondarySortKeyApp {    /**     * @param args     */    public static void main(String[] args) {        // TODO Auto-generated method stub        SparkConf conf = new SparkConf().setAppName("SecondarySortKeyApp").setMaster("local");        JavaSparkContext sc = new JavaSparkContext(conf);        JavaRDD<String> line = sc.textFile("F:/helloSpark2.txt",1);        JavaPairRDD<SecondarySortKey, String> pairs = line.mapToPair(new PairFunction<String, SecondarySortKey, String>() {            public Tuple2<SecondarySortKey, String> call(String line)                    throws Exception {                // TODO Auto-generated method stub                return new Tuple2<SecondarySortKey, String>(new SecondarySortKey(Integer.valueOf(line.split(" ")[0]), Integer.valueOf(line.split(" ")[1])), line);            }                   });        JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey(false); //完成二次排序        //过滤掉排序后自定的Key,保留排序的结果        JavaRDD<String> values = sortedPairs.map(new Function<Tuple2<SecondarySortKey,String>, String>() {            public String call(Tuple2<SecondarySortKey, String> pair)                    throws Exception {                // TODO Auto-generated method stub                return pair._2;            }        });        values.foreach(new VoidFunction<String>() {            public void call(String line) throws Exception {                // TODO Auto-generated method stub                System.out.println(line);            }        });        sc.close();    }}
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
大数据学习路线分享Master的jps
Spark- word Count案例
SparkRDD算子案例:统计出每一个省份每个广告被点击数量排行的Top3
spark的优化-控制数据分区和分布
Spark Rdd & DataFrame
用spark streaming实时读取hdfs数据并写入elasticsearch中
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服