打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
MapReduce框架中全排序的算法思想
关于全排序的问题  Tom White的书中提出的数据取样方法 ,最近学习了一下,下面做个比较,以防后患!!

 主要思想就是在要排序的所有数据中随机取出一定量的数据,这些数据取自三个部分,

1.选取总得数据(键值对)数目

2.选取的split数目

3.每个split选取的键值对数目(只要达到总得键值对数目马上停止采集)

 

接下来对整个选取得键值对进行全局排序,然后根据工作配置的reducer task数目R来选取关键key,将采集后而且排序的key分成R个部分,即出现R-1个分割点,然后取出R-1个分割点处的key,将其写到一个二进制文件(默认是_partition.lst),然后将这个文件设置为DistributedCache,及所有map reducer 共享的文件。接下来PartitionerClass来读取这个共享的二进制文件,读取其中的R-1个可以的值,根据着R-1key生成一个Binary形式的Tire树,可以加快查找(以空间换取时间),将所有的map输出根据这个R-1key将不同范围内的key 输出到不同reducer,然后每个reducer进行一下局部排序即可,这样可以保证第ireducer 输出的键值对所有的可以都比第i+1reducer的键值对的key小。从而达到所有的key全局有序,

 

note :这其中的mapClass ReducerClass都设置成默认的即可,即直接输出键值对,mapreduce框架自带有sort功能,即可满足条件。(为简单测试目的)

 

在mapreduce自带的框架内的TotalOrderPartitioner类就是通过读取这个_partition.lst,并生成对应的trie树,加快前缀匹配速度,以空间换取时间的思想,从而分配可以到对应的reduce内部,然后使用框架自带的排序功能进行局部的排序,从而达到整体的有序。我们也没有必要将这些文件进行合并,

 

Trie树,又称单词查找树或键树,是一种树形结构,是一种哈希树的变种。典型应用是用于统计和排序大量的字符串(但不仅限于字符串),所以经常被搜索引擎系统用于文本词频统计。它的优点是:最大限度地减少无谓的字符串比较,查询效率比哈希表高。

它有3个基本特性:

  1)根节点不包含字符,除根节点外每一个节点都只包含一个字符。

  2)从根节点到某一节点,路径上经过的字符连接起来,为该节点对应的字符串。

3)每个节点的所有子节点包含的字符都不相同。

 

数据结构:

const int MaxKeySize = 25; //关键码最大位数

  typedef struct { //关键码类型

  KeyType * ch[MaxKeySize]; //关键码存放数组

  int currentSize; //关键码当前位数

} KeyType;

 

下面贴下我自己实现的采样全局排序(为了简单  没有实现trie树  直接进行比较  速度会有一定的影响)

自定义的InputFormat,从而在分片的时候进行采集:

 

