Hadoop分布式离线计算

2021/02/03 Hadoop

Hadoop分布式离线计算框架

Hadoop中有两个重要的组件:一个是HDFS,另一个是MapReduce,HDFS用来存储大批量的数据,而MapReduce则是通过计算来发现数据中有价值的内容。2004年,Google发表了一篇论文,向全世界的人们介绍了MapReduce。MapReduce被广泛地应用于日志分析、海量数据排序、在海量数据中查找特定模式等场景中。

MapReduce概述

Hadoop作为开源组织下最重要的项目之一,自推出后得到了全球学术界和工业界的广泛关注、推广和普及。它是开源项目Lucene(搜索索引程序库)和Nutch(搜索引擎)的创始人Doug Cutting于2004年推出的。当时Doug Cutting发现MapReduce正是其所需要解决大规模Web数据处理的重要技术,因而模仿Google MapReduce,基于Java设计开发了一个称为Hadoop的开源MapReduce并行计算框架和系统。

MapReduce的特点

MapReduce适合处理离线的海量数据,这里的“离线”可以理解为存在本地,非实时处理。MapReduce往往处理大批量数据,比如PB级别或者ZB级别。 MapReduce有以下特点

  • 易于编程 如果要编写分布式程序,只需要实现一些简单接口,与编写普通程序类似,避免了复杂的过程。同时,编写的这个分布式程序可以部署到大批量廉价的普通机器上运行。
  • 具有良好的扩展性 是指当一台机器的计算资源不能满足存储或者计算的时候,可以通过增加机器来扩展存储和计算能力。
  • 具有高容错性 MapReduce设计的初衷是可以使程序部署运行在廉价的机器上,廉价的机器坏的概率相对较高,这就要求其具有良好的容错性。当一台机器“挂掉”以后,相应数据的存储和计算能力会被移植到另外一台机器上,从而实现容错性。

MapReduce的应用场景

MapReduce的应用场景主要表现在从大规模数据中进行计算,不要求即时返回结果的场景,比如以下典型应用:

  • 单词统计。
  • 简单的数据统计,比如网站PV和UV统计。
  • 搜索引擎建立索引。
  • 搜索引擎中,统计最流行的K个搜索词。
  • 复杂数据分析算法实现。

下面介绍MapReduce不适用的方面。

  • 实时计算,MapReduce不合适在毫秒级或者秒级内返回结果。
  • 流式计算,MapReduce的输入数据集是静态的,不能动态变化,所以不适合流式计算。
  • DAG计算,如果多个应用程序存在依赖关系,并且后一个应用程序的输入为前一个的输出,在这种情况下也不适合MapReduce。

MapReduce的核心思想

MapReduce是一种并行编程模型,是Hadoop生态系统的核心组件之一,“分而治之”是MapReduce的核心思想,它表示把一个大规模的数据集切分成很多小的单独的数据集,然后放在多个机器上同时处理。

MapReduce的核心函数

MapReduce把整个并行运算过程高度抽象到两个函数上,一个是map另一个是reduce。Map函数就是分而治之中的“分”,reduce函数就是分而治之中的“治”。

MapReduce把一个存储在分布式文件系统中的大规模数据集切分成许多独立的小的数据集,然后分配给多个map任务处理。然后map任务的输出结果会进一步处理成reduce任务的输入,最后由reduce任务进行汇总,然后上传到分布式文件系统中。

Map函数:map函数会将小的数据集转换为适合输入的<key,value>键值对的形式,然后处理成一系列具有相同key的<key,value>作为输出,我们可以把输出看做list(<key,value>)

Reduce函数:reduce函数会把map函数的输出作为输入,然后提取具有相同key的元素,并进行操作,最后的输出结果也是<key,value>键值对的形式,并合并成一个文件。

单词统计实例

MapReduce的执行过程比较复杂,我们先从一个WordCount实例着手,从总体上理解MapReduce的执行过程。 假设有一个非常大的文件,需求是统计文件中每个单词出现的次数。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  //Part1:Mapper模块
  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{    //继承Mapper,同时设置输入输出键值对格式
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    //【核心】与实际业务逻辑挂钩,由开发者自行编写
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);    //输出键值对<word,1>
      }
    }
  }
  
  //Part2:Reducer模块
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {		//继承Reducer,同时设置输入输出键值对格式
    private IntWritable result = new IntWritable();

    //【核心】与实际业务逻辑挂钩,由开发者自行编写
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {  
        sum += val.get();       //键相同的,把其列表值累加起来
      }
      result.set(sum);
      context.write(key, result);   //输出键值对<key,result>
    }
  }

  //Part3:应用程序Driver
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();   	 //初始化相关Hadoop配置
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    
    Job job = new Job(conf, "word count");    		//新建job并设置主类,“word count”为MapReduce任务名
    job.setJarByClass(WordCount.class);
    
    //设置Mapper、Combiner、Reducer
    job.setMapperClass(TokenizerMapper.class);		//必选
    job.setCombinerClass(IntSumReducer.class);   	//可选
    job.setReducerClass(IntSumReducer.class);		//必选
    
    //设置输出键值对格式
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    //设置输入和输出路径
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    
    //提交MapReduce任务运行,并等待运行结束
    System.exit(job.waitForCompletion(true) ? 0 : 1);    //固定写法
  }
}

