教你使用JAVA语言API操作hdfs(环境构建+操作使用)
小标 2018-09-10 来源 : 阅读 651 评论 0

摘要:本文主要向大家介绍了教你使用JAVA语言API操作hdfs(环境构建+操作使用),通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

本文主要向大家介绍了教你使用JAVA语言API操作hdfs(环境构建+操作使用),通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

一.构建环境

        在hadoop的安装包中的share目录中有hadoop所有你能想象到的内容。
        进入安装包下的share文件夹你会看到doc和hadoop文件夹。其中doc中是hadoop的整个document。而hadoop文件夹中则存放着所有开发hadoop所有用到的jar包,其依赖放到相应的lib文件夹中。
       我们这次用到的是hadoop文件夹中的common以及hdfs文件夹中的内容。在创建工程时,继续将这两个文件夹中的jar包添加到相应的工程中,然后将文件夹中的lib文件夹的所有jar包作为依赖也添加到工程文件中(common和hdfs文件的lib文件中的jar包可能存在重复,注意去重。);其中source文件夹是该模块的源码包,如果想在IDEA中看到源码,需要将这些内容也添加到工程中。

二.操作HDFS流程

获取HDFS的配置,根据HDFS的配置获取整个HDFS操作系统的内容

打开HDFS操作系统,进行操作

文件夹的操作:增删改查

文件的上传下载

文件的IO操作——hdfs之间的复制

三.具体的操作命令

根据配置获取HDFS文件操作系统(共有三种方式)

方法一:直接获取配置文件方法
通常情况下该方法用于本地有hadoop系统,可以直接进行访问。此时仅需在配置文件中指定要操作的文件系统为hdfs即可。这里的conf的配置文件可以设置hdfs的各种参数,并且优先级比配置文件要搞

方法二:指定URI路径,进而获取配置文件创建操作系统
通常该方法用于本地没有hadoop系统,但是可以通过URI的方式进行访问。此时要给给定hadoop的NN节点的访问路径,hadoop的用户名,以及配置文件信息(此时会自动访问远程hadoop的配置文件)


               /**
                * 根据配置文件获取HDFS操作对象
                * 有两种方法:
                *  1.使用conf直接从本地获取配置文件创建HDFS对象
                *  2.多用于本地没有hadoop系统,但是可以远程访问。使用给定的URI和用户名,访问远程的配置文件,然后创建HDFS对象。
                * @return FileSystem                 */
               public FileSystem getHadoopFileSystem() {
           
           
                   FileSystem fs = null;
                   Configuration conf = null;            
                   // 方法一,本地有配置文件,直接获取配置文件(core-site.xml,hdfs-site.xml)                    // 根据配置文件创建HDFS对象                    // 此时必须指定hdsf的访问路径。
                   conf = new Configuration();                    // 文件系统为必须设置的内容。其他配置参数可以自行设置,且优先级最高
                   conf.set("fs.defaultFS", "hdfs://huabingood01:9000");            
                   try {                        // 根据配置文件创建HDFS对象
                       fs = FileSystem.get(conf);
                   } catch (IOException e) {
                       e.printStackTrace();
                       logger.error("",e);
                   }            
                   // 方法二:本地没有hadoop系统,但是可以远程访问。根据给定的URI和用户名,访问hdfs的配置参数                    // 此时的conf不需任何设置,只需读取远程的配置文件即可。
                   /*conf = new Configuration();
                   // Hadoop的用户名
                   String hdfsUserName = "huabingood";
           
                   URI hdfsUri = null;
                   try {
                       // HDFS的访问路径
                       hdfsUri = new URI("hdfs://huabingood01:9000");
                   } catch (URISyntaxException e) {
                       e.printStackTrace();
                       logger.error(e);
                   }
           
                   try {
                       // 根据远程的NN节点,获取配置信息,创建HDFS对象
                       fs = FileSystem.get(hdfsUri,conf,hdfsUserName);
                   } catch (IOException e) {
                       e.printStackTrace();
                       logger.error(e);
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                       logger.error(e);
                   }*/
           
                   // 方法三,反正我们没有搞懂。
                   /*conf  = new Configuration();
                   conf.addResource("/opt/huabingood/pseudoDistributeHadoop/hadoop-2.6.0-cdh5.10.0/etc/hadoop/core-site.xml");
                   conf.addResource("/opt/huabingood/pseudoDistributeHadoop/hadoop-2.6.0-cdh5.10.0/etc/hadoop/hdfs-site.xml");
           
                   try {
                       fs = FileSystem.get(conf);
                   } catch (IOException e) {
                       e.printStackTrace();
                       logger.error(e);
                   }*/
           
                   return fs;
               }


  2.添加文件夹


