111 发表于 2018-10-29 12:06:22

【大数据】Hadoop 成长之路 (二) 利用Hadoop分析气象数据集

1.下载气象数据集  
    wget -r -c
  
    查看数据集
  
    # ls
  
    010030-99999-2008.gz010231-99999-2008.gz010460-99999-2008.gz010570-99999-2008.gz010881-99999-2008.gz
  
    010070-99999-2008.gz010260-99999-2008.gz010490-99999-2008.gz010750-99999-2008.gz010883-99999-2008.gz
  
    010150-99999-2008.gz010330-99999-2008.gz010520-99999-2008.gz010780-99999-2008.gz010890-99999-2008.gz
  
    010230-99999-2008.gz010450-99999-2008.gz010550-99999-2008.gz010830-99999-2008.gz
  

  
2.将数据解压并导入到example文件中
  
    # zcat *.gz > example
  
    查看文件是否正确
  
    # tail -10 example
  
    0101010980999992008031013004+70367+031100FM-12+001599999V0201801N006019999999N9999999N1-00081-00291099591ADDMA1999999099411MD1710101+9999REMSYN060AAXX10131 01098 46/// /1806 11008 21029 39941 49959 57010;
  
    0101010980999992008031014004+70367+031100FM-12+001599999V0201901N006019999999N9999999N1-00071-00241099601ADDMA1999999099411MD1710051+9999REMSYN060AAXX10141 01098 46/// /1906 11007 21024 39941 49960 57005;
  
    0171010980999992008031015004+70367+031100FM-12+001599999V0202201N004010042019N0060001N1-00151-00261099611ADDAY171031AY221031GF107991061071004501021999MA1999999099431MD1510021+9999MW1221REMSYN082AAXX10151 01098 41456 72204 11015 21026 39943 49961 55002 72272 8672/ 333 4////;
  
    0101010980999992008031016004+70367+031100FM-12+001599999V0202101N005019999999N9999999N1-00121-00211099581ADDMA1999999099401MD1010011+9999REMSYN060AAXX10161 01098 46/// /2105 11012 21021 39940 49958 50001;
  
    0101010980999992008031017004+70367+031100FM-12+001599999V0202201N004019999999N9999999N1-00131-00231099591ADDMA1999999099411MD1410001+9999REMSYN060AAXX10171 01098 46/// /2204 11013 21023 39941 49959 54000;
  
    0213010980999992008031018004+70367+031100FM-12+001599999V0201901N004010042019N0150001N1-00061-00151099601ADDAA112000021AY171061AY221061GF107991051071004501021999KA1120M-00061MA1999999099421MD1510011+9999MW1701REMSYN100AAXX10181 01098 11465 71904 11006 21015 39942 49960 55001 69912 77072 8572/ 333 11006 4//// 91107;
  
    0101010980999992008031019004+70367+031100FM-12+001599999V0201901N006019999999N9999999N1+00001-00101099591ADDMA1999999099411MD1210011+9999REMSYN060AAXX10191 01098 46/// /1906 10000 21010 39941 49959 52001;
  
    0101010980999992008031020004+70367+031100FM-12+001599999V0201801N006019999999N9999999N1+00041-00091099621ADDMA1999999099441MD1210031+9999REMSYN060AAXX10201 01098 46/// /1806 10004 21009 39944 49962 52003;
  
    0171010980999992008031021004+70367+031100FM-12+001599999V0201901N005010042019N0300001N1+00071-00061099621ADDAY171031AY221031GF107991071071004501999999MA1999999099441MD1210021+9999MW1021REMSYN082AAXX10211 01098 41480 71905 10007 21006 39944 49962 52002 70272 877// 333 4////;
  
    0101010980999992008031022004+70367+031100FM-12+001599999V0201901N005019999999N9999999N1+00091-0004
  

  
3.将数据集导入到hadoop的in目录的test文件中
  
    /root/hadoop-1.1.2/bin/hadoop fs -put ./example ./in/test
  

  
