Hadoop分布式文件系统

2021/02/08 Hadoop HDFS

Hadoop分布式文件系统

HDFS(Hadoop Distributed File System)是Hadoop项目的核心子项目,是Hadoop主要应用的一个分布式文件系统。实际上,Hadoop中有一个综合性的文件系统抽象,它提供了文件系统实现的各类接口,HDFS只是这个抽象文件系统的一个实例。

学习HDFS主要包含以下内容

  • 包括HDFS的特点、基本操作、常用API及读/写数据流等。

Hadoop的文件系统

HDFS的设计理念源于非常朴素的思想:当数据集的大小超过单台计算机的存储能力时,就有必要将其进行分区(partition)并存储到若干台单独的计算机上,而管理网络中跨多台计算机存储的文件系统称为分布式文件系统(distribute filesystem)。该系统架构于网络之上,势必会引入网络编程的复杂性,因此分布式文件系统比普通文件系统更为复杂,例如,使文件系统能够容忍节点故障且不丢失任何数据,就是一个极大的挑战。 准确地说,Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。Hadoop文件系统接口由Java抽象类org.apache.hadoop.fs.FileSystem类定义,该类同时还继承了org.apache.hadoop.conf并且实现了Java的java.io.Closeable接口。

文件系统 URI方案 Java实现 定  义
Local file fs.LocalFileSystem 使用了客户端校验和的本地文件系统。没有使用校验和的本地磁盘文件系统由RawLocalFileSystem实现
HDFS hdfs hdfs.DistributedFileSystem Hadoop分布式文件系统
S3(原生) s3n fs.s3native.NativeS3FileSystem 基于Amazon S3的文件系统

Hadoop提供了许多文件系统的接口,用户可以选取合适的URI方案来实现对特定的文件系统的交互。例如,如果想访问本地文件系统,执行以下shell命令即可:

hadoop dfs -ls file:///      (最后一个/表示本地文件系统的根目录)

HDFS的设计理念

作为Hadoop生态圈的基础,HDFS非常适合运行在廉价硬件集群之上,以流式数据访问模式来存储超大文件。简单的一句话,已经勾勒出HDFS的特点。

  • 适合存储超大文件:存储在HDFS的文件大多在GB甚至TB级别,目前阿里巴巴的集群存储的数据已经达到了60 PB。
  • 运行于廉价硬件之上:HDFS在设计的时候,就已经认为在集群规模足够大的时候,节点故障并不是小概率事件,而可以认为是一种常态。例如,一个节点故障的概率如果是千分之一,那么当集群规模是1 000台时,正常情况每天都会有节点故障。当节点发生故障时,HDFS能够继续运行并且不让用户察觉到明显的中断,所以HDFS并不需要运行在高可靠且昂贵的服务器上,普通的PC Server即可。
  • 流式数据访问:HDFS认为,一次写入,多次读取是最高效的访问模式。HDFS存储的数据集作为Hadoop的分析对象,在数据集生成后,会长时间在此数据集上进行各种分析。每次分析都将涉及该数据集的大部分数据甚至全部数据,因此读取整个数据集的时间延迟比读取第一条记录的时间延迟更重要。 除了上面3点,HDFS也有一些短板。
  • 实时的数据访问弱:如果应用要求数据访问的时间在秒或是毫秒级别,那么HDFS是做不到的。由于HDFS针对高数据吞吐量做了优化,因而牺牲了读取数据的速度,对于响应时间是秒或是毫秒的数据访问,可以考虑使用HBase。
  • 大量的小文件:当Hadoop启动时,NameNode会将所有元数据读到内存,以此构建目录树。一般来说,一个HDFS上的文件、目录和数据块的存储信息大约在150字节左右,那么可以推算出,如果NameNode的内存为16 GB的话,大概只能存放480万个文件,对于一个超大规模的集群,这个数字很快就可以达到。
  • 多用户写入,任意修改文件:HDFS中的文件只能有一个写入者,并且写数据操作总是在文件末。它不支持多个写入者,也不支持在数据写入后,在文件的任意位置进行修改。事实上,如果不将hdfs-site.xml中的dfs.support.append设置为true的话,HDFS也不支持对文件进行追加操作。

HDFS的架构

