打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
使用Spark core和SparkSQL的窗口函数分别实现分组取topN的操作

Spark 1.4及以上版本中,针对sparkSQL,添加了很多新的函数,进一步扩展了SparkSQL对数据的处理能力。

本篇介绍一个强大的窗口函数 row_number()函数,常用于对数据进行分组并取每个分组中的TopN数据。

示例数据如下:

class1 90class2 56class1 87class1 76class2 88class1 95class1 74class2 87class2 67class2 77
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

1、直接使用Spark core中的api来实现分组取topN功能:
首先将数据源读入代JavaRDD中,然后解析每一行数据,将每一行的第一个元素作为key,第二元素作为value构成tuple的RDD

SparkConf conf = new SparkConf().setAppName("groupTopN").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> lines = sc.textFile("C:\\Temp\\groupTopN.txt");JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {    String[] lineSplited = line.split(" ");    return new Tuple2<String,Integer>(lineSplited[0],Integer.valueOf(lineSplited[1]));}});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

得到pairs是一个二元组的RDD,直接调用groupByKey()函数,就可以按照key来进行分组了

JavaPairRDD<String, Iterable<Integer>> grouped = pairs.groupByKey();
  • 1
  • 1

分组后每个key对应的这一个value的集合,这里,需要对每个key对应的value集合首先进行排序,然后取其前N个元素即可

JavaPairRDD<String,Iterable> groupedTopN = grouped.mapToPair(new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable>() {        @Override        public Tuple2<String, Iterable> call(Tuple2<String, Iterable<Integer>> values) throws Exception {            Iterator<Integer> iter = values._2.iterator();            List<Integer> list = new ArrayList<Integer>();            while(iter.hasNext()){                list.add(iter.next());            }            //将list中的元素排序            list.sort(new Comparator<Integer>() {                @Override                public int compare(Integer t1, Integer t2) {                    int i1 = t1;                    int i2 = t2;                    return -(i1 - i2);//逆序排列                }            });             List<Integer> top3 = list.subList(0, 3);//直接去前3个元素            return new Tuple2<String,Iterable>(values._1,top3);        }    });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

为了便于验证,直接咋本地进行测试,并打印显示

groupedTopN.foreach(new VoidFunction<Tuple2<String,Iterable>>() {    @Override    public void call(Tuple2<String, Iterable> t) throws Exception {        System.out.println(t._1);        Iterator iterator = t._2.iterator();        while(iterator.hasNext()){            System.out.println(iterator.next());        }        System.out.println("====华丽的分割线=======");    }});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2、使用SparkSQL的窗口函数来时上同样的功能

思路:
窗口函数是HiveSQL中特有的,因此,首先将数据导入到Hive表中,然后映射到Spark的DataFrame,在sql语句中直接调用窗口函数即可实现该功能

首先,直接在HiveSQL中创建对应的hive表,然后导入本地数据到hive表中

SparkConf conf = new SparkConf().setAppName("WindowFunctionTopN").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);HiveContext hiveContext = new HiveContext(sc.sc());//将数据导入到hive表中hiveContext.sql("DROP TABLE IF EXISTS class_info");hiveContext.sql("CREATE TABLE IF NOT EXISTS class_info ("        + "class STRING,"        + "score INT");hiveContext.sql("LOAD DATA "        + "LOCAL INPATH '/cqt/testdata/groupTopN.txt' "        + "INTO TABLE class_info");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

然后,直接调用窗口函数row_number(),注意窗口函数的调用语法

DataFrame tom3DF = hiveContext.sql("select class,score from" +"(select class,score,"                + "row_number() OVER (PARTITION BY class ORDER BY score DESC) rank from class_info) tmp where rank<=3");
  • 1
  • 2
  • 1
  • 2

将得到的数据回写到hive表中保存即可

// 将每组排名前3的数据,保存到一个表中hiveContext.sql("DROP TABLE IF EXISTS grouped_top3");  tom3DF.saveAsTable("grouped_top3");
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

至此,代码,编写完毕,相比于第一种方式,代码清爽很多!

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
Flink Program Guide (2)
Spark在Yarn上运行Wordcount程序 – 过往记忆
第20课 :SPARK Top N彻底解秘 TOPN 排序(Scala)SPARK分组TOPN 算法(JAVA) 必须掌握!
「Flink」Flink中的时间类型
Spark菜鸟学习营Day1 从Java到RDD编程
Ceylon: Quick introduction
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服