|
前一段时间在写一个MapReduce程序时,需要用到一些第三方的Jar包,于是运行程序时需要把这些Jar包一一上传到所有Hadoop节点。
当集群中增加一个节点或换一套集群环境时,又需要重新布置一遍这些Jar包,这样不仅费时,而且还容易遗漏、出错。并且,当我不再需要这些Jar包时,又得一个一个节点去清理。非常耗时。
后来查阅相关资料,发现Hadoop提供一个DistributedCache,用它可以把HDFS中的文件加载到DistributedCache中,当我们需要这些文件时,DistributedCache自动把这些文件下载到集群中节点的本地存储上(mapred.local.dir)。这样就不需要一一布置第三方的Jar包,并且Hadoop集群增加节点也不需要再上传了。
DistributedCache的使用方法,参考http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/filecache/DistributedCache.html。
// Setting up the cache for the application
1. Copy the requisite files to the FileSystem:
$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
2. Setup the application's JobConf:
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
3. Use the cached files in the Mapper
or Reducer:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {
private Path[] localArchives;
private Path[] localFiles;
public void configure(JobConf job) {
// Get the cached archives/files
localArchives = DistributedCache.getLocalCacheArchives(job);
localFiles = DistributedCache.getLocalCacheFiles(job);
}
public void map(K key, V value,
OutputCollector<K, V> output, Reporter reporter)
throws IOException {
// Use data from the cached archives/files here
// ...
// ...
output.collect(k, v);
}
}
DistributedCache 不光对第三方Jar有用,对于Read-Only的数据都可以,这样也扩展了我们编写MapReduce程序的思路。 |
|
|