一个完整的HDFS运行在一些节点之上,这些节点运行着不同类型的守护进程,如NameNode、DataNode、SecondaryNameNode,不同类型的节点相互配合,相互协作,在集群中扮演了不同的角色,一起构成了HDFS。 一个典型的HDFS集群中,有一个NameNode,一个SecondaryNode和至少一个DataNode,而HDFS客户端数量并没有限制。所有的数据均存放在运行DataNode进程的节点的块(block)里。

块(Block)

每个磁盘都有默认的数据块大小,这是磁盘进行数据读/写的最小单位,而文件系统也有文件块的概念,如ext3、ext2等。文件系统的块大小只能是磁盘块大小的整数倍,磁盘块大小一般是512字节,文件系统块大小一般为几千字节,如ext3的文件块大小为4 096字节,Windows的文件块大小为4 096字节。用户在使用文件系统对文件进行读取或写入时,完全不需要知道块的细节,这些对于用户都是透明的。

HDFS同样也有块(block)的概念,但是HDFS的块比一般文件系统的块大得多,默认为64 MB,并且可以随着实际需要而变化,配置项为hdfs-site.xml文件中的dfs.block.size项。与单一文件系统相似,HDFS上的文件也被划分为块大小的多个分块,它是HDFS存储处理的最小单元。

例如: 某个文件data.txt,大小为150 MB,如果此时HDFS的块大小没有经过配置,默认为64 MB,那么该文件实际在HDFS中存储的情况如下。 该文件的第一个块,大小为64 MB,第二个块,大小为64 MB,文件的第三个块,大小为22 MB,这里特别指出的,与其他文件系统不同的是,HDFS小于一个块大小的文件不会占据整个块的空间,所以第三块的大小为22 MB而不是64 MB。

HDFS中的块如此之大的原因是为了最小化寻址开销。如果块设置的足够大,从磁盘传输数据的时间可以明显大于定位这个块开始位置所需的时间。这样,传输一个由多个块组成的文件的时间取决于磁盘传输的效率。得益于磁盘传输速率的提升,块的大小可以被设为128 MB甚至更大。

在hdfs-site.xml文件中,还有一项配置为dfs.relication,该项配置为每个HDFS的块在Hadoop集群中保存的份数,值越高,冗余性越好,占用存储也越多,默认为3,即有2份冗余。

使用块的好处是非常明显的。

  • 可以保存比存储节点单一磁盘大的文件:块的设计实际上就是对文件进行分片,分片可以保存在集群的任意节点,从而使文件存储跨越了磁盘甚至机器的限制,如data.txt文件被切分为3个块,并存放在3个DataNode之中。
  • 简化存储子系统:将存储子系统控制单元设置为块,可简化存储管理,并且也实现了元数据和数据的分开管理和存储。
  • 容错性高:这是块非常重要的一点,如果将dfs.relication设置为2,那么任意一个块损坏,都不会影响数据的完整性,用户在读取文件时,并不会察觉到异常。之后集群会将损坏的块的副本从其他候选节点复制到集群中能正常工作的节点,从而使副本数回到配置的水平。

NameNode和SecondaryNameNode

NameNode也被称为名字节点,是HDFS的主从(master/slave)架构的主角色的扮演者。NameNode是HDFS的大脑,它维护着整个文件系统的目录树,以及目录树里所有的文件和目录,这些信息以两种文件存储在本地文件中:一种是命名空间镜像(也称为文件系统镜像,File System Image,FSImage),即HDFS元数据的完整快照,每次NameNode启动的时候,默认会加载最新的命名空间镜像,另一种是命名空间镜像的编辑日志(edit log)

SecondaryNameNode,也被称为第二名字节点,是用于定期合并命名空间镜像和命名空间镜像的编辑日志的辅助守护进程。每个HDFS集群都有一个SecondaryNameNode,在生产环境下,一般SecondaryNameNode也会单独运行在一台服务器上。

FSImage文件其实是文件系统元数据的一个永久性检查点,但并非每一个写操作都会更新这个文件,因为FSImage是一个大型文件,如果频繁地执行写操作,会使系统运行极为缓慢。解决方案是NameNode只将改动内容预写日志(WAL),即写入命名空间镜像的编辑日志。随着时间的推移,编辑日志会变得越来越大,那么一旦发生故障,将会花费非常多的时间来回滚操作,所以就像传统的关系型数据库一样,需要定期地合并FSImage和编辑日志。如果由NameNode来做合并的操作,那么NameNode在为集群提供服务时可能无法提供足够的资源,为了彻底解决这一问题,SecondaryNameNode应运而生。

DataNode

