3.5 join 文件合并

需求描述

两个文件合并 类似于数据库中的两张表合并

  • 文件1 user_info.txt

    user_id user_name user_loc
    
    1    user1    BJ
    2    user2    TJ
    3    user3    BJ
    
  • 文件2 order_info.txt

    order_id user_id product_id price
    
    1    1    1    200
    
    2    1    2    300
    
    3    1    2    200
    
    4    2    3    100
    
    5    1    3    300
    

Hadoop Streaming 实现

  • mapper.py
"""
输出:
cat data/*|python mapper.py |sort
1       A:BJ
1       B:1:1
1       B:2:2
1       B:3:2
1       B:5:3
2       A:TJ
2       B:4:3
3       A:BJ
"""
import sys

for line in sys.stdin:
    user_id = None
    product_id = None
    user_loc = None
    order_id = None
    # 标识数据来源
    source = None
    line = line.strip()
    if line == "":
        continue
    fields = line.split('\t')

    # 判断列的长度来区分数据来源
    if len(fields) == 3:
        # user data
        source = 'A'
        user_id, _, user_loc = fields
        print('{0}\t{1}:{2}'.format(user_id, source, user_loc))

    elif len(fields) == 4:
        source = 'B'
        order_id, user_id, product_id, price = fields
        print('{0}\t{1}:{2}:{3}'.format(user_id, source, order_id, price))
  • reducer.py
"""
输出:
cat data/*|python mapper.py |sort| python reducer.py
1       BJ
2       BJ
2       BJ
3       BJ
3       TJ
"""
import sys

last_user_id = None
order_cnt = 0
price_sum = 0

for line in sys.stdin:
    line=line.strip()
    user_id,info = line.split('\t')
    fields = info.split(':')
    if not last_user_id :
        #first user info
        last_user_id = user_id
        order_cnt = 0
        price_sum = 0
    elif user_id == last_user_id:
        order_cnt += 1
        price_sum += int(fields[2])

    else :
        # a new user
        print('{0}\t{1}:{2}'.format(last_user_id,order_cnt,price_sum))
        last_user_id = user_id
        order_cnt = 0
        price_sum = 0


print('{0}\t{1}:{2}'.format(last_user_id,order_cnt,price_sum))

run.sh

HADOOP_CMD="/hadoop_home/bin/hadoop" #hadoop安装路径
STREAM_JAR_PATH="/hadoop_home/share/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.7.0.jar" # hadoop streaming jar包所在位置

INPUT_FILE_PATH="/xxx.txt"  #要进行词频统计的文档在hdfs中的路径
OUTPUT_PATH="/output"                         #MR作业后结果的存放路径

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH # 输出路径如果之前存在 先删掉否则会报错

#执行hadoop-streaming进行计算
$HADOOP_CMD jar $STREAM_JAR_PATH\
 -D stream.map.output.field.separator=\t\ #表示将map输出的结果按照制表符进行拆分,系统会默认对拆分的数据进行排序
 -input $INPUT_FILE_PATH\
 -output $OUTPUT_PATH\
 -mapper "/usr/local/python3/bin/python3 mapper.py"\
 -reducer "/usr/local/python3/bin/python3 reducer.py"\
 -file "mapper.py"\
 -file "reducer.py"

join文件合并代码优化

  • mapper_opt.py

    由于逻辑简单,可优化的点基本没有

"""
@author:  
@contact:  
@time:


输出:
cat data/*|python mapper.py |sort
1       A:BJ
1       B:1:1
1       B:2:2
1       B:3:2
1       B:5:3
2       A:TJ
2       B:4:3
3       A:BJ

"""

import sys


def main():
    for line in sys.stdin:
        line = line.strip()
        if line == '':
            continue
        fields = line.split('\t')
        if len(fields) == 3:
            #user_data
            source = 'A'
            user_id,_,user_loc = fields
            print('{0}\t{1}:{2}'.format(user_id,source,user_loc))
        elif len(fields) == 4:
            #order_data
            source = 'B'
            order_id,user_id,product_id,price = fields
            print('{0}\t{1}:{2}:{3}'.format(user_id,source,order_id,price))