4.编写MapReduce程序
  
    cd /root/hadoop-1.1.2/myclass
  
    # cat MaxTemperature.java
  
    // cc MaxTemperature Application to find the maximum temperature in the weather dataset
  
    // vv MaxTemperature
  
    import org.apache.hadoop.fs.Path;
  
    import org.apache.hadoop.io.IntWritable;
  
    import org.apache.hadoop.io.Text;
  
    import org.apache.hadoop.mapreduce.Job;
  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  

  
    public class MaxTemperature {
  

  
      public static void main(String[] args) throws Exception {
  
      if (args.length != 2) {
  
          System.err.println("Usage: MaxTemperature");
  
          System.exit(-1);
  
      }
  

  
      Job job = new Job();
  
      job.setJarByClass(MaxTemperature.class);
  
      job.setJobName("Max temperature");
  

  
      FileInputFormat.addInputPath(job, new Path(args));
  
      FileOutputFormat.setOutputPath(job, new Path(args));
  

  
      job.setMapperClass(MaxTemperatureMapper.class);
  
      job.setReducerClass(MaxTemperatureReducer.class);
  

  
      job.setOutputKeyClass(Text.class);
  
      job.setOutputValueClass(IntWritable.class);
  

  
      System.exit(job.waitForCompletion(true) ? 0 : 1);
  
      }
  
    }
  
    // ^^ MaxTemperature
  

  
    # cat MaxTemperatureMapper.java
  
    // cc MaxTemperatureMapper Mapper for maximum temperature example
  
    // vv MaxTemperatureMapper
  
    import java.io.IOException;
  

  
    import org.apache.hadoop.io.IntWritable;
  
    import org.apache.hadoop.io.LongWritable;
  
    import org.apache.hadoop.io.Text;
  
    import org.apache.hadoop.mapreduce.Mapper;
  

  
    public class MaxTemperatureMapper
  
      extends Mapper {
  

  
      private static final int MISSING = 9999;
  

  
      @Override
  
      public void map(LongWritable key, Text value, Context context)
  
          throws IOException, InterruptedException {
  

  
      String line = value.toString();
  
      String year = line.substring(15, 19);
  
      int airTemperature;
  
      if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
  
          airTemperature = Integer.parseInt(line.substring(88, 92));
  
      } else {
  
          airTemperature = Integer.parseInt(line.substring(87, 92));
  
      }
  
      String quality = line.substring(92, 93);
  
      if (airTemperature != MISSING && quality.matches("")) {
  
          context.write(new Text(year), new IntWritable(airTemperature));
  
      }
  
      }
  
    }
  
    // ^^ MaxTemperatureMapper
  

  
    # cat MaxTemperatureMapper.java
  
    // cc MaxTemperatureMapper Mapper for maximum temperature example
  
    // vv MaxTemperatureMapper
  
    import java.io.IOException;
  

  
    import org.apache.hadoop.io.IntWritable;
  
    import org.apache.hadoop.io.LongWritable;
  
    import org.apache.hadoop.io.Text;
  
    import org.apache.hadoop.mapreduce.Mapper;
  

  
    public class MaxTemperatureMapper
  
      extends Mapper {
  

  
      private static final int MISSING = 9999;
  

  
      @Override
  
      public void map(LongWritable key, Text value, Context context)
  
          throws IOException, InterruptedException {
  

  
      String line = value.toString();
  
      String year = line.substring(15, 19);
  
      int airTemperature;
  
      if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
  
          airTemperature = Integer.parseInt(line.substring(88, 92));
  
      } else {
  
          airTemperature = Integer.parseInt(line.substring(87, 92));
  
      }
  
      String quality = line.substring(92, 93);
  
      if (airTemperature != MISSING && quality.matches("")) {
  
          context.write(new Text(year), new IntWritable(airTemperature));
  
      }
  
      }
  
    }
  
    // ^^ MaxTemperatureMapper
  

  
    # cat MaxTemperatureReducer.java
  
    // cc MaxTemperatureReducer Reducer for maximum temperature example
  
    // vv MaxTemperatureReducer
  
    import java.io.IOException;
  

  
    import org.apache.hadoop.io.IntWritable;
  
    import org.apache.hadoop.io.Text;
  
    import org.apache.hadoop.mapreduce.Reducer;
  

  
    public class MaxTemperatureReducer
  
      extends Reducer {
  

  
      @Override
  
      public void reduce(Text key, Iterable values,
  
          Context context)
  
          throws IOException, InterruptedException {
  

  
      int maxValue = Integer.MIN_VALUE;
  
      for (IntWritable value : values) {
  
          maxValue = Math.max(maxValue, value.get());
  
      }
  
      context.write(key, new IntWritable(maxValue));
  
      }
  
    }
  
    // ^^ MaxTemperatureReducer
  

  
4.编译程序
  
    javac -classpath ../hadoop-core-1.1.2.jar *.java
  
    查看
  
    # ls
  
    MaxTemperature.classMaxTemperatureMapper.classMaxTemperatureReducer.class
  
    MaxTemperature.java   MaxTemperatureMapper.java   MaxTemperatureReducer.java
  

  
