3.4 利用MRJob编写和运行MapReduce代码
mrjob 简介
mrjob是编写在Hadoop上运行的Python程序的最简单方法。如果使用mrjob,您将能够在本地测试代码,而无需安装Hadoop或在自己选择的集群上运行它。
在一个类中保存一个作业的所有MapReduce代码
在运行时很容易上传和安装代码和数据依赖
用一行代码切换输入和输出格式
自动下载和解析Python跟踪的错误日志
在Python代码之前或之后放置命令行过滤器
如果你不想成为Hadoop专家,但是需要MapReduce的计算能力,mrjob可能正是适合你的。
mrjob 安装
使用pip安装
pip install mrjob
运行模式
- 内嵌( -r inline)
- 本地 (-r local)
- Hadoop (-r hadoop)
- Amazon EMR (-r emr)
mrjob实现WordCount
from mrjob.job import MRJob
#定义一个类继承MRJob
class MRWordCounter(MRJob):
#定义两个方法:mapper和reducer
def mapper(self, key, line):
for word in line.split():
yield word, 1
def reducer(self, word, occurrences):
yield word, sum(occurrences)
if __name__ == '__main__':
MRWordCounter.run()
运行MapReduce
1、内嵌(-r inline)方式
特点是调试方便,启动单一进程模拟任务执行状态和结果,默认(-r inline)可以省略,输出文件使用 > output-file 或-o output-file,比如下面两种运行方式是等价的
python word_count.py -r inline input.txt > output.txt python word_count.py input.txt > output.txt
2、本地(-r local)方式
用于本地模拟Hadoop调试,与内嵌(inline)方式的区别是启动了多进程执行每一个任务。如:
python word_count.py -r local input.txt > output1.txt
3、Hadoop(-r hadoop)方式
用于hadoop环境,支持Hadoop运行调度控制参数,如:
1)指定Hadoop任务调度优先级(VERY_HIGH|HIGH),如:--jobconf mapreduce.job.priority=VERY_HIGH。
2)Map及Reduce任务个数限制,如:--jobconf mapreduce.map.tasks=2 --jobconf mapreduce.reduce.tasks=5
python word_count.py -r hadoop hdfs:///test.txt -o hdfs:///output
mrjob 实现 topN统计
统计数据中出现次数最多的前n个数据
import sys
from mrjob.job import MRJob,MRStep
import heapq
class TopNWords(MRJob):
def mapper(self, _, line):
if line.strip() != "":
for word in line.strip().split():
yield word,1
#介于mapper和reducer之间,用于临时的将mapper输出的数据进行统计
def combiner(self, word, counts):
yield word,sum(counts)
def reducer_sum(self, word, counts):
yield None,(sum(counts),word)
#利用heapq将数据进行排序,将最大的2个取出
def top_n_reducer(self,_,word_cnts):
for cnt,word in heapq.nlargest(2,word_cnts):
yield word,cnt
#实现steps方法用于指定自定义的mapper,comnbiner和reducer方法
def steps(self):
return [
MRStep(mapper=self.mapper,
combiner=self.combiner,
reducer=self.reducer_sum),
MRStep(reducer=self.top_n_reducer)
]
def main():
TopNWords.run()
if __name__=='__main__':
main()