设为首页 收藏本站
查看: 461|回复: 0

[经验分享] Hadoop计算分类活跃用户

[复制链接]

尚未签到

发表于 2016-12-4 09:01:53 | 显示全部楼层 |阅读模式
/**
*
*/
package cn.focus.dc.hadoop;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
/**
* @author qiaowang
*
*/
public class PingeUgcGroupStat {
private static final String PINGE_ACTIVE = "pinge.log";
private static java.util.Map<String, Set<Integer>> map = new HashMap<String, Set<Integer>>();
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String[] ugc = { "user.login", "user.register", "pic.fav", "pic.unfav", "user.follow",
"user.unfollow", "pic.like", "pic.unlike" };
Set<String> ugcSet = new HashSet<String>();
//构建默认key
for (int i = 0; i < ugc.length; i++) {
String ugcWord = ugc;
ugcSet.add(ugcWord);
}
String line = value.toString();
String[] words = line.split("\\|");
if (ugcSet.contains(words[3]) && !"".equals(words[4])) {
// 没有版本信息
StringBuilder buf = new StringBuilder();
buf.append(words[1]).append("\t").append(words[2]).append("\t").append(words[3]);
word.set(buf.toString());
int uid = Integer.valueOf(words[4]);
output.collect(word, new IntWritable(uid));
} else if (ugcSet.contains(words[4]) && !"".equals(words[5])) {
// 有版本信息
StringBuilder buf = new StringBuilder();
buf.append(words[1]).append("\t").append(words[2]).append("\t").append(words[4]);
word.set(buf.toString());
int uid = Integer.valueOf(words[5]);
output.collect(word, new IntWritable(uid));
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
while (values.hasNext()) {
if(!map.containsKey(key.toString())){
Set<Integer> set = new HashSet<Integer>();
map.put(key.toString(), set);
} else {
Set<Integer> set = map.get(key.toString());
set.add(values.next().get());
map.put(key.toString(), set);
}
}
int size = map.get(key.toString()).size();
output.collect(key, new IntWritable(size));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(PingeStat.class);
conf.setJobName("pingeStat");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[1]));
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(URI.create(args[1]), config); // 获得hdfs文件系统设置
// FileSystem hdfs = DistributedFileSystem.get(conf);
FileSystem local = FileSystem.getLocal(config); // 获得本地文件系统设置
Path inputDir = new Path(args[0]); // 本地输入目录
Path hdfsFile = new Path(args[1]); // 远程输出文件
try {
FileStatus[] inputFiles = local.listStatus(inputDir); // 数组,用来循环保存本地文件目录信息
FSDataOutputStream out = hdfs.create(hdfsFile, new Progressable() {
@Override
public void progress() {
// TODO Auto-generated method stub
System.out.print(".");
}
}); // 创新输出hdfs文件
for (int i = 0; i < inputFiles.length; i++) { // 循环取出本地文件目录信息
if (inputFiles.isDir()) {
// 根据目录机构的特点获取每个子目录下pinge.access.log文件
// 补全文件名
String fileName = args[0] + inputFiles.getPath().getName() + "/" + PINGE_ACTIVE;// 主要关心文件名
Path filePath = new Path(fileName);
FSDataInputStream in = null;
try {
in = local.open(filePath); // 打开本地文件
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
if (null != in) {
byte buffer[] = new byte[256];
int bytesRead = 0;
while ((bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead); // 合并文件
}
in.close();
}
}
}
out.close();
} catch (IOException e) {
// TODO: handle exception
e.printStackTrace();
}
// 删除输出目录
deleteFromHdfs(args[2]);
// 运行job
JobClient.runJob(conf);
}
/** 从HDFS上删除文件 */
private static void deleteFromHdfs(String dst) throws FileNotFoundException, IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
fs.deleteOnExit(new Path(dst));
fs.close();
}
}


获取活跃用户