5.制作jar包
  
    # jar cvf ../MaxTemperature.jar *.class
  
    added manifest
  
    adding: MaxTemperature.class(in = 1413) (out= 799)(deflated 43%)
  
    adding: MaxTemperatureMapper.class(in = 1876) (out= 805)(deflated 57%)
  
    adding: MaxTemperatureReducer.class(in = 1687) (out= 717)(deflated 57%)
  
    删除类文件:rm -rf *.class
  

  
6.运行程序
  
    分析上述导入文件至in/test的数据集,并将分析结果导出到./out_result中
  
    # ./bin/hadoop jar MaxTemperature.jar MaxTemperature ./in/test ./out_result
  
    16/07/13 23:07:54 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
  
    16/07/13 23:07:55 INFO input.FileInputFormat: Total input paths to process : 1
  
    16/07/13 23:07:55 INFO util.NativeCodeLoader: Loaded the native-hadoop library
  
    16/07/13 23:07:55 WARN snappy.LoadSnappy: Snappy native library not loaded
  
    16/07/13 23:07:59 INFO mapred.JobClient: Running job: job_201607131558_0001
  
    16/07/13 23:08:00 INFO mapred.JobClient:map 0% reduce 0%
  
    16/07/13 23:08:23 INFO mapred.JobClient:map 100% reduce 0%
  
    16/07/13 23:08:38 INFO mapred.JobClient:map 100% reduce 100%
  
    16/07/13 23:08:40 INFO mapred.JobClient: Job complete: job_201607131558_0001
  
    16/07/13 23:08:40 INFO mapred.JobClient: Counters: 29
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Map-Reduce Framework
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Spilled Records=300506
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Map output materialized bytes=1652789
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Reduce input records=150253
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Virtual memory (bytes) snapshot=3868651520
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Map input records=150656
  
    16/07/13 23:08:40 INFO mapred.JobClient:   SPLIT_RAW_BYTES=108
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Map output bytes=1352277
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Reduce shuffle bytes=1652789
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Physical memory (bytes) snapshot=295931904
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Reduce input groups=1
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Combine output records=0
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Reduce output records=1
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Map output records=150253
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Combine input records=0
  
    16/07/13 23:08:40 INFO mapred.JobClient:   CPU time spent (ms)=12220
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Total committed heap usage (bytes)=177016832
  
    16/07/13 23:08:40 INFO mapred.JobClient:   File Input Format Counters
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Bytes Read=35197493
  
    16/07/13 23:08:40 INFO mapred.JobClient:   FileSystemCounters
  
    16/07/13 23:08:40 INFO mapred.JobClient:   HDFS_BYTES_READ=35197601
  
    16/07/13 23:08:40 INFO mapred.JobClient:   FILE_BYTES_WRITTEN=3409028
  
    16/07/13 23:08:40 INFO mapred.JobClient:   FILE_BYTES_READ=1652789
  
    16/07/13 23:08:40 INFO mapred.JobClient:   HDFS_BYTES_WRITTEN=9
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Job Counters
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Launched map tasks=1
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Launched reduce tasks=1
  
    16/07/13 23:08:40 INFO mapred.JobClient:   SLOTS_MILLIS_REDUCES=12976
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Total time spent by all reduces waiting after reserving slots (ms)=0
  
    16/07/13 23:08:40 INFO mapred.JobClient:   SLOTS_MILLIS_MAPS=18769
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Total time spent by all maps waiting after reserving slots (ms)=0
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Data-local map tasks=1
  
    16/07/13 23:08:40 INFO mapred.JobClient:   File Output Format Counters
  
    16/07/13 23:08:40 INFO mapred.JobClient:   Bytes Written=9
  

  

  
7. 查看结果
  
    # ./bin/hadoop fs -ls ./out_result
  
    Found 3 items
  
    -rw-r--r--   3 root supergroup          0 2016-07-13 23:08 /user/root/out_result/_SUCCESS
  
    drwxr-xr-x   - root supergroup          0 2016-07-13 23:08 /user/root/out_result/_logs
  
    -rw-r--r--   3 root supergroup          9 2016-07-13 23:08 /user/root/out_result/part-r-00000
  
    log文件为日志,part文件为结果
  
    查看part文件内容:
  
    # ./bin/hadoop fs -cat ./out_result/part-r-00000
  
2008290


页: [1]
查看完整版本: 【大数据】Hadoop 成长之路 (二) 利用Hadoop分析气象数据集