关于全排序的问题 Tom White的书中提出的数据取样方法 ,最近学习了一下,下面做个比较,以防后患!!
主要思想就是在要排序的所有数据中随机取出一定量的数据,这些数据取自三个部分,
1.选取总得数据(键值对)数目
2.选取的split数目
3.每个split选取的键值对数目(只要达到总得键值对数目马上停止采集)
接下来对整个选取得键值对进行全局排序,然后根据工作配置的reducer task数目R来选取关键key,将采集后而且排序的key分成R个部分,即出现R-1个分割点,然后取出R-1个分割点处的key,将其写到一个二进制文件(默认是_partition.lst),然后将这个文件设置为DistributedCache,及所有map reducer 共享的文件。接下来PartitionerClass来读取这个共享的二进制文件,读取其中的R-1个可以的值,根据着R-1个key生成一个Binary形式的Tire树,可以加快查找(以空间换取时间),将所有的map输出根据这个R-1个key将不同范围内的key 输出到不同reducer,然后每个reducer进行一下局部排序即可,这样可以保证第i个reducer 输出的键值对所有的可以都比第i+1个reducer的键值对的key小。从而达到所有的key全局有序,
note :这其中的mapClass ReducerClass都设置成默认的即可,即直接输出键值对,mapreduce框架自带有sort功能,即可满足条件。(为简单测试目的)
在mapreduce自带的框架内的TotalOrderPartitioner类就是通过读取这个_partition.lst,并生成对应的trie树,加快前缀匹配速度,以空间换取时间的思想,从而分配可以到对应的reduce内部,然后使用框架自带的排序功能进行局部的排序,从而达到整体的有序。我们也没有必要将这些文件进行合并,
Trie树,又称单词查找树或键树,是一种树形结构,是一种哈希树的变种。典型应用是用于统计和排序大量的字符串(但不仅限于字符串),所以经常被搜索引擎系统用于文本词频统计。它的优点是:最大限度地减少无谓的字符串比较,查询效率比哈希表高。
它有3个基本特性:
1)根节点不包含字符,除根节点外每一个节点都只包含一个字符。
2)从根节点到某一节点,路径上经过的字符连接起来,为该节点对应的字符串。
3)每个节点的所有子节点包含的字符都不相同。
数据结构:
const int MaxKeySize = 25; //关键码最大位数
typedef struct { //关键码类型
KeyType * ch[MaxKeySize]; //关键码存放数组
int currentSize; //关键码当前位数
} KeyType;
下面贴下我自己实现的采样全局排序(为了简单 没有实现trie树 直接进行比较 速度会有一定的影响)
自定义的InputFormat,从而在分片的时候进行采集:
- package com.zxx.smpler;
-
- public class SamplerInputFormat extends FileInputFormat<Text, Text>
- {
-
- static final String PARTITION_FILENAME = "_partition.lst";
- static final String SAMPLE_SIZE = "terasort.partitions.sample";
- private static JobConf lastConf = null;
- private static InputSplit[] lastResult = null;
-
- static class TextSampler implements IndexedSortable
- {
-
- public ArrayList<Text> records=new ArrayList<Text>();
-
- @Override
- public int compare(int arg0, int arg1)
- {
- Text right=records.get(arg0);
- Text left=records.get(arg1);
-
- return right.compareTo(left);
- }
-
- @Override
- public void swap(int arg0, int arg1)
- {
- Text right=records.get(arg0);
- Text left=records.get(arg1);
-
- records.set(arg0, left);
- records.set(arg1, right);
- }
-
- public void addKey(Text key)
- {
- records.add(new Text(key));
- }
-
- public Text[] createPartitions(int numPartitions)
- {
- int numRecords=records.size();
- if(numPartitions>numRecords)
- {
- throw new IllegalArgumentException
- ("Requested more partitions than input keys (" + numPartitions +
- " > " + numRecords + ")");
- }
- new QuickSort().sort(this, 0, records.size());
- float stepSize=numRecords/(float)numPartitions;
- Text[] result=new Text[numPartitions-1];
- for(int i=1;i<numPartitions;++i)
- {
- result[i-1]=records.get(Math.round(stepSize*i));
- }
- return result;
- }
- }
-
- public static void writePartitionFile(JobConf conf, Path partFile) throws IOException
- {
- SamplerInputFormat inputFormat=new SamplerInputFormat();
- TextSampler sampler=new TextSampler();
- Text key =new Text();
- Text value=new Text();
-
- int partitions = conf.getNumReduceTasks(); //Reducer任务的个数
- long sampleSize = conf.getLong(SAMPLE_SIZE, 100); //采集数据-键值对的个数
- InputSplit[] splits=inputFormat.getSplits(conf, conf.getNumMapTasks());//获得数据分片
- int samples=Math.min(10, splits.length);//采集分片的个数
- long recordsPerSample = sampleSize / samples;//每个分片采集的键值对个数
- int sampleStep = splits.length / samples; //采集分片的步长
- long records = 0;
-
- for(int i=0;i<samples;i++)
- {
- RecordReader<Text, Text> reader=inputFormat.getRecordReader(splits[sampleStep*i], conf, null);
- while(reader.next(key, value))
- {
- sampler.addKey(key);
- records+=1;
- if((i+1)*recordsPerSample<=records)
- {
- break;
- }
- }
- }
- FileSystem outFs = partFile.getFileSystem(conf);
- if (outFs.exists(partFile)) {
- outFs.delete(partFile, false);
- }
- SequenceFile.Writer writer=SequenceFile.createWriter(outFs, conf, partFile, Text.class, NullWritable.class);
- NullWritable nullValue = NullWritable.get();
- for(Text split:sampler.createPartitions(partitions))
- {
- writer.append(split, nullValue);
- }
- writer.close();
-
- }
-
- static class TeraRecordReader implements RecordReader<Text,Text>
- {
-
- private LineRecordReader in;
- private LongWritable junk = new LongWritable();
- private Text line = new Text();
- private static int KEY_LENGTH = 10;
-
- public TeraRecordReader(Configuration job, FileSplit split) throws IOException
- {
- in = new LineRecordReader(job, split);
- }
-
- @Override
- public void close() throws IOException
- {
- in.close();
- }
-
- @Override
- public Text createKey()
- {
- // TODO Auto-generated method stub
- return new Text();
- }
-
- @Override
- public Text createValue()
- {
- return new Text();
- }
-
- @Override
- public long getPos() throws IOException
- {
- // TODO Auto-generated method stub
- return in.getPos();
- }
-
- @Override
- public float getProgress() throws IOException
- {
- // TODO Auto-generated method stub
- return in.getProgress();
- }
-
- @Override
- public boolean next(Text arg0, Text arg1) throws IOException
- {
- if(in.next(junk, line))
- {
- if(line.getLength()<KEY_LENGTH)
- {
- arg0.set(line);
- arg1.clear();
- }else{
- byte[] bytes=line.getBytes(); //默认知道读取要比较值的前10个字节 作为key 后面的字节作为value;
- arg0.set(bytes, 0,KEY_LENGTH);
- arg1.set(bytes,KEY_LENGTH, line.getLength()-KEY_LENGTH);
- }
- return true;
- }else {
- return false;
- }
- }
-
- }
- @Override
- public InputSplit[] getSplits(JobConf conf, int splits) throws IOException
- {
- if(conf==lastConf)
- {
- return lastResult;
- }
- lastConf=conf;
- lastResult=super.getSplits(lastConf, splits);
- return lastResult;
-
- }
- @Override
- public org.apache.hadoop.mapred.RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException
- {
- // TODO Auto-generated method stub
- return new TeraRecordReader(arg1,(FileSplit)arg0);
- }
-
- }
主函数:
- package com.zxx.smpler;
-
-
- public class SamplerSort extends Configured implements Tool
- {
- //自定义的Partitioner
- public static class TotalOrderPartitioner implements Partitioner<Text,Text>
- {
-
- private Text[] splitPoints;
-
- public TotalOrderPartitioner(){}
- @Override
- public int getPartition(Text arg0, Text arg1, int arg2)
- {
- // TODO Auto-generated method stub
- return findPartition(arg0);
- }
-
- @Override
- public void configure(JobConf arg0)
- {
- try
- {
- FileSystem fs = FileSystem.getLocal(arg0);
- Path partFile = new Path(SamplerInputFormat.PARTITION_FILENAME);
- splitPoints = readPartitions(fs, partFile, arg0); //读取采集文件
- } catch (IOException ie)
- {
- throw new IllegalArgumentException("can't read paritions file", ie);
- }
-
- }
-
- public int findPartition(Text key) //分配可以到多个reduce
- {
- int len=splitPoints.length;
- for(int i=0;i<len;i++)
- {
- int res =key.compareTo(splitPoints[i]);
- if(res>0&&i<len-1)
- {
- continue;
- }else if (res==0) {
- return i;
- }else if(res<0){
- return i;
- }else if (res>0&&i==len-1) {
- return i+1;
- }
- }
- return 0;
- }
-
- private static Text[] readPartitions(FileSystem fs, Path p, JobConf job) throws IOException
- {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
- List<Text> parts = new ArrayList<Text>();
- Text key=new Text();
- NullWritable value = NullWritable.get();
- while(reader.next(key,value))
- {
- parts.add(key);
- }
- reader.close();
- return parts.toArray(new Text[parts.size()]);
- }
-
- }
- @Override
- public int run(String[] args) throws Exception
- {
- JobConf job=(JobConf)getConf();
- Path inputDir = new Path(args[0]);
- inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
- Path partitionFile = new Path(inputDir, SamplerInputFormat.PARTITION_FILENAME);
-
- URI partitionUri = new URI(partitionFile.toString() +
- "#" + SamplerInputFormat.PARTITION_FILENAME);
-
- SamplerInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- job.setJobName("SamplerTotalSort");
- job.setJarByClass(SamplerSort.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setInputFormat(SamplerInputFormat.class);
- job.setOutputFormat(TextOutputFormat.class);
- job.setPartitionerClass(TotalOrderPartitioner.class);
- job.setNumReduceTasks(4);
-
- SamplerInputFormat.writePartitionFile(job, partitionFile); //数据采集并写入文件
-
- DistributedCache.addCacheFile(partitionUri, job); //将这个文件作为共享文件 提供给partition使用
- DistributedCache.createSymlink(job);
-
- //SamplerInputFormat.setFinalSync(job, true);
- JobClient.runJob(job);
- return 0;
- }
- public static void main(String[] args) throws Exception
- {
- int res = ToolRunner.run(new JobConf(), new SamplerSort(), args);
- System.exit(res);
- }
-
-
- }
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请
点击举报。