Hadoop实战

2021/02/08 Hadoop

Hadoop实战

Hadoop之本地运行WordCount

编写WordCount

  • 输入文件格式如下:
    hello java
    hello hadoop
    
  • 输出如下:
    hello 2
    hadoop 1
    java 1
    

引入Maven依赖


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop.version>2.6.0</hadoop.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>

加入log4j.properties配置文件

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.rootLogger=INFO, console

编写Mapper

读取输入文本中的每一行,并切分单词,记录单词的数量并输出,输出类型为Text,IntWritable 例如:java,1


    // Mapper模块 继承Mapper,同时设置输入输出键值对格式
    public static class TokenizerMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        Logger logger = LoggerFactory.getLogger(TokenizerMapper.class);

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            logger.info("key:{};value:{}",key,value);
            //key 代表的是每一行数据的行首偏移量  value代表的是每一行内容
            //对获取到的每一行数据进行切割,把单词切割出来
            String[] words = StringUtils.split(value.toString(), ' ');
            for (String w : words) {
                context.write(new Text(w), new IntWritable(1));
            }
        }
    }

编写Reducer

接收Mapper的输出结果进行累加并输出结果,接收类型为Mapper的输出类型Text,Iterable 例如:java (1,1),输出类型为 Text,intWritable 例如:java 2


    //Reducer模块,继承Reducer,同时设置输入输出键值对格式
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();       //键相同的,把其列表值累加起来
            }
            context.write(key, new IntWritable(sum));   //输出键值对<key,result>
        }
    }

编写Job

将Mapper和Reducer组装起来封装成功一个Job,作为一个执行单元。计算WordCount就是一个Job。

    //Job
    public static void main(String[] args) throws Exception {
        // 设置输入路径
        String inputPath = "D:/demo/in/hello.txt";
        // 设置输出路径
        String outputPath = "D:/demo/out/hello";
        Configuration conf = new Configuration();     //初始化相关Hadoop配置

        Job job = new Job(conf, "word count");            //新建job并设置主类,“word count”为MapReduce任务名
        job.setJarByClass(WordCount.class);

        //设置Mapper、Combiner、Reducer
        job.setMapperClass(TokenizerMapper.class);        //必选
        job.setCombinerClass(IntSumReducer.class);    //可选
        job.setReducerClass(IntSumReducer.class);        //必选

        //设置输出键值对格式
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置输入和输出路径
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job,
                new Path(outputPath));

        //提交MapReduce任务运行,并等待运行结束
        System.exit(job.waitForCompletion(true) ? 0 : 1);    //固定写法
    }

在本地文件夹D:\hadoop\input下新建 words.txt,内容为上面给出的输入内容作为输入 同样输出文件夹为output,那么直接运行程序:

可能出现的错误:

  • java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. 原因: 没有拷贝winutils拷贝到hadoop-2.7.3/bin目录下或者没有配置HADOOP_HOME环境变量或者配置HADOOP_HOME环境变量没生效 解决:
  • 下载winutils拷贝到hadoop-2.7.3/bin目录下
  • 检查环境变量是否配置
  • 如果已经配置好环境变量,重启idea或这电脑,有可能是环境变量没生效

运行结果

允许如果出现一下信息就表示已经正确执行了。

