Hadoop02-HDFS概述、shell&客户端操作

NiuMT 2020-06-03 20:58:30
Hadoop

HDFS概述

HDFS(Hadoop Distributed File System)是一种分布式文件管理系统。通过目录树定位文件;其次有很多服务器联合起来实现其功能,集群中的服务器有各自的角色。

应用场景:适合一次写入,多次读出的场景,且不支持文件的修改,适合用来做数据分析,不适合用来做网盘应用。

优缺点

优点:

  1. 高容错性
    • 数据自动保存多个副本,通过增加副本的形式,提高容错性
    • 某一个副本丢失后,他可以自动恢复
  2. 适合处理大数据
    • 数据规模:能够处理数据规模达到GB,TB,甚至PB级别的数据
    • 文件规模:能够处理百万规模以上的文件数量
  3. 可构建在廉价的机器上,通过多副本的机制,提高可靠性

缺点:

  1. 不适合低延时数据访问,比如毫秒级别的存储数据
  2. 无法高效地对大量小文件进行存储
    • 存储大量小文件的话,会占用 NameNode 大量的内存来存储文件目录和块信息
    • 小文件的存储的寻址时间会超过读取时间,违反了HDFS的设计目标
  3. 不支持文件并发写入,随机修改
    • 一个文件只能有一个写,不允许多个线程同时写
    • 仅支持数据 append,不支持文件的随机修改

组成架构

image-20201119172611242image-20201119172658750

文件块大小(面试重点)

HDFS中的文件在物理上是分块存储(block),块的大小可以通过配置参数(dfs.blocksize)规定,yarn集群的默认大小在Hadoop2.x/3.x中是128M,Hadoop1.x 是64M,在本地运行时32M。

使用存储块的好处

假如上传的一个文件非常大,没有任何一块磁盘能够存储,这样这个文件就没法上传了,如果使用块的概念,会把文件分割成许多块,这样这个文件可以使用集群中的任意节点进行存储。

数据存储要考虑容灾备份,以块为单位非常有利于进行备份,HDFS默认每个块备份3份,这样如果这个块上或这个节点坏掉,可以直接找其他节点上的备份块。还有就是,有的时候需要将备份数量提高,

这样能够分散机群的读取负载,因为可以在多个节点中寻找到目标数据,减少单个节点读取。

image-20201119173254988

问:为什么块的大小不能太大,也不能太小?

  1. 如果块设置过大,

    1. 从磁盘传输数据的时间会明显大于寻址时间,导致程序在处理这块数据时,变得非常慢;
  2. mapreduce中的map任务通常一次只处理一个块中的数据,如果块过大运行速度也会很慢。
    1. 在数据读写计算的时候,需要进行网络传输。如果block过大会导致网络传输时间增长,程序卡顿/超时/无响应. 任务执行的过程中拉取其他节点的block或者失败重试的成本会过高.
  3. namenode监管容易判断数据节点死亡.导致集群频繁产生/移除副本, 占用cpu,网络,内存资源.

  4. 如果块设置过小,

    1. 存放大量小文件会占用NameNode中大量内存来存储元数据,而NameNode的物理内存是有限的;
  5. 文件块过小,寻址时间增大,导致程序一直在找block的开始位置。
    1. 操作系统对目录中的小文件处理存在性能问题.比如同一个目录下文件数量操作100万,执行”fs -l “之类的命令会卡死.
  6. 则会频繁的进行文件传输,对严重占用网络/CPU资源.

主要取决于磁盘/网络的传输速率。[其实就是CPU,磁盘,网卡之间的协同效率 即 跨物理机/机架之间文件传输速率]

为什么分片大小需要与HDFS数据块(分块)大小一致

hadoop将mapReduce的输入数据划分为等长的小数据块,称为输入分片或者分片,hadoop为每个分片构建一个map任务。

hadoop在存储有输入数据(HDFS中的数据)的节点上运行map任务,可以获得高性能,这就是所谓的数据本地化。所以最佳分片的大小应该与HDFS上的块大小一样,因为如果分片跨越2个数据块,对于任何一个HDFS节点(基本不肯能同时存储这2个数据块),分片中的另外一块数据就需要通过网络传输到map任务节点,与使用本地数据运行map任务相比,效率则更低!

