设为首页 收藏本站
查看: 582|回复: 0

[经验分享] Hadoop-常用操作

[复制链接]

尚未签到

发表于 2016-12-6 07:32:16 | 显示全部楼层 |阅读模式
使用URL的方式读取一个文件内容,需要设置一个handler工厂,这个工厂只能设置一次
static {
URL.setURLStreamHandlerFactory( new FsUrlStreamHandlerFactory() );
}
public void test1() throws IOException {
URL u = new URL("hdfs://IP:8020/test");
InputStream is = u.openStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String line = null;
while( (line=br.readLine()) != null ) {
System.out.println(line);
}
br.close();
}

使用hadoop的FileSystem读取文件
public void test2() throws IOException {
String url = "hdfs://IP:8020/test-data/hello.txt";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),config);
InputStream is = null;
is = fs.open(new Path(url));
IOUtils.copyBytes(is, System.out, 4096, false);
}


将一个本地文件拷贝到hadoop文件系统中
public void test3() throws IOException {
String src = "C:\\test.txt";
String dest = "hdfs://IP:8020/test-data/hello.txt";
InputStream is = new BufferedInputStream(new FileInputStream(src));
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dest), config);
OutputStream os = fs.create(new Path(dest), new Progressable() {
@Override
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(is, os, 4096, true);
System.out.println("ok~");
}


列出文件属性
public void test4() throws IOException {
String url = "hdfs:/IP:8020/test-data/hello.txt";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),config);
FileStatus status = fs.getFileStatus(new Path(url));
System.out.println("AccessTime : "+status.getAccessTime());
System.out.println("BlockSize : "+status.getBlockSize());
System.out.println("group : "+status.getGroup());
System.out.println("len : "+status.getLen());
System.out.println("ModificationTime : "+status.getModificationTime());
System.out.println("owner : "+status.getOwner());
System.out.println("is dir ? : "+status.isDir());
System.out.println("path : "+status.getPath());
System.out.println("permission : "+status.getPermission());
System.out.println("replication : "+status.getReplication());
}

通过路径过滤器查找文件
public void test5() throws IOException {
String url = "hdfs://IP:8020/test-data/*";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),config);
FileStatus[] status = fs.globStatus( new Path(url), new RegexPathFilter("^.*hello.*$") );
for(FileStatus s:status) {
System.out.println(s.getPath().toString());
}
System.out.println("filter execute ok");
}
//路径正则过滤器类
public class RegexPathFilter implements PathFilter {
private final String regex;
public RegexPathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
return path.toString().matches(regex);
}
}

删除,支持递归删除
public void delete() throws IOException {
String url = "hdfs://IP:8020/test-data/xxx.txt";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),config);
fs.delete(new Path(url), false);
System.out.println("delete ok");
}

重命名
public void test5_rename() throws IOException {
String url = "hdfs://IP:8020/test-data/xx.txt";
String url2 = "hdfs://IP:8020/test-data/modify-xx.txt";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),config);
boolean isok = fs.rename(new Path(url), new Path(url2));
System.out.println("complete : "+isok);
}

检查文件是否存在
public void exist() throws IOException {
String url = "hdfs:/IP:8020/test-data/modify-xx.txt";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),config);
boolean isExist = fs.exists( new Path(url));
System.out.println("exist ? "+isExist);
}

查找某个文件在HDFS中的位置
public void test5_location() throws IOException {
String url = "hdfs://IP:8020/test-data/hello.txt";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),config);
FileStatus status = fs.getFileStatus(new Path(url));
BlockLocation[] bls = fs.getFileBlockLocations(status, 0, status.getLen());
for(int i=0;i<bls.length;i++) {
String[] hosts = bls.getHosts();
System.out.println("block :"+i+"\tlocation : "+hosts);
}
}


获取HDFS集群上所有节点的名称
public void test5_allnode() throws IOException {
String url2 = "hdfs://IP:8020/test-data/modify-xx.txt";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url2),config);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] status = hdfs.getDataNodeStats();
for(DatanodeInfo d:status) {
System.out.println(d.getHostName());
}
}


创建本地和远端的checksum
public void localCreateChecksum() throws IOException {
String url = "file:///C:/zzzz/abc.txt";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),config);
ChecksumFileSystem cfs = new LocalFileSystem(fs);
FSDataOutputStream fsdos = cfs.create(new Path(url));
fsdos.write("hehe".getBytes());
fsdos.flush();
fsdos.close();
}

public void distributeCreateChecksum() throws IOException {
String url = "hdfs://IP:8020/test/abc.txt";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),config);
ChecksumFileSystem cfs = new LocalFileSystem(fs);
FSDataOutputStream fsdos = cfs.create(new Path(url));
fsdos.write("hehe~".getBytes());
fsdos.flush();
fsdos.close();
}

压缩和解压缩,压缩池
public void compress() throws IOException {
FileInputStream fis = new FileInputStream("C:\\zzzz\\xx.txt");
GzipCodec gc = new GzipCodec();
String url = "hdfs://IP:8020/test/compress.txt";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url), config);
OutputStream fdos = fs.create(new Path(url));
byte[] buf = new byte[10240];
int len = fis.read(buf);
System.out.println("content:");
System.out.println( new String(buf,0,len) );
CompressionOutputStream cos = gc.createOutputStream(fdos);
cos.write(buf,0,len);
cos.flush();
cos.close();
}
public void decompress() throws IOException {
String url = "hdfs://IP:8020/test/compress.gz";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url), config);
GzipCodec gc = new GzipCodec();
FSDataInputStream fdis = fs.open(new Path(url));
CompressionInputStream cis = gc.createInputStream(fdis);
IOUtils.copy(cis, System.out);
}
public void comprssPool() throws IOException {
FileInputStream fis = new FileInputStream("C:\\zzzz\\abc.txt");
GzipCodec gc = new GzipCodec();
FileOutputStream fdos = new FileOutputStream("C:/zzzz/pool.txt");
Compressor compressor = CodecPool.getCompressor(gc);
CompressionOutputStream cos = gc.createOutputStream(fdos, compressor);
IOUtils.copy(fis, cos);
CodecPool.returnCompressor(compressor);
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-310153-1-1.html 上篇帖子: hadoop SecondNamenode详解 下篇帖子: Hadoop学习资料及网站汇总
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表