Java代码
 
  1. package com.zxx.smpler;  
  2.   
  3. public class SamplerInputFormat extends FileInputFormat<Text, Text>  
  4. {  
  5.       
  6.     static final String PARTITION_FILENAME = "_partition.lst";  
  7.     static final String SAMPLE_SIZE = "terasort.partitions.sample";  
  8.     private static JobConf lastConf = null;  
  9.     private static InputSplit[] lastResult = null;  
  10.       
  11.     static class TextSampler implements IndexedSortable  
  12.     {  
  13.   
  14.         public ArrayList<Text> records=new ArrayList<Text>();  
  15.           
  16.         @Override  
  17.         public int compare(int arg0, int arg1)  
  18.         {  
  19.             Text right=records.get(arg0);  
  20.             Text left=records.get(arg1);  
  21.               
  22.             return right.compareTo(left);  
  23.         }  
  24.   
  25.         @Override  
  26.         public void swap(int arg0, int arg1)  
  27.         {  
  28.             Text right=records.get(arg0);  
  29.             Text left=records.get(arg1);  
  30.               
  31.             records.set(arg0, left);  
  32.             records.set(arg1, right);  
  33.         }  
  34.           
  35.         public void addKey(Text key)   
  36.         {  
  37.             records.add(new Text(key));  
  38.         }  
  39.           
  40.         public Text[] createPartitions(int numPartitions)  
  41.         {  
  42.             int numRecords=records.size();  
  43.             if(numPartitions>numRecords)  
  44.             {  
  45.                 throw new IllegalArgumentException  
  46.                   ("Requested more partitions than input keys (" + numPartitions +  
  47.                    " > " + numRecords + ")");  
  48.             }  
  49.             new QuickSort().sort(this0, records.size());  
  50.             float stepSize=numRecords/(float)numPartitions;  
  51.             Text[] result=new Text[numPartitions-1];  
  52.             for(int i=1;i<numPartitions;++i)  
  53.             {  
  54.                 result[i-1]=records.get(Math.round(stepSize*i));  
  55.             }  
  56.             return result;  
  57.         }  
  58.     }  
  59.       
  60.     public static void writePartitionFile(JobConf conf, Path partFile) throws IOException  
  61.     {  
  62.         SamplerInputFormat inputFormat=new SamplerInputFormat();  
  63.         TextSampler sampler=new TextSampler();  
  64.         Text key =new Text();  
  65.         Text value=new Text();  
  66.           
  67.         int partitions = conf.getNumReduceTasks();    //Reducer任务的个数  
  68.         long sampleSize = conf.getLong(SAMPLE_SIZE, 100); //采集数据-键值对的个数  
  69.         InputSplit[] splits=inputFormat.getSplits(conf, conf.getNumMapTasks());//获得数据分片  
  70.         int samples=Math.min(10, splits.length);//采集分片的个数  
  71.         long recordsPerSample = sampleSize / samples;//每个分片采集的键值对个数  
  72.         int sampleStep = splits.length / samples; //采集分片的步长  
  73.         long records = 0;  
  74.           
  75.         for(int i=0;i<samples;i++)  
  76.         {  
  77.             RecordReader<Text, Text> reader=inputFormat.getRecordReader(splits[sampleStep*i], conf, null);  
  78.             while(reader.next(key, value))  
  79.             {  
  80.                 sampler.addKey(key);  
  81.                 records+=1;  
  82.                 if((i+1)*recordsPerSample<=records)  
  83.                 {  
  84.                     break;  
  85.                 }  
  86.             }  
  87.         }  
  88.         FileSystem outFs = partFile.getFileSystem(conf);  
  89.         if (outFs.exists(partFile)) {  
  90.             outFs.delete(partFile, false);  
  91.           }  
  92.         SequenceFile.Writer writer=SequenceFile.createWriter(outFs, conf, partFile, Text.class, NullWritable.class);  
  93.         NullWritable nullValue = NullWritable.get();  
  94.         for(Text split:sampler.createPartitions(partitions))  
  95.         {  
  96.             writer.append(split, nullValue);  
  97.         }  
  98.         writer.close();  
  99.           
  100.     }  
  101.   
  102.     static class TeraRecordReader implements RecordReader<Text,Text>  
  103.     {  
  104.   
  105.         private LineRecordReader in;  
  106.         private LongWritable junk = new LongWritable();  
  107.         private Text line = new Text();  
  108.         private static int KEY_LENGTH = 10;  
  109.           
  110.         public TeraRecordReader(Configuration job, FileSplit split) throws IOException  
  111.         {  
  112.             in = new LineRecordReader(job, split);  
  113.         }  
  114.   
  115.         @Override  
  116.         public void close() throws IOException  
  117.         {  
  118.             in.close();   
  119.         }  
  120.   
  121.         @Override  
  122.         public Text createKey()  
  123.         {  
  124.             // TODO Auto-generated method stub  
  125.             return new Text();  
  126.         }  
  127.   
  128.         @Override  
  129.         public Text createValue()  
  130.         {  
  131.             return new Text();  
  132.         }  
  133.   
  134.         @Override  
  135.         public long getPos() throws IOException  
  136.         {  
  137.             // TODO Auto-generated method stub  
  138.             return in.getPos();  
  139.         }  
  140.   
  141.         @Override  
  142.         public float getProgress() throws IOException  
  143.         {  
  144.             // TODO Auto-generated method stub  
  145.             return in.getProgress();  
  146.         }  
  147.   
  148.         @Override  
  149.         public boolean next(Text arg0, Text arg1) throws IOException  
  150.         {  
  151.             if(in.next(junk, line))  
  152.             {  
  153.                 if(line.getLength()<KEY_LENGTH)  
  154.                 {  
  155.                     arg0.set(line);  
  156.                     arg1.clear();  
  157.                 }else{  
  158.                     byte[] bytes=line.getBytes();  //默认知道读取要比较值的前10个字节  作为key 后面的字节作为value;  
  159.                     arg0.set(bytes, 0,KEY_LENGTH);  
  160.                     arg1.set(bytes,KEY_LENGTH, line.getLength()-KEY_LENGTH);  
  161.                 }  
  162.                 return true;  
  163.             }else {  
  164.                 return false;     
  165.             }  
  166.         }  
  167.           
  168.     }  
  169.     @Override  
  170.     public InputSplit[] getSplits(JobConf conf, int splits) throws IOException  
  171.     {  
  172.         if(conf==lastConf)  
  173.         {  
  174.             return lastResult;  
  175.         }  
  176.         lastConf=conf;  
  177.         lastResult=super.getSplits(lastConf, splits);  
  178.         return lastResult;  
  179.   
  180.     }  
  181.     @Override  
  182.     public org.apache.hadoop.mapred.RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException  
  183.     {  
  184.         // TODO Auto-generated method stub  
  185.         return new TeraRecordReader(arg1,(FileSplit)arg0);  
  186.     }  
  187.      
  188. }  

 主函数:

