设为首页 收藏本站
查看: 853|回复: 0

[经验分享] Writing An Hadoop MapReduce Program In Python

[复制链接]

尚未签到

发表于 2016-12-8 10:14:25 | 显示全部楼层 |阅读模式
  from: http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
  

In this tutorial, I will describe how to write a simple MapReduce program for Hadoop in the Python programming language.


Table of Contents:

  • Motivation
  • What we want to do
  • Prerequisites
  • Python MapReduce Code
  • Map: mapper.py
  • Reduce: reducer.py
  • Test your code (cat data | map | sort | reduce)
  • Running the Python Code on Hadoop
  • Download example input data
  • Copy local example data to HDFS
  • Run the MapReduce job
  • Improved Mapper and Reducer code: using Python iterators and generators
  • mapper.py
  • reducer.py
  • Related Links
  • Comments (112)



 


Motivation

Even though the Hadoop framework is written in Java, programs for Hadoop need not to be coded in Java but can also be developed in other languages like Python or C++ (the latter since version 0.14.1). However, the documentation and the most prominent Python example on the Hadoop home page could make you think that youmust translate your Python code using Jython into a Java jar file. Obviously, this is not very convenient and can even be problematic if you depend on Python features not provided by Jython. Another issue of the Jython approach is the overhead of writing your Python program in such a way that it can interact with Hadoop – just have a look at the example in<HADOOP_INSTALL>/src/examples/python/WordCount.py and you see what I mean. I still recommend to have at least a look at the Jython approach and maybe even at the new C++ MapReduce API called Pipes, it’s really interesting.

Having that said, the ground is prepared for the purpose of this tutorial: writing a Hadoop MapReduce program in a more Pythonic way, i.e. in a way you should be familiar with.


What we want to do

We will write a simple MapReduce program (see also Wikipedia) for Hadoop in Python but without using Jython to translate our code to Java jar files.

Our program will mimick the WordCount example, i.e. it reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occured, separated by a tab.



[size=1em]Note: You can also use programming languages other than Python such as Perl or Ruby with the “technique” described in this tutorial. I wrote some words about what happens behind the scenes. Feel free to correct me if I’m wrong.


Prerequisites

You should have an Hadoop cluster up and running because we will get our hands dirty. If you don’t have a cluster yet, my following tutorials might help you to build one. The tutorials are tailored to Ubuntu Linux but the information does also apply to other Linux/Unix variants.




  • Running Hadoop On Ubuntu Linux (Single-Node Cluster)
    How to set up a single-node Hadoop cluster using the Hadoop Distributed File System (HDFS) on Ubuntu Linux



  • Running Hadoop On Ubuntu Linux (Multi-Node Cluster)
    How to set up a multi-node Hadoop cluster using the Hadoop Distributed File System (HDFS) on Ubuntu Linux

Python MapReduce Code

The “trick” behind the following Python code is that we will use HadoopStreaming (see also the wiki entry) for helping us passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output). We will simply use Python’s sys.stdin to read input data and print our own output to sys.stdout. That’s all we need to do because HadoopStreaming will take care of everything else! Amazing, isn’t it? Well, at least I had a “wow” experience…


Map: mapper.py

Save the following code in the file /home/hduser/mapper.py. It will read data from STDIN (standard input), split it into words and output a list of lines mapping words to their (intermediate) counts to STDOUT (standard output). The Map script will not compute an (intermediate) sum of a word’s occurrences. Instead, it will output “<word> 1″ immediately – even though the <word> might occur multiple times in the input – and just let the subsequent Reduce step do the final sum count. Of course, you can change this behavior in your own scripts as you please, but we will keep it like that in this tutorial because of didactic reasons :-)

Make sure the file has execution permission (chmod +x /home/hduser/mapper.py should do the trick) or you will run into problems.


01#!/usr/bin/env python



02 



03
import sys



04 



05# input comes from STDIN (standard input)



06
for line in sys.stdin:



07
    # remove leading and trailing whitespace



08
    line = line.strip()



09
    # split the line into words



10
    words = line.split()



11
    # increase counters



12
    for word in words:



13
        # write the results to STDOUT (standard output);



14
        # what we output here will be the input for the



15
        # Reduce step, i.e. the input for reducer.py



16
        #



17
        # tab-delimited; the trivial word count is 1



18
        print '%s\t%s' % (word, 1)







Reduce: reducer.py

