Hadoop Streaming

0
170

Hadoop streaming is part of the utility package in the Hadoop distribution. With the help of Hadoop streaming, you can define and execute MapReduce jobs and tasks with any executable code or script a reducer or mapper.

Also Read: Hadoop MapReduce

Let’s take an example of the word-count problem:

A Hadoop job has a mapper and a reducer phase. The codes shown below are in the python script and can be run in Hadoop easily. You can also use other languages like Perl, Ruby etc. to write the code.

Mapper Code (Mapper.py):

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin:

# Remove whitespace either side myline = myline.strip()

# Break the line into words words = myline.split()

# Iterate the words list for myword in words:

# Write the results to standard output print ‘%s\t%s’ % (myword, 1)

Reducer Code (Reducer.py):

#!/usr/bin/python

from operator import itemgetter

import sys

current_word = “”

current_count = 0

word = “”

# Input takes from standard input for myline in sys.stdin:

# Remove whitespace either side myline = myline.strip()

# Split the input we got from mapper.py word, count = myline.split(‘\t’, 1)

# Convert count variable to integer

try:

count = int(count)

except ValueError:

# Count was not a number, so noiselessly overlook this line and proceed

if current_word == word:

current_count += count

else:

if current_word:

# Write result to standard output print ‘%s\t%s’ % (current_word, current_count)

current_count = count

current_word = word

# Do not forget to output the last word if needed!

if current_word == word:

print ‘%s\t%s’ % (current_word, current_count)

These individual codes can be saved in file called mapper.py and reducer.py and can be executed with below commands in Hadoop.

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar \

-input input_dirs \

-output output_dir \

-mapper <path/mapper.py \

-reducer <path/reducer.py

How Streaming Works

In the word-count example shown above the two codes, mapper.py and reducer.py, execute the input and output to stdin and stdout respectively. The streaming utility will create a MapReduce job, push it to a cluster, monitor its progress and get the output once the job is executed.

Each executable is launched as a separate process when the mapper is called. When the mapper job is executed, the input is fed to stdin by converting it into lines. The line-oriented outputs are collected by the mapper from stdout and each output is converted to a key-value pair, which becomes the final output of the mapper job. The length of the line till the first tab is considered a key and the remaining length except the tab becomes the value. If there is no tab in the line, then the entire line becomes the key and value is null. This is the default setting to pick key-value from the line output but can be customized.

The reducer task is executed in a similar way. Each task is launched as a separate process and feeds the input key-values pairs to the stdin in the form of lines. The line-oriented outputs are collected by the reducer from stdout and each output is converted to a key-value pair, which becomes the final output of the reducer job. By default, the length of the line till the first tab is considered a key and the remaining length except the tab becomes the corresponding value. This default setting can also be customized.

Streaming Command Options

The syntax for various streaming command options are:

  1. input directoryname: Defines the input location for the mapper.

  2. output directoryname: Defines the output location for the reducer.

  3. mapper JavaClassName: Runs the Mapper executable.

  4. reducer JavaClassName: Runs the Reducer executable.

  5. file filename: Make the mapper or reducer executable available on the nodes.

  6. inputformat JavaClassName: The class should return key-value pairs of text class. TextInputFormat is the default.

  7. outputformat JavaClassName: The class should return key-value pairs of text class. TextOutputFormat is the default.

  8. partitioner JavaClassName: Determines to which reducer the key is sent to.
  9. combiner JavaClassName: Combiner executable for map output.

  10. cmdenv name=value: Share the environment variables with streaming commands.

  11. inputreader: To specify a record reader class.

  12. verbose: Defines verbose output.

  13. lazyOutput: Create output lazily.

  14. numReduceTasks: Define the reducer count.

  15. mapdebug: If map job fails then this script will be called.

  16. reducedebug: If reduce job fails then this script will be called.

Specifying a Java Class as the Mapper/Reducer

The Java class can be supplied as a mapper or reducer with the help of below commands:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \

-input myInputDirs \

-output myOutputDir \

-mapper org.apache.hadoop.mapred.lib.IdentityMapper \

-reducer /bin/wc

Packaging Files With Job Submissions

Any executable file can be supplied as the mapper or the reducer. If the executable isn’t in the cluster then you can use the “-file” command to tell the executable file’s path. For e.g. the –file command below ensures that the file “myPythonScript.py” is moved to the cluster as a part of the job submission.

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \

-input myInputDirs \

-output myOutputDir \

-mapper myPythonScript.py \

-reducer /bin/wc \

-file myPythonScript.py

An other files that are required to run the job can also be parcelled to the cluster to be used by the mapper or reducer by usng below commands.

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \

-input myInputDirs \

-output myOutputDir \

-mapper myPythonScript.py \

-reducer /bin/wc \

-file myPythonScript.py \

-file myDictionary.txt

Conclusion

You now have a basic understanding of the Hadoop Streaming. The above details about Hadoop streaming should be good enough for you to get started in using Hadoop and its applications.