#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
Reduce step: reducer.py
Save the following code in the file /home/hduser/reducer.py
. It will read the results of mapper.py
(so the output format of mapper.py
and the expected input format of reducer.py
must match) and sum the
occurrences of each word to a final count, and then output its results to STDOUT
Make sure the file has execution permission (chmod +x /home/hduser/reducer.py
should do the trick) or you will run
into problems.
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Test your code (cat data | map | sort | reduce)
I recommend to test your mapper.py
and reducer.py
scripts locally before using them in a MapReduce job.
Otherwise your jobs might successfully complete but there will be no job result data at all or not the results
you would have expected. If that happens, most likely it was you (or me) who screwed up.
Here are some ideas on how to test the functionality of the Map and Reduce scripts.
# Test mapper.py and reducer.py locally first
# very basic test
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
quux 1
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
bar 1
foo 3
labs 1
quux 2
# using one of the ebooks as example input
# (see below on where to get the ebooks)
hduser@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
The 1
Project 1
Gutenberg 1
EBook 1
of 1
(you get the idea)
Running the Python Code on Hadoop
We will use three ebooks from Project Gutenberg for this example:
The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson
The Notebooks of Leonardo Da Vinci
Ulysses by James Joyce
Download each ebook as text files in Plain Text UTF-8
encoding and store the files in a local temporary directory of
choice, for example /tmp/gutenberg
hduser@ubuntu:~$ ls -l /tmp/gutenberg/
total 3604
-rw-r--r-- 1 hduser hadoop 674566 Feb 3 10:17 pg20417.txt
-rw-r--r-- 1 hduser hadoop 1573112 Feb 3 10:18 pg4300.txt
-rw-r--r-- 1 hduser hadoop 1423801 Feb 3 10:18 pg5000.txt
Copy local example data to HDFS
Before we run the actual MapReduce job, we must first copy the files
from our local file system to Hadoop’s HDFS.
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x - hduser supergroup 0 2010-05-08 17:40 /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r-- 3 hduser supergroup 674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r-- 3 hduser supergroup 1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r-- 3 hduser supergroup 1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
Run the MapReduce job
Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As I said above,
we leverage the Hadoop Streaming API for helping us passing data between our Map and Reduce code via STDIN
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file /home/hduser/mapper.py -mapper /home/hduser/mapper.py \
-file /home/hduser/reducer.py -reducer /home/hduser/reducer.py \
-input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
If you want to modify some Hadoop settings on the fly like increasing the number of Reduce tasks, you can use the
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=16 ...
Note about mapred.map.tasks: Hadoop does not honor mapred.map.tasks beyond considering it a hint. But it accepts the user specified mapred.reduce.tasks and doesn't manipulate that. You cannot force mapred.map.tasks but can specify mapred.reduce.tasks.
The job will read all the files in the HDFS directory /user/hduser/gutenberg
, process it, and store the results in
the HDFS directory /user/hduser/gutenberg-output
. In general Hadoop will create one output file per reducer; in
our case however it will only create a single file because the input files are very small.
Example output of the previous command in the console:
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
packageJobJar: [/app/hadoop/tmp/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...] INFO streaming.StreamJob: map 0% reduce 0%
[...] INFO streaming.StreamJob: map 43% reduce 0%
[...] INFO streaming.StreamJob: map 86% reduce 0%
[...] INFO streaming.StreamJob: map 100% reduce 0%
[...] INFO streaming.StreamJob: map 100% reduce 33%
[...] INFO streaming.StreamJob: map 100% reduce 70%
[...] INFO streaming.StreamJob: map 100% reduce 77%
[...] INFO streaming.StreamJob: map 100% reduce 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
[...] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output
As you can see in the output above, Hadoop also provides a basic web interface for statistics and information. When
the Hadoop cluster is running, open http://localhost:50030/ in a browser and have a look
around. Here’s a screenshot of the Hadoop web interface for the job we just ran.
Figure 1: A screenshot of Hadoop's JobTracker web interface, showing the details of the MapReduce job we just ran
Check if the result is successfully stored in HDFS directory /user/hduser/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-output
Found 1 items
/user/hduser/gutenberg-output/part-00000 <r 1> 903193 2007-09-21 13:00
You can then inspect the contents of the file with the dfs -cat
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000
"(Lo)cra" 1
"1490 1
"1498," 1
"35" 1
"40," 1
"A 2
"AS-IS". 2
"A_ 1
"Absoluti 1
Note that in this specific output above the quote signs ("
) enclosing the words have not been inserted by Hadoop.
They are the result of how our Python code splits words, and in this case it matched the beginning of a quote in the
ebook texts. Just inspect the part-00000
file further to see it for yourself.
Improved Mapper and Reducer code: using Python iterators and generators
The Mapper and Reducer examples above should have given you an idea of how to create your first MapReduce application.
The focus was code simplicity and ease of understanding, particularly for beginners of the Python programming language.
In a real-world application however, you might want to optimize your code by using
Python iterators and generators (an even
better introduction in PDF).
Generally speaking, iterators and generators (functions that create iterators, for example with Python’s yield
statement) have the advantage that an element of a sequence is not produced until you actually need it. This can help
a lot in terms of computational expensiveness or memory consumption depending on the task at hand.
Note: The following Map and Reduce scripts will only work "correctly" when being run in the Hadoop context, i.e. as Mapper and Reducer in a MapReduce job. This means that running the naive test command "cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py" will not work correctly anymore because some functionality is intentionally outsourced to Hadoop.
Precisely, we compute the sum of a word’s occurrences, e.g. ("foo", 4)
, only if by chance the same word (foo
appears multiple times in succession. In the majority of cases, however, we let the Hadoop group the (key, value) pairs
between the Map and the Reduce step because Hadoop is more efficient in this regard than our simple Python scripts.
#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""
import sys
def read_input(file):
for line in file:
# split the line into words
yield line.split()
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for words in data:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
# tab-delimited; the trivial word count is 1
for word in words:
print '%s%s%d' % (word, separator, 1)
if __name__ == "__main__":
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
# current_word - string containing a word (the key)
# group - iterator yielding all ["<current_word>", "<count>"] items
for current_word, group in groupby(data, itemgetter(0)):
total_count = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
# count was not a number, so silently discard this item
if __name__ == "__main__":
From yours truly:
Running Hadoop On Ubuntu Linux (Single-Node Cluster)
Running Hadoop On Ubuntu Linux (Multi-Node Cluster)
Computer science PhD. Writer. Open source software committer.
Currently focusing on product & technology strategy and competitive analysis
in the Office of the CTO at Confluent.
Read more »