设为首页 收藏本站
查看: 994|回复: 0

[经验分享] hadoop patition 分区简介和自定义

[复制链接]

尚未签到

发表于 2016-12-9 06:09:58 | 显示全部楼层 |阅读模式
  0 简介:
  0) 类比于新生<k,v>入学,不同的学生实现分配好了宿舍,然后进入到不同的宿舍(reduce task)
  如果map发送来的数据量太大,意味着这些数据都到这个默认reduce节点执行,没有发挥reduce
  并行计算的目的,IO压力也很大。 这就是分区的原因。
  a) 默认下分配一个区
  b) 分配几个区,则对应几个reduce任务,每个任务在执行的时候都会公用reduce内的代码
  c) 自定义分区下 返回的分区数量一定要和 定义的reduce任务相同,具体来说就是:
  自定义分区类 extends HashPartitioner,重写getPartition时,返回的分支个数要和
  job.setNumReduceTasks(X); 中的X个数相同
  如果分区格式和reducetask任务个数不同下,在hadoop不同版本中的运行情况如下:

  HashPartitioner.java  key.hashcode() & integer.maxvalue % numreducetasks =  模1恒等于0 返回值恒为0 返回值是分区的标记或者索引 part-00000 part-00001 等等
默认的是job.setPartitionerClass(HashPartitioner.class)  自定义分区返回的是索引数字,从0开始依次递增1返回。
以 手机号和座机号写在一个文件中为例:
如果分区数量 大于/小于 reduce数量时,
2个分区 1个reduce --->  hadoop2中依旧能正常执行 只不过不会分区 所有数据都写到一个输出中   hadoop1中会报错
2个分区 4个reduce --->  hadoop2中依旧能正常执行 输出结果写到4个区中,第一个分区结果为手机号 第二个为座机号 剩下两个为空文件 所有数据都写到一个输出中
  d) 需要打包放在hadoop环境内运行,否则在本机运行eg:eclipse环境下,会报错如下:

14/12/09 14:12:58 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: Illegal partition for 84138413 (1)
  map-shuffle-reduce过程图如下:
  
DSC0000.png
 
  1 代码
  结果处理成2个区, 一个是放手机号的 一个是放固话的:

package partition;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
*
* 实现单词计数功能,指定分区个数(分区下必须通过打包方式来运行)
* 1 自定义规约
* 1.1  规约定义好处:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短
*  1.2 因为不是所有的算法都适合使用Combiner处理,例如求平均数,因此Combiner不作为MR运行的标配
*  1.3 Combiner本身已经执行了reduce操作仅仅是处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据
*      这也是Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作的原因。
* 2 自定义分区
*   2.1 分区运行必须打成jar运行
*   2.2 map分几个区,则reduce有几个任务数量,每个reduce任务将对应一个输出文件
*   2.3 分区不是越多越好,要根据业务需求,分区太多,也会造成资源创建,等待等消耗
*   2.4 多个reduce任务在运行的好处是提高整体job的运行效率
*   
*   结果处理成2个区, 一个是放手机号的 一个是放固话的
*  [iyunv@master ~]# hadoop fs -ls /out
Warning: $HADOOP_HOME is deprecated.
Found 4 items
-rw-r--r--   1 root supergroup          0 2014-08-24 16:02 /out/_SUCCESS
drwxr-xr-x   - root supergroup          0 2014-08-24 16:02 /out/_logs
-rw-r--r--   1 root supergroup        556 2014-08-24 16:02 /out/part-r-00000
-rw-r--r--   1 root supergroup         79 2014-08-24 16:02 /out/part-r-00001
*/
public class KpiAppPatition {

// 0 定义操作地址
static final String FILE_ROOT = "hdfs://master:9000/";
static final String INPUT_PATH = "hdfs://master:9000/hello";
static final String OUT_PATH = "hdfs://master:9000/out";
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf);
Path outpath = new Path(OUT_PATH);
if(fileSystem.exists(outpath)){
fileSystem.delete(outpath, true);
}
// 0 定义干活的人
Job job = new Job(conf);
job.setJarByClass(KpiAppPatition.class);
// 1.1 告诉干活的人 输入流位置     读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数
FileInputFormat.setInputPaths(job, INPUT_PATH);
// 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
job.setInputFormatClass(TextInputFormat.class);
//1.2 指定自定义的map类
job.setMapperClass(MyMapper2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(KpiWritable2.class);
//1.3 分区
job.setPartitionerClass(KpiPartitioner.class);
job.setNumReduceTasks(2);
//1.4 TODO 排序、分组    目前按照默认方式执行
//1.5 TODO 规约
//2.2 指定自定义reduce类
job.setReducerClass(MyReducer2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//2.3 指定写出到哪里
FileOutputFormat.setOutputPath(job, outpath);
job.setOutputFormatClass(TextOutputFormat.class);
// 让干活的人干活
job.waitForCompletion(true);
}
}
// 自定义分区写法如 注意返回的值是从0开始依次累加1的int值,不能跳跃
// 否则报错 说找不到编号为X的
// 你的reduce有几个,那么就会从0开始以1为累加数字返回对应个数的分区编码 然后
// 在去你代码里找对应编码  代码中随意返回patition的num 找不到就会报错
class KpiPartitioner extends HashPartitioner<Text, KpiWritable2>{
@Override
public int getPartition(Text key, KpiWritable2 value, int numReduceTasks) {
System.out.println("KpiPartitioner numReduceTasks is : " + numReduceTasks );
return (key.toString().length()==11)?0:1; // key为key2 即 电话号码,这里 如果是手机号(11位)则返回0,否则返回1 这样会生成2个分区,1个存放手机号的 1个存放固话的
}
}
/**
* 将 <k1,v1> --->  <k2,v2>
* @author zm
*/
class MyMapper2 extends Mapper<LongWritable, Text, Text, KpiWritable2>{
/**
* key 表示k1 即 当前行号
* value 表示v1 即当前行内容
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//格式: 1363157990043 1392505741300-1F-64-E1-E6-9A:CMCC120.196.100.55t3.baidu.com搜索引擎69631105848243200
String[] elements = v1.toString().split("\t");
String phoneNum = elements[1];
KpiWritable2 v2 = new KpiWritable2(elements[6],elements[7],elements[8],elements[9]);
Text k2 = new Text(phoneNum);
context.write(k2, v2);
}
}
/**
* 将 <k2,v2> --->  <k3,v3>
* @author zm
*/
class MyReducer2 extends Reducer<Text, KpiWritable2,Text, LongWritable>{
protected void reduce(Text k2, Iterable<KpiWritable2> v2s,
org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
long upPackNum = 0L;
long downPackNum = 0L;
long upPayLoad = 0L;
long downPayLoad = 0L;
for(KpiWritable2 kpiWritable1 : v2s){
upPackNum = kpiWritable1.upPackNum;
downPackNum = kpiWritable1.downPackNum;
upPayLoad = kpiWritable1.upPayLoad;
downPayLoad = kpiWritable1.downPayLoad;
}
KpiWritable2 v3 = new KpiWritable2(upPackNum+"",downPackNum+"",upPayLoad+"",downPayLoad+"");
context.write(k2, v3);
}

}

/**
* 自定义类型类,里面封装 上网流量信息
* @author zm
*
*/
class KpiWritable2 implements Writable{
long upPackNum; // 上传数据包个数
long downPackNum;// 下载数据包个数
long upPayLoad;// 上传数据
long downPayLoad;// 下载数据
public KpiWritable2(String upPackNum,String downPackNum,String upPayLoad,String downPayLoad){
this.upPackNum = Long.parseLong(upPackNum);
this.downPackNum = Long.parseLong(downPackNum);
this.upPayLoad = Long.parseLong(upPayLoad);
this.downPayLoad = Long.parseLong(downPayLoad);
}
public KpiWritable2(){}
@Override
public void write(DataOutput out) throws IOException {
// 先写后读
out.writeLong(this.upPackNum);
out.writeLong(this.downPackNum);
out.writeLong(this.upPayLoad);
out.writeLong(this.downPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
// 读取的时候, 按照写方法的顺序( 队列方式) 顺序读取
this.upPackNum = in.readLong();
this.downPackNum = in.readLong();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
}
@Override
public String toString() {
return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311494-1-1.html 上篇帖子: Hadoop配置文件配置项定义说明 下篇帖子: Hadoop运行原理详解
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表