设为首页 收藏本站
查看: 303|回复: 0

[经验分享] hadoop编程笔记

[复制链接]

尚未签到

发表于 2016-12-5 07:42:11 | 显示全部楼层 |阅读模式
  1 基本hadoop程序
  2 输入输出格式
  3 多个map reduce管道(已经实践,靠谱,可否支持写入不同的文件呢?)
  4 数据join。
  map端join
  reduce端join(分组,打标志,以关联字段为key)
  分布式缓存(非对称,小数据)
  布隆过滤器(假负率为0,位图,缩小存储空间,非对称表join常用手段)
  http://www.google.com.hk/ggblog/googlechinablog/2007/07/bloom-filter_7469.html


public class WordCountDemo {
private static final Log LOG = LogFactory.getLog(WordCountDemo.class);
public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(10);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
//LOG.error(key.toString()+":"+result.get());
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(WordCountDemo.class);
job.setMapperClass(TokenizerMapper.class);
//job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/input"));
FileOutputFormat.setOutputPath(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/output"));
long start=System.currentTimeMillis();
boolean done=job.waitForCompletion(true);
LOG.error("cost:"+(System.currentTimeMillis()-start));
System.exit(done ? 0 : 1);
}
}


public class WordCountDemo2 {
private static final Log LOG = LogFactory.getLog(WordCountDemo2.class);
public static class CleanMapper extends MapReduceBase implements
Mapper<Object, Text, Object, Text> {
@Override
public void map(Object key, Text value,
OutputCollector<Object, Text> output, Reporter reporter)
throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String s = itr.nextToken();
if (s.contains("security")) {
LOG.error("bingo:"+value.toString());
output.collect(key, value);
break;
}
}
}
}
public static class PostCleanMapper extends MapReduceBase implements
Mapper<Text, IntWritable, Text, IntWritable> {
@Override
public void map(Text key, IntWritable value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
if (key.toString().contains("security")) {
output.collect(key, value);
}
}
}
public static class TokenizerMapper extends MapReduceBase implements
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
public static class IntSumReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable();

