|
1 基本hadoop程序
2 输入输出格式
3 多个map reduce管道(已经实践,靠谱,可否支持写入不同的文件呢?)
4 数据join。
map端join
reduce端join(分组,打标志,以关联字段为key)
分布式缓存(非对称,小数据)
布隆过滤器(假负率为0,位图,缩小存储空间,非对称表join常用手段)
http://www.google.com.hk/ggblog/googlechinablog/2007/07/bloom-filter_7469.html
public class WordCountDemo {
private static final Log LOG = LogFactory.getLog(WordCountDemo.class);
public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(10);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
//LOG.error(key.toString()+":"+result.get());
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(WordCountDemo.class);
job.setMapperClass(TokenizerMapper.class);
//job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/input"));
FileOutputFormat.setOutputPath(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/output"));
long start=System.currentTimeMillis();
boolean done=job.waitForCompletion(true);
LOG.error("cost:"+(System.currentTimeMillis()-start));
System.exit(done ? 0 : 1);
}
}
public class WordCountDemo2 {
private static final Log LOG = LogFactory.getLog(WordCountDemo2.class);
public static class CleanMapper extends MapReduceBase implements
Mapper<Object, Text, Object, Text> {
@Override
public void map(Object key, Text value,
OutputCollector<Object, Text> output, Reporter reporter)
throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String s = itr.nextToken();
if (s.contains("security")) {
LOG.error("bingo:"+value.toString());
output.collect(key, value);
break;
}
}
}
}
public static class PostCleanMapper extends MapReduceBase implements
Mapper<Text, IntWritable, Text, IntWritable> {
@Override
public void map(Text key, IntWritable value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
if (key.toString().contains("security")) {
output.collect(key, value);
}
}
}
public static class TokenizerMapper extends MapReduceBase implements
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
public static class IntSumReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum+=values.next().get();
}
result.set(sum);
output.collect(key, result);
//LOG.error(key.toString()+":"+result.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
JobConf job = new JobConf(conf);
job.setJobName("ChainJob");
job.setJarByClass(WordCountDemo2.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/input"),new Path(
"/c:/cygwin/usr/hadoop-1.0.3/bin"));
FileOutputFormat.setOutputPath(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/output"));
JobConf map1Conf = new JobConf(false);
JobConf map2Conf = new JobConf(false);
//ChainMapper.addMapper(job, CleanMapper.class, Object.class, Text.class, Object.class, Text.class, false, map1Conf);
ChainMapper.addMapper(job, TokenizerMapper.class, Object.class, Text.class, Text.class, IntWritable.class, true, map2Conf);
JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(job, IntSumReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);
//ChainReducer.addMapper(job, PostCleanMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, false, map1Conf);
long start=System.currentTimeMillis();
JobClient.runJob(job);
LOG.error("cost:"+(System.currentTimeMillis()-start));
}
}
public class DataJoin extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DataJoin.class);
public static class MapClass extends DataJoinMapperBase {
protected Text generateInputTag(String inputFile) {
String datasource = inputFile.split("@")[0];
LOG.error("datasource:" + datasource);
return new Text(datasource);
}
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
LOG.error("line:" + line);
String[] tokens = line.split(",");
String groupKey = tokens[0];
LOG.error("groupKey:" + groupKey);
return new Text(groupKey);
}
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable();
LOG.error("value:" + value);
LOG.error("inputTag:" + inputTag);
retv.setData((Writable) value);
retv.setTag(this.inputTag);
return retv;
}
}
public static class Reduce extends DataJoinReducerBase {
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
LOG.error("tags:" + tags);
LOG.error("values:" + values);
if (tags.length < 2)
return null;
String joinedStr = "";
for (int i = 0; i < values.length; i++) {
if (i > 0)
joinedStr += ",";
TaggedWritable tw = (TaggedWritable) values;
String line = ((Text) tw.getData()).toString();
LOG.error("line2:" + line);
String[] tokens = line.split(",", 2);
joinedStr += tokens[1];
}
TaggedWritable retv = new TaggedWritable();
LOG.error("joinedStr:" + joinedStr);
LOG.error("tags[0]:" + tags[0]);
retv.setData(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
public static class TaggedWritable extends TaggedMapOutput {
private Writable data;
public TaggedWritable() {
this.tag = new Text();
}
public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}
public Writable getData() {
return data;
}
public void setData(Writable data) {
this.data = data;
}
public void write(DataOutput out) throws IOException {
this.tag.write(out);
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
LOG.error("readFields", e);
}
}
this.data.readFields(in);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, DataJoin.class);
FileInputFormat.setInputPaths(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/input/*.txt"));
FileOutputFormat.setOutputPath(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/output"));
job.setJobName("DataJoin");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DataJoin(), args);
System.exit(res);
}
}
public class DataJoinDC extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DataJoinDC.class);
public static final String CACHE_LOCALFILES = "mapred.cache.localFiles";
public static final String CACHE_FILES = "mapred.cache.files";
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DataJoinDC(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, DataJoinDC.class);
LOG.equals("cacheFile:"
+ new Path("/c:/cygwin/usr/hadoop-1.0.3/input/customer@.txt")
.toUri());
DistributedCache.addCacheFile(new Path(
"/c:/cygwin/usr/hadoop-1.0.3/input/customer@.txt").toUri(),
conf);
FileInputFormat.setInputPaths(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/input/order@.txt"));
FileOutputFormat.setOutputPath(job, new Path(
"/c:/cygwin/usr/hadoop-1.0.3/output"));
LOG.equals("CACHE_LOCALFILES:" + conf.getStrings(CACHE_LOCALFILES));
LOG.equals("CACHE_LOCALFILES:" + conf.getStrings(CACHE_FILES));
job.setJobName("DataJoin with DistributedCache");
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.set("key.value.separator.in.input.line", ",");
JobClient.runJob(job);
return 0;
}
public static class MapClass extends MapReduceBase implements
Mapper<Text, Text, Text, Text> {
private Hashtable<String, String> joinData = new Hashtable<String, String>();
@Override
public void configure(JobConf conf) {
try {
String line;
String[] tokens;
BufferedReader joinReader = new BufferedReader(new FileReader(
"/c:/cygwin/usr/hadoop-1.0.3/input/customer@.txt"));
try {
while ((line = joinReader.readLine()) != null) {
LOG.error("line:" + line);
tokens = line.split(",", 2);
joinData.put(tokens[0], tokens[1]);
}
} finally {
joinReader.close();
}
} catch (IOException e) {
System.err.println("Exception reading DistributedCache: " + e);
}
}
public void map(Text key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
LOG.error("key:" + key);
Set<Entry<String, String>> set = joinData.entrySet();
for (Entry<String, String> entry : set) {
LOG.error("key:" + entry.getKey());
LOG.error("value:" + entry.getValue());
}
String joinValue = joinData.get(key.toString());
LOG.error("joinValue:" + joinValue);
LOG.error("value:" + value);
if (joinValue != null) {
output.collect(key,
new Text(value.toString() + "," + joinValue));
}
}
}
}
try数据统计:
试用品表和商品表的聚合数据(量小hive,表连接)
访问日志表,包括直接访问试用的日志,由试用引导到其他页面的数据。进行基本的uv统计,访问路径模式统计(hive过滤)。
试用交易表,试用商品对应的交易数据,引导交易数据(hive表连接)
试用用户表聚合uic表(hive表连接)
试用申请表用于同地址去重(mapreduce) |
|