Save the following code in the file /home/hduser/reducer.py. It will read the results of mapper.py from STDIN (standard input), and sum the occurrences of each word to a final count, and output its results to STDOUT (standard output).

Make sure the file has execution permission (chmod +x /home/hduser/reducer.py should do the trick) or you will run into problems.


01#!/usr/bin/env python



02 



03
from operator import itemgetter



04
import sys



05 



06
current_word = None



07
current_count = 0



08
word = None



09 



10# input comes from STDIN



11
for line in sys.stdin:



12
    # remove leading and trailing whitespace



13
    line = line.strip()



14 



15
    # parse the input we got from mapper.py



16
    word, count = line.split('\t', 1)



17 



18
    # convert count (currently a string) to int



19
    try:



20
        count = int(count)



21
    except ValueError:



22
        # count was not a number, so silently



23
        # ignore/discard this line



24
        continue



25 



26
    # this IF-switch only works because Hadoop sorts map output



27
    # by key (here: word) before it is passed to the reducer



28
    if current_word == word:



29
        current_count += count



30
    else:



31
        if current_word:



32
            # write result to STDOUT



33
            print '%s\t%s' % (current_word, current_count)



34
        current_count = count



35
        current_word = word



36 



37# do not forget to output the last word if needed!



38
if current_word == word:



39
    print '%s\t%s' % (current_word, current_count)







Test your code (cat data | map | sort | reduce)

I recommend to test your mapper.py and reducer.py scripts locally before using them in a MapReduce job. Otherwise your jobs might successfully complete but there will be no job result data at all or not the results you would have expected. If that happens, most likely it was you (or me) who screwed up.

Here are some ideas on how to test the functionality of the Map and Reduce scripts.


# very basic test
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
foo     1
foo     1
quux    1
labs    1
foo     1
bar     1
quux    1
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
bar     1
foo     3
labs    1
quux    2
# using one of the ebooks as example input
# (see below on where to get the ebooks)
hduser@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
The     1
Project 1
Gutenberg       1
EBook   1
of      1
[...]
(you get the idea)
Running the Python Code on Hadoop

Download example input data

We will use three ebooks from Project Gutenberg for this example:



  • The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson
  • The Notebooks of Leonardo Da Vinci
  • Ulysses by James Joyce

Download each ebook as text files in Plain Text UTF-8 encoding and store the files in a temporary directory of choice, for example /tmp/gutenberg.


hduser@ubuntu:~$ ls -l /tmp/gutenberg/
total 3604
-rw-r--r-- 1 hduser hadoop  674566 Feb  3 10:17 pg20417.txt
-rw-r--r-- 1 hduser hadoop 1573112 Feb  3 10:18 pg4300.txt
-rw-r--r-- 1 hduser hadoop 1423801 Feb  3 10:18 pg5000.txt
hduser@ubuntu:~$
Copy local example data to HDFS

Before we run the actual MapReduce job, we first have to copy the files from our local file system to Hadoop’sHDFS.


hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
hduser@ubuntu:/usr/local/hadoop$
Run the MapReduce job

Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As I said above, we useHadoopStreaming for helping us passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output).


hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /home/hduser/mapper.py -mapper /home/hduser/mapper.py -file /home/hduser/reducer.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
If you want to modify some Hadoop settings on the fly like increasing the number of Reduce tasks, you can use the -Doption:


hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=16 ...

[size=1em]An important note about mapred.map.tasks: Hadoop does not honor mapred.map.tasks beyond considering it a hint. But it accepts the user specified mapred.reduce.tasks and doesn’t manipulate that. You cannot force mapred.map.tasks but can specify mapred.reduce.tasks.


The job will read all the files in the HDFS directory /user/hduser/gutenberg, process it, and store the results in the HDFS directory /user/hduser/gutenberg-output. In general Hadoop will create one output file per reducer; in our case however it will only create a single file because the input files are very small.

Example output of the previous command in the console:


hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/app/hadoop/tmp/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...]
[...] INFO streaming.StreamJob:  map 0%  reduce 0%
[...] INFO streaming.StreamJob:  map 43%  reduce 0%
[...] INFO streaming.StreamJob:  map 86%  reduce 0%
[...] INFO streaming.StreamJob:  map 100%  reduce 0%
[...] INFO streaming.StreamJob:  map 100%  reduce 33%
[...] INFO streaming.StreamJob:  map 100%  reduce 70%
[...] INFO streaming.StreamJob:  map 100%  reduce 77%
[...] INFO streaming.StreamJob:  map 100%  reduce 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
[...] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$
As you can see in the output above, Hadoop also provides a basic web interface for statistics and information. When the Hadoop cluster is running, go to http://localhost:50030/ and browse around. Here’s a screenshot of the Hadoop web interface for the job we just ran.