DataNode被称为数据节点,它是HDFS的主从架构的从角色的扮演者,它在NameNode的指导下完成I/O任务。如前文所述,存放在HDFS的文件都是由HDFS的块组成,所有的块都存放于DataNode节点。实际上,对于DataNode所在的节点来说,块就是一个普通的文件,我们可以去DataNode存放块的目录下观察(默认是$(dfs.data.dir)/current),块的文件名为blk_blkID。

DataNode会不断地向NameNode报告。初始化时,每个DataNode将当前存储的块告知NameNode,在集群正常工作时,DataNode仍然会不断地更新NameNode,为之提供本地修改的相关信息,同时接受来自NameNode的指令,创建、移动或者删除本地磁盘上的数据块。

HDFS客户端

HDFS客户端是指用户和HDFS交互的手段,HDFS提供了非常多的客户端,包括命令行接口、Java API、Thrift接口、C语言库、用户空间文件系统

HDFS容错

如何使文件系统能够容忍节点故障且不丢失任何数据,也就是HDFS的容错机制。

心跳机制

在NameNode和DataNode之间维持心跳检测,当由于网络故障之类的原因,导致DataNode发出的心跳包没有被NameNode正常收到的时候,NameNode就不会将任何新的I/O操作派发给那个DataNode,该DataNode上的数据被认为是无效的,因此NameNode会检测是否有文件块的副本数目小于设置值,如果小于就自动开始复制新的副本并分发到其他DataNode节点。

检测文件块的完整性

HDFS会记录每个新创建文件的所有块的校验和。当以后检索这些文件时或者从某个节点获取块时,会首先确认校验和是否一致,如果不一致,会从其他DataNode节点上获取该块的副本。

集群的负载均衡

由于节点的失效或者增加,可能导致数据分布不均匀,当某个DataNode节点的空闲空间大于一个临界值的时候,HDFS会自动从其他DataNode迁移数据过来。

NameNode上的FSImage和编辑日志文件

NameNode上的FSImage和编辑日志文件是HDFS的核心数据结构,如果这些文件损坏了,HDFS将失效。因而,NameNode由Secondary NameNode定期备份FSImage和编辑日志文件,NameNode在Hadoop中确实存在单点故障的可能,当NameNode出现机器故障,手工干预是必须的。

文件的删除

删除并不是马上从NameNode移出命名空间,而是存放在/trash目录随时可恢复,直到超过设置时间才被正式移除。设置的时间由hdfs-site.xml文件的配置项fs.trash.interval决定,单位为秒。

HDFS读取文件和写入文件

在HDFS中,NameNode作为集群的大脑,保存着整个文件系统的元数据,而真正数据是存储在DataNode的块中。HDFS如何读取和写入文件,组成同一文件的块在HDFS的分布情况如何影响HDFS读取和写入速度。

块的分布

HDFS会将文件切片成块并存储至各个DataNode中,文件数据块在HDFS的布局情况由NameNode和hdfs-site.xml中的配置dfs.replication共同决定。dfs.replication表示该文件在HDFS中的副本数,默认为3,即有两份冗余。

NameNode如何选择在哪个DataNode存储副本?这里需要在可靠性、写入速度和读取速度之间进行一些取舍。如果将所有副本都存储在一个节点之中,那么写入的带宽损失将是最小的,因为复制管道都是在单一节点上运行。但这样无论副本数设为多少,HDFS都不提供真实的冗余,因为该文件的所有数据都在一个节点之上,那么如果该节点发生故障的话,该文件的数据将会丢失。如果将副本放在不同数据中心,可以最大限度地提高冗余,但是对带宽的损耗非常大。即使在同一数据中心,也有不同的副本分布策略。其实,在发布的Hadoop 0.17.0版本中改变了数据布局策略来辅助保持数据块在集群内分布相对均匀。从0.21.0版本开始,可即时选择块的分布策略。

Hadoop的默认布局是在HDFS客户端节点上放第一个副本,但是由于HDFS客户端有可能运行于集群之外,就随机选择一个节点,不过Hadoop会尽量避免选择那些存储太满或者太忙的节点。第二个副本放在与第一个不同且随机另外选择的机架中的节点上。第三个副本与第二个副本放在相同的机架,且随机选择另外一个节点。其他副本(如果dfs.replication大于3)放在集群随机选择的节点上,Hadoop也会尽量避免在相同的机架上放太多副本。当NameNode按照上面的策略选定副本存储的位置后,就会根据集群网络拓扑图创建一个管道。

