wangluo010 发表于 2017-12-18 06:21:22

Python结合Shell/Hadoop实现MapReduce

  基本流程为:
  cat data | map | sort | reduce
  cat devProbe | ./mapper.py | sort| ./reducer.py
  echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py
  # -k, -key=POS1[,POS2]   键以pos1开始,以pos2结束
  如不执行下述命令,可以再py文件前加上python调用
  chmod +x mapper.py
  chmod +x reducer.py
  对于分布式环境下,可以使用以下命令:
  hadoop jar //hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar \
  -file mapper.py   -mapper mapper.py \
  -file reducer.py    -reducer reducer.py \
  -input     -output
  mapper.py
  

#!/usr/bin/python  
#
-*- coding: UTF-8 -*-  

  
__author__ = 'Manhua'
  

  
import sys
  
for line in sys.stdin:
  
line = line.strip()
  
item = line.split('`')
  
print "%s\t%s" % (item+'`'+item, 1)
  

  reducer.py
  

#!/usr/bin/python  
#
-*- coding: UTF-8 -*-  

  
__author__ = 'Manhua'
  

  

  
import sys
  

  
current_word = None
  
current_count = 0
  
word = None
  

  
for line in sys.stdin:
  
line = line.strip()
  
word, count = line.split('\t', 1)
  
try:
  
count = int(count)
  
except ValueError:#count如果不是数字的话,直接忽略掉
  
continue
  
if current_word == word:
  
current_count += count
  
else:
  
if current_word:
  
print "%s\t%s" % (current_word, current_count)
  
current_count = count
  
current_word = word
  

  
if word == current_word:#不要忘记最后的输出
  
print "%s\t%s" % (current_word, current_count)
  
页: [1]
查看完整版本: Python结合Shell/Hadoop实现MapReduce