设为首页 收藏本站
查看: 771|回复: 0

[经验分享] hadoop二次排序(合集)

[复制链接]

尚未签到

发表于 2016-12-6 07:27:43 | 显示全部楼层 |阅读模式

  1.原理
  在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。
在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

  2.步骤
  (1)自定义key
  所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法
  @Override
public void write(DataOutput out) {}
  @Override
public void readFields(DataInput in) {}
@Override
public int hashCode() {}
@Override
public boolean equals(Object right)
@Override
public int compareTo(StringPair o) {}

  (2)由于key是自定义的,所以还需要自定义一下类:
(2.1)分区函数类。这是key的第一次比较。

static class FirstPartitioner extends HashPartitioner<StringPair, LongWritable>
在job中使用setPartitionerClasss设置Partitioner。
  (2.2)key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator。
  public static class SortComparator extends WritableComparator

  必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)
  另一种方法是 实现接口RawComparator。
在job中使用setSortComparatorClass设置key比较函数类。
(2.3)分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。
public static class GroupingComparator extends WritableComparator
分组函数类也必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)
分组函数类的另一种方法是实现接口RawComparator。
在job中使用setGroupingComparatorClass设置分组函数类。

另外注意的是,如果reduce的输入与输出不是同一种类型,则不要定义Combiner也使用reduce,因为Combiner的输出是reduce的输入。除非重新定义一个Combiner。
  

  例子1:package example;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Text.Comparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;


import util.HbaseServiceUtil;


public class TwoSortMR {
static class TwoSortMapper extends
Mapper<ImmutableBytesWritable, Result, Text, LongWritable> {
HTable htable;
HTable htable1;
byte[] family = Bytes.toBytes("baseInfo");
HbaseServiceUtil u;


protected void setup(Context context) throws IOException,
InterruptedException {
u = new HbaseServiceUtil();
this.htable = u.getHtable("wb_hbase_relation_attentions", 1);
this.htable1 = u.getHtable("wb_hbase_user", 1);
}


@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
String uid = new String(key.get());
try {
int cachsize = 5000;
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(uid + "_"));
scan.setStopRow(Bytes.toBytes(uid + "_sz"));
scan.setCaching(cachsize);
ResultScanner scanner = htable.getScanner(scan);


Result[] results = scanner.next(cachsize);
// 判断用户是否有对应的微博信息
if (results.length <= 0) {
return;
}
for (Result result : results) {
if (result.isEmpty()) {
continue;
}
byte[] obj = result.getValue(family, Bytes.toBytes("uid2"));
if (obj == null) {
continue;
}
String uid2 = new String(obj);
Get get = new Get(uid2.getBytes());
Result result1 = htable1.get(get);
obj = result1.getValue(family, Bytes.toBytes("fansCount"));
if (obj == null) {
continue;
}
String fansCount = new String(obj);
if (!fansCount.matches("[0123456789].*")) {
continue;
}
long aa = Long.parseLong(fansCount);
try {
context.write(new Text(uid+"|"+aa), new LongWritable(aa));
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}


}


static class TwoSortReducer extends
Reducer<Text, LongWritable, NullWritable, Put> {


@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
// long max = values.iterator().next().get();
StringBuffer max = new StringBuffer();
for (LongWritable value : values) {
max.append(value.get()+"|");
}
System.out.println(key.toString()+" "+max.toString());


Put put = new Put(Bytes.toBytes(key.toString().split("\\|")[0]));
put.add("baseInfo".getBytes(), "maxFansCount".getBytes(),
(max.toString() + "").getBytes());
context.write(NullWritable.get(), put);

}


}


// map阶段的最后会对整个map的List进行分区,每个分区映射到一个reducer
static class FirstPartitioner extends HashPartitioner<Text, LongWritable> {
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
return (key.toString().split("\\|")[0].hashCode() & Integer.MAX_VALUE)
% numPartitions;
}
}
// 每个分区内又调用job.setSortComparatorClass或者key的比较函数进行排序
public static class SortComparator extends WritableComparator {
protected SortComparator() {
super(Text.class, true);
}
// 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数
// 这里是先比较用户id,再比较粉丝数,按粉丝数降序排序
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
Text t1 =(Text)w1;
Text t2 =(Text)w2;
long a1 =Long.parseLong(t1.toString().split("\\|")[0].replaceAll("si", ""));
long a2 = Long.parseLong(t2.toString().split("\\|")[0].replaceAll("si", ""));
long a3 = Long.parseLong(t1.toString().split("\\|")[1]);
long a4 = Long.parseLong(t2.toString().split("\\|")[1]);

int cmp = TwoSortMR.compare(a1, a2);
if (cmp != 0) {
  return cmp;
}
return -TwoSortMR.compare(a3, a4); //reverse


}
}