数据读取

HDFS客户端可以通过多种不同的方式(如命令行、Java API等)对HDFS进行读写操作,这些操作都遵循同样的流程。HDFS客户端需要使用到Hadoop库函数,函数库封装了大部分与NameNode和DataNode通信相关的细节,同时也考虑了分布式文件系统在诸多场景的错误处理机制。

假设在HDFS中存储了一个文件/user/test.txt,HDFS客户端要读取该文件,Hadoop客户端程序库是必不可少的。HDFS客户端首先要访问NameNode,并告诉它所要读取的文件,在这之前,HDFS会对客户的身份信息进行验证。验证的方式有两种:一种是通过信任的客户端,由其指定用户名;第二种方式是通过诸如Kerberos等强制验证机制来完成。接下来还需要检查文件的所有者及其设定的访问权限。当文件确实存在,且该用户对其有访问权限,这时NameNode会告诉HDFS客户端这个文件的第一个数据块的标号以及保存有该数据块的DataNode列表。这个列表是DataNode与HDFS客户端间的距离进行了排序。有了数据块标号和DataNode的主机名,HDFS客户端便可以直接访问最合适的DataNode,读取所需要的数据块。这个过程会一直重复直到该文件的所有数据块读取完成或HDFS客户端主动关闭了文件流。特殊的情况,如果该HDFS客户端是集群中的DataNode时,该节点将从本地DataNode中读取数据。

写入数据

首先HDFS客户端通过HDFS相关API发送请求,打开一个要写入的文件,如果该用户有写入文件的权限,那么这一请求将被送达NameNode,并建立该文件的元数据。但此时新建立的文件元数据并未和任何数据块相关联,这时HDFS客户端会收到“打开文件成功”的响应,接着就可以写入数据了。当客户端将数据写入流时,数据会被自动拆分成数据包,并将数据包保存在内存队列中。客户端有一个独立的线程,它从队列中读取数据包,并向NameNode请求一组DataNode列表,以便写入下一个数据块的多个副本。接着,HDFS客户端将直接连接到列表中的第一个DataNode,而该DataNode又连接到第二个DataNode,第二个又连接到第三个,如此就建立了数据块的复制管道。复制管道中的每一个DataNode都会确认所收到的数据包已经成功写入磁盘。HDFS客户端应用程序维护着一个列表,记录着哪些数据包尚未收到确认信息。每收到一个响应,客户端便知道数据已经成功地写入管道中的一个DataNode。当数据块被写入列表中的DataNode中时,HDFS客户端将重新向NameNode申请下一组DataNode。最终,客户端将剩余数据包写入全部磁盘,关闭数据管道并通知NameNode文件写操作已经完成。

如果写入的时候,复制管道中的某一个DataNode无法将数据写入磁盘(如DataNode死机)。发生这种错误时,管道会立即关闭,已发送的但尚未收到确认的数据包会被退回到队列中,以确保管道中错误节点的下游节点可以得到数据包。而在剩下的健康的DataNode中,正在写入的数据块会被分配新的blk_id。这样,当发生故障的数据节点恢复后,冗余的数据块就会因为不属于任何文件而被自动丢弃,由剩余DataNode节点组成的新复制管道会重新开放,写入操作得以继续,写操作将继续直至文件关闭。NameNode如果发现文件的某个数据块正在通过复制管道进行复制,就会异步地创建一个新的复制块,这样,即便HDFS的多个DataNode发生错误,HDFS客户端仍然可以从数据块的副本中恢复数据,前提是满足最少数目要求的数据副本(dfs.replication.min)已经被正确写入(dfs.replication.min配置默认为1)。

数据完整性

Hadoop用户都希望HDFS在读写数据时,数据不会有任何丢失或者损坏。但是在实际情况中,如果系统需要处理的数据量超过HDFS能够处理的极限,那么数据被损坏的概率还是很高的。

检测数据是否损坏的常用措施是,在数据第一次引入系统时计算校验和,并在数据通过一个不可靠的通道进行数据传输时再次计算校验和,如果发现两次校验和不一样,那么可以判定,数据已经损坏。校验和技术并不能修复数据,只能检测出数据是否已经损坏。

HDFS也是采用校验和技术判断数据是否损坏,HDFS会对写入的所有数据计算校验和,并在读取数据的时验证校验和,它针对由core-site.xml文件的io.bytes.per.checksum配置项指定字节的数据计算校验和,默认为512字节。