主要分为Split、Map、Shuffle和Reduce阶段,每个阶段在WordCount中的作用如下:

  • Split阶段,首先大文件被切分成多份,假设这里被切分成了3份,每一行代表一份。
  • Map阶段,解析出每个单词,并在后边记上数字1。
  • Shuffle阶段,将每一份中的单词分组到一起,并默认按照字母进行排序。
  • Reduce阶段,将相同的单词进行累加。
  • 输出结果。

MapReduce执行过程

从WordCount实例中,可以基于单词统计大概了解MapReduce的过程,下面用三明治的例子来演示一下MapReduce的工作流程:

  • input阶段相当于准备食材的步骤;
  • split阶段相当于分食材的步骤,数据切片1表示面包、数据切片2表示培根、数据切片3表示西红柿、数据切片4表示生菜;
  • map阶段相当于切面包、煎培根、切西红柿、洗生菜四个步骤同时进行;
  • shuffle阶段相当于把切好的食材分类,存放、汇总;
  • reduce阶段相当于整合组装成三明治;
  • output阶段相当于打包。

接下来我们从理论层面来介绍MapReduce的执行过程。

MapReduce的工作过程,一共分为input、split、map、shuffle、reduce、output六个阶段。具体执行过程如下:

  • input阶段:将数据源输入到MapReduce框架中
  • split阶段:将大规模的数据源切片成许多小的数据集,然后对数据进行预处理,处理成适合map任务输入的<key,value>形式。
  • map阶段:对输入的<key,value>键值对进行处理,然后产生一系列的中间结果。通常一个split分片对应一个map任务,有几个split就有几个map任务。
  • shuffle阶段:对map阶段产生的一系列<key,value>进行分区、排序、归并等操作,然后处理成适合reduce任务输入的键值对形式。Shuffle完成对数据的排序和合并等操作,但是Shuffle不会对输入的数据进行改动,所以还是key2和value2。
  • reduce阶段:提取所有相同的key,并按用户的需求对value进行操作,最后也是以<key,value>的形式输出结果。
  • output阶段:进行一系列验证后,将reduce的输出结果上传到分布式文件系统中。

这就是MapReduce程序自己的处理流程,都是按照这个“套路”运行的。下面对Split阶段、Map和Reduce阶段,以及Shuffle阶段分别展开介绍。

Map Reduce的文件切片——Split

Split的大小默认与block对应,也可以由用户任意控制。MapReduce的Split大小计算公式如下:

max(min.split,min(max.split,block))

其中,max.split=totalSize/numSpilt,totalSize为文件大小,numSplit为用户设定的map task个数,默认为1;min.split=InputSplit的最小值,具体可以在配置文件中配置参数mapred.min.split.size,不配置时默认为1B,block是HDFS中块的大小。

举例来说:把一个258MB的文件上传到HDFS上,假设block块大小是128MB,那么它就会被分成3个block块,与之对应产生3个Split,所以最终会产生3个map task。而第3个block块里存的文件大小只有2MB,它的block块大小是128MB,那么它实际占用多大空间呢?通过以上公式可知其占用的是实际的文件大小,而非一个块的大小。

Map过程和Reduce过程

Map的实现逻辑和Reduce的实现逻辑都是由程序员完成的,其中Map的个数和Split的个数对应起来,也就是说一个Split切片对应一个Map任务,关于Reduce的默认数是1,程序员可以自行设置。 另外需要注意的是,一个程序可能只有一个Map任务却没有Reduce任务,也可能是多个MapReduce程序串接起来,比如把第一个MapReduce的输出结果当作第二个MapReduce的输入,第二个MapReduce的输出成为第三个MapReduce的输入,最终才可以完成一个任务。

Shuffle过程

Shuffle又叫“洗牌”,它起到连接Map任务与Reduce任务的作用,在这里需要注意的是,Shuffle不是一个单独的任务,它是MapReduce执行中的步骤。

Shuffle分为两部分,一部分在Map端,另一部分在Reduce端,Map处理后的数据会以key、value的形式存在缓冲区中(buffer in memory),缓冲区大小为128MB。 当该缓冲区快要溢出时(默认80%),会将数据写到磁盘中生成文件,也就是溢写操作(spill to disk)。溢写磁盘的过程是由一个线程来完成,溢写之前包括Partition(分区)和Sort(排序),Partition和Sort都有默认实现,其中Partition分区默认是按照“hash值%reduce数量”进行分区的,分区之后的数据会进入不同的Reduce,而Sort是默认按照字母顺序进行排序的。溢写之后会在磁盘上生成多个文件,多个文件会通过merge线程完成文件的合并,由多个小文件生成一个大文件。