/**
*
*/
package cn.focus.dc.hadoop;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
/**
* @author qiaowang
*
*/
public class PingeActiveStat {
private static final String PINGE_ACTIVE = "pinge.access.log";
private static java.util.Map<String, Set<Integer>> map = new HashMap<String, Set<Integer>>();
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] words = line.split("\\|");
StringBuilder buf = new StringBuilder();
if ("user.active".equals(words[3]) && !"".equals(words[4])) {
// 没有版本信息
buf.append(words[1]).append("\t").append(words[2]).append("\t").append(words[3]);
int uid = Integer.valueOf(words[4]);
//int uidEnd = uid%10;
//word.set(String.valueOf(uidEnd));
word.set(buf.toString());
output.collect(word, new IntWritable(uid));
} else if ("user.active".equals(words[4]) && !"".equals(words[5])) {
// 有版本信息
buf.append(words[1]).append("\t").append(words[2]).append("\t").append(words[4]);
int uid = Integer.valueOf(words[5]);
//int uidEnd = uid%10;
//word.set(String.valueOf(uidEnd));
word.set(buf.toString());
output.collect(word, new IntWritable(uid));
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
while (values.hasNext()) {
if(!map.containsKey(key.toString())){
Set<Integer> set = new HashSet<Integer>();
map.put(key.toString(), set);
} else {
Set<Integer> set = map.get(key.toString());
set.add(values.next().get());
map.put(key.toString(), set);
}
}
int size = map.get(key.toString()).size();
output.collect(key, new IntWritable(size));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(PingeStat.class);
conf.setJobName("pingeStat");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[1]));
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(URI.create(args[1]), config); // 获得hdfs文件系统设置
// FileSystem hdfs = DistributedFileSystem.get(conf);
FileSystem local = FileSystem.getLocal(config); // 获得本地文件系统设置
Path inputDir = new Path(args[0]); // 本地输入目录
Path hdfsFile = new Path(args[1]); // 远程输出文件
try {
FileStatus[] inputFiles = local.listStatus(inputDir); // 数组,用来循环保存本地文件目录信息
FSDataOutputStream out = hdfs.create(hdfsFile, new Progressable() {
@Override
public void progress() {
// TODO Auto-generated method stub
System.out.print(".");
}
}); // 创新输出hdfs文件
for (int i = 0; i < inputFiles.length; i++) { // 循环取出本地文件目录信息
if (inputFiles.isDir()) {
// 根据目录机构的特点获取每个子目录下pinge.access.log文件
// 补全文件名
String fileName = args[0] + inputFiles.getPath().getName() + "/" + PINGE_ACTIVE;// 主要关心文件名
Path filePath = new Path(fileName);
FSDataInputStream in = null;
try {
in = local.open(filePath); // 打开本地文件
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
if (null != in) {
byte buffer[] = new byte[256];
int bytesRead = 0;
while ((bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead); // 合并文件
}
in.close();
}
}
}
out.close();
} catch (IOException e) {
// TODO: handle exception
e.printStackTrace();
}
// 删除输出目录
deleteFromHdfs(args[2]);
// 运行job
JobClient.runJob(conf);
}
/** 从HDFS上删除文件 */
private static void deleteFromHdfs(String dst) throws FileNotFoundException, IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
fs.deleteOnExit(new Path(dst));
fs.close();
}
}


编译:
/usr/lib/jdk1.6.0_33/bin/javac -classpath /opt/hadoop/hadoop-core-1.1.2.jar -d /home/hadoop/pinge_classes/ /home/hadoop/pinge_classes/PingeActiveStat.java
hadoop@Master:~/pinge_classes$ /usr/lib/jdk1.6.0_33/bin/jar -cvf /home/hadoop/PingeActiveStat.jar -C /home/hadoop/pinge_classes/ .
运行
/opt/hadoop/bin/hadoop jar /home/hadoop/PingeActiveStat.jar cn.focus.dc.hadoop.PingeActiveStat /opt/tmp_log/pinge-access-2013-08-18.log hdfs://10.1.77.213:54310/user/hadoop/pinge_access/pinge-access-2013-08-18.log hdfs://10.1.77.213:54310/user/hadoop/pinge_access_output/

放权限
/opt/apps/hadoop/bin/hadoop fs -chmod -R 777 /user

/usr/lib/jdk1.6.0_33/bin/javac -classpath /opt/hadoop/hadoop-core-1.1.2.jar -d /home/hadoop/pinge_ugc_classes/ /home/hadoop/pinge_ugc_classes/PingeUgcStat.java
/usr/lib/jdk1.6.0_33/bin/jar -cvf /home/hadoop/PingeUgcStat.jar -C /home/hadoop/pinge_ugc_classes/ .

/opt/hadoop/bin/hadoop jar /home/hadoop/PingeUgcStat.jar cn.focus.dc.hadoop.PingeUgcStat /opt/tmp_log/pinge-2013-08-24.log  hdfs://10.1.77.213:54310/user/hadoop/pinge_ugc/pinge-ugc-2013-08-24.log hdfs://10.1.77.213:54310/user/hadoop/pinge_ugc_output/
/opt/apps/hadoop/bin/hadoop jar /opt/stat/PingeUgcStat.jar cn.focus.dc.hadoop.PingeUgcStat /opt/tmp_log/pinge-2013-08-24.log hdfs://localhost:54310/user/hadoop/pinge_ugc/pinge-ugc-2013-08-24.log hdfs://localhost:54310/user/hadoop/pinge_ugc_output/

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-309344-1-1.html 上篇帖子: hadoop自定义InputFormat 下篇帖子: hadoop动态debug设置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表