DataNode负责验证收到的数据的校验和,并且如果该校验和正确,则保存收到的数据。DataNode在收到客户端的数据或复制其他DataNode的数据时执行这个操作。正在写数据的HDFS客户端将数据及其校验和发送到由一系列DataNode组成的复制管道,如图3-9所示,管道中最后一个DataNode负责验证校验和。如果DataNode检测到错误,HDFS客户端便会收到一个校验和异常,可以在应用程序中捕获该异常,并做相应的处理,例如重新尝试写入。

HDFS客户端从DataNode读取数据时,也会验证校验和,将它们与DataNode中存储的校验和进行比较。每个DataNode均保存有一个用于验证的校验和日志,所以它知道每个数据块的最后一次验证时间。客户端成功验证一个数据块后,会告诉这个DataNode,DataNode由此更新日志。

不只是客户端在读取数据和写入数据时会验证校验和,每个DataNode也会在一个后台线程运行一个DataBlockScanner,定期验证存储在这个DataNode上的所有数据块。

由于HDFS存储着每个数据块的副本,因此当一个数据块损坏时,HDFS可以通过复制完好的该数据块副本来修复损坏的数据块,进而得到一个新的、完好无损的副本。大致的步骤是,HDFS客户端在读取数据块时,如果检测到错误,则向NameNode报告已损坏的数据块以及尝试读取该数据块的DataNode,最后才抛出ChecksumException异常。NameNode将这个已损坏的数据块标记为已损坏。之后,它安排这个数据块的一个副本复制到另一个DataNode,如此一来,数据块的副本数又回到了配置的水平。最后,已损坏的数据块副本便会被删除。

如何访问HDFS

HDFS提供给HDFS客户端访问的方式多种多样,用户可以根据不同的情况选择不同的方式。

命令行接口

Hadoop自带一组命令行工具,而其中有关HDFS的命令是其工具集的一个子集。命令行工具虽然是最基础的文件操作方式,但却是最常用的。作为一名合格的Hadoop开发人员和运维人员,熟练掌握是非常有必要的。 执行hadoop dfs命令可以显示基本的使用信息。