14:40:01,813  WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14:40:02,058  INFO deprecation:1173 - session.id is deprecated. Instead, use dfs.metrics.session-id
14:40:02,060  INFO JvmMetrics:76 - Initializing JVM Metrics with processName=JobTracker, sessionId=
14:40:02,355  WARN JobResourceUploader:64 - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14:40:02,387  WARN JobResourceUploader:171 - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
14:40:02,422  INFO FileInputFormat:283 - Total input paths to process : 1
14:40:02,685  INFO JobSubmitter:198 - number of splits:1
14:40:02,837  INFO JobSubmitter:287 - Submitting tokens for job: job_local866013445_0001
14:40:03,035  INFO Job:1294 - The url to track the job: http://localhost:8080/
14:40:03,042  INFO Job:1339 - Running job: job_local866013445_0001
14:40:03,044  INFO LocalJobRunner:471 - OutputCommitter set in config null
14:40:03,110  INFO FileOutputCommitter:108 - File Output Committer Algorithm version is 1
14:40:03,115  INFO LocalJobRunner:489 - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
14:40:03,211  INFO LocalJobRunner:448 - Waiting for map tasks
14:40:03,211  INFO LocalJobRunner:224 - Starting task: attempt_local866013445_0001_m_000000_0
14:40:03,238  INFO FileOutputCommitter:108 - File Output Committer Algorithm version is 1
14:40:03,383  INFO ProcfsBasedProcessTree:192 - ProcfsBasedProcessTree currently is supported only on Linux.
14:40:03,439  INFO Task:612 -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@4d11cc8c
14:40:03,445  INFO MapTask:756 - Processing split: file:/D:/hadoop/input/words.txt:0+24
14:40:03,509  INFO MapTask:1205 - (EQUATOR) 0 kvi 26214396(104857584)
14:40:03,509  INFO MapTask:998 - mapreduce.task.io.sort.mb: 100
14:40:03,509  INFO MapTask:999 - soft limit at 83886080
14:40:03,509  INFO MapTask:1000 - bufstart = 0; bufvoid = 104857600
14:40:03,510  INFO MapTask:1001 - kvstart = 26214396; length = 6553600
14:40:03,515  INFO MapTask:403 - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
--->Map-->LocalJobRunner Map Task Executor #0
--->Map-->LocalJobRunner Map Task Executor #0
14:40:03,522  INFO LocalJobRunner:591 - 
14:40:03,522  INFO MapTask:1460 - Starting flush of map output
14:40:03,522  INFO MapTask:1482 - Spilling map output
14:40:03,522  INFO MapTask:1483 - bufstart = 0; bufend = 40; bufvoid = 104857600
14:40:03,522  INFO MapTask:1485 - kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600
14:40:03,573  INFO MapTask:1667 - Finished spill 0
14:40:03,583  INFO Task:1038 - Task:attempt_local866013445_0001_m_000000_0 is done. And is in the process of committing
14:40:03,589  INFO LocalJobRunner:591 - map
14:40:03,589  INFO Task:1158 - Task 'attempt_local866013445_0001_m_000000_0' done.
14:40:03,589  INFO LocalJobRunner:249 - Finishing task: attempt_local866013445_0001_m_000000_0
14:40:03,590  INFO LocalJobRunner:456 - map task executor complete.
14:40:03,593  INFO LocalJobRunner:448 - Waiting for reduce tasks
14:40:03,593  INFO LocalJobRunner:302 - Starting task: attempt_local866013445_0001_r_000000_0
14:40:03,597  INFO FileOutputCommitter:108 - File Output Committer Algorithm version is 1
14:40:03,597  INFO ProcfsBasedProcessTree:192 - ProcfsBasedProcessTree currently is supported only on Linux.
14:40:03,627  INFO Task:612 -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@2ae5eb6
14:40:03,658  INFO ReduceTask:362 - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@72ddfb0b
14:40:03,686  INFO MergeManagerImpl:197 - MergerManager: memoryLimit=1314232704, maxSingleShuffleLimit=328558176, mergeThreshold=867393600, ioSortFactor=10, memToMemMergeOutputsThreshold=10
14:40:03,688  INFO EventFetcher:61 - attempt_local866013445_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
14:40:03,720  INFO LocalFetcher:144 - localfetcher#1 about to shuffle output of map attempt_local866013445_0001_m_000000_0 decomp: 50 len: 54 to MEMORY
14:40:03,729  INFO InMemoryMapOutput:100 - Read 50 bytes from map-output for attempt_local866013445_0001_m_000000_0
14:40:03,730  INFO MergeManagerImpl:315 - closeInMemoryFile -> map-output of size: 50, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->50
14:40:03,731  INFO EventFetcher:76 - EventFetcher is interrupted.. Returning
14:40:03,731  INFO LocalJobRunner:591 - 1 / 1 copied.
14:40:03,731  INFO MergeManagerImpl:687 - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
14:40:03,744  INFO Merger:606 - Merging 1 sorted segments
14:40:03,744  INFO Merger:705 - Down to the last merge-pass, with 1 segments left of total size: 41 bytes
14:40:03,746  INFO MergeManagerImpl:754 - Merged 1 segments, 50 bytes to disk to satisfy reduce memory limit
14:40:03,748  INFO MergeManagerImpl:784 - Merging 1 files, 54 bytes from disk
14:40:03,748  INFO MergeManagerImpl:799 - Merging 0 segments, 0 bytes from memory into reduce
14:40:03,748  INFO Merger:606 - Merging 1 sorted segments
14:40:03,749  INFO Merger:705 - Down to the last merge-pass, with 1 segments left of total size: 41 bytes
14:40:03,749  INFO LocalJobRunner:591 - 1 / 1 copied.
14:40:03,847  INFO deprecation:1173 - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
--->Reducer-->pool-3-thread-1
--->Reducer-->pool-3-thread-1
--->Reducer-->pool-3-thread-1
14:40:03,867  INFO Task:1038 - Task:attempt_local866013445_0001_r_000000_0 is done. And is in the process of committing
14:40:03,868  INFO LocalJobRunner:591 - 1 / 1 copied.
14:40:03,868  INFO Task:1199 - Task attempt_local866013445_0001_r_000000_0 is allowed to commit now
14:40:03,873  INFO FileOutputCommitter:535 - Saved output of task 'attempt_local866013445_0001_r_000000_0' to file:/D:/hadoop/output/_temporary/0/task_local866013445_0001_r_000000
14:40:03,877  INFO LocalJobRunner:591 - reduce > reduce
14:40:03,877  INFO Task:1158 - Task 'attempt_local866013445_0001_r_000000_0' done.
14:40:03,877  INFO LocalJobRunner:325 - Finishing task: attempt_local866013445_0001_r_000000_0
14:40:03,877  INFO LocalJobRunner:456 - reduce task executor complete.
14:40:04,044  INFO Job:1360 - Job job_local866013445_0001 running in uber mode : false
14:40:04,045  INFO Job:1367 -  map 100% reduce 100%
14:40:04,045  INFO Job:1378 - Job job_local866013445_0001 completed successfully
14:40:04,050  INFO Job:1385 - Counters: 30
    File System Counters
        FILE: Number of bytes read=488
        FILE: Number of bytes written=566782
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
    Map-Reduce Framework
        Map input records=2
        Map output records=4
        Map output bytes=40
        Map output materialized bytes=54
        Input split bytes=96
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=54
        Reduce input records=4
        Reduce output records=3
        Spilled Records=8
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=7
        Total committed heap usage (bytes)=498073600
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=24
    File Output Format Counters 
        Bytes Written=36

Process finished with exit code 0

会在D:\hadoop\output输出结果如下: part-r-00000中出现最后的结果。其中part-r-00000的内容如下:

hadoop  1
hello   2
java    1

Search

    微信好友

    博士的沙漏

    Table of Contents