Java代码
 
  1. package com.zxx.smpler;  
  2.   
  3.   
  4. public class SamplerSort extends Configured implements Tool  
  5. {  
  6.        //自定义的Partitioner  
  7.     public static class TotalOrderPartitioner implements Partitioner<Text,Text>  
  8.     {  
  9.   
  10.         private Text[] splitPoints;  
  11.           
  12.         public TotalOrderPartitioner(){}  
  13.         @Override  
  14.         public int getPartition(Text arg0, Text arg1, int arg2)  
  15.         {  
  16.             // TODO Auto-generated method stub  
  17.             return findPartition(arg0);  
  18.         }  
  19.   
  20.         @Override  
  21.         public void configure(JobConf arg0)  
  22.         {  
  23.             try  
  24.             {  
  25.                 FileSystem fs = FileSystem.getLocal(arg0);  
  26.                 Path partFile = new Path(SamplerInputFormat.PARTITION_FILENAME);  
  27.                 splitPoints = readPartitions(fs, partFile, arg0);  //读取采集文件  
  28.             } catch (IOException ie)  
  29.             {  
  30.                 throw new IllegalArgumentException("can't read paritions file", ie);  
  31.             }  
  32.               
  33.         }  
  34.           
  35.         public int findPartition(Text key)          //分配可以到多个reduce  
  36.         {  
  37.             int len=splitPoints.length;  
  38.             for(int i=0;i<len;i++)  
  39.             {  
  40.                 int res =key.compareTo(splitPoints[i]);  
  41.                 if(res>0&&i<len-1)  
  42.                 {  
  43.                     continue;  
  44.                 }else if (res==0) {  
  45.                     return i;  
  46.                 }else if(res<0){  
  47.                     return i;  
  48.                 }else if (res>0&&i==len-1) {  
  49.                     return i+1;  
  50.                 }  
  51.             }  
  52.             return 0;  
  53.         }  
  54.           
  55.         private static Text[] readPartitions(FileSystem fs, Path p, JobConf job) throws IOException    
  56.         {  
  57.             SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);  
  58.             List<Text> parts = new ArrayList<Text>();  
  59.             Text key=new Text();  
  60.             NullWritable value = NullWritable.get();  
  61.             while(reader.next(key,value))  
  62.             {  
  63.                 parts.add(key);  
  64.             }  
  65.             reader.close();  
  66.             return parts.toArray(new Text[parts.size()]);  
  67.         }  
  68.           
  69.     }  
  70.     @Override  
  71.     public int run(String[] args) throws Exception  
  72.     {  
  73.         JobConf job=(JobConf)getConf();  
  74.         Path inputDir = new Path(args[0]);  
  75.         inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));  
  76.         Path partitionFile = new Path(inputDir, SamplerInputFormat.PARTITION_FILENAME);  
  77.           
  78.         URI partitionUri = new URI(partitionFile.toString() +  
  79.                 "#" + SamplerInputFormat.PARTITION_FILENAME);  
  80.           
  81.         SamplerInputFormat.setInputPaths(job, new Path(args[0]));  
  82.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  83.           
  84.         job.setJobName("SamplerTotalSort");  
  85.         job.setJarByClass(SamplerSort.class);  
  86.         job.setOutputKeyClass(Text.class);  
  87.         job.setOutputValueClass(Text.class);  
  88.         job.setInputFormat(SamplerInputFormat.class);  
  89.         job.setOutputFormat(TextOutputFormat.class);  
  90.         job.setPartitionerClass(TotalOrderPartitioner.class);  
  91.         job.setNumReduceTasks(4);  
  92.           
  93.         SamplerInputFormat.writePartitionFile(job, partitionFile);  //数据采集并写入文件  
  94.           
  95.         DistributedCache.addCacheFile(partitionUri, job);  //将这个文件作为共享文件  提供给partition使用  
  96.         DistributedCache.createSymlink(job);  
  97.           
  98.         //SamplerInputFormat.setFinalSync(job, true);  
  99.         JobClient.runJob(job);  
  100.         return 0;  
  101.     }  
  102.     public static void main(String[] args) throws Exception  
  103.     {  
  104.         int res = ToolRunner.run(new JobConf(), new SamplerSort(), args);  
  105.         System.exit(res);  
  106.     }  
  107.   
  108.   
  109. }  
 

 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
算法设计与分析 2.8 快速排序
爬网页数据,C# 也可以像 Jquery 那样吗?
各种排序算法的分析及java实现
hadoop in china: 用Hadoop进行分布式并行编程二
最全的Java笔试题库之选择题篇-总共234道【1~60】
快速排序
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服