5、SPARK RDD开发实战

需求:通过电商后台产生的订单等信息,为用户构建简单用户画像

s8

  • 提取性别、年龄、星座等信息
  • 用户身份证号码的编码规则
    • 第7至第14位是出生日期码,表示出生的年、月、日,年、月、日分别用4位、2位(不足两位加0)、2(同上)位数字表示。
    • 第15至17位是顺序码:表示在同一地址码表示的范围内,对同年、同月、同日出生的人遍定的顺序号,顺序码的奇数分配给男性,偶数分配给女性。

代码:

import sys
from pyspark.sql import SparkSession
import datetime

#将生日转换为星座
def birth_to_zodiac(birth):
    month = birth.month
    day = birth.day
    s = u"魔羯水瓶双鱼牡羊金牛双子巨蟹狮子处女天秤天蝎射手魔羯"
    # s = "AABBCCDDEEFFGGHHIIGGKKLLAA"
    # s = u"010203040506070809101112"
    arr = [20, 19, 21, 21, 21, 22, 23, 23, 23, 23, 22, 22]
    idx = month * 2 - (2 if day < arr[month - 1] else 0)
    return s[idx:idx + 2]

#通过身份证和订单日期计算用户的年龄和性别,星座
def user_profile(idcard, order_time):
    try:
        #xxxxxx19800101111xxxx
        birth = datetime.datetime.strptime(str(idcard)[6:14], '%Y%m%d')
        sex = u'男' if int(str(idcard)[14:17]) % 2 == 1 else u'女'
        order_time = datetime.datetime.strptime(order_time, '%Y-%m-%d')
        age = order_time.year - birth.year - ((order_time.month, order_time.day) < (birth.month, birth.day))
        zodiac = birth_to_zodiac(birth)
        return sex, age, zodiac
    except:
        return idcard, 0, order_time

#将数据中的内容进行拆分,取出身份证和订单日期
def idcardSplit(x):
    segs = x.split(',')
    idcard, order_date = segs[0], segs[1]
    #将身份证中的有效信息取出,进行脱敏
    idcard = 'xxxxxx' + idcard[6:17] + 'xxxx'
    return idcard, order_date


if __name__ == '__main__':

       #data_path:数据文件路径
    #save_path:结果的存储路径
    help_str = 'Usage : <data_path> <save_path>'
    if len(sys.argv) != 3:
        print(help_str)
        exit(0)

    spark = SparkSession.builder.appName('user_profile').getOrCreate()
    sc = spark.sparkContext
    userRDD = sc.textFile(sys.argv[1])

    # 将数据进行脱敏
    userMaskRDD = userRDD.map(lambda x: idcardSplit(x)).cache()
    userProfileRDD = userMaskRDD.filter(lambda x: len(x[0]) > 18).map(lambda x: user_profile(x[0], x[1]))

    # 脏数据
    dirty_data = userProfileRDD.filter(lambda x: x[1] == 0)
    dirty_data.take(0)

    cleanRDD = userProfileRDD.filter(lambda x: x[1] > 0)
    cleanRDD.saveAsTextFile(sys.argv[2])

    sc.stop()

results matching ""

    No results matching ""