// 只要这个比较器比较的两个key相同,他们就属于同一个组.
// 它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key
public static class GroupingComparator extends WritableComparator {
protected GroupingComparator() {
super(Text.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
//      return w1.toString().split("\\|")[0].compareTo(w2.toString().split("\\|")[0]);
Text t1 =(Text)w1;
Text t2 =(Text)w2;
long l = Long.parseLong(t1.toString().split("\\|")[0].replaceAll("si", ""));
long r = Long.parseLong(t2.toString().split("\\|")[0].replaceAll("si", ""));
return TwoSortMR.compare(l, r);
}
}
public static int compare(long left, long right) {
// TODO Auto-generated method stub
return left > right ? 1 : (left == right ? 0 : -1);
}


public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = HbaseServiceUtil.getConfiguration();
HbaseServiceUtil.setConf(conf, "hdfs4");


String inputTableName = "analyzer_wuqilong1";
String OutputTableName = "analyzer_wuqilong3";
Scan scan = new Scan();
// scan.setStartRow("si".getBytes());
// scan.setStopRow("0000000000000si\uFFFF".getBytes()); //��Ҫ�Ӵ�Χ��
scan.setCaching(100);
scan.setCacheBlocks(false); // don't set to true for MR jobs
conf.set(TableInputFormat.SCAN,
HbaseServiceUtil.convertScanToString(scan));
conf.set(TableInputFormat.INPUT_TABLE, inputTableName);
conf.set(TableOutputFormat.OUTPUT_TABLE, OutputTableName);



Job job = new Job(conf);


job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);


job.setMapOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(Text.class);


job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);


job.setMapperClass(TwoSortMapper.class);
job.setReducerClass(TwoSortReducer.class);

// 分区函数
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(SortComparator.class);
// 分组函数
job.setGroupingComparatorClass(GroupingComparator.class);


job.setNumReduceTasks(5);
job.setJarByClass(TwoSortMR.class);
job.setJobName("Test2Sort");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}


}
  

  例子2: package temp;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;


public class StringPair implements WritableComparable<StringPair> {


private String first;
private long second;


public StringPair() {
}


public void set(String first, long second) {
this.first = first;
this.second = second;
}


public String toString() {
return first + "|" + second;
}


public String getFirst() {
return first;
}


public long getSecond() {
return second;
}


@Override
public void write(DataOutput out) throws IOException {
int length = first.length();
byte[] buf = first.getBytes();
// 先写字符串长度
out.writeInt(length);
// 再写字符串数据
out.write(buf, 0, length);
// 接着long
out.writeLong(second);
}


@Override
public void readFields(DataInput in) throws IOException {
// 先写字符串的长度信息
int length = in.readInt();
byte[] buf = new byte[length];
in.readFully(buf, 0, length);
first = new String(buf);
second = in.readLong();
}


@Override
public int hashCode() {
return first.hashCode();
}


@Override
public boolean equals(Object right) {
return first.equals(((StringPair) right).first);
}


@Override
public int compareTo(StringPair o) {
int c1 = StringPair.compare(getFirst(), o.getFirst());
if (c1 != 0) {
return c1;
} else {
return StringPair.compare(getSecond(), o.getSecond());
}
}


public static int compare(StringPair o1, StringPair o2) {
int c1 = StringPair.compare(o1.getFirst(), o2.getFirst());
if (c1 != 0) {
return c1;
} else {
return -StringPair.compare(o1.getSecond(), o2.getSecond());
}
}


public static int compare(long left, long right) {
return left > right ? 1 : (left == right ? 0 : -1);
}


public static int compare(String left, String right) {
return left.compareTo(right);
}



  }
  

package temp;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;


import util.HbaseServiceUtil;