/**
    * 这里的创建文件夹同shell中的mkdir -p 语序前面的文件夹不存在
    * 跟java中的IO操作一样,也只能对path对象做操作;但是这里的Path对象是hdfs中的
    * @param fs
    * @return
    */
   public boolean myCreatePath(FileSystem fs){        boolean b = false;

       Path path = new Path("/hyw/test/huabingood/hyw");        try {            // even the path exist,it can also create the path.
           b = fs.mkdirs(path);
       } catch (IOException e) {
           e.printStackTrace();
           logger.error(e);
       } finally {            try {
               fs.close();
           } catch (IOException e) {
               e.printStackTrace();
               logger.error(e);
           }
       }        return b;
   }


 

  3.删除文件夹


/**
    * 删除文件,实际上删除的是给定path路径的最后一个
    * 跟java中一样,也需要path对象,不过是hadoop.fs包中的。
    * 实际上delete(Path p)已经过时了,更多使用delete(Path p,boolean recursive)
    * 后面的布尔值实际上是对文件的删除,相当于rm -r
    * @param fs
    * @return
    */
   public boolean myDropHdfsPath(FileSystem fs){        boolean b = false;        // drop the last path
       Path path = new Path("/huabingood/hadoop.tar.gz");        try {
           b = fs.delete(path,true);
       } catch (IOException e) {
           e.printStackTrace();
           logger.error(e);
       } finally {            try {
               fs.close();
           } catch (IOException e) {
               e.printStackTrace();
               logger.error(e);
           }
       }        return b;
   }


 

  4.重命名


/**
    * 重命名文件夹
    * @param hdfs
    * @return
    */
   public boolean myRename(FileSystem hdfs){        boolean b = false;
       Path oldPath = new Path("/hyw/test/huabingood");
       Path newPath = new Path("/hyw/test/huabing");        try {
           b = hdfs.rename(oldPath,newPath);
       } catch (IOException e) {
           e.printStackTrace();
           logger.error(e);
       }finally {            try {
               hdfs.close();
           } catch (IOException e) {
               e.printStackTrace();
               logger.error(e);
           }
       }        return b;
   }


 

  5.文件夹的递归遍历


        /**
            * 遍历文件夹
            * public FileStatus[] listStatus(Path p)
            * 通常使用HDFS文件系统的listStatus(path)来获取改定路径的子路径。然后逐个判断
            * 值得注意的是:
            *  1.并不是总有文件夹中有文件,有些文件夹是空的,如果仅仅做是否为文件的判断会有问题,必须加文件的长度是否为0的判断
            *  2.使用getPath()方法获取的是FileStatus对象是带URL路径的。使用FileStatus.getPath().toUri().getPath()获取的路径才是不带url的路径
            * @param hdfs
            * @param listPath 传入的HDFS开始遍历的路径
            * @return
            */
           public SetrecursiveHdfsPath(FileSystem hdfs,Path listPath){        
               /*FileStatus[] files = null;
               try {
                   files = hdfs.listStatus(listPath);
                   Path[] paths = FileUtil.stat2Paths(files);
                   for(int i=0;i<files.length;i++){
                       if(files[i].isFile()){
                           // set.add(paths[i].toString());
                           set.add(paths[i].getName());
                       }else {
                           recursiveHdfsPath(hdfs,paths[i]);
                       }
                   }
               } catch (IOException e) {
                   e.printStackTrace();
                   logger.error(e);
               }*/
       
               FileStatus[] files = null;        
               try {
                   files = hdfs.listStatus(listPath);                    // 实际上并不是每个文件夹都会有文件的。
                   if(files.length == 0){                        // 如果不使用toUri(),获取的路径带URL。                        set.add(listPath.toUri().getPath());
                   }else {                        // 判断是否为文件
                       for (FileStatus f : files) {                            if (files.length == 0 || f.isFile()) {
                               set.add(f.getPath().toUri().getPath());
                           } else {                                // 是文件夹,且非空,就继续遍历                                recursiveHdfsPath(hdfs, f.getPath());
                           }
                       }
                   }
               } catch (IOException e) {
                   e.printStackTrace();
                   logger.error(e);
               }                return set;
           }


 

  6.文件的判断