[size=1em]A screenshot of Hadoop's web interface, showing the details of the MapReduce job we just ran.



Check if the result is successfully stored in HDFS directory /user/hduser/gutenberg-output:


hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-output
Found 1 items
/user/hduser/gutenberg-output/part-00000     <r 1>   903193  2007-09-21 13:00
hduser@ubuntu:/usr/local/hadoop$
You can then inspect the contents of the file with the dfs -cat command:


hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000
"(Lo)cra"       1
"1490   1
"1498," 1
"35"    1
"40,"   1
"A      2
"AS-IS".        2
"A_     1
"Absoluti       1
[...]
hduser@ubuntu:/usr/local/hadoop$
Note that in this specific output above the quote signs (“) enclosing the words have not been inserted by Hadoop. They are the result of how our Python code splits words, and in this case it matched the beginning of a quote in the ebook texts. Just inspect the part-00000 file further to see it for yourself.


Improved Mapper and Reducer code: using Python iterators and generators

The Mapper and Reducer examples above should have given you an idea of how to create your first MapReduce application. The focus was code simplicity and ease of understanding, particularly for beginners of the Python programming language. In a real-world application however, you might want to optimize your code by using Python iterators and generators (an even better introduction in PDF) as some readers have pointed out.

Generally speaking, iterators and generators (functions that create iterators, for example with Python’s yieldstatement) have the advantage that an element of a sequence is not produced until you actually need it. This can help a lot in terms of computational expensiveness or memory consumption depending on the task at hand.



[size=1em]Note: The following Map and Reduce scripts will only work “correctly” when being run in the Hadoop context, i.e. as Mapper and Reducer in a MapReduce job. This means that running the naive test “cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py” will not work correctly anymore because some functionality is intentionally outsourced to Hadoop.


Precisely, we compute the sum of a word’s occurrences, e.g. (“foo”, 4), only if by chance the same word (“foo”) appears multiple times in succession. In the majority of cases, however, we let the Hadoop group the (key, value) pairs between the Map and the Reduce step because Hadoop is more efficient in this regard than our simple Python scripts.


mapper.py

01#!/usr/bin/env python



02"""A more advanced Mapper, using Python iterators and generators."""



03 



04
import sys



05 



06
def read_input(file):



07
    for line in file:



08
        # split the line into words



09
        yield line.split()



10 



11
def main(separator='\t'):



12
    # input comes from STDIN (standard input)



13
    data = read_input(sys.stdin)



14
    for words in data:



15
        # write the results to STDOUT (standard output);



16
        # what we output here will be the input for the



17
        # Reduce step, i.e. the input for reducer.py



18
        #



19
        # tab-delimited; the trivial word count is 1



20
        for word in words:



21
            print '%s%s%d' % (word, separator, 1)



22 



23
if __name__ == "__main__":



24
    main()







reducer.py

01#!/usr/bin/env python



02"""A more advanced Reducer, using Python iterators and generators."""



03 



04
from itertools import groupby



05
from operator import itemgetter



06
import sys



07 



08
def read_mapper_output(file, separator='\t'):



09
    for line in file:



10
        yield line.rstrip().split(separator, 1)



11 



12
def main(separator='\t'):



13
    # input comes from STDIN (standard input)



14
    data = read_mapper_output(sys.stdin, separator=separator)



15
    # groupby groups multiple word-count pairs by word,



16
    # and creates an iterator that returns consecutive keys and their group:



17
    #   current_word - string containing a word (the key)



18
    #   group - iterator yielding all ["<current_word>", "<count>"] items



19
    for current_word, group in groupby(data, itemgetter(0)):



20
        try:



21
            total_count = sum(int(count) for current_word, count in group)



22
            print "%s%s%d" % (current_word, separator, total_count)



23
        except ValueError:



24
            # count was not a number, so silently discard this item



25
            pass



26 



27
if __name__ == "__main__":



28
    main()







Related Links

From yours truly:



  • Running Hadoop On Ubuntu Linux (Single-Node Cluster)
  • Running Hadoop On Ubuntu Linux (Multi-Node Cluster)

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311363-1-1.html 上篇帖子: hadoop添加垃圾回收站 管用 下篇帖子: 海量数据 & Hadoop 面试题
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表