打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
hadoop setCombinerClass Combiner Reduce

hadoop的入门用了一个星期,看了《Hadoop权威指南第2版中文版》、《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》两个PDF。

然后就开始干活了,首先要实现的是将原始数据中的用户id 和用户访问的url进行提取,并放入缓存,(工作内容背景就不介绍了,这里只是用hadoop实现工作中的一小部分)。

测试时,可以成功将数据生成文件part-00000,提取的数据样本也准确,当在Reduce代码里将结果加入缓存时,发现存入缓存的数据,是生成的数据文件中的数据的两倍,看了下数据结果,是缓存被存入了两次。

Reducer

public static class MyReducer extends Reducer<Text, Text, Text, Text> {		String rhost = "";		int rport=0;		Jedis redis = null;		protected void setup(Context context) throws IOException, InterruptedException {			super.setup(context);			this.rhost = context.getConfiguration().get("redis.host","localhost");			this.rport = context.getConfiguration().getInt("redis.port", 6379);			redis = new Jedis(rhost,rport);			redis.select(1);		}		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {			//context.write(key, new Text(""));			context.getCounter("counter", "CountReduceTimes").increment(1);			Map<String, String> hash = null;			Integer idx = 1;			if(null != redis&&redis.isConnected()){				hash = redis.hgetAll(key.toString());				_Log.info(key.toString()+" redis old map size:"+(hash==null?0:hash.size()));				if(null != hash)					idx = hash.size() + 1;				else					hash = new HashMap<String,String>();			}			else				_Log.info("redis invalid, host:"+rhost+" port:"+rport+" isConnected:"+redis.isConnected());						for (Text val : values) {				context.write(key, val);				if(null != hash){					_Log.info(key.toString()+" oldMap size before put:"+hash.size());					hash.put((idx++).toString(), val.toString());				}			}						if(null != hash)					redis.hmset(key.toString(),hash);		}	}


Mapper

public static class MyMapper extends Mapper<Object, WBCDR, Text, Text> {		private List<String> urlLst = new ArrayList<String>();		protected void setup(Context context) throws IOException, InterruptedException {			super.setup(context);			HdfsRwService service = new HdfsRwService(context.getConfiguration());			try {				String urlSrc = service.readText("/datas/wbcache/TargetUrl.txt");				if(!"".equals(urlSrc))				{					//返回的是\r\n分隔的行					String[] urlItem = urlSrc.split("\r\n");					if(null != urlItem)						for(String s:urlItem)							//把该有的各种空白去掉\r\n \n							if(!(s = s.trim()).equals(""))								urlLst.add(/*s.endsWith("/")?s.substring(0, s.length()-1):*/s);				}			} catch (Exception e) {				_Log.error(e, e);			}		}		public boolean IsTargetUrl(String requestUrl)		{			boolean res = false;			for(String s:urlLst)				if(requestUrl.startsWith(s)){					res = true;					break;				}			return res;		}		public void map(Object key, WBCDR value, Context context) throws IOException, InterruptedException {			String userId = value.getUserId();			String requestUrl = value.getRequestUrl().trim().toLowerCase();			String requestMethod = value.getRequestMethod();			if("GET".equals(requestMethod))			{				if(requestUrl != null&&IsTargetUrl(requestUrl))					context.write(new Text(userId), new Text(requestUrl));				else					context.getCounter("counter", "UrlOutOfTarget").increment(1);			}			else{				context.getCounter("counter", "UrlOutOfGetMethod").increment(1);			}		}	}

从map和reduce上看应该没什么问题,然后去看logs/userlog日志文件,貌似reduce被执行了两次,(也只有执行了两次才会重复存入缓存)。

然后翻书,重新理解mapreduce原理跟细节,无果。

然后重新看代码,

main

public static void main(String[] args) throws Exception {		// TODO Auto-generated method stub		if(args.length < 1) {			_Log.info("plase input date [YYYYMMDD]...");			return;		}		final String statDate = args[0];		//final String reg = URLDecoder.decode(args[1], "UTF-8").trim().toLowerCase();		//_Log.info("statDate:" + statDate + ",url=" + reg);				Configuration conf = new Configuration();		Job job = new Job(conf, GetRespCdrUserUrlDomain.class.getSimpleName());		job.setJarByClass(GetRespCdrUserUrlDomain.class);		//job.getConfiguration().set("conf.reg", reg);				String [] datas = statDate.split(",");		for (String data : datas) {			Path inputPath = new Path(input + data.substring(0, 6) + "/" + data);			FileInputFormat.addInputPath(job, inputPath);			_Log.info("inputPath:" + inputPath);		}				Path outputPath = new Path(output + "_" + statDate + "_" + System.currentTimeMillis());		FileOutputFormat.setOutputPath(job, outputPath);		_Log.info("outputPath:" + outputPath);				job.setMapperClass(MyMapper.class);		//job.setCombinerClass(MyReducer.class);		job.setReducerClass(MyReducer.class);		job.setInputFormatClass(SequenceFileInputFormat.class);		job.setMapOutputKeyClass(Text.class);		job.setMapOutputValueClass(Text.class);		job.setOutputFormatClass(TextOutputFormat.class);		job.setOutputKeyClass(Text.class);		job.setOutputValueClass(Text.class);		job.waitForCompletion(true);	}


发现main多了job.setCombinerClass(MyReducer.class);,注释掉,打包,测试,看结果,一切OK。

也是时间太仓促,后面再结合源码深入学习了,网上查了下Combiner,这里说得比较好:http://blog.csdn.net/ipolaris/article/details/8723782

基本内容是:

在MapReduce中,当map生成的数据过大时,带宽就成了瓶颈,怎样精简压缩传给Reduce的数据,有不影响最终的结果呢。有一种方法就是使用Combiner,Combiner号称本地的Reduce,Reduce最终的输入,是Combiner的输出。

Combiner是用reducer来定义的,多数的情况下Combiner和reduce处理的是同一种逻辑,所以job.setCombinerClass()的参数可以直接使用定义的reduce,当然也可以单独去定义一个有别于reduce的Combiner,继承Reducer,写法基本上定义reduce一样。



本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
分布式中使用Redis实现Session共享(二)
Hadoop MapReduce 任务执行流程源代码详细解析
Hadoop集群(第9期)_MapReduce初级案例
Hadoop MapReduce新旧API区别
MapReduce Job中全局共享数据的处理办法
Hive的使用以及常用语法(Hive语法即Hql语法)
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服