hadoop的入门用了一个星期,看了《Hadoop权威指南第2版中文版》、《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》两个PDF。
然后就开始干活了,首先要实现的是将原始数据中的用户id 和用户访问的url进行提取,并放入缓存,(工作内容背景就不介绍了,这里只是用hadoop实现工作中的一小部分)。
测试时,可以成功将数据生成文件part-00000,提取的数据样本也准确,当在Reduce代码里将结果加入缓存时,发现存入缓存的数据,是生成的数据文件中的数据的两倍,看了下数据结果,是缓存被存入了两次。
Reducer
public static class MyReducer extends Reducer<Text, Text, Text, Text> { String rhost = ""; int rport=0; Jedis redis = null; protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); this.rhost = context.getConfiguration().get("redis.host","localhost"); this.rport = context.getConfiguration().getInt("redis.port", 6379); redis = new Jedis(rhost,rport); redis.select(1); } public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //context.write(key, new Text("")); context.getCounter("counter", "CountReduceTimes").increment(1); Map<String, String> hash = null; Integer idx = 1; if(null != redis&&redis.isConnected()){ hash = redis.hgetAll(key.toString()); _Log.info(key.toString()+" redis old map size:"+(hash==null?0:hash.size())); if(null != hash) idx = hash.size() + 1; else hash = new HashMap<String,String>(); } else _Log.info("redis invalid, host:"+rhost+" port:"+rport+" isConnected:"+redis.isConnected()); for (Text val : values) { context.write(key, val); if(null != hash){ _Log.info(key.toString()+" oldMap size before put:"+hash.size()); hash.put((idx++).toString(), val.toString()); } } if(null != hash) redis.hmset(key.toString(),hash); } }
Mapper
public static class MyMapper extends Mapper<Object, WBCDR, Text, Text> { private List<String> urlLst = new ArrayList<String>(); protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); HdfsRwService service = new HdfsRwService(context.getConfiguration()); try { String urlSrc = service.readText("/datas/wbcache/TargetUrl.txt"); if(!"".equals(urlSrc)) { //返回的是\r\n分隔的行 String[] urlItem = urlSrc.split("\r\n"); if(null != urlItem) for(String s:urlItem) //把该有的各种空白去掉\r\n \n if(!(s = s.trim()).equals("")) urlLst.add(/*s.endsWith("/")?s.substring(0, s.length()-1):*/s); } } catch (Exception e) { _Log.error(e, e); } } public boolean IsTargetUrl(String requestUrl) { boolean res = false; for(String s:urlLst) if(requestUrl.startsWith(s)){ res = true; break; } return res; } public void map(Object key, WBCDR value, Context context) throws IOException, InterruptedException { String userId = value.getUserId(); String requestUrl = value.getRequestUrl().trim().toLowerCase(); String requestMethod = value.getRequestMethod(); if("GET".equals(requestMethod)) { if(requestUrl != null&&IsTargetUrl(requestUrl)) context.write(new Text(userId), new Text(requestUrl)); else context.getCounter("counter", "UrlOutOfTarget").increment(1); } else{ context.getCounter("counter", "UrlOutOfGetMethod").increment(1); } } }
然后翻书,重新理解mapreduce原理跟细节,无果。
然后重新看代码,
main
public static void main(String[] args) throws Exception { // TODO Auto-generated method stub if(args.length < 1) { _Log.info("plase input date [YYYYMMDD]..."); return; } final String statDate = args[0]; //final String reg = URLDecoder.decode(args[1], "UTF-8").trim().toLowerCase(); //_Log.info("statDate:" + statDate + ",url=" + reg); Configuration conf = new Configuration(); Job job = new Job(conf, GetRespCdrUserUrlDomain.class.getSimpleName()); job.setJarByClass(GetRespCdrUserUrlDomain.class); //job.getConfiguration().set("conf.reg", reg); String [] datas = statDate.split(","); for (String data : datas) { Path inputPath = new Path(input + data.substring(0, 6) + "/" + data); FileInputFormat.addInputPath(job, inputPath); _Log.info("inputPath:" + inputPath); } Path outputPath = new Path(output + "_" + statDate + "_" + System.currentTimeMillis()); FileOutputFormat.setOutputPath(job, outputPath); _Log.info("outputPath:" + outputPath); job.setMapperClass(MyMapper.class); //job.setCombinerClass(MyReducer.class); job.setReducerClass(MyReducer.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); }
也是时间太仓促,后面再结合源码深入学习了,网上查了下Combiner,这里说得比较好:http://blog.csdn.net/ipolaris/article/details/8723782
基本内容是:
在MapReduce中,当map生成的数据过大时,带宽就成了瓶颈,怎样精简压缩传给Reduce的数据,有不影响最终的结果呢。有一种方法就是使用Combiner,Combiner号称本地的Reduce,Reduce最终的输入,是Combiner的输出。
Combiner是用reducer来定义的,多数的情况下Combiner和reduce处理的是同一种逻辑,所以job.setCombinerClass()的参数可以直接使用定义的reduce,当然也可以单独去定义一个有别于reduce的Combiner,继承Reducer,写法基本上定义reduce一样。
联系客服