225025 发表于 2018-10-30 09:30:23

Hadoop2.6.0学习笔记(二)HDFS访问

package com.invic.hdfs;  

  
import java.io.IOException;
  
import java.io.OutputStream;
  
import java.net.URI;
  
import java.util.Scanner;
  

  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.fs.FSDataInputStream;
  
import org.apache.hadoop.fs.FSDataOutputStream;
  
import org.apache.hadoop.fs.FileStatus;
  
import org.apache.hadoop.fs.FileSystem;
  
import org.apache.hadoop.fs.FileUtil;
  
import org.apache.hadoop.fs.LocatedFileStatus;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.fs.PathFilter;
  
import org.apache.hadoop.fs.RemoteIterator;
  
import org.apache.hadoop.io.IOUtils;
  
import org.apache.hadoop.util.Progressable;
  

  
/**
  
*
  
* @author lucl
  
* @ 通过FileSystem API来实现
  
*FileSystem get(Configuration)            通过设置配置文件core-site.xml读取类路径来实现,默认本地文件系统
  
*FileSystem get(URI, Configuration)       通过URI来设定要使用的文件系统
  
*FileSystem get(URI, Configuration, user) 作为给定用户来访问文件系统,对安全来说至关重要
  
*/
  
public class MyHdfsOfFS {
  

  
    private static String HOST = "hdfs://nnode";
  
    private static String PORT = "8020";
  

  
    private static String NAMENODE = HOST + ":" + PORT;
  

  
    public static void main(String[] args) throws IOException {
  
      Configuration conf = new Configuration();
  

  
      String path = NAMENODE + "/user/";
  

  
      /**
  
         * 由于这里设计的为hadoop的user目录,默认会查询hdfs的用户家目录下的文件
  
         */
  
      String user = "hadoop";
  
      FileSystem fs = null;
  
      try {
  
            fs = FileSystem.get(URI.create(path), conf, user);
  
      } catch (InterruptedException e) {
  
            e.printStackTrace();
  
      }
  

  
      if (null == fs) {
  
            return;
  
      }
  

  
      /**
  
         * 递归创建目录
  
         */
  
      boolean mkdirs = fs.mkdirs(new Path("invic/test/mvtech"));
  
      if (mkdirs) {
  
            System.out.println("Dir ‘invic/test/mvtech’ create success.");
  
      }
  

  
      /**
  
         * 判断目录是否存在
  
         */
  
      boolean exists = fs.exists(new Path("invic/test/mvtech"));
  
      if (exists) {
  
            System.out.println("Dir ‘invic/test/mvtech’ exists.");
  
      }
  

  
      /**
  
         * FSDataInputStream支持随意位置访问
  
         * 这里的lucl.txt默认查找路径为/user/Administrator/lucl.txt
  
             因为我是windows的eclipse
  
         * 如果我上面的get方法最后指定了user
  
             则查询的路径为/user/get方法指定的user/lucl.txt
  
         */
  
      FSDataInputStream in = fs.open(new Path("lucl.txt"));
  

  
      OutputStream os = System.out;
  

  
      int buffSize = 4098;
  

  
      boolean close = false;
  

  
      IOUtils.copyBytes(in, os, buffSize, close);
  

  
      System.out.println("\r\n跳到文件开始重新读取文件。。。。。。");
  
      in.seek(0);
  
      IOUtils.copyBytes(in, os, buffSize, close);
  

  
      IOUtils.closeStream(in);
  

  
      /**
  
         * 创建文件
  
         */
  
      FSDataOutputStream create = fs.create(new Path("sample.txt"));
  
      create.write("This is my first sample file.".getBytes());
  
      create.flush();
  
      create.close();
  

  
      /**
  
         * 文件拷贝
  
         */
  
      fs.copyFromLocalFile(new Path("F:\\Mvtech\\ftpfile\\cg-10086.com.csv"),
  
      new Path("cg-10086.com.csv"));
  

  
      /**
  
         * 文件追加
  
         */
  
      FSDataOutputStream append = fs.append(new Path("sample.txt"));
  
      append.writeChars("\r\n");
  
      append.writeChars("New day, new World.");
  
      append.writeChars("\r\n");
  

  
      IOUtils.closeStream(append);
  

  
      /**
  
         * progress的使用
  
         */
  
      FSDataOutputStream progress = fs.create(new Path("progress.txt"),
  
      new Progressable() {
  

  
            @Override
  
            public void progress() {
  
                System.out.println("write is in progress......");
  
            }
  
      });
  

  
      // 接收键盘输入到hdfs上
  
      Scanner sc = new Scanner(System.in);
  
      System.out.print("Please type your enter : ");
  
      String name = sc.nextLine();
  
      while (!"quit".equals(name)) {
  
            if (null == name || "".equals(name.trim())) {
  
                continue;
  
            }
  
            progress.writeChars(name);
  

  
            System.out.print("Please type your enter : ");
  
            name = sc.nextLine();
  
      }
  

  
      /**
  
         * 递归列出文件
  
         */
  
      RemoteIterator it = fs.listFiles(new Path(path), true);
  
      while (it.hasNext()) {
  
            LocatedFileStatus loc = it.next();
  
            System.out.println(loc.getPath().getName() + "|" + loc.getLen() + "|"
  
            + loc.getOwner());
  
      }
  

  
      /**
  
         * 文件或目录元数据:文件长度、块大小、复本、修改时间、所有者及权限信息
  
         */
  
      FileStatus status = fs.getFileStatus(new Path("lucl.txt"));
  
      System.out.println(status.getPath().getName() + "|" +
  
      status.getPath().getParent().getName() + "|" + status.getBlockSize() + "|"
  
      + status.getReplication() + "|" + status.getOwner());
  

  
      /**
  
         * 列出目录中文件listStatus,若参数为文件则以数组方式返回长度为1的FileStatus对象
  
         */
  
      fs.listStatus(new Path(path));
  
      fs.listStatus(new Path(path), new PathFilter() {
  

  
            @Override
  
            public boolean accept(Path tmpPath) {
  
                String tmpName = tmpPath.getName();
  
                if (tmpName.endsWith(".txt")) {
  
                  return true;
  
                }
  
                return false;
  
            }
  
      });
  

  
      // 可以传入一组路径,会最终累计合并成一个数组返回
  
      // fs.listStatus(Path [] files);
  
      FileStatus [] mergeStatus = fs.listStatus(new Path[]{new Path("lucl.txt"),
  
         new Path("progress.txt"), new Path("sample.txt")});
  
      Path [] listPaths = FileUtil.stat2Paths(mergeStatus);
  
      for (Path p : listPaths) {
  
            System.out.println(p);
  
      }
  

  
      /**
  
         * 文件模式匹配
  
         */
  
      FileStatus [] patternStatus = fs.globStatus(new Path("*.txt"));
  
      for (FileStatus stat : patternStatus) {
  
            System.out.println(stat.getPath());
  
      }
  

  
      /**
  
         * 删除数据
  
         */
  
      boolean recursive = true;
  
      fs.delete(new Path("demo.txt"), recursive);
  

  
      fs.close();
  
    }
  
}


页: [1]
查看完整版本: Hadoop2.6.0学习笔记(二)HDFS访问