xxqyzsc 发表于 2017-12-18 13:18:26

Hadoop MapReduce编程 API入门系列之MapReduce多种输入格式(十七)

1 package zhouls.bigdata.myMapReduce.ScoreCount;  

2  
3 import java.io.IOException;
  
4 import org.apache.hadoop.conf.Configuration;
  
5 import org.apache.hadoop.fs.FSDataInputStream;
  
6 import org.apache.hadoop.fs.FileSystem;
  
7 import org.apache.hadoop.fs.Path;
  
8 import org.apache.hadoop.io.Text;
  
9 import org.apache.hadoop.mapreduce.InputSplit;
  
10 import org.apache.hadoop.mapreduce.JobContext;
  
11 import org.apache.hadoop.mapreduce.RecordReader;
  
12 import org.apache.hadoop.mapreduce.TaskAttemptContext;
  
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  
15 import org.apache.hadoop.util.LineReader;
  
16 /**
  
17 * 自定义学生成绩读写InputFormat
  
18 * 数据格式参考:19020090017 小讲 90 99 100 89 95
  
19 * @author Bertron
  
20 */
  
21
  
22             //其实这个程序,就是在实现InputFormat接口,TVPlayInputFormat是InputFormat接口的实现类
  
23             //比如   ScoreInputFormatextends FileInputFormat implements InputFormat。
  
24
  
25             //问:自定义输入格式 ScoreInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。
  
26

  
27 public>  
28
  
29 //    线路是: booleanisSplitable()   ->   RecordReader<Text,ScoreWritable> createRecordReader()   ->   ScoreRecordReader extends RecordReader<Text, ScoreWritable >
  
30   
  
31   @Override
  
32   protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法
  
33             //isSplitable方法就是是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。
  
34 //      如果不允许分割,则isSplitable==false,则将第一个block、文件目录、开始位置为0,长度为整个文件的长度封装到一个InputSplit,加入splits中
  
35 //      如果文件长度不为0且支持分割,则isSplitable==true,获取block大小,默认是64MB
  
36         return false;    //整个文件封装到一个InputSplit
  
37         //要么就是return true;      //切分64MB大小的一块一块,再封装到InputSplit
  
38   }
  
39   
  
40   @Override
  
41   public RecordReader<Text,ScoreWritable> createRecordReader(InputSplit inputsplit,TaskAttemptContext context) throws IOException, InterruptedException {
  
42 //      RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装
  
43 //      createRecordReader是方法,在这里是,ScoreInputFormat.createRecordReader。ScoreInputFormat是InputFormat类的实例
  
44 //      InputSplit input和TaskAttemptContext context是传入参数
  
45         
  
46 //      isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit
  
47 //      isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit
  
48         
  
49         //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类ScoreRecordReader。
  
50         //类似与Excel、WeiBo、TVPlayData代码写法
  
51         return new ScoreRecordReader();//新建一个ScoreRecordReader实例,所有才有了上面RecordReader<Text,ScoreWritable>,所以才如下ScoreRecordReader,写我们自己的
  
52   }
  
53   
  
54   
  
55   //RecordReader中的两个参数分别填写我们期望返回的key/value类型,我们期望key为Text类型,value为ScoreWritable类型封装学生所有成绩

  
56   public static>  
57         public LineReader in;//行读取器
  
58         public Text line;//每行数据类型
  
59         public Text lineKey;//自定义key类型,即k1
  
60         public ScoreWritable lineValue;//自定义value类型,即v1
  
61         
  
62         @Override
  
63         public void close() throws IOException {//关闭输入流
  
64             if(in !=null){
  
65               in.close();
  
66             }
  
67         }
  
68         @Override
  
69         public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey
  
70             return lineKey;//返回类型是Text,即Text lineKey
  
71         }
  
72         @Override
  
73         public ScoreWritable getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue
  
74             return lineValue;//返回类型是ScoreWritable,即ScoreWritable lineValue
  
75         }
  
76         @Override
  
77         public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress
  
78             return 0;//返回类型是float,即float 0
  
79         }
  
80         @Override
  
81         public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {//初始化,都是模板
  
82             FileSplit split=(FileSplit)input;
  
83             Configuration job=context.getConfiguration();
  
84             Path file=split.getPath();
  
85             FileSystem fs=file.getFileSystem(job);
  
86            
  
87             FSDataInputStream filein=fs.open(file);
  
88             in=new LineReader(filein,job);//输入流in
  
89             line=new Text();//每行数据类型
  
90             lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key
  
91             lineValue = new ScoreWritable();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value
  
92         }
  
93         
  
94         //此方法读取每行数据,完成自定义的key和value
  
95         @Override
  
96         public boolean nextKeyValue() throws IOException, InterruptedException {//这里面,才是篡改的重点
  
97             int linesize=in.readLine(line);//line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。
  
98            
  
99 //            是SplitLineReader.readLine->SplitLineReaderextends   LineReader->org.apache.hadoop.util.LineReader
  
100            
  
101 //            in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾
  
102 //            in.readLine(str, maxLineLength)//只读到maxLineLength行
  
103 //            in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值
  
104
  
105             if(linesize==0) return false;
  
106            
  
107            
  
108             String[] pieces = line.toString().split("\\s+");//解析每行数据
  
109                     //因为,我们这里是。默认读完读到文件末尾。line是Text类型。pieces是String[],即String数组。
  
110            
  
111             if(pieces.length != 7){
  
112               throw new IOException("Invalid record received");
  
113             }
  
114             //将学生的每门成绩转换为 float 类型
  
115             float a,b,c,d,e;
  
116             try{
  
117               a = Float.parseFloat(pieces.trim());//将String类型,如pieces转换成,float类型,给a
  
118               b = Float.parseFloat(pieces.trim());
  
119               c = Float.parseFloat(pieces.trim());
  
120               d = Float.parseFloat(pieces.trim());
  
121               e = Float.parseFloat(pieces.trim());
  
122             }catch(NumberFormatException nfe){
  
123               throw new IOException("Error parsing floating poing value in record");
  
124             }
  
125             lineKey.set(pieces+"\t"+pieces);//完成自定义key数据
  
126             lineValue.set(a, b, c, d, e);//封装自定义value数据
  
127 //            或者写
  
128 //            lineValue.set(Float.parseFloat(pieces.trim()),Float.parseFloat(pieces.trim()),Float.parseFloat(pieces.trim()),
  
129 //                  Float.parseFloat(pieces.trim()),Float.parseFloat(pieces.trim()));
  
130            
  
131 //            pieces   pieces pieces... pieces
  
132 //            19020090040 秦心芯 123 131 100 95 100
  
133 //            19020090006 李磊 99 92 100 90 100
  
134 //            19020090017 唐一建 90 99 100 89 95
  
135 //            19020090031 曾丽丽 100 99 97 79 96
  
136 //            19020090013 罗开俊 105 115 94 45 100
  
137 //            19020090039 周世海 114 116 93 31 97
  
138 //            19020090020 王正伟 109 98 88 47 99
  
139 //            19020090025 谢瑞彬 94 120 100 50 73
  
140 //            19020090007 于微 89 78 100 66 99
  
141 //            19020090012 刘小利 87 82 89 71 99
  
142            
  
143            
  
144            
  
145             return true;
  
146         }      
  
147   }
  
148 }
页: [1]
查看完整版本: Hadoop MapReduce编程 API入门系列之MapReduce多种输入格式(十七)