@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum+=values.next().get();
}
result.set(sum);
output.collect(key, result);
//LOG.error(key.toString()+":"+result.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
JobConf job = new JobConf(conf);
job.setJobName("ChainJob");
job.setJarByClass(WordCountDemo2.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/input"),new Path(
"/c:/cygwin/usr/hadoop-1.0.3/bin"));
FileOutputFormat.setOutputPath(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/output"));
JobConf map1Conf = new JobConf(false);
JobConf map2Conf = new JobConf(false);
//ChainMapper.addMapper(job, CleanMapper.class, Object.class, Text.class, Object.class, Text.class, false, map1Conf);
ChainMapper.addMapper(job, TokenizerMapper.class,  Object.class, Text.class, Text.class, IntWritable.class, true, map2Conf);
JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(job, IntSumReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);
//ChainReducer.addMapper(job, PostCleanMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, false, map1Conf);
long start=System.currentTimeMillis();
JobClient.runJob(job);
LOG.error("cost:"+(System.currentTimeMillis()-start));
}
}

 
public class DataJoin extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DataJoin.class);
public static class MapClass extends DataJoinMapperBase {
protected Text generateInputTag(String inputFile) {
String datasource = inputFile.split("@")[0];
LOG.error("datasource:" + datasource);
return new Text(datasource);
}
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
LOG.error("line:" + line);
String[] tokens = line.split(",");
String groupKey = tokens[0];
LOG.error("groupKey:" + groupKey);
return new Text(groupKey);
}
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable();
LOG.error("value:" + value);
LOG.error("inputTag:" + inputTag);
retv.setData((Writable) value);
retv.setTag(this.inputTag);
return retv;
}
}
public static class Reduce extends DataJoinReducerBase {
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
LOG.error("tags:" + tags);
LOG.error("values:" + values);
if (tags.length < 2)
return null;
String joinedStr = "";
for (int i = 0; i < values.length; i++) {
if (i > 0)
joinedStr += ",";
TaggedWritable tw = (TaggedWritable) values;
String line = ((Text) tw.getData()).toString();
LOG.error("line2:" + line);
String[] tokens = line.split(",", 2);
joinedStr += tokens[1];
}
TaggedWritable retv = new TaggedWritable();
LOG.error("joinedStr:" + joinedStr);
LOG.error("tags[0]:" + tags[0]);
retv.setData(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
public static class TaggedWritable extends TaggedMapOutput {
private Writable data;
public TaggedWritable() {
this.tag = new Text();
}
public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}
public Writable getData() {
return data;
}
public void setData(Writable data) {
this.data = data;
}
public void write(DataOutput out) throws IOException {
this.tag.write(out);
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
LOG.error("readFields", e);
}
}
this.data.readFields(in);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, DataJoin.class);
FileInputFormat.setInputPaths(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/input/*.txt"));
FileOutputFormat.setOutputPath(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/output"));
job.setJobName("DataJoin");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DataJoin(), args);
System.exit(res);
}
}
 
public class DataJoinDC extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DataJoinDC.class);
public static final String CACHE_LOCALFILES = "mapred.cache.localFiles";
public static final String CACHE_FILES = "mapred.cache.files";
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DataJoinDC(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, DataJoinDC.class);
LOG.equals("cacheFile:"
+ new Path("/c:/cygwin/usr/hadoop-1.0.3/input/customer@.txt")
.toUri());
DistributedCache.addCacheFile(new Path(
"/c:/cygwin/usr/hadoop-1.0.3/input/customer@.txt").toUri(),
conf);
FileInputFormat.setInputPaths(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/input/order@.txt"));
FileOutputFormat.setOutputPath(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/output"));
LOG.equals("CACHE_LOCALFILES:" + conf.getStrings(CACHE_LOCALFILES));
LOG.equals("CACHE_LOCALFILES:" + conf.getStrings(CACHE_FILES));
job.setJobName("DataJoin with DistributedCache");
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.set("key.value.separator.in.input.line", ",");
JobClient.runJob(job);
return 0;
}
public static class MapClass extends MapReduceBase implements
Mapper<Text, Text, Text, Text> {
private Hashtable<String, String> joinData = new Hashtable<String, String>();
@Override
public void configure(JobConf conf) {
try {
String line;
String[] tokens;
BufferedReader joinReader = new BufferedReader(new FileReader(
"/c:/cygwin/usr/hadoop-1.0.3/input/customer@.txt"));
try {
while ((line = joinReader.readLine()) != null) {
LOG.error("line:" + line);
tokens = line.split(",", 2);
joinData.put(tokens[0], tokens[1]);
}
} finally {
joinReader.close();
}
} catch (IOException e) {
System.err.println("Exception reading DistributedCache: " + e);
}
}
public void map(Text key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
LOG.error("key:" + key);
Set<Entry<String, String>> set = joinData.entrySet();
for (Entry<String, String> entry : set) {
LOG.error("key:" + entry.getKey());
LOG.error("value:" + entry.getValue());
}
String joinValue = joinData.get(key.toString());
LOG.error("joinValue:" + joinValue);
LOG.error("value:" + value);
if (joinValue != null) {
output.collect(key,
new Text(value.toString() + "," + joinValue));
}
}
}
}
   try数据统计:
  试用品表和商品表的聚合数据(量小hive,表连接)
  访问日志表,包括直接访问试用的日志,由试用引导到其他页面的数据。进行基本的uv统计,访问路径模式统计(hive过滤)。
  试用交易表,试用商品对应的交易数据,引导交易数据(hive表连接)
  试用用户表聚合uic表(hive表连接)
  试用申请表用于同地址去重(mapreduce)

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-309662-1-1.html 上篇帖子: hadoop 环境搭建2(转) 下篇帖子: Eclipse:Run on Hadoop 没有反应
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表