合并完之后的数据(以key和value的形式存在)会基于Partition被发送到不同的Reduce上,Reduce会从不同的Map上取得“属于”自己的数据并写入磁盘,完成merge操作减少文件数量,并调用Reduce程序,最终通过Output完成输出。

MapReduce任务的优化

相信每个程序员在编程时都会问自己两个问题“我如何完成这个任务”,以及“怎么能让程序运行得更快”。同样,MapReduce计算模型的多次优化也是为了更好地解答这两个问题。 MapReduce计算模型的优化涉及了方方面面的内容,但是主要集中在两个方面:一是计算性能方面的优化;二是I/O操作方面的优化。这其中,又包含六个方面的内容。

  • 任务调度 任务调度是Hadoop中非常重要的一环,这个优化又涉及两个方面的内容。计算方面:Hadoop总会优先将任务分配给空闲的机器,使所有的任务能公平地分享系统资源。I/O方面:Hadoop会尽量将Map任务分配给InputSplit所在的机器,以减少网络I/O的消耗。
  • 数据预处理与InputSplit的大小 MapReduce任务擅长处理少量的大数据,而在处理大量的小数据时,MapReduce的性能就会逊色很多。因此在提交MapReduce任务前可以先对数据进行一次预处理,将数据合并以提高MapReduce任务的执行效率,这个办法往往很有效。如果这还不行,可以参考Map任务的运行时间,当一个Map任务只需要运行几秒就可以结束时,就需要考虑是否应该给它分配更多的数据。通常而言,一个Map任务的运行时间在一分钟左右比较合适,可以通过设置Map的输入数据大小来调节Map的运行时间。在FileInputFormat中(除了CombineFileInputFormat),Hadoop会在处理每个Block后将其作为一个InputSplit,因此合理地设置block块大小是很重要的调节方式。除此之外,也可以通过合理地设置Map任务的数量来调节Map任务的数据输入。
  • Map和Reduce任务的数量 合理地设置Map任务与Reduce任务的数量对提高MapReduce任务的效率是非常重要的。默认的设置往往不能很好地体现出MapReduce任务的需求,不过,设置它们的数量也要有一定的实践经验。 首先要定义两个概念—Map/Reduce任务槽。Map/Reduce任务槽就是这个集群能够同时运行的Map/Reduce任务的最大数量。比如,在一个具有1200台机器的集群中,设置每台机器最多可以同时运行10个Map任务,5个Reduce任务。那么这个集群的Map任务槽就是12000,Reduce任务槽是6000。任务槽可以帮助对任务调度进行设置。 设置MapReduce任务的Map数量主要参考的是Map的运行时间,设置Reduce任务的数量就只需要参考任务槽的设置即可。一般来说,Reduce任务的数量应该是Reduce任务槽的0.95倍或是1.75倍,这是基于不同的考虑来决定的。当Reduce任务的数量是任务槽的0.95倍时,如果一个Reduce任务失败,Hadoop可以很快地找到一台空闲的机器重新执行这个任务。当Reduce任务的数量是任务槽的1.75倍时,执行速度快的机器可以获得更多的Reduce任务,因此可以使负载更加均衡,以提高任务的处理速度。
  • Combine函数 Combine函数是用于本地合并数据的函数。在有些情况下,Map函数产生的中间数据会有很多是重复的,比如在一个简单的WordCount程序中,因为词频是接近与一个zipf分布的,每个Map任务可能会产生成千上万个<the,1>记录,若将这些记录一一传送给Reduce任务是很耗时的。所以,MapReduce框架运行用户写的combine函数用于本地合并,这会大大减少网络I/O操作的消耗。此时就可以利用combine函数先计算出在这个Block中单词the的个数。合理地设计combine函数会有效地减少网络传输的数据量,提高MapReduce的效率。 在MapReduce程序中使用combine很简单,只需在程序中添加如下内容: ```java job.setCombinerClass(combine.class);
在WordCount程序中,可以指定Reduce类为combine函数,具体如下:
```java
job.setCombinerClass(Reduce.class);
  • 压缩 编写MapReduce程序时,可以选择对Map的输出和最终的输出结果进行压缩(同时可以选择压缩方式)。在一些情况下,Map的中间输出可能会很大,对其进行压缩可以有效地减少网络上的数据传输量。对最终结果的压缩虽然会减少数据写HDFS的时间,但是也会对读取产生一定的影响,因此要根据实际情况来选择(第7章中提供了一个小实验来验证压缩的效果)。
  • 自定义comparator 在Hadoop中,可以自定义数据类型以实现更复杂的目的,比如,当读者想实现k-means算法(一个基础的聚类算法)时可以定义k个整数的集合。自定义Hadoop数据类型时,推荐自定义comparator来实现数据的二进制比较,这样可以省去数据序列化和反序列化的时间,提高程序的运行效率

Search

    微信好友

    博士的沙漏

    Table of Contents