3.5 join 文件合并
需求描述
两个文件合并 类似于数据库中的两张表合并
文件1 user_info.txt
user_id user_name user_loc 1 user1 BJ 2 user2 TJ 3 user3 BJ
文件2 order_info.txt
order_id user_id product_id price 1 1 1 200 2 1 2 300 3 1 2 200 4 2 3 100 5 1 3 300
Hadoop Streaming 实现
- mapper.py
"""
输出:
cat data/*|python mapper.py |sort
1 A:BJ
1 B:1:1
1 B:2:2
1 B:3:2
1 B:5:3
2 A:TJ
2 B:4:3
3 A:BJ
"""
import sys
for line in sys.stdin:
user_id = None
product_id = None
user_loc = None
order_id = None
# 标识数据来源
source = None
line = line.strip()
if line == "":
continue
fields = line.split('\t')
# 判断列的长度来区分数据来源
if len(fields) == 3:
# user data
source = 'A'
user_id, _, user_loc = fields
print('{0}\t{1}:{2}'.format(user_id, source, user_loc))
elif len(fields) == 4:
source = 'B'
order_id, user_id, product_id, price = fields
print('{0}\t{1}:{2}:{3}'.format(user_id, source, order_id, price))
- reducer.py
"""
输出:
cat data/*|python mapper.py |sort| python reducer.py
1 BJ
2 BJ
2 BJ
3 BJ
3 TJ
"""
import sys
last_user_id = None
order_cnt = 0
price_sum = 0
for line in sys.stdin:
line=line.strip()
user_id,info = line.split('\t')
fields = info.split(':')
if not last_user_id :
#first user info
last_user_id = user_id
order_cnt = 0
price_sum = 0
elif user_id == last_user_id:
order_cnt += 1
price_sum += int(fields[2])
else :
# a new user
print('{0}\t{1}:{2}'.format(last_user_id,order_cnt,price_sum))
last_user_id = user_id
order_cnt = 0
price_sum = 0
print('{0}\t{1}:{2}'.format(last_user_id,order_cnt,price_sum))
run.sh
HADOOP_CMD="/hadoop_home/bin/hadoop" #hadoop安装路径
STREAM_JAR_PATH="/hadoop_home/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.7.0.jar" # hadoop streaming jar包所在位置
INPUT_FILE_PATH="/xxx.txt" #要进行词频统计的文档在hdfs中的路径
OUTPUT_PATH="/output" #MR作业后结果的存放路径
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH # 输出路径如果之前存在 先删掉否则会报错
#执行hadoop-streaming进行计算
$HADOOP_CMD jar $STREAM_JAR_PATH\
-D stream.map.output.field.separator=\t\ #表示将map输出的结果按照制表符进行拆分,系统会默认对拆分的数据进行排序
-input $INPUT_FILE_PATH\
-output $OUTPUT_PATH\
-mapper "/usr/local/python3/bin/python3 mapper.py"\
-reducer "/usr/local/python3/bin/python3 reducer.py"\
-file "mapper.py"\
-file "reducer.py"
join文件合并代码优化
mapper_opt.py
由于逻辑简单,可优化的点基本没有
"""
@author:
@contact:
@time:
输出:
cat data/*|python mapper.py |sort
1 A:BJ
1 B:1:1
1 B:2:2
1 B:3:2
1 B:5:3
2 A:TJ
2 B:4:3
3 A:BJ
"""
import sys
def main():
for line in sys.stdin:
line = line.strip()
if line == '':
continue
fields = line.split('\t')
if len(fields) == 3:
#user_data
source = 'A'
user_id,_,user_loc = fields
print('{0}\t{1}:{2}'.format(user_id,source,user_loc))
elif len(fields) == 4:
#order_data
source = 'B'
order_id,user_id,product_id,price = fields
print('{0}\t{1}:{2}:{3}'.format(user_id,source,order_id,price))
if __name__=='__main__':
main()
- reducer_opt.py
"""
用itertools 和 itemgetter优化
"""
from itertools import groupby
from operator import itemgetter
import sys
def read_line(file):
for line in file:
line=line.strip()
if line == '':
continue
fields = line.split('\t') #key tab value
yield fields
def main():
data_iter = read_line(sys.stdin) #data_iter is iterator,not real data
for key,kviter in groupby(data_iter,itemgetter(0)):
user_id = key
user_loc = None
order_cnt = 0
order_sum = 0
for line in kviter:
fields = line[1].split(':')
if len(fields) == 3:
order_cnt += 1
order_sum += int(fields[2])
print('{0}\t{1}:{2}'.format(user_id,order_cnt,order_sum))
if __name__ == '__main__':
main()
mrjob 实现
实现对两个数据表进行join操作,显示效果为每个用户的所有订单信息
"01:user1" "01:80,02:90"
"02:user2" "03:82,04:95"
文件1:
user_info.txt
uid uname
01 user1
02 user2
03 user3
04 user4
文件2:
order_info.txt
uid oid price
01 01 80
01 02 90
02 03 82
02 04 95
代码:
from mrjob.job import MRJob
import os
import sys
class UserOrderJoin(MRJob):
#值排序
SORT_VALUES = True
def mapper(self, _, line):
fields = line.strip().split('\t')
if len(fields) == 2:
# user data
source = 'A'
user_id = fields[0]
user_name = fields[1]
yield user_id,[source,user_name]
elif len(fields) == 3:
# order data
source ='B'
user_id = fields[0]
order_id = fields[1]
price = fields[2]
yield user_id,[source,order_id,price]
else :
pass
def reducer(self,user_id,values):
'''
每个用户的订单列表
"01:user1" "01:80,02:90"
"02:user2" "03:82,04:95"
:param user_id:
:param values:
:return:
'''
values = [v for v in values]
if len(values)>1 :
user_name = values[0][1]
order_info = [':'.join([v[1],v[2]]) for v in values[1:]]
yield ':'.join([user_id,user_name]),','.join(order_info)
def main():
UserOrderJoin.run()
if __name__ == '__main__':
main()
实现对两个数据表进行join操作,显示效果为每个用户所下订单的订单总量和累计消费金额
"01:user1" [2, 170]
"02:user2" [2, 177]
from mrjob.job import MRJob
import os
import sys
class UserOrderJoin(MRJob):
SORT_VALUES = True
def mapper(self, _, line):
fields = line.strip().split('\t')
if len(fields) == 2:
# user data
source = 'A'
user_id = fields[0]
user_name = fields[1]
yield user_id,[source,user_name]
elif len(fields) == 3:
# order data
source ='B'
user_id = fields[0]
order_id = fields[1]
price = fields[2]
yield user_id,[source,order_id,price]
else :
pass
def reducer(self,user_id,values):
'''
统计每个用户的订单数量和累计消费金额
:param user_id:
:param values:
:return:
'''
values = [v for v in values]
user_name = None
order_cnt = 0
order_sum = 0
if len(values)>1:
for v in values:
if len(v) == 2 :
user_name = v[1]
elif len(v) == 3:
order_cnt += 1
order_sum += int(v[2])
yield ":".join([user_id,user_name]),(order_cnt,order_sum)
def main():
UserOrderJoin().run()
if __name__ == '__main__':
main()