package org.myorg;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**摘{@link TextOutputFormat}碌ineRecordWriter隆拢 */
public class LineRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
public static class myOutput extends MultipleOutputFormat<Text, Text> {
@Override
protected String generateFileNameForKeyValue(Text key, Text value, TaskAttemptContext taskID) {
String mykey = key.toString();
String myValue = value.toString();
String tasknum = taskID.getTaskAttemptID().getTaskID().toString();
String fileNum = tasknum.substring(tasknum.length()-3);
String newname = mykey.substring(0,3);
return fileNum+newname;
}
}
目的:
根据输出数据的某些特征,分类输出到不同的文件夹下以便管理,而不是放在同一个文件夹下。
实现:
1、重写MultipleOutputFormat的某些方法,参考org.apache.hadoop.mapred.lib.MultipleOutputFormat,需要在程序中实现的子类方法是:
protected String generateFileNameForKeyValue(K key, V value, TaskAttemptContext job),即通过key和value及conf配置信息决定文件名
(含扩展名)。其中,需要改写的方法是最后一个方法:private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job,
String baseName),baseName 即为 在程序中重写的子类 generateFileNameForKeyValue 的返回值。
代码:
package org.myorg;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public abstract class MultipleOutputFormat<K extends WritableComparable, V extends Writable>
extends FileOutputFormat<K, V> {
private MultiRecordWriter writer = null;
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException {
if (writer == null) {
writer = new MultiRecordWriter(job, getTaskOutputPath(job));
}
return writer;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPath = super.getOutputPath(conf);
if (outputPath == null) {
throw new IOException("Undefined job output-path");
}
workPath = outputPath;
}
return workPath;
}
protected abstract String generateFileNameForKeyValue(K key, V value, TaskAttemptContext job);//Configuration conf);
public class MultiRecordWriter extends RecordWriter<K, V> {
private HashMap<String, RecordWriter<K, V>> recordWriters = null;
private TaskAttemptContext job = null;
private Path workPath = null;
public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
super();
this.job = job;
this.workPath = workPath;
recordWriters = new HashMap<String, RecordWriter<K, V>>();
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
while (values.hasNext()) {
values.next().close(context);
}
this.recordWriters.clear();
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
String baseName = generateFileNameForKeyValue(key, value, job);//job.getConfiguration());
RecordWriter<K, V> rw = this.recordWriters.get(baseName);
if (rw == null) {
rw = getBaseRecordWriter(job, baseName);
this.recordWriters.put(baseName, rw);
}
//LongWritable keys=(LongWritable)key;
//long ret=keys.get()>>1;
//keys.set(ret);
rw.write(key, value);//change
}
private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = "\t"; //change
String pathname=baseName.substring(12); //change
RecordWriter<K, V> recordWriter = null;
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class);
//String pathname=baseName.substring(12);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
Path file = new Path(workPath+"/"+pathname, baseName.substring(0,11) + codec.getDefaultExtension()); //change
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec
.createOutputStream(fileOut)), keyValueSeparator);
} else {
Path file = new Path(workPath+"/"+pathname, baseName.substring(0,11)); //change
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
}
return recordWriter;
}
}
}
2、把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共类使用。RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,protected访问权限,因此普通程序无法访问。
代码如下:
3、在主程序中加载generateFileNameForKeyValue方法:
在main函数中需添加 job.setOutputFormatClass(myOutput.class);
更多信息请查看 java进阶网 http://www.javady.com
|