为什么不能远大于64MB或者128MB或256MB?

这里主要从上层的MapReduce框架来讨论

(1)Map崩溃问题:系统需要重新启动,启动过程需要重新加载数据,数据块越大,数据加载时间越长,系统恢复过程越长。

(2)监管时间问题:主节点监管其他节点的情况,每个节点会周期性的把完成的工作和状态的更新报告回来。如果一个节点保持沉默超过一个预设的时间间隔,主节点记录下这个节点状态为死亡,并把分配给这个节点的数据发到别的节点。对于这个“预设的时间间隔”,这是从数据块的角度大概估算的。假如是对于64MB的数据块,我可以假设你10分钟之内无论如何也能解决了吧,

超过10分钟也没反应,那就是死了。可对于640MB或是1G以上的数据,我应该要估算个多长的时间呢?估算的时间短了,那就误判死亡了,更坏的情况是所有节点都会被判死亡。

估算的时间长了,那等待的时间就过长了。所以对于过大的数据块,这个“预设的时间间隔”不好估算。

(3)Map任务上:因为MapReducer中一般一个map处理一个块上的数据,如果块很大,任务数会很少(少于集群中的节点个数)这样执行效率会明显降低。

HDFS的Shell操作

bin/hadoop fs XXXX

bin/hdfs dfs XXXX

dfs是fs的实现类

全部命令

[atguigu@hadoop102 hadoop-2.7.2]$ bin/hadoop fs

**      [-appendToFile <localsrc> ... <dst>] #追加一个文件到已经存在的文件末尾
**      [-cat [-ignoreCrc] <src> ...] #显示文件内容
        [-checksum <src> ...]
