打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
mapreduce top n

在最初接触mapreduce时,top n 问题的解决办法是将mapreduce输出(排序后)放入一个集合中,取前n个,但这种写法过于简单,内存能够加载的集合的大小是有上限的,一旦数据量大,很容易出现内存溢出。
    今天在这里介绍另一种实现方式,当然这也不是最好的方式,不过正所谓一步一个脚印,迈好每一步,以后的步伐才能更坚定,哈哈说了点题外话。恩恩,以后还会有更好的方式

    需求,得到top 最大的前n条记录
    这里只给出一些核心的代码,其他job等配置的代码略

Configuration conf = new Configuration();conf.setInt("N", 5);

    初始化job之前需要 conf.setInt("N",5); 意在在mapreduce阶段读取N,N就代表着top N
    以下是map

package com.lzz.one;import java.io.IOException;import java.util.Arrays;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** * topN* 	#orderid,userid,payment,productid* [root@x00 hd]# cat seventeen_a.txt* 1,9819,100,121* 2,8918,2000,111* 3,2813,1234,22* 4,9100,10,1101* 5,3210,490,111* 6,1298,28,1211* 7,1010,281,90* 8,1818,9000,20* [root@x00 hd]# cat seventeen_b.txt* 100,3333,10,100* 101,9321,1000,293* 102,3881,701,20* 103,6791,910,30* 104,8888,11,39 * 预测结果:(求 Top N=5 的结果)* 1	9000* 2	2000* 3	1234* 4	1000* 5	910 * @author Administrator * */public class TopNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{    int len;    int top[];    @Override    public void setup(Context context) throws IOException,InterruptedException {        len = context.getConfiguration().getInt("N", 10);        top = new int[len+1];    }     @Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {    String line = value.toString();    String arr []= line.split(",");    if(arr != null && arr.length == 4){        int pay = Integer.parseInt(arr[2]);        add(pay);    }}public void add(int pay){    top[0] = pay;    Arrays.sort(top);} @Overridepublic void cleanup(Context context) throws IOException,InterruptedException {    for(int i=1;i<=len;i++){        context.write(new IntWritable(top[i]),new IntWritable(top[i]));    } } }        

    接下来是reduce

package com.lzz.one;import java.io.IOException;import java.util.Arrays;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;public class TopNReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{	int len;	int top[];	@Override	public void setup(Context context)			throws IOException, InterruptedException {		len = context.getConfiguration().getInt("N", 10);		top = new int[len+1];	}		@Override	public void reduce(IntWritable key, Iterable<IntWritable> values,			Context context)			throws IOException, InterruptedException {		for(IntWritable val : values){			add(val.get());		}	}		public void add(int pay){		top[0] = pay;		Arrays.sort(top);	}		@Override	public void cleanup(Context context)			throws IOException, InterruptedException {		for(int i=len;i>0;i--){			context.write(new IntWritable(len-i+1),new IntWritable(top[i]));		}	}}

    说一下逻辑,虽然画图比较清晰,但是时间有限,画图水平有限,只用语言来描述吧,希望能说的明白
    如果要取top 5,则应该定义一个长度为为6的数组,map所要做的事情就是将每条日志的那个需要排序的字段放入数组第一个元素中,调用Arrays.sort(Array[])方法可以将数组按照正序,从数字角度说是从小到大排序,比如第一条记录是9000,那么排序结果是[0,0,0,0,0,9000],第二条日志记录是8000,排序结果是[0,0,0,0,8000,9000],第三条日志记录是8500,排序结果是[0,0,0,8000,8500,9000],以此类推,每次放进去一个数字如果大于数组里面最小的元素,相当于将最小的覆盖掉了,也就是说数组中元素永远是拿到日志中最大的那些个记录
    ok,map将数组原封不动按照顺序输出,reduce接收到从每个map拿到的五个排好序的元素,在进行跟map一样的排序,排序后数组里面就是按照从小到大排好序的元素,将这些元素倒序输出就是最终我们要的结果了

    与之前的方式做个比较,之前的map做的事情很少,在reduce中排序后哪前5条,reduce的压力是很大的,要把所有的数据都处理一遍,而一般设置reduce的个数较少,一旦数据较多,reduce就会承受不了,悲剧了。而现在的方式巧妙的将reduce的压力转移到了map,而map是集群效应的,很多台服务器来做这件事情,减少了一台机器上的负担,每个map其实只是输出了5个元素而已,如果有5个map,其实reduce才对5*5个数据进行了操作,也就不会出现内存溢出等问题了


本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
MapReduce简单实例:wordcount--大数据纪录片第五记
【Hadoop】:手动实现WordCount案例
多个mapreduce工作相互依赖处理方法完整实例(JobControl)
Hadoop MapReduce新旧API区别
Hadoop学习之路(6)MapReduce自定义分区实现
[Hadoop源码解读](六)MapReduce篇之MapTask类
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服