/**
    * 文件简单的判断
    * 是否存在
    * 是否是文件夹
    * 是否是文件
    * @param fs     */
   public void myCheck(FileSystem fs){        boolean isExists = false;        boolean isDirectorys = false;        boolean isFiles = false;

       Path path = new Path("/hyw/test/huabingood01");        try {
           isExists = fs.exists(path);
           isDirectorys = fs.isDirectory(path);
           isFiles = fs.isFile(path);
       } catch (IOException e){
           e.printStackTrace();
           logger.error(e);
       } finally {            try {
               fs.close();
           } catch (IOException e) {
               e.printStackTrace();
               logger.error(e);
           }
       }        if(!isExists){
           System.out.println("lu jing not cun zai.");
       }else{
           System.out.println("lu jing cun zai.");            if(isDirectorys){
               System.out.println("Directory");
           }else if(isFiles){
               System.out.println("Files");
           }
       }


 

  7.文件配置信息的查询


/**
    * 获取配置的所有信息
    * 首先,我们要知道配置文件是哪一个
    * 然后我们将获取的配置文件用迭代器接收
    * 实际上配置中是KV对,我们可以通过java中的Entry来接收
    *     */
   public void showAllConf(){
       Configuration conf = new Configuration();
       conf.set("fs.defaultFS", "hdfs://huabingood01:9000");
       Iterator<Map.Entry> it = conf.iterator();        while(it.hasNext()){
           Map.Entryentry = it.next();
           System.out.println(entry.getKey()+"=" +entry.getValue());
       }
   }


 

  8.文件的上传


   /**
    * 文件下载
    * 注意下载的路径的最后一个地址是下载的文件名
    * copyToLocalFile(Path local,Path hdfs)
    * 下载命令中的参数是没有任何布尔值的,如果添加了布尔是,意味着这是moveToLocalFile()
    * @param fs     */
   public void getFileFromHDFS(FileSystem fs){
       Path HDFSPath = new Path("/hyw/test/hadoop-2.6.0-cdh5.10.0.tar.gz");
       Path localPath = new Path("/home/huabingood");        try {
           fs.copyToLocalFile(HDFSPath,localPath);
       } catch (IOException e) {
           e.printStackTrace();
           logger.error("zhe li you cuo wu !" ,e);
       }finally {            try {
               fs.close();
           } catch (IOException e) {
               e.printStackTrace();
               logger.error("zhe li you cuo wu !", e);
           }
       }
   }


 

  9.文件的下载


        /**
            * 文件的下载
            * 注意事项同文件的上传
            * 注意如果上传的路径不存在会自动创建
            * 如果存在同名的文件,会覆盖
            * @param fs             */
           public void myPutFile2HDFS(FileSystem fs){        
               boolean pathExists = false;                // 如果上传的路径不存在会创建                // 如果该路径文件已存在,就会覆盖
               Path localPath = new Path("/home/huabingood/绣春刀.rmbv");
               Path hdfsPath = new Path("/hyw/test/huabingood/abc/efg/绣春刀.rmbv");        
               try {
                   fs.copyFromLocalFile(localPath,hdfsPath);
               } catch (IOException e) {
                   e.printStackTrace();
               }finally {                    try {
                       fs.close();
                   } catch (IOException e) {
                       e.printStackTrace();
                   }
               }
       
           }


 

  10.文件的IO操作——HDFS之间文件的复制


           /**
            * hdfs之间文件的复制
            * 使用FSDataInputStream来打开文件open(Path p)
            * 使用FSDataOutputStream开创建写到的路径create(Path p)
            * 使用 IOUtils.copyBytes(FSDataInputStream,FSDataOutputStream,int buffer,Boolean isClose)来进行具体的读写
            * 说明:
            *  1.java中使用缓冲区来加速读取文件,这里也使用了缓冲区,但是只要指定缓冲区大小即可,不必单独设置一个新的数组来接受
            *  2.最后一个布尔值表示是否使用完后关闭读写流。通常是false,如果不手动关会报错的
            * @param hdfs             */
           public void copyFileBetweenHDFS(FileSystem hdfs){
               Path inPath = new Path("/hyw/test/hadoop-2.6.0-cdh5.10.0.tar.gz");
               Path outPath = new Path("/huabin/hadoop.tar.gz");        
               // byte[] ioBuffer = new byte[1024*1024*64];                // int len = 0;        
               FSDataInputStream hdfsIn = null;
               FSDataOutputStream hdfsOut = null;        
               try {
                   hdfsIn = hdfs.open(inPath);
                   hdfsOut = hdfs.create(outPath);
       
                   IOUtils.copyBytes(hdfsIn,hdfsOut,1024*1024*64,false);        
                   /*while((len=hdfsIn.read(ioBuffer))!= -1){
                       IOUtils.copyBytes(hdfsIn,hdfsOut,len,true);
                   }*/
               } catch (IOException e) {
                   e.printStackTrace();
                   logger.error(e);
               }finally {                    try {
                   hdfsOut.close();
                   hdfsIn.close();
                   } catch (IOException e) {
                       e.printStackTrace();
                   }
       
               }
       
           }

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注编程语言JAVA频道!


本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 0
看完这篇文章有何感觉?已经有1人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程