打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Hadoop 文件输入和文件输出 | 学步园

本文完成对hadoop输入、输出文件方式的控制,完成的功能如下:

1、改写map读取数据的格式:默认的<文件偏移量,行内容>----------->变为<文件名,文件内容>

2、改写输出的格式,输出文件时每个输入文件对应一个输出文件,输出文件的名字跟输入文件名字相同。

直接上代码

coAuInputFormat

package an.hadoop.code.audit;/** * The function of this class is revise the input format  * the <key,value > ---> map * <path,content> of the map * */import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class coAuInputFormat extends FileInputFormat<Text, Text>{	private CompressionCodecFactory compressionCodecs = null;	public void configure(Configuration conf) {		compressionCodecs = new CompressionCodecFactory(conf);	}		/**	 * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理	 *	 * @param fs	 * @param file	 *	 * @return false	 */	protected boolean isSplitable(FileSystem fs, Path file) {		CompressionCodec codec = compressionCodecs.getCodec(file);		return false;//以文件为单位,每个单位作为一个split,即使单个文件的大小超过了64M,也就是Hadoop一个块得大小,也不进行分片	}	@Override	public RecordReader<Text, Text> createRecordReader(InputSplit split,			TaskAttemptContext context) throws IOException,			InterruptedException {		// TODO Auto-generated method stub		return new coAuRecordReader(context, split);	}}

coAuRecordReader

package an.hadoop.code.audit;import java.io.IOException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class coAuRecordReader extends RecordReader<Text, Text> {		private static final Log LOG = LogFactory.getLog(coAuRecordReader.class.getName());	private CompressionCodecFactory compressionCodecs = null;	private long start;	private long pos;	private long end;	private byte[] buffer;	private String keyName;	private FSDataInputStream fileIn;	private Text key = null;    private Text value = null;	public coAuRecordReader(TaskAttemptContext context, InputSplit genericSplit) throws IOException {		// TODO Auto-generated constructor stub				Configuration job = context.getConfiguration();		FileSplit split = (FileSplit) genericSplit;		start = ((FileSplit) split).getStart(); //从中可以看出每个文件是作为一个split的		end = split.getLength() + start;		final Path path = split.getPath();//		keyName = path.toString();//key 的值是文件路径		LOG.info("filename in hdfs is : " + keyName);//写入日志文件,去哪里查看日志呢?		final FileSystem fs = path.getFileSystem(job);		fileIn = fs.open(path);		fileIn.seek(start);		buffer = new byte[(int)(end - start)];		this.pos = start;		/*if(key == null){			key = new Text();			key.set(keyName);		}		if(value == null){			value = new Text();			value.set(utf8);		}*/					}//coAuRecordReader()		@Override	public void initialize(InputSplit genericSplit, TaskAttemptContext context)			throws IOException, InterruptedException {		// TODO Auto-generated method stub		FileSplit split = (FileSplit) genericSplit;	    Configuration job = context.getConfiguration();	    //this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);	    start = split.getStart();	    end = start + split.getLength();	    final Path file = split.getPath();	    compressionCodecs = new CompressionCodecFactory(job);	    final CompressionCodec codec = compressionCodecs.getCodec(file);	    keyName = file.toString();//key 的值是文件路径		LOG.info("filename in hdfs is : " + keyName);//写入日志文件,去哪里查看日志呢?		final FileSystem fs = file.getFileSystem(job);		fileIn = fs.open(file);		fileIn.seek(start);		buffer = new byte[(int)(end - start)];		this.pos = start;					}	@Override	public boolean nextKeyValue() throws IOException, InterruptedException {		// TODO Auto-generated method stub		//这个是需要做的				if(key == null){			key = new Text();		}		key.set(keyName);		if(value == null){			value = new Text();		}		key.clear();		key.set(keyName);// set the key		value.clear();//clear the value		while(pos < end){			fileIn.readFully(pos,buffer);			value.set(buffer);						pos += buffer.length;			LOG.info("end is : " + end  + " pos is : " + pos);			return true;		}						return false;	}	@Override	public Text getCurrentKey() throws IOException, InterruptedException {		// TODO Auto-generated method stub		return key;	}	@Override	public Text getCurrentValue() throws IOException, InterruptedException {		// TODO Auto-generated method stub		return value;	}	@Override	public float getProgress() throws IOException, InterruptedException {		// TODO Auto-generated method stub		if (start == end) {			return 0.0f;		} else {			return Math.min(1.0f, (pos - start) / (float)(end - start));		}	}	@Override	public void close() throws IOException {		// TODO Auto-generated method stub		if (fileIn != null) {	        fileIn.close(); 	    }	}	}

coAuOutputFormat

package an.hadoop.code.audit;/** * the name of the output file name *  * */import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;public class coAuOutputFormat extends MultipleOutputFormat<Text, Text> {		private final static String suffix = "_its4";		@Override	protected String generateFileNameForKeyValue(Text key, Text value,			Configuration conf) {		// TODO Auto-generated method stub		String path =  key.toString(); //文件的路径及名字		String[] dir = path.split("/");				int length = dir.length; 		String filename = dir[length -1];		return filename + suffix;//输出的文件名,输出的文件名	}	}

MultipleOutputFormat

package an.hadoop.code.audit;/** * the mutiply  * */import java.io.DataOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>		extends FileOutputFormat<K, V> { //默认的是TextOutputFormat	private MultiRecordWriter writer = null;	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,			InterruptedException {		if (writer == null) {			writer = new MultiRecordWriter(job, getTaskOutputPath(job));//job ,output path		}		return writer;	}	private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {//获得输出路径		Path workPath = null;		OutputCommitter committer = super.getOutputCommitter(conf);		if (committer instanceof FileOutputCommitter) {//如果是			workPath = ((FileOutputCommitter) committer).getWorkPath();//工作路径		} else {			Path outputPath = super.getOutputPath(conf);//获得conf路径			if (outputPath == null) {				throw new IOException("Undefined job output-path");			}			workPath = outputPath;		}		return workPath; //	}	/**通过key, value, conf来确定输出文件名(含扩展名)*/	protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);//抽象方法,被之后的方法重写了	public class MultiRecordWriter extends RecordWriter<K, V> {		/**RecordWriter的缓存*/		private HashMap<String, RecordWriter<K, V>> recordWriters = null;		private TaskAttemptContext job = null;		/**输出目录*/		private Path workPath = null;		public MultiRecordWriter(TaskAttemptContext job, Path workPath) {//构造函数			super();			this.job = job;			this.workPath = workPath;			recordWriters = new HashMap<String, RecordWriter<K, V>>();		}		@Override		public void close(TaskAttemptContext context) throws IOException, InterruptedException {//多个writer都要关掉			Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();			while (values.hasNext()) {				values.next().close(context);			}			this.recordWriters.clear();		}		@Override		public void write(K key, V value) throws IOException, InterruptedException {			//得到输出文件名			String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());//生成输出文件名			RecordWriter<K, V> rw = this.recordWriters.get(baseName);//??			if (rw == null) {				rw = getBaseRecordWriter(job, baseName);//				this.recordWriters.put(baseName, rw);			}			rw.write(key, value);		}		// ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}		private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)				throws IOException, InterruptedException {			Configuration conf = job.getConfiguration();			boolean isCompressed = getCompressOutput(job);			String keyValueSeparator = ",";			RecordWriter<K, V> recordWriter = null;			if (isCompressed) {				Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,						GzipCodec.class);				CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);				Path file = new Path(workPath, baseName + codec.getDefaultExtension());				FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);				recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec						.createOutputStream(fileOut)), keyValueSeparator);			} else {				Path file = new Path(workPath, baseName);				FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);//file 是指的file name of the output file				recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);//这里调用的LineRecordWriter			}			return recordWriter;		}	}}

LineRecordWriter

package an.hadoop.code.audit;/*RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在, * protected访问权限,因此普通程序无法访问。这里仅仅是把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共*/import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/**摘自{@link TextOutputFormat}中的LineRecordWriter。 */public class LineRecordWriter<K, V> extends RecordWriter<K, V> {	private static final String utf8 = "UTF-8";	private static final byte[] newline;	static {		try {			newline = "\n".getBytes(utf8);// 相当与分隔符		} catch (UnsupportedEncodingException uee) {			throw new IllegalArgumentException("can't find " + utf8 + " encoding");		}	}	protected DataOutputStream out;	private final byte[] keyValueSeparator;	public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {		this.out = out;		try {			this.keyValueSeparator = keyValueSeparator.getBytes(utf8);		} catch (UnsupportedEncodingException uee) {			throw new IllegalArgumentException("can't find " + utf8 + " encoding");		}	}	public LineRecordWriter(DataOutputStream out) {		this(out, "/t");//"/t"默认的分隔符	}	private void writeObject(Object o) throws IOException {//被write函数调用		if (o instanceof Text) {//			Text to = (Text) o;			out.write(to.getBytes(), 0, to.getLength());//将指定 byte 数组中从偏移量 off 开始的 len 个字节写入基础输出流		} else {			out.write(o.toString().getBytes(utf8));		}	}	public synchronized void write(K key, V value) throws IOException {//这个要修改成 只是写成一个文件的格式,		boolean nullKey = key == null || key instanceof NullWritable;		boolean nullValue = value == null || value instanceof NullWritable;//重点是要改写Key,value,之类,value是一个文本,key是地址,这里不写入key了		if (nullKey && nullValue) {			return;		}		/*if (!nullKey) {//这个可以控制是否写入key,seperate and value			writeObject(key);		}		if (!(nullKey || nullValue)) {			out.write(keyValueSeparator);		}*/		if (!nullValue) {			writeObject(value);		}		out.write(newline);	}	public synchronized void close(TaskAttemptContext context) throws IOException {		out.close();	}}

CodeAudit

package an.hadoop.code.audit;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class CodeAudit {	public static void main(String[] args) throws Exception {	    Configuration conf = new Configuration();	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();	    if (otherArgs.length != 2) {	      System.err.println("Usage: code audit <in> <out>");	      System.exit(2);	    }	    Job job = new Job(conf, "code audit");	    job.setJarByClass(CodeAudit.class);	    job.setMapperClass(coAuMapper.class);	    job.setInputFormatClass(coAuInputFormat.class);	    //job.setOutputKeyClass(NullWritable.class);	    job.setOutputKeyClass(Text.class);	    job.setOutputValueClass(Text.class);	    	    job.setOutputFormatClass(coAuOutputFormat.class);	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));	    System.exit(job.waitForCompletion(true) ? 0 : 1);	  }}
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
hadoop简单实现文本数据全局排序
MapReduce编程之通过MapReduce读取数据,往Hbase中写数据
Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat
Hadoop学习之路(6)MapReduce自定义分区实现
MapReduce简单实例:wordcount--大数据纪录片第五记
Hadoop MapReduce新旧API区别
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服