[hadoop@master bin]$ hadoop dfs
Usage: java FsShell
      [-ls <path>]
      [-lsr <path>]
      [-df [<path>]]
      [-du <path>]
      [-dus <path>]
      [-count[-q] <path>]
      [-mv <src> <dst>]
      [-cp <src> <dst>]
      [-rm [-skipTrash] <path>]
      [-rmr [-skipTrash] <path>]
      [-expunge]
      [-put <localsrc> ... <dst>]
      [-copyFromLocal <localsrc> ... <dst>]
      [-moveFromLocal <localsrc> ... <dst>]
      [-get [-ignoreCrc] [-crc] <src> <localdst>]
      [-getmerge <src> <localdst> [addnl]]
      [-cat <src>]
      [-text <src>]
      [-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>]
      [-moveToLocal [-crc] <src> <localdst>]
      [-mkdir <path>]
      [-setrep [-R] [-w] <rep> <path/file>]
      [-touchz <path>]
      [-test -[ezd] <path>]
      [-stat [format] <path>]
      [-tail [-f] <file>]
      [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
      [-chown [-R] [OWNER][:[GROUP]] PATH...]
      [-chgrp [-R] GROUP PATH...]
      [-help [cmd]]
  • hadoop dfs –ls 列出文件或目录内容 hadoop dfs –ls /
  • hadoop dfs –lsr 递归地列出目录内容 hadoop dfs –lsr /
  • hadoop dfs –df 查看目录的使用情况 hadoop dfs –df /
  • hadoop dfs –du 显示目录中所有文件及目录的大小 hadoop dfs –du /
  • hadoop dfs –dus 只显示目录的总大小,与-du不同的是,-du会把目录下所有的文件或目录大小都列举出来,而-dus只会将目录的大小列出来 hadoop dfs –dus /
  • hadoop dfs –count [-q] 显示下的目录数及文件数,输出格式为“目录数 文件数 大小 文件名”。如果加上-q还可以查看文件索引的情况 hadoop dfs –count /
  • hadoop dfs –mv 将HDFS上的文件移动到目的文件夹 hadoop dfs–mv/user/hadoop/ a.txt /user/test,将/user/ hadoop文件夹下的文件a.txt移动到/user/test文件夹下
  • Hadoop dfs –rm [-skipTrash] 将HDFS上路径为的文件移动到回收站。如果加上-skipTrash,则直接删除 hadoop dfs –rm /test.txt
  • hadoop dfs –rmr [-skipTrash] 将HDFS上路径为的目录以及目录下的文件移动到回收站。如果加上-skipTrash,则直接删除 hadoop dfs –rmr /test
  • hadoop dfs –expunge 清空回收站 hadoop dfs –expunge
  • hadoop dfs –put 本地文件上传到HDFS的目录下 hadoop dfs –put /home/hadoop/ test.txt / user/hadoop
  • hadoop dfs-copyFromLocal ... 功能类似于put hadoop dfs -copyFromLocal /home/ hadoop/test.txt /user/hadoop
  • hadoop dfs-moveFromLocal ... 本地文件移动到HDFS的目录下 hadoop dfs -moveFromLocal /home/ hadoop/test.txt /user/hadoop
  • hadoop dfs -get [-ignoreCrc] [-crc] 将HDFS上的文件下载到本地的目录,可用-ignorecrc选项复制CRC校验失败的文件,使用-crc选项复制文件以及CRC信息 hadoop dfs -get /user/ hadoop/a.txt /home/hadoop
  • hadoop dfs -getmerge [addnl] 将HDFS上目录下的所有文件按文件名排序并合并成一个文件输出到本地的目录,addnl是可选的,用于指定在每个文件结尾添加一个换行符 hadoop dfs-getmerge/user/ test /home/ hadoop/o
  • hadoop dfs -cat 浏览HDFS路径为的文件的内容 hadoop dfs -cat /user/ hadoop/text.txt
  • hadoop dfs -text 将HDFS路径为的文本文件输出 hadoop dfs -text /user/ test.txt
  • hadoop dfs -copyToLocal [-ignoreCrc] [-crc] 功能类似于get hadoop dfs -copyToLocal /user/ hadoop/a.txt/home/ hadoop
  • hadoop dfs -moveToLocal [-crc] 将HDFS上路径为的文件移动到本地路径下 hadoop dfs -get /user/ hadoop/a.txt /home/hadoop
  • hadoop dfs -mkdir 在HDFS上创建路径为的目录 hadoop dfs -mkdir /user/ test
  • hadoop dfs -setrep [-R] [-w] <path/file> 设置文件的复制因子。该命令可以单独设置文件的复制因子,加上-R可以递归执行该操作 hadoop dfs -setrep 5 -R /user/test
  • hadoop dfs -touchz 创建一个路径为的0字节的HDFS空文件 hadoop dfs -touchz /user/ hadoop/ test
  • hadoop dfs -test -[ezd] 检查HDFS上路径为的文件,-e检查文件是否存在。如果存在则返回0,-z检查文件是否是0字节。如果是则返回0,-d如果路径是个目录,则返回1,否则返回0 hadoop dfs -test -e /user/ test.txt
  • hadoop dfs -stat [format] 显示HDFS上路径为的文件或目录的统计信息。 hadoop fs -stat %b %n %o %r /user/test
  • hadoop dfs -tail [-f] 显示HDFS上路径为的文件的最后1 KB的字节,-f选项会使显示的内容随着文件内容更新而更新 hadoop dfs -tail -f /user/ test.txt
  • hadoop dfs -chmod [-R] <MODE[,MODE]… | OCTALMODE> PATH… 改变HDFS上路径为PATH的文件的权限,-R选项表示递归执行该操作 hadoop dfs -chmod -R +r /user/ test,表示将/user/ test目录下的所有文件赋予读的权限
  • hadoop dfs -chown [-R] [OWNER][:[GROUP]] PATH… 改变HDFS上路径为PATH的文件的所属用户,-R选项表示递归执行该操作 hadoop dfs -chown -R hadoop: hadoop /user/test,表示将/user/test目录下所有文件的所属用户和所属组别改为hadoop
  • hadoop dfs -chgrp [-R] GROUP PATH… 改变HDFS上路径为PATH的文件的所属组别,-R选项表示递归执行该操作 hadoop dfs -chown -R hadoop /user/test,表示将/user/test目录下所有文件的所属组别改为hadoop
  • hadoop dfs -help 显示所有dfs命令的帮助信息 hadoop dfs -help

Java API

本地访问HDFS最主要的方式是HDFS提供的Java应用程序接口,其他的访问方式都建立在这些应用程序接口之上。为了访问HDFS,HDFS客户端必须拥有一份HDFS的配置文件,也就是hdfs-site.xml文件,以获取NameNode的相关信息,每个应用程序也必须能访问Hadoop程序库JAR文件,也就是在$HADOOP_HOME、$HADOOP_HOME/lib下面的jar文件。

Hadoop是由Java编写的,所以通过Java API可以调用所有的HDFS的交互操作接口,最常用的是FileSystem类,它也是命令行hadoop fs的实现

java.net.URL

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

public class MyCat {

   static{
     URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
   }

   public static void main(String[] args) throws MalformedURLException, IOException{
     InputStream in = null;
     try {
       in = new URL(args[0]).openStream();
       IOUtils.copyBytes(in, System.out, 4096,false);
     } finally{
       IOUtils.closeStream(in);
     }
   }
}

导出为xx.jar文件,执行命令:

hadoop jar xx.jar hdfs://master:9000/user/hadoop/test

执行完成后,屏幕上输出HDFS的文件/user/hadoop/test的文件内容。

该程序是从HDFS读取文件的最简单的方式,即用java.net.URL对象打开数据流。代码中,静态代码块的作用是让Java程序识别Hadoop的HDFS url。

org.apache.hadoop.fs.FileSystem

虽然上面的方式是最简单的方式,但是在实际开发中,访问HDFS最常用的类还是FileSystem类。

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileSystemCat {

   public static void main(String[] args) throws IOException {
     String uri = "hdfs://master:9000/user/hadoop/test";
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(URI.create(uri), conf);

     InputStream in = null;
     try {
       in = fs.open(new Path(uri));
       IOUtils.copyBytes(in, System.out, 4096,false);
     } finally{
       IOUtils.closeStream(in);
     }
   }
}

导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.FileSystemCat

执行完成后控制台会输出文件内容。 FileSystem类的实例获取是通过工厂方法:

public static FileSystem get(URI uri,Configuration conf) throws IOException

其中Configuration对象封装了HDFS客户端或者HDFS集群的配置,该方法通过给定的URI方案和权限来确定要使用的文件系统。得到FileSystem实例之后,调用open()函数获得文件的输入流:

Public FSDataInputStream open(Path f) throws IOException

写入文件

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileCopyFromLocal {
    public static void main(String[] args) throws IOException {
     //本地文件路径
     String source = "/home/hadoop/test";
     String destination = "hdfs://master:9000/user/hadoop/test2";
     InputStream in = new BufferedInputStream(new FileInputStream(source));
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(URI.create(destination),conf);
     OutputStream out = fs.create(new Path(destination));
     IOUtils.copyBytes(in, out, 4096,true);
    }
}

导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.FileCopyFromLocal

FileSystem实例的create()方法返回FSDataOutputStream对象,与FSDataInputStream类不同的是,FSDataOutputStream不允许在文件中定位,这是因为HDFS只允许一个已打开的文件顺序写入,或在现有文件的末尾追加数据。

创建HDFS的目录

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CreateDir {

   public static void main(String[] args){
     String uri = "hdfs://master:9000/user/test";
     Configuration conf = new Configuration();
     try {
       FileSystem fs = FileSystem.get(URI.create(uri), conf);
       Path dfs=new Path("hdfs://master:9000/user/test");
          fs.mkdirs(dfs);
     } catch (IOException e) {
       e.printStackTrace();
     }
  }
}

导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.CreateDir

运行完成后可以用命令hadoop dfs -ls验证目录是否创建成功。

删除HDFS上的文件或目录。

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class DeleteFile {
   public static void main(String[] args){
    String uri = "hdfs://master:9000/user/hadoop/test";
     Configuration conf = new Configuration();
     try {
       FileSystem fs = FileSystem.get(URI.create(uri), conf);
       Path delef=new Path("hdfs://master:9000/user/hadoop");
        boolean isDeleted=fs.delete(delef,false);
        //是否递归删除文件夹及文件夹下的文件
        //boolean isDeleted=fs.delete(delef,true);
        System.out.println(isDeleted);
     } catch (IOException e) {
       e.printStackTrace();
     }
  }
}

导出为xx.jar文件,上传至集群任意一节点

hadoop jar xx.jar com.hdfsclient.DeleteFile

如果需要递归删除文件夹,则需将fs.delete(arg0, arg1)方法的第二个参数设为true。

查看文件是否存在。

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CheckFileIsExist {
  public static void main(String[] args){
    String uri = "hdfs://master:9000/user/hadoop/test";
     Configuration conf = new Configuration();

     try {
       FileSystem fs = FileSystem.get(URI.create(uri), conf);
       Path path=new Path(url);
         boolean isExists=fs.exists(path);
          System.out.println(isExists);
     } catch (IOException e) {
       e.printStackTrace();
     }
  }
}

导出为xx.jar文件,上传至集群任意一台节点,执行命令:

hadoop jar xx.jar com.hdfsclient.CheckFileIsExist

列出目录下的文件或目录名称

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class ListFiles {
    public static void main(String[] args){
     String uri = "hdfs://master:9000/user";
     Configuration conf = new Configuration();
     try {
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path =new Path(uri);
        FileStatus stats[]=fs.listStatus(path);
        for(int i = 0; i < stats.length; ++i){
           System.out.println(stats[i].getPath().toString());
       }
        fs.close();
     }  catch (IOException e) {
       e.printStackTrace();
     }
  }
}

导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.ListFiles

运行后,控制台会打印出/user目录下的目录名称或文件名。

文件存储的位置信息。

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class LocationFile {
  public static void main(String[] args){
    String uri = "hdfs://master:9000/user/test/test";
     Configuration conf = new Configuration();
    try {
       FileSystem fs = FileSystem.get(URI.create(uri), conf);
       Path fpath=new Path(uri);
        FileStatus filestatus = fs.getFileStatus(fpath);
        BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus, 0, 
            filestatus.getLen());
        int blockLen = blkLocations.length;
        for(int i=0;i<blockLen;i++){
          String[] hosts = blkLocations[i].getHosts();
          System.out.println("block_"+i+"_location:"+hosts[0]);
        }
     } catch (IOException e) {
       e.printStackTrace();
     }
  }
}

