残缺极品 发表于 2017-12-17 23:36:19

二次排序问题(分别使用Hadoop和Spark实现)

  不多说,直接上干货!
  这篇博客里的算法部分的内容来自《数据算法:Hadoop/Spark大数据处理技巧》一书,不过书中的代码虽然思路正确,但是代码不完整,并且只有java部分的编程,我在它的基础上又加入scala部分,当然是在使用Spark的时候写的scala。

一、输入、期望输出、思路。
  输入为SecondarySort.txt,内容为:
  

2000,12,04,10  
2000,11,01,20
  
2000,12,02,-20
  
2000,11,07,30
  
2000,11,24,-40
  
2012,12,21,30
  
2012,12,22,-20
  
2012,12,23,60
  
2012,12,24,70
  
2012,12,25,10
  
2013,01,23,90
  
2013,01,24,70
  
2013,01,20,-10
  

  意义为:年,月,日,温度
  期望输出:
  

2013-01 90,70,-10  
2012-12 70,60,30,10,-20
  
2000-12 10,-20
  
2000-11 30,20,-40
  

  意义为:
  年-月 温度1,温度2,温度3,……
  年-月从上之下降序排列,
  温度从左到右降序排列
  思路:
  抛弃不需要的代表日的哪一行数据
  将年月作为组合键(key),比较大小,降序排列
  将对应年月(key)的温度的值(value)进行降序排列和拼接

二、使用Java编写MapReduce程序实现二次排序
  代码要实现的类有:

  除了常见的SecondarySortingMapper,SecondarySortingReducer,和SecondarySortDriver以外
  这里还多出了两个个插件类(DateTemperatureGroupingComparator和DateTemperaturePartioner)和一个自定义类型(DateTemperaturePair)
  以下是实现的代码(注意以下每个文件的代码段我去掉了包名,所以要使用的话自己加上吧):
  SecondarySortDriver.java
  

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.IntWritable;
  
import org.apache.hadoop.mapreduce.Job;
  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
import org.apache.hadoop.util.Tool;
  
import org.apache.hadoop.util.ToolRunner;
  

  

public>
public int run(String[] args) throws Exception {  
Configuration configuration
= getConf();  
Job job
= Job.getInstance(configuration, "SecondarySort");  
job.setJarByClass(SecondarySortDriver.
class);  
job.setJobName(
"SecondarySort");  

  
Path inputPath
= new Path(args);  
Path outputPath
= new Path(args);  
FileInputFormat.setInputPaths(job, inputPath);
  
FileOutputFormat.setOutputPath(job, outputPath);
  

  

// 设置map输出key value格式  
job.setMapOutputKeyClass(DateTemperaturePair.class);
  
job.setMapOutputValueClass(IntWritable.class);
  
// 设置reduce输出key value格式
  
job.setOutputKeyClass(DateTemperaturePair.class);
  
job.setOutputValueClass(IntWritable.class);
  

  
job.setMapperClass(SecondarySortingMapper.class);
  
job.setReducerClass(SecondarySortingReducer.class);
  
job.setPartitionerClass(DateTemperaturePartitioner.class);
  
job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);
  

  
boolean status = job.waitForCompletion(true);
  
return status ? 0 : 1;
  
}
  

  
public static void main(String[] args) throws Exception {
  
if (args.length != 2) {
  
throw new IllegalArgumentException(
  
"!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: SecondarySortDriver"
  
+ "<input-path> <output-path>");
  
}
  
int returnStatus = ToolRunner.run(new SecondarySortDriver(), args);
  
System.exit(returnStatus);
  
}
  
}
  

  DateTemperaturePair.java
  

import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;
  
import org.apache.hadoop.io.WritableComparable;
  

  
import java.io.DataInput;
  
import java.io.DataOutput;
  
import java.io.IOException;
  

  

public>
WritableComparable<DateTemperaturePair> {  

private String yearMonth;  

private String day;  

protected Integer temperature;  

  

public int compareTo(DateTemperaturePair o) {  

int compareValue = this.yearMonth.compareTo(o.getYearMonth());  

if (compareValue == 0) {  
compareValue
= temperature.compareTo(o.getTemperature());  
}
  

return -1 * compareValue;  
}
  

  

public void write(DataOutput dataOutput) throws IOException {  
Text.writeString(dataOutput, yearMonth);
  
dataOutput.writeInt(temperature);
  

  
}
  

  

public void readFields(DataInput dataInput) throws IOException {  

this.yearMonth = Text.readString(dataInput);  

this.temperature = dataInput.readInt();  

  
}
  

  
@Override
  

public String toString() {  

return yearMonth.toString();  
}
  

  

public String getYearMonth() {  

return yearMonth;  
}
  

  

public void setYearMonth(String text) {  

this.yearMonth = text;  
}
  

  

public String getDay() {  

return day;  
}
  

  

public void setDay(String day) {  

this.day = day;  
}
  

  

public Integer getTemperature() {  

return temperature;  
}
  

  

public void setTemperature(Integer temperature) {  

this.temperature = temperature;  
}
  
}
  

  SecondarySortingMapper.java
  

