Mastering Hadoop 3
上QQ阅读APP看书,第一时间看更新

MapReduce workflow in the Hadoop framework

The MapReduce execution goes through various steps and each step has scope for a little optimization. In the previous sections, we have covered the components of the MapReduce framework and now we will briefly look into the MapReduce execution flow, which will help us understand how each component interacts with each other. The following diagram gives a brief overview about the MapReduce execution flow. We have divided the diagram into smaller parts so that each step looks easier to understand. The step numbers are mentioned over arrow connectors and the last arrow in the diagram connects to the following diagram in the section: 

We will explain the different steps of the MapReduce internal flow here as follows:

  1. The InputFormat is the starting point of any MapReduce application. It is defined in the job configuration in the Driver class of the application, for example, job.setInputFormatClass(TextInputFormat.class). The InputFormat helps in understanding the type of input and how to read it. It returns the input split and record reader, which helps in reading records from the file. The size of the input split again depends on InputFormat, for example, the TextInputFormat split size will be equal to the hdfs block size that is defined with the property dfs.blocksize. For a non-splittable file format such as .gzip, the input split size will be equal to the single file size, which means that if there are 10 gzip files, then there will be a 10 input split.
  2. For every input split, one mapper will be launched. That means that if getsplit() returns five splits, then five map tasks will be launched to process the splits. The map function defined in the Mapper class will be executed for each key-value pair of input splits returned by RecordReader. The RecordReader implementation depends on InputFileFormat and for TextInputFormat, every new line is considered as a new record.
  1. The mapper processes the record. For every record, we may choose to emit one or more output and write it to the context object provided in the map function. The class that is responsible for collecting the map output is defined using the property mapreduce.job.map.output.collector.class and the default implementation is the  org.apache.Hadoop.mapred.MapTask.MapOutputBuffer class. The following diagram is the continuation of the preceding diagram:
  1. The output emitted by the map function goes to the partition class, which has the getPartition() method. The method calculates the partition number based on a specific algorithm and by default it uses HashPartitioner. The code for it looks as follows:
package org.apache.Hadoop.mapreduce.lib.partition;

import org.apache.Hadoop.mapreduce.Partitioner;

/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {

/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

}
  1. The emitted output with a partition is now written to the circular buffer in memory. By default, the size of the circular buffer is 100 MB. It can be changed by assigning a new value to the property mapreduce.task.io.sort.mb. If the size of the output data exceeds, the data stored in the circular buffer will spill to the disk.
Spilling : The data is spilled to the disk if the size of the data exceeds the limit specified in  mapreduce.map.sort.spill.percentage , which means if  mapreduce.map.sort.spill.percent  is  0.8  and  mapreduce.task.io.sort.mb  is 100 MB, then the buffer size reaches 80 MB, which is 80% of 100 MB and it will be spilled to the disk. The spilling process runs in parallel and does not affect the  Mapper  function. Remember that the mapper process will only get blocked if the mapper processes the data much faster than spilling, because the buffer will get full and mapper have to wait until there is some space in the buffer to accept new records.
  1. Before the data gets spilled to the disk, it is sorted by partition key and inside each partition, it is sorted with record keys. After the sorting, the combiner processes the records to reduce the amount of data that is written to the disk. The combiner is not guaranteed to be executed and if the size of the record emitted by mapper is larger than the buffer size, then both the sorting and combiner phases will be skipped and data will directly spill to disk.
  2. For every spilling operation there will be a new file created on the local disk of the mapper. The location of the directory can be configured using the mapreduce.job.local.dir property. 

  3. After all the records are processed and spilled tasks are completed, the spill files are merged together to form a single map output file. This phase is called the merge phase. The merge process can process up to 100 files by default. This setting can be changed by changing the value of mapreduce.task.io.sort.factor.

If the number of files is greater than the value specified, the merge step will recursively merge files until all files are merged to a single file. If the total number of spill files is greater than min.num.spills.for.combine, the Combiner will be executed on the final merge result. The MapOutput File along with the index file are written to the mapper's local disk. The index file contains information such as number of partitions, start, end points of partition, and so on. The following diagram is the continuation of the preceding diagram:

  1. The next step is the shuffling of data from mapper to reducer. The reducer runs an event fetcher thread whose task is to poll the application master for the status of the mappers. After receiving the execution finish status of the mapper, it passes the mapper's information to another thread, which starts copying the data from the mapper using the HTTP protocol. By default, the number of fetcher threads is five and it can be changed by assigning a new value to mapred.reduce.parallel.copies. The data copied by fetcher threads is stored in the memory, and the size of the memory is 1 GB by default. The memory can be increased or decreased by changing mapreduce.reduce.memory.totalbytes. When the size of map output exceeds the specified percentage of memory that is defined using mapreduce.reduce.shuffle.input.buffer.percent (default 0.7) 70%, then the fetcher saves the output into the local disk of the reducer. 
  2. The merger is the next step after fetcher. The merger thread can run in parallel with the fetcher jobs and it merges all the records fetched by the fetcher. The final step in the overall process is running the reducer task. The reducer reads the record from the merged file and processes it inside the reduce method. For each key, the reduce task will be executed once, which means if there are 55 keys in the output merged file, the reducer will run the reduce function 55 times.