导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.LocationFile

HDFS的存储由DataNode的块完成,执行成功后,控制台会输出:

block_0_location:slave1
block_1_location:slave2
block_2_location:slave3

表示该文件被分为3个数据块存储,存储的位置分别为slave1、slave2、slave3。

SequenceFile

SequeceFile是HDFS API提供的一种二进制文件支持,这种二进制文件直接将<key, value>对序列化到文件中,所以SequenceFile是不能直接查看的,可以通过hadoop dfs -text命令查看,后面跟要查看的SequenceFile的HDFS路径。 写入SequenceFile。

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

public class SequenceFileWriter {

  private static final String[] text = {
     "两行黄鹂鸣翠柳",
     "一行白鹭上青天",
     "窗含西岭千秋雪",
     "门泊东吴万里船",
   };

   public static void main(String[] args) {
     String uri = "hdfs://master:9000/user/hadoop/testseq";
     Configuration conf = new Configuration();
     SequenceFile.Writer writer = null;
     try {
       FileSystem fs = FileSystem.get(URI.create(uri), conf);
       Path path =new Path(uri);
       IntWritable key = new IntWritable();
       Text value = new Text();
       writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), 
              value.getClass());

        for (int i = 0;i<100;i++){
         key.set(100-i);
         value.set(text[i%text.length]);
         writer.append(key, value);
        }

     } catch (IOException e) {
        e.printStackTrace();
     } finally {
        IOUtils.closeStream(writer);
     }
   }
}

可以看到SequenceFile.Writer的构造方法需要指定键值对的类型。如果是日志文件,那么时间戳作为key,日志内容是value是非常合适的。 导出为xx.jar文件,上传至集群任意一节点,执行命令:

hadoop jar xx.jar com.hdfsclient.SequenceFileWriter

读取SequenceFile。

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileReader {
   public static void main(String[] args) {
     String uri = "hdfs://master:9000/user/hadoop/testseq";
     Configuration conf = new Configuration();
     SequenceFile.Reader reader = null;

     try {
       FileSystem fs = FileSystem.get(URI.create(uri), conf);
       Path path = new Path(uri);
       reader = new SequenceFile.Reader(fs, path, conf);
       Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(),   
                conf);
       Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(),   
                conf);
       long position = reader.getPosition();      
       while(reader.next(key,value)){
         System.out.printf("[%s]\t%s\n",key,value);
         position = reader.getPosition();
       }
     } catch (IOException e) {
       e.printStackTrace();
     } finally {
       IOUtils.closeStream(reader);
     }
   }
}

Search

    微信好友

    博士的沙漏

    Table of Contents