本文完成对hadoop输入、输出文件方式的控制,完成的功能如下:
1、改写map读取数据的格式:默认的<文件偏移量,行内容>----------->变为<文件名,文件内容>
2、改写输出的格式,输出文件时每个输入文件对应一个输出文件,输出文件的名字跟输入文件名字相同。
直接上代码:
coAuInputFormat
package an.hadoop.code.audit;/** * The function of this class is revise the input format * the <key,value > ---> map * <path,content> of the map * */import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class coAuInputFormat extends FileInputFormat<Text, Text>{ private CompressionCodecFactory compressionCodecs = null; public void configure(Configuration conf) { compressionCodecs = new CompressionCodecFactory(conf); } /** * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理 * * @param fs * @param file * * @return false */ protected boolean isSplitable(FileSystem fs, Path file) { CompressionCodec codec = compressionCodecs.getCodec(file); return false;//以文件为单位,每个单位作为一个split,即使单个文件的大小超过了64M,也就是Hadoop一个块得大小,也不进行分片 } @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new coAuRecordReader(context, split); }}
coAuRecordReader
package an.hadoop.code.audit;import java.io.IOException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class coAuRecordReader extends RecordReader<Text, Text> { private static final Log LOG = LogFactory.getLog(coAuRecordReader.class.getName()); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private byte[] buffer; private String keyName; private FSDataInputStream fileIn; private Text key = null; private Text value = null; public coAuRecordReader(TaskAttemptContext context, InputSplit genericSplit) throws IOException { // TODO Auto-generated constructor stub Configuration job = context.getConfiguration(); FileSplit split = (FileSplit) genericSplit; start = ((FileSplit) split).getStart(); //从中可以看出每个文件是作为一个split的 end = split.getLength() + start; final Path path = split.getPath();// keyName = path.toString();//key 的值是文件路径 LOG.info("filename in hdfs is : " + keyName);//写入日志文件,去哪里查看日志呢? final FileSystem fs = path.getFileSystem(job); fileIn = fs.open(path); fileIn.seek(start); buffer = new byte[(int)(end - start)]; this.pos = start; /*if(key == null){ key = new Text(); key.set(keyName); } if(value == null){ value = new Text(); value.set(utf8); }*/ }//coAuRecordReader() @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); //this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); keyName = file.toString();//key 的值是文件路径 LOG.info("filename in hdfs is : " + keyName);//写入日志文件,去哪里查看日志呢? final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); fileIn.seek(start); buffer = new byte[(int)(end - start)]; this.pos = start; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub //这个是需要做的 if(key == null){ key = new Text(); } key.set(keyName); if(value == null){ value = new Text(); } key.clear(); key.set(keyName);// set the key value.clear();//clear the value while(pos < end){ fileIn.readFully(pos,buffer); value.set(buffer); pos += buffer.length; LOG.info("end is : " + end + " pos is : " + pos); return true; } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float)(end - start)); } } @Override public void close() throws IOException { // TODO Auto-generated method stub if (fileIn != null) { fileIn.close(); } } }
coAuOutputFormat
package an.hadoop.code.audit;/** * the name of the output file name * * */import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;public class coAuOutputFormat extends MultipleOutputFormat<Text, Text> { private final static String suffix = "_its4"; @Override protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf) { // TODO Auto-generated method stub String path = key.toString(); //文件的路径及名字 String[] dir = path.split("/"); int length = dir.length; String filename = dir[length -1]; return filename + suffix;//输出的文件名,输出的文件名 } }
MultipleOutputFormat
package an.hadoop.code.audit;/** * the mutiply * */import java.io.DataOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> { //默认的是TextOutputFormat private MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job));//job ,output path } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {//获得输出路径 Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) {//如果是 workPath = ((FileOutputCommitter) committer).getWorkPath();//工作路径 } else { Path outputPath = super.getOutputPath(conf);//获得conf路径 if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; // } /**通过key, value, conf来确定输出文件名(含扩展名)*/ protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);//抽象方法,被之后的方法重写了 public class MultiRecordWriter extends RecordWriter<K, V> { /**RecordWriter的缓存*/ private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; /**输出目录*/ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) {//构造函数 super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {//多个writer都要关掉 Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { //得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());//生成输出文件名 RecordWriter<K, V> rw = this.recordWriters.get(baseName);//?? if (rw == null) { rw = getBaseRecordWriter(job, baseName);// this.recordWriters.put(baseName, rw); } rw.write(key, value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);//file 是指的file name of the output file recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);//这里调用的LineRecordWriter } return recordWriter; } }}
LineRecordWriter
package an.hadoop.code.audit;/*RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在, * protected访问权限,因此普通程序无法访问。这里仅仅是把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共*/import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/**摘自{@link TextOutputFormat}中的LineRecordWriter。 */public class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8);// 相当与分隔符 } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "/t");//"/t"默认的分隔符 } private void writeObject(Object o) throws IOException {//被write函数调用 if (o instanceof Text) {// Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength());//将指定 byte 数组中从偏移量 off 开始的 len 个字节写入基础输出流 } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException {//这个要修改成 只是写成一个文件的格式, boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable;//重点是要改写Key,value,之类,value是一个文本,key是地址,这里不写入key了 if (nullKey && nullValue) { return; } /*if (!nullKey) {//这个可以控制是否写入key,seperate and value writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); }*/ if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); }}
CodeAudit
package an.hadoop.code.audit;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class CodeAudit { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: code audit <in> <out>"); System.exit(2); } Job job = new Job(conf, "code audit"); job.setJarByClass(CodeAudit.class); job.setMapperClass(coAuMapper.class); job.setInputFormatClass(coAuInputFormat.class); //job.setOutputKeyClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(coAuOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
联系客服