*       [-chgrp [-R] GROUP PATH...] #和Linux文件系统中的用法一样,修改文件所属权限
*       [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
*       [-chown [-R] [OWNER][:[GROUP]] PATH...]
*       [-copyFromLocal [-f] [-p] <localsrc> ... <dst>] #从本地文件系统中 拷贝 文件到 HDFS路径去
*       [-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]  #从 HDFS拷贝到本地
        [-count [-q] <path> ...]
**      [-cp [-f] [-p] <src> ... <dst>] #从 HDFS的一个路径拷贝到 HDFS的另一个路径
        [-createSnapshot <snapshotDir> [<snapshotName>]]
        [-deleteSnapshot <snapshotDir> <snapshotName>]
        [-df [-h] [<path> ...]]
**      [-du [-s] [-h] <path> ...] #统计文件夹的大小信息
        [-expunge]
**      [-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] #等同于 copyToLocal
        [-getfacl [-R] <path>]
        [-getmerge [-nl] <src> <localdst>]
        [-help [cmd ...]]
**      [-ls [-d] [-h] [-R] [<path> ...]] #显示目录信息
**      [-mkdir [-p] <path> ...] #创建路径
*       [-moveFromLocal <localsrc> ... <dst>] #从本地 剪切 粘贴到 HDFS
        [-moveToLocal <src> <localdst>]
**      [-mv <src> ... <dst>] #在 HDFS目录中移动文件
**      [-put [-f] [-p] <localsrc> ... <dst>] #等同于 copyFromLocal
        [-renameSnapshot <snapshotDir> <oldName> <newName>]
**      [-rm [-f] [-r|-R] [-skipTrash] <src> ...] #删除文件或文件夹
        [-rmdir [--ignore-fail-on-non-empty] <dir> ...]
        [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
*       [-setrep [-R] [-w] <rep> <path> ...] #设置 HDFS中文件的副本数量
        [-stat [format] <path> ...]
*       [-tail [-f] <file>] #显示一个文件的末尾 1kb的数据
        [-test -[defsz] <path>]
        [-text [-ignoreCrc] <src> ...]
        [-touchz <path> ...]
        [-usage [cmd ...]]
hadoop fs -help rm; -help:输出这个命令参数
hadoop fs -ls /; -ls: 显示目录信息
hadoop fs -mkdir -p /sanguo/shuguo; -mkdir:在HDFS上创建目录
hadoop fs  -moveFromLocal  ./kongming.txt  /sanguo/shuguo; -moveFromLocal:从本地剪切粘贴到HDFS
hadoop-2.7.2]$ hadoop fs -appendToFile liubei.txt /sanguo/shuguo/kongming.txt; -appendToFile:追加一个文件到已经存在的文件末尾
hadoop fs -cat /sanguo/shuguo/kongming.txt; -cat:显示文件内容
adoop fs  -chmod  666  /sanguo/shuguo/kongming.txt; -chgrp 、-chmod、-chown:Linux文件系统中的用法一样,修改文件所属权限
hadoop fs -copyFromLocal README.txt /; -copyFromLocal:从本地文件系统中拷贝文件到HDFS路径去
hadoop fs -copyToLocal /sanguo/shuguo/kongming.txt ./; -copyToLocal:从HDFS拷贝到本地
hadoop fs -cp /sanguo/shuguo/kongming.txt /zhuge.txt; -cp :从HDFS的一个路径拷贝到HDFS的另一个路径
hadoop fs -mv /zhuge.txt /sanguo/shuguo/; -mv:在HDFS目录中移动文件
hadoop fs -get /sanguo/shuguo/kongming.txt ./; -get:等同于copyToLocal,就是从HDFS下载文件到本地
hadoop fs -getmerge /user/atguigu/test/* ./zaiyiqi.txt; -getmerge:合并下载多个文件
hadoop fs -put ./zaiyiqi.txt /user/atguigu/test/; -put:等同于copyFromLocal
hadoop fs -tail /sanguo/shuguo/kongming.txt; -tail:显示一个文件的末尾
hadoop fs -rm /user/atguigu/test/jinlian2.txt; -rm:删除文件或文件夹
hadoop fs -rmdir /test; -rmdir:删除空目录
hadoop fs -du -s -h /user/atguigu/test; -du统计文件夹的大小信息
> 2.7 K  /user/atguigu/test
hadoop fs -du -h /user/atguigu/test
> 1.3 K  /user/atguigu/test/README.txt
> 15     /user/atguigu/test/jinlian.txt
> 1.4 K  /user/atguigu/test/zaiyiqi.txt
hadoop fs -setrep 10 /sanguo/shuguo/kongming.txt; -setrep:设置HDFS中文件的副本数量

HDFS的客户端操作

客户端环境准备

配置HADOOP_HOME环境变量;配置Path环境变量

image-20201123230158336

image-20201123230203791

创建一个Maven工程HdfsClientDemo,在pom.xml添加依赖:

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>
</dependencies>

注意:如果Eclipse/Idea打印不出日志,在控制台上只显示

1.log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).  
2.log4j:WARN Please initialize the log4j system properly.  
3.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

需要在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

创建HdfsClient类

public class HdfsClient{    
@Test
public void testMkdirs() throws IOException, InterruptedException, URISyntaxException{

        // 1 获取文件系统
        Configuration configuration = new Configuration();
        // 配置在集群上运行
        // configuration.set("fs.defaultFS", "hdfs://hadoop102:9000");
        // FileSystem fs = FileSystem.get(configuration);

        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

        // 2 创建目录
        fs.mkdirs(new Path("/1108/daxian/banzhang"));

        // 3 关闭资源
        fs.close();
    }
}

由于需要连接集群操作,所以需要在idea中设置连接集群的账户名:

image-20201123230908988

HDFS的API操作

HDFS文件上传(测试参数优先级)

@Test
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {

        // 1 获取文件系统
        Configuration configuration = new Configuration();
        configuration.set("dfs.replication", "2");
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

        // 2 上传文件
        fs.copyFromLocalFile(new Path("e:/banzhang.txt"), new Path("/banzhang.txt"));

        // 3 关闭资源
        fs.close();

        System.out.println("over");
}

将hdfs-site.xml拷贝到项目的根目录下

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

参数优先级

参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath下的用户自定义配置文件 >(3)然后是服务器的默认配置

HDFS文件下载

@Test
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException{

        // 1 获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

        // 2 执行下载操作
        // boolean delSrc 指是否将原文件删除
        // Path src 指要下载的文件路径
        // Path dst 指将文件下载到的路径
        // boolean useRawLocalFileSystem 是否开启文件校验
        fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("e:/banhua.txt"), true);

        // 3 关闭资源
        fs.close();
}

HDFS文件夹删除

@Test
public void testDelete() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

    // 2 执行删除
    fs.delete(new Path("/0508/"), true);

    // 3 关闭资源
    fs.close();
}

HDFS文件名更改

@Test
public void testRename() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu"); 

    // 2 修改文件名称
    fs.rename(new Path("/banzhang.txt"), new Path("/banhua.txt"));

    // 3 关闭资源
    fs.close();
}

HDFS文件详情查看

查看文件名称、权限、长度、块信息

@Test
public void testListFiles() throws IOException, InterruptedException, URISyntaxException{

    // 1获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu"); 

    // 2 获取文件详情
    RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

    while(listFiles.hasNext()){
        LocatedFileStatus status = listFiles.next();

        // 输出详情
        // 文件名称
        System.out.println(status.getPath().getName());
        // 长度
        System.out.println(status.getLen());
        // 权限
        System.out.println(status.getPermission());
        // 分组
        System.out.println(status.getGroup());
         System.out.println(status.getOwner());
         System.out.println(status.getModificationTime());
         System.out.println(status.getReplication());
         System.out.println(status.getBlockSize());

        // 获取存储的块信息
        BlockLocation[] blockLocations = status.getBlockLocations();

        for (BlockLocation blockLocation : blockLocations) {

            // 获取块存储的主机节点
            String[] hosts = blockLocation.getHosts();

            for (String host : hosts) {
                System.out.println(host);
            }
        }

        System.out.println("-----------分割线----------");
    }

// 3 关闭资源
fs.close();
}

HDFS文件和文件夹判断

@Test
public void testListStatus() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件配置信息
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

    // 2 判断是文件还是文件夹
    FileStatus[] listStatus = fs.listStatus(new Path("/"));

    for (FileStatus fileStatus : listStatus) {

        // 如果是文件
        if (fileStatus.isFile()) {
                System.out.println("f:"+fileStatus.getPath().getName());
            }else {
                System.out.println("d:"+fileStatus.getPath().getName());
            }
        }

    // 3 关闭资源
    fs.close();
}

HDFS的I/O流操作

HDFS文件上传

需求:把本地e盘上的banhua.txt文件上传到HDFS根目录

@Test
public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

    // 2 创建输入流
    FileInputStream fis = new FileInputStream(new File("e:/banhua.txt"));

    // 3 获取输出流
    FSDataOutputStream fos = fs.create(new Path("/banhua.txt"));

    // 4 流对拷
    IOUtils.copyBytes(fis, fos, configuration);

    // 5 关闭资源
    IOUtils.closeStream(fos);
    IOUtils.closeStream(fis);
    fs.close();
}

HDFS文件下载

需求:从HDFS上下载banhua.txt文件到本地e盘上

// 文件下载
@Test
public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

    // 2 获取输入流
    FSDataInputStream fis = fs.open(new Path("/banhua.txt"));

    // 3 获取输出流
    FileOutputStream fos = new FileOutputStream(new File("e:/banhua.txt"));

    // 4 流的对拷
    IOUtils.copyBytes(fis, fos, configuration);

    // 5 关闭资源
    IOUtils.closeStream(fos);
    IOUtils.closeStream(fis);
    fs.close();
}

定位文件读取

需求:分块读取HDFS上的大文件,比如根目录下的/hadoop-2.7.2.tar.gz

// 下载第一块
@Test
public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

    // 2 获取输入流
    FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));

    // 3 创建输出流
    FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));

    // 4 流的拷贝
    byte[] buf = new byte[1024];

    for(int i =0 ; i < 1024 * 128; i++){
        fis.read(buf);
        fos.write(buf);
    }

    // 5关闭资源
    IOUtils.closeStream(fis);
    IOUtils.closeStream(fos);
fs.close();
}


// 下载第二块
@Test
public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

    // 2 打开输入流
    FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));

    // 3 定位输入数据位置
    fis.seek(1024*1024*128);

    // 4 创建输出流
    FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));

    // 5 流的对拷
    IOUtils.copyBytes(fis, fos, configuration);

    // 6 关闭资源
    IOUtils.closeStream(fis);
    IOUtils.closeStream(fos);
}

在Window命令窗口中进入到目录E:\,然后执行如下命令,对数据进行合并

type hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1

合并完成后,将hadoop-2.7.2.tar.gz.part1重新命名为hadoop-2.7.2.tar.gz。解压发现该tar包非常完整。