liyeho 发表于 2017-12-18 12:40:40

Hadoop MapReduce编程 API入门系列之FOF(Fund of Fund)(二十三)

  不多说,直接上代码。










  代码
  package zhouls.bigdata.myMapReduce.friend;
  import org.apache.hadoop.io.Text;

  public>  public Fof(){//无参构造
  super();
  }
  public Fof(String a,String b){//有参构造
  super(getFof(a, b));
  }
  public static String getFof(String a,String b){
  int r =a.compareTo(b);
  if(r<0){
  return a+"\t"+b;
  }else{
  return b+"\t"+a;
  }
  }
  }
  package zhouls.bigdata.myMapReduce.friend;
  import java.io.DataInput;
  import java.io.DataOutput;
  import java.io.IOException;
  import org.apache.hadoop.io.WritableComparable;

  public>  //WritableComparable,实现这个方法,要多很多
  //readFields是读入,write是写出
  private String uname;
  private int friendsCount;
  public String getUname() {
  return uname;
  }
  public void setUname(String uname) {
  this.uname = uname;
  }
  public int getFriendsCount() {
  return friendsCount;
  }
  public void setFriendsCount(int friendsCount) {
  this.friendsCount = friendsCount;
  }//这一大段的get和set,可以右键,source,产生get和set,自动生成。
  public User() {//无参构造
  }
  public User(String uname,int friendsCount){//有参构造
  this.uname=uname;
  this.friendsCount=friendsCount;
  }
  public void write(DataOutput out) throws IOException { //序列化
  out.writeUTF(uname);
  out.writeInt(friendsCount);
  }
  public void readFields(DataInput in) throws IOException {//反序列化
  this.uname=in.readUTF();
  this.friendsCount=in.readInt();
  }
  public int compareTo(User o) {//核心
  int result = this.uname.compareTo(o.getUname());
  if(result==0){
  return Integer.compare(this.friendsCount, o.getFriendsCount());
  }
  return result;
  }
  }
  package zhouls.bigdata.myMapReduce.friend;
  import org.apache.hadoop.io.WritableComparable;
  import org.apache.hadoop.io.WritableComparator;

  public>  public FoFSort() {//把自定义的User,传进了
  super(User.class,true);
  }
  public int compare(WritableComparable a, WritableComparable b) {//排序核心
  User u1 =(User) a;
  User u2=(User) b;
  int result =u1.getUname().compareTo(u2.getUname());
  if(result==0){
  return -Integer.compare(u1.getFriendsCount(), u2.getFriendsCount());
  }
  return result;
  }
  }
  package zhouls.bigdata.myMapReduce.friend;
  import org.apache.hadoop.io.WritableComparable;
  import org.apache.hadoop.io.WritableComparator;

  public>  public FoFGroup() {//把自定义的User,传进了
  super(User.class,true);
  }
  public int compare(WritableComparable a, WritableComparable b) {//分组核心
  User u1 =(User) a;
  User u2=(User) b;
  return u1.getUname().compareTo(u2.getUname());
  }
  }
  package zhouls.bigdata.myMapReduce.friend;
  import java.io.IOException;
  import java.text.SimpleDateFormat;
  import java.util.Calendar;
  import java.util.Date;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.DoubleWritable;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.NullWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.Reducer;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  import org.apache.hadoop.util.StringUtils;

  public>  //小明老王如花林志玲
  //老王小明凤姐排序在FoFSort.java
  //如花小明李刚凤姐
  //林志玲小明李刚凤姐郭美美            分组在FoFGroup.java
  //李刚如花凤姐林志玲
  //郭美美凤姐林志玲
  //凤姐如花老王林志玲郭美美
  public static void main(String[] args) {
  Configuration config =new Configuration();
  //config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");
  //config.set("yarn.resourcemanager.hostname", "HadoopMaster");
  //config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
  //config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"\t",这里自定义,如","
  if(run1(config)){
  run2(config);//设置两个run,即两个mr。
  }
  }
  public static void run2(Configuration config) {
  try {
  FileSystem fs =FileSystem.get(config);
  Job job =Job.getInstance(config);
  job.setJarByClass(RunJob.class);
  job.setJobName("fof2");
  job.setMapperClass(SortMapper.class);
  job.setReducerClass(SortReducer.class);
  job.setSortComparatorClass(FoFSort.class);
  job.setGroupingComparatorClass(FoFGroup.class);
  job.setMapOutputKeyClass(User.class);
  job.setMapOutputValueClass(User.class);
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  ////设置MR执行的输入文件
  //FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/f1"));
  //
  ////该目录表示MR执行之后的结果数据所在目录,必须不能存在
  //Path outputPath=new Path("hdfs://HadoopMaster:9000/out/f2");
  //设置MR执行的输入文件
  FileInputFormat.addInputPath(job, new Path("./out/f1"));
  //该目录表示MR执行之后的结果数据所在目录,必须不能存在
  Path outputPath=new Path("./out/f2");
  if(fs.exists(outputPath)){
  fs.delete(outputPath, true);
  }
  FileOutputFormat.setOutputPath(job, outputPath);
  boolean f =job.waitForCompletion(true);
  if(f){
  System.out.println("job 成功执行");
  }
  } catch (Exception e) {
  e.printStackTrace();
  }
  }
  public static boolean run1(Configuration config) {
  try {
  FileSystem fs =FileSystem.get(config);
  Job job =Job.getInstance(config);
  job.setJarByClass(RunJob.class);
  job.setJobName("friend");
  job.setMapperClass(FofMapper.class);
  job.setReducerClass(FofReducer.class);
  job.setMapOutputKeyClass(Fof.class);
  job.setMapOutputValueClass(IntWritable.class);
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  //FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/friend/friend.txt"));//下有friend.txt
  //
  //Path outpath =new Path("hdfs://HadoopMaster:9000/out/f1");
  FileInputFormat.addInputPath(job, new Path("./data/friend/friend.txt"));//下有friend.txt
  Path outpath =new Path("./out/f1");
  if(fs.exists(outpath)){
  fs.delete(outpath, true);
  }
  FileOutputFormat.setOutputPath(job, outpath);
  boolean f= job.waitForCompletion(true);
  return f;
  } catch (Exception e) {
  e.printStackTrace();
  }
  return false;
  }

  static>  protected void map(Text key, Text value,
  Context context)
  throws IOException, InterruptedException {
  String user =key.toString();
  String[] friends =StringUtils.split(value.toString(), '\t');
  for (int i = 0; i < friends.length; i++) {
  String f1 = friends;
  Fof ofof =new Fof(user, f1);
  context.write(ofof, new IntWritable(0));
  for (int j = i+1; j < friends.length; j++) {
  String f2 = friends;
  Fof fof =new Fof(f1, f2);
  context.write(fof, new IntWritable(1));
  }
  }
  }
  }

  static>  protected void reduce(Fof arg0, Iterable<IntWritable> arg1,
  Context arg2)
  throws IOException, InterruptedException {
  int sum =0;
  boolean f =true;
  for(IntWritable i: arg1){
  if(i.get()==0){
  f=false;
  break;
  }else{
  sum=sum+i.get();
  }
  }
  if(f){
  arg2.write(arg0, new IntWritable(sum));
  }
  }
  }

  static>  protected void map(Text key, Text value,
  Context context)
  throws IOException, InterruptedException {
  String[] args=StringUtils.split(value.toString(),'\t');
  String other=args;
  int friendsCount =Integer.parseInt(args);
  context.write(new User(key.toString(),friendsCount), new User(other,friendsCount));
  context.write(new User(other,friendsCount), new User(key.toString(),friendsCount));
  }
  }

  static>  protected void reduce(User arg0, Iterable<User> arg1,
  Context arg2)
  throws IOException, InterruptedException {
  String user =arg0.getUname();
  StringBuffer sb =new StringBuffer();
  for(User u: arg1 ){
  sb.append(u.getUname()+":"+u.getFriendsCount());
  sb.append(",");
  }
  arg2.write(new Text(user), new Text(sb.toString()));
  }
  }

az18 发表于 2017-12-18 13:29:44

1111111
页: [1]
查看完整版本: Hadoop MapReduce编程 API入门系列之FOF(Fund of Fund)(二十三)