import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapreduce.Mapper;
  

  
import java.io.IOException;
  

  

public>
Mapper<LongWritable, Text, DateTemperaturePair, IntWritable> {  
@Override
  

protected void map(LongWritable key, Text value, Context context)  
throws IOException, InterruptedException {
  
String[] tokens
= value.toString().split(",");  

// YYYY = tokens  

// MM = tokens  

// DD = tokens  

// temperature = tokens  
String yearMonth = tokens + "-" + tokens;
  
String day = tokens;
  
int temperature = Integer.parseInt(tokens);
  

  
DateTemperaturePair reduceKey = new DateTemperaturePair();
  
reduceKey.setYearMonth(yearMonth);
  
reduceKey.setDay(day);
  
reduceKey.setTemperature(temperature);
  
context.write(reduceKey, new IntWritable(temperature));
  
}
  
}
  

  DateTemperaturePartioner.java
  

import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Partitioner;
  

  

public>
Partitioner<DateTemperaturePair, Text> {  
@Override
  

public int getPartition(DateTemperaturePair dataTemperaturePair, Text text,  

int i) {  

return Math.abs(dataTemperaturePair.getYearMonth().hashCode() % i);  
}
  
}
  

  DateTemperatureGroupingComparator.java
  

import org.apache.hadoop.io.WritableComparable;  
import org.apache.hadoop.io.WritableComparator;
  

  

public>
  

public DateTemperatureGroupingComparator() {  
super(DateTemperaturePair.
class, true);  
}
  

  
@Override
  

public int compare(WritableComparable a, WritableComparable b) {  
DateTemperaturePair pair1
= (DateTemperaturePair) a;  
DateTemperaturePair pair2
= (DateTemperaturePair) b;  

return pair1.getYearMonth().compareTo(pair2.getYearMonth());  
}
  
}
  

  SecondarySortingReducer.java
  

import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapreduce.Reducer;
  

  
import java.io.IOException;
  

  

public>
Reducer<DateTemperaturePair, IntWritable, DateTemperaturePair, Text> {  

  
@Override
  

protected void reduce(DateTemperaturePair key,  
Iterable
<IntWritable> values, Context context) throws IOException,  
InterruptedException {
  
StringBuilder sortedTemperatureList
= new StringBuilder();  

for (IntWritable temperature : values) {  
sortedTemperatureList.append(temperature);
  
sortedTemperatureList.append(
",");  
}
  
sortedTemperatureList.deleteCharAt(sortedTemperatureList.length()
-1);  
context.write(key,
new Text(sortedTemperatureList.toString()));  
}
  

  
}
  

  三、使用scala编写Spark程序实现二次排序
  这个代码想必就比较简洁了。如下:
  SecondarySort.scala
  

package spark  
import org.apache.spark.{SparkContext, SparkConf}
  
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
  
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
  

  

object SecondarySort {  
def main(args: Array) {
  
val conf
= new SparkConf().setAppName(" Secondary Sort ")  
.setMaster(
"local")  

var sc = new SparkContext(conf)  
sc.setLogLevel(
"Warn")  

//val file = sc.textFile("hdfs://localhost:9000/Spark/SecondarySort/Input/SecondarySort2.txt")  
val file = sc.textFile("e:\\SecondarySort.txt")
  
val rdd = file.map(line => line.split(","))
  
.map(x=>((x(0),x(1)),x(3))).groupByKey().sortByKey(false)
  
.map(x => (x._1._1+"-"+x._1._2,x._2.toList.sortWith(_>_)))
  
rdd.foreach(
  
x=>{
  
val buf = new StringBuilder()
  
for(a <- x._2){
  
buf.append(a)
  
buf.append(",")
  
}
  
buf.deleteCharAt(buf.length()-1)
  
println(x._1+" "+buf.toString())
  
})
  
sc.stop()
  
}
  
}
  
页: [1]
查看完整版本: 二次排序问题(分别使用Hadoop和Spark实现)