设为首页 收藏本站
查看: 859|回复: 0

[经验分享] 使用 Python 编写 Hadoop MapReduce 程序

[复制链接]

尚未签到

发表于 2016-12-9 06:36:26 | 显示全部楼层 |阅读模式
使用 Python 编写 Hadoop MapReduce 程序

 

以前写 HadoopMapReduce 程序时,使用的是 Java ,利用 Java 写起来是轻车熟路,没有问题,但是使用 Java 很明显的一个弊端就是每次都要编码、打包、上传、执行,还真心是麻烦,想要更加简单的使用 Hadoop 的运算能力,想要写 MapReduce 程序不那么复杂。还真是个问题。
仔细考虑了下,熟悉的 Python 又得拿起来了,随便搜了下 Python 编写 MapReduce 程序,看了个教程,发现用起来真是方便,遂记录之。
 
 
Hadoop 框架使用 Java 开发的,对 Java 进行了原生的支持,不过对于其它语言也提供了 API 支持,如 Python 、 C++ 、 Perl 、 Ruby 等。这个工具就是 Hadoop Streaming ,顾名思义, Streaming 就是 Pipe 操作,说起 pipe ,大家肯定不陌生。最原生的 Python 支持是需要 Jython 支持的,不过这里有额外的方法来实现,大家如果只是使用的话,不用纠结 Jython 转换的问题。
 
前置条件:
Python 环境
Hadoop 环境( single or cluster
最容易的 Hadoop 编程模型就是 Mapper 和 Reducer 的编写,这种编程模型大大降低了我们对于并发、同步、容错、一致性的要求,你只要编写好自己的业务逻辑,就可以提交任务。然后喝杯茶,结果就出来了,前提是你的业务逻辑没有错误。
使用 Hadoop Streaming ,能够利用 Pipe 模型,而使用 Python 的巧妙之处在于处理输入输出的数据使用的是 STDIN 和 STDOUT ,然后 Hadoop Streaming 会接管一切,转化成 MapReduce 模型。
  我们还是使用 wordcount 例子,具体内容不再详细解释,如果有不理解的可以自行度之。下面我们先看下 mapper 的代码:
 

#!/usr/bin/env python
import sys
#input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)

简单解释一下,输入从 sys.stdin 进入,然后进行分割操作,对于每行的分割结果,打印出 word 和 count=1 , Mapper 就这么简单。
大家看完 Mapper 之后,会产生疑问,这个怎么能够实现 mapper 功能?我们跳出这个 sys.stdin 模型,再回顾下 MapReduce 的程序。在 Mapper 中,程序不关心你怎么输入,只关心你的输出,这个 Mapper 代码会被放到各个 slave 机器上,去执行 Mapper 过程,其实可以理解为过滤、处理。
  在示例中,程序的输入会被进行一系列的处理过程,得到 word 和 count ,这个就是 slave 机器上的数据处理之后的内容。仔细理解下这个过程,对于开发程序还是相当有帮助的。
下面我们来看下 Reduce 程序, wordcount 的 reduce 程序就是统计相同 word 的 count 数目,然后再输出。我们还是直接上代码吧:

#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)

看完这个reduce代码,执行一下,完全没有问题,但是未必真正能理解这个reduce的内容,我来解释一下,明确知道执行流程的可以跳过。
reduce的代码页不复杂,利用Reduce程序,可以得出count数目。如果当前的词和分出来的词一致的话,count相加,如果不一致的话,就打印出来,同时更新输入的wordcount。最后的if是打印出最后一次统计结果。
reduce的执行依赖了MapReduce模型一个要点,在Shuffle过程中,同一个key会放到同一个reduce任务中,这样处理的是一系列连续的相同的key值,当key不一样的时候,就是说开始统计下一个word了。

<!--EndFragment-->利用PythonMapReduce程序就这么多内容,更细节的内容和自己处理的业务相关。
下面测试下结果:

<!--EndFragment-->
echo "foo foo quux labs foo bar quux" | python ./mapper.py
  foo     1
  foo     1
  quux    1
  labs    1
  foo     1
  bar     1
  quux    1
  进一步可以看到

echo "foo foo quux labs foo bar quux" | python ./mapper.py | sort -k1,1 | ./reducer.py
 bar     1
  foo     3
  labs    1
  quux    2
下面就是执行Hadoop命令了,在使用Hadoop Streaming时,要使用一定的格式操作才能提交任务。
  <!--EndFragment--> 

   hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar –mapper mapperfile –file mapper_file_path –reducer reducefile –file reducer_file_path –input input_path –output output_path
将自己的mapperreducer代码代入上面命令中,执行一下看结果是否正确。

<!--EndFragment-->本文的最后列一下Hadoop Streaming操作的参数,以作备忘。
  <!--EndFragment-->

Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming.jar [options]
 Options:
   -input    <path>                   DFS input file(s) for the Map step
   -output   <path>                   DFS output directory for the Reduce step
   -mapper   <cmd|JavaClassName>      The streaming command to run
   -combiner <JavaClassName>          Combiner has to be a Java class
   -reducer  <cmd|JavaClassName>      The streaming command to run
   -file     <file>                   File/dir to be shipped in the Job jar file
   -dfs    <h:p>|local                Optional. Override DFS configuration
   -jt     <h:p>|local                Optional. Override JobTracker configuration
   -additionalconfspec specfile       Optional.
   -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
   -outputformat TextOutputFormat(default)|JavaClassName  Optional.
   -partitioner JavaClassName         Optional.
   -numReduceTasks <num>              Optional.
   -inputreader <spec>                Optional.
   -jobconf  <n>=<v>                  Optional. Add or override a JobConf property
   -cmdenv   <n>=<v>                  Optional. Pass env.var to streaming commands
   -cacheFile fileNameURI
   -cacheArchive fileNameURI
   -verbose
  下面简单说下参数的意思:
-input:DFS输入,可以有多个input输入,不过我一般喜欢把输入用逗号{,}分割。
-output:DFS输入,实际上就是Reducer输出
-mapper:MapReduce中的Mapper,看清楚了,也可以是cmd shell命令
-combiner:这个必须是Java
-reducer:MapReducer中的Reducer,也可以是shell命令
-file:这个file参数是用来提交本地的文件,如本地的mapper或者reducer
-dfs:这个是可选的,用来覆盖DFS设定。
-jt:用来覆盖jobtracker的设定
-inputformat:输入格式设定
-outputformat:输出文件的格式设定
 
  
上面的这些参数已经足够平时的应用了,如果有更为细节的需求,就要考虑Streaming是否合适,是否适应自己的业务逻辑。
 
最后再说一句:按照Hadoop Streaming的执行流程,这些参数应该足够了,但是如果我有更复杂的需求:如根据key值分离文件;根据key值重命名文件;读取HDFS上文件配置数据;从多个数据源中读取mapper数据,如HDFS、DataBase、HBase、Nosql等,这些比较灵活的应用使用Python Streaming都有限制,或者是我暂时还没有看到这块。但是目前来说,使用Hadoop Streaming操作能够大量减少代码和流程,比使用Java要方便许多,特别是对于日常的、临时的统计工作。
 
只有更复杂的统计工作和Hadoop Streaming特性,留待以后再行发掘。



<!--EndFragment-->

  <!--EndFragment--> 
 
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-311522-1-1.html 上篇帖子: Hadoop wordcount程序的配置运行 下篇帖子: Centos5下安装hadoop-伪分布式模式
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表