public class TwoSortMR {
static class TwoSortMapper extends
Mapper<ImmutableBytesWritable, Result, StringPair, LongWritable> {
HTable htable;
HTable htable1;
byte[] family = Bytes.toBytes("baseInfo");
HbaseServiceUtil u;


protected void setup(Context context) throws IOException,
InterruptedException {
u = new HbaseServiceUtil();
this.htable = u.getHtable("wb_hbase_relation_attentions", 1);
this.htable1 = u.getHtable("wb_hbase_user", 1);
}


@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
String uid = new String(key.get());
try {
int cachsize = 5000;
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(uid + "_"));
scan.setStopRow(Bytes.toBytes(uid + "_sz"));
scan.setCaching(cachsize);
ResultScanner scanner = htable.getScanner(scan);


Result[] results = scanner.next(cachsize);
// 判断用户是否有对应的微博信息
if (results.length <= 0) {
return;
}
for (Result result : results) {
if (result.isEmpty()) {
continue;
}
byte[] obj = result.getValue(family, Bytes.toBytes("uid2"));
if (obj == null) {
continue;
}
String uid2 = new String(obj);
Get get = new Get(uid2.getBytes());
Result result1 = htable1.get(get);
obj = result1.getValue(family, Bytes.toBytes("fansCount"));
if (obj == null) {
continue;
}
String fansCount = new String(obj);
if (!fansCount.matches("[0123456789].*")) {
continue;
}
long aa = Long.parseLong(fansCount);
try {
StringPair strs = new StringPair();
strs.set(uid,aa);
context.write(strs, new LongWritable(aa));
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}


}


static class TwoSortReducer extends
Reducer<StringPair, LongWritable, NullWritable, Put> {


@Override
protected void reduce(StringPair key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
// long max = values.iterator().next().get();
StringBuffer max = new StringBuffer();
for (LongWritable value : values) {
max.append(value.get()+"|");
}
System.out.println(key.toString()+" "+max.toString());


Put put = new Put(Bytes.toBytes(key.getFirst()));
put.add("baseInfo".getBytes(), "maxFansCount".getBytes(),
(max.toString() + "").getBytes());
context.write(NullWritable.get(), put);

}


}


// map阶段的最后会对整个map的List进行分区,每个分区映射到一个reducer
static class FirstPartitioner extends HashPartitioner<StringPair, LongWritable> {
@Override
public int getPartition(StringPair key, LongWritable value, int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE)
% numPartitions;
}
}
// 每个分区内又调用job.setSortComparatorClass或者key的比较函数进行排序
public static class SortComparator extends WritableComparator {
protected SortComparator() {
super(StringPair.class, true);
}

@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
StringPair t1 = (StringPair) a;
StringPair t2 = (StringPair) b;
return StringPair.compare(t1, t2);


}
}
// 只要这个比较器比较的两个key相同,他们就属于同一个组.
// 它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key
public static class GroupingComparator extends WritableComparator {
protected GroupingComparator() {
super(StringPair.class, true);
}


// 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数
// 这里是先比较用户id,再比较粉丝数,按粉丝数降序排序
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
StringPair strs1 = (StringPair) a;
StringPair strs2 = (StringPair) b;
  return StringPair.compare(strs1.getFirst(), strs2.getFirst());
}
}


public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = HbaseServiceUtil.getConfiguration();
HbaseServiceUtil.setConf(conf, "hdfs4");


String inputTableName = "analyzer_wuqilong1";
String OutputTableName = "analyzer_wuqilong3";
Scan scan = new Scan();
// scan.setStartRow("si".getBytes());
// scan.setStopRow("0000000000000si\uFFFF".getBytes()); //��Ҫ�Ӵ�Χ��
scan.setCaching(100);
scan.setCacheBlocks(false); // don't set to true for MR jobs
conf.set(TableInputFormat.SCAN,
HbaseServiceUtil.convertScanToString(scan));
conf.set(TableInputFormat.INPUT_TABLE, inputTableName);
conf.set(TableOutputFormat.OUTPUT_TABLE, OutputTableName);



Job job = new Job(conf);


job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);


job.setMapOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(StringPair.class);


job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);


job.setMapperClass(TwoSortMapper.class);
job.setReducerClass(TwoSortReducer.class);

// 分区函数
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(SortComparator.class);
// 分组函数
job.setGroupingComparatorClass(GroupingComparator.class);


job.setNumReduceTasks(5);
job.setJarByClass(TwoSortMR.class);
job.setJobName("TestCombine");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}


}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-310148-1-1.html 上篇帖子: Elasticsearch、MongoDB和Hadoop比较 下篇帖子: hadoop core-default hdfs-default默认配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表