3、spark-mllib实战-特征筛选
需求:了解数据,对数据进行描述性统计,对数据进行感性认识
进行基本描述性统计
- 计算均值和方差
import pyspark.mllib.stat as st
import numpy as np
# 获得数值型变量的统计信息,包括均值和方差等
numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',
'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',
'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'
]
numeric_rdd = births_transformed\
.select(numeric_cols)\
.rdd \
.map(lambda row: [e for e in row])
mllib_stats = st.Statistics.colStats(numeric_rdd)
#count、max、mean、min、normL1、normL2、numNonzeros、variance
for col, m, v in zip(numeric_cols,
mllib_stats.mean(),
mllib_stats.variance()):
print('{0}: \t{1:.2f} \t {2:.2f}'.format(col, m, np.sqrt(v)))
# 类别变量,计算频数
categorical_cols = [e for e in births_transformed.columns
if e not in numeric_cols]
categorical_rdd = births_transformed\
.select(categorical_cols)\
.rdd \
.map(lambda row: [e for e in row])
for i, col in enumerate(categorical_cols):
agg = categorical_rdd \
.groupBy(lambda x: x[i]) \#分别以每一列做分组
.map(lambda row: (row[0], len(row[1])))#以不同取值作为key,以对应的数据的条数作为value
#对数据结果以元组中的第1个元组即上面代码中的value,降序排列
print(col, sorted(agg.collect(),
key=lambda el: el[1],
reverse=True))
for i, col in enumerate(categorical_cols):
agg = categorical_rdd \
.groupBy(lambda x: x[i]) \
.map(lambda row: (row[0], len(row[1])))
print(col, sorted(agg.collect(),
key=lambda el: el[1],
reverse=True))
- 数值型变量计算相关系数矩阵
#计算相关系数矩阵
corrs = st.Statistics.corr(numeric_rdd)
#只挑选相关系数>0.5的这些特征
for i, el in enumerate(corrs > 0.5):
correlated = [
(numeric_cols[j], corrs[i][j])
for j, e in enumerate(el)
if e == 1.0 and j != i]
if len(correlated) > 0:
for e in correlated:
print('{0}-to-{1}: {2:.2f}' \
.format(numeric_cols[i], e[0], e[1]))
'''
CIG_BEFORE-to-CIG_1_TRI: 0.83
CIG_BEFORE-to-CIG_2_TRI: 0.72
CIG_BEFORE-to-CIG_3_TRI: 0.62
CIG_1_TRI-to-CIG_BEFORE: 0.83
CIG_1_TRI-to-CIG_2_TRI: 0.87
CIG_1_TRI-to-CIG_3_TRI: 0.76
CIG_2_TRI-to-CIG_BEFORE: 0.72
CIG_2_TRI-to-CIG_1_TRI: 0.87
CIG_2_TRI-to-CIG_3_TRI: 0.89
CIG_3_TRI-to-CIG_BEFORE: 0.62
CIG_3_TRI-to-CIG_1_TRI: 0.76
CIG_3_TRI-to-CIG_2_TRI: 0.89
MOTHER_PRE_WEIGHT-to-MOTHER_DELIVERY_WEIGHT: 0.54
MOTHER_PRE_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.65
MOTHER_DELIVERY_WEIGHT-to-MOTHER_PRE_WEIGHT: 0.54
MOTHER_DELIVERY_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.60
MOTHER_WEIGHT_GAIN-to-MOTHER_PRE_WEIGHT: 0.65
MOTHER_WEIGHT_GAIN-to-MOTHER_DELIVERY_WEIGHT: 0.60
'''
#去掉相关性强的变量中的一个
features_to_keep = [
'INFANT_ALIVE_AT_REPORT',
'BIRTH_PLACE',
'MOTHER_AGE_YEARS',
'FATHER_COMBINED_AGE',
'CIG_1_TRI',
'MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT',
'DIABETES_PRE',
'DIABETES_GEST',
'HYP_TENS_PRE',
'HYP_TENS_GEST',
'PREV_BIRTH_PRETERM'
]
births_transformed = births_transformed.select([e for e in features_to_keep])
- 类别型变量计算相关性
#对于类别型变量,不能直接计算相关系数判断相关性,但是可以用chi-square来检验相关性
import pyspark.mllib.linalg as ln
for cat in categorical_cols[1:]:
#建立透视表,交叉部分为count结果
agg = births_transformed \
.groupby('INFANT_ALIVE_AT_REPORT') \
.pivot(cat) \
.count()
#由于数据中存在空值,需要将空值进行处理
agg_rdd = agg \
.rdd \
.map(lambda row: (row[1:])) \
.flatMap(lambda row:[0 if e == None else e for e in row]) \
.collect()
row_length = len(agg.collect()[0]) - 1
#创建一个矩阵
#参数1:行数
#参数2:列数
#参数3:数据
agg = ln.Matrices.dense(row_length, 2, agg_rdd)
test = st.Statistics.chiSqTest(agg)
print(cat, test.pValue)
'''
agg = births_transformed.groupby('INFANT_ALIVE_AT_REPORT').pivot( 'BIRTH_PLACE').count()
+----------------------+-----+---+---+---+---+---+---+----+
|INFANT_ALIVE_AT_REPORT| 1| 2| 3| 4| 5| 6| 7| 9|
+----------------------+-----+---+---+---+---+---+---+----+
| 1|22995|113|158| 39| 19| 2| 23|null|
| 0|21563| 23| 66|288| 55| 9| 68| 8|
+----------------------+-----+---+---+---+---+---+---+----+
BIRTH_PLACE 0.0
DIABETES_PRE 0.0
DIABETES_GEST 0.0
HYP_TENS_PRE 0.0
HYP_TENS_GEST 0.0
PREV_BIRTH_PRETERM 0.0
'''