if __name__=='__main__':
    main()
  • reducer_opt.py
"""
 用itertools 和 itemgetter优化
"""


from itertools import groupby
from operator import itemgetter
import sys
def read_line(file):
    for line in file:
        line=line.strip()
        if line == '':
            continue
        fields = line.split('\t') #key tab value
        yield fields

def main():
    data_iter = read_line(sys.stdin) #data_iter is iterator,not real data
    for key,kviter in groupby(data_iter,itemgetter(0)):
        user_id = key
        user_loc = None
        order_cnt = 0
        order_sum = 0
        for line in kviter:
            fields = line[1].split(':')
            if len(fields) == 3:
                order_cnt += 1
                order_sum += int(fields[2])
        print('{0}\t{1}:{2}'.format(user_id,order_cnt,order_sum))

if __name__ == '__main__':
    main()

mrjob 实现

实现对两个数据表进行join操作,显示效果为每个用户的所有订单信息

"01:user1"    "01:80,02:90"
"02:user2"    "03:82,04:95"

文件1:

user_info.txt
uid uname
01    user1
02    user2
03    user3
04    user4

文件2:

order_info.txt
uid oid price
01    01    80
01    02    90
02    03    82
02    04    95

代码:

from mrjob.job import MRJob
import os
import sys
class UserOrderJoin(MRJob):
    #值排序
    SORT_VALUES = True
    def mapper(self, _, line):
        fields = line.strip().split('\t')
        if len(fields) == 2:
            # user data
            source = 'A'
            user_id = fields[0]
            user_name = fields[1]
            yield  user_id,[source,user_name]
        elif len(fields) == 3:
            # order data
            source ='B'
            user_id = fields[0]
            order_id = fields[1]
            price = fields[2]
            yield user_id,[source,order_id,price]
        else :
            pass

    def reducer(self,user_id,values):
        '''
        每个用户的订单列表
        "01:user1"    "01:80,02:90"
        "02:user2"    "03:82,04:95"

        :param user_id:
        :param values:
        :return:
        '''
        values = [v for v in values]
        if len(values)>1 :
            user_name = values[0][1]
            order_info = [':'.join([v[1],v[2]]) for v in values[1:]]
            yield ':'.join([user_id,user_name]),','.join(order_info)


def main():
    UserOrderJoin.run()

if __name__ == '__main__':
    main()

实现对两个数据表进行join操作,显示效果为每个用户所下订单的订单总量和累计消费金额

"01:user1"    [2, 170]
"02:user2"    [2, 177]
from mrjob.job import MRJob
import os
import sys
class UserOrderJoin(MRJob):
    SORT_VALUES = True

    def mapper(self, _, line):
        fields = line.strip().split('\t')
        if len(fields) == 2:
            # user data
            source = 'A'
            user_id = fields[0]
            user_name = fields[1]
            yield  user_id,[source,user_name]
        elif len(fields) == 3:
            # order data
            source ='B'
            user_id = fields[0]
            order_id = fields[1]
            price = fields[2]
            yield user_id,[source,order_id,price]
        else :
            pass



    def reducer(self,user_id,values):
        '''
        统计每个用户的订单数量和累计消费金额
        :param user_id:
        :param values:
        :return:
        '''
        values = [v for v in values]
        user_name = None
        order_cnt = 0
        order_sum = 0
        if len(values)>1:
            for v in values:
                if len(v) ==  2 :
                    user_name = v[1]
                elif len(v) == 3:
                    order_cnt += 1
                    order_sum += int(v[2])
            yield ":".join([user_id,user_name]),(order_cnt,order_sum)


def main():
    UserOrderJoin().run()

if __name__ == '__main__':
    main()

results matching ""

    No results matching ""