四、spark-mllib实战-模型训练和模型评估
'''
在mlib中,基础数据结构是基于RDD的LabeledPoint:Label + Features
'''
'''
出生地是个字符串,并不是YNU这种简单的类型,我们采用Hash方法生成每个地址的词向量,其实相当于one-hot-encoding编码
https://stackoverflow.com/questions/35205865/what-is-the-difference-between-hashingtf-and-countvectorizer-in-spark
'''
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg
hashing = ft.HashingTF(7)
births_hashed = births_transformed \
.rdd \
.map(lambda row: [
list(hashing.transform(row[1]).toArray())
if col == 'BIRTH_PLACE'
else row[i]
for i, col
in enumerate(features_to_keep)]) \
.map(lambda row: [[e] if type(e) == int else e
for e in row]) \
.map(lambda row: [item for sublist in row
for item in sublist]) \
.map(lambda row: reg.LabeledPoint(
row[0],
ln.Vectors.dense(row[1:]))
)
'''
map1:
[[0, [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0], 29, 99, 0, 99, 999, 0, 0, 0, 0, 0],
[0, [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0], 22, 29, 0, 65, 180, 0, 0, 0, 0, 0]]
map2:
[
[[0], [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0], [29], [99], [0], [99], [999], [0], [
0], [0], [0], [0]],
[[0], [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0], [22], [29], [0],[65], [180], [0], [0], [0], [0], [0]]
]
map 3:
[[0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 29, 99, 0, 99, 999, 0, 0, 0, 0, 0],
[0,0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 22, 29, 0, 65, 180, 0, 0, 0, 0, 0]]
map 4:
[LabeledPoint(0.0, [0.0,0.0,1.0,0.0,0.0,0.0,0.0,29.0,99.0,0.0,99.0,999.0,0.0,0.0
,0.0,0.0,0.0]),
LabeledPoint(0.0, [0.0,0.0,1.0,0.0,0.0,0.0,0.0,22.0,29.0,0.0,65.0,180.0,0.0,0.0,0.0,0.0,0.0])]
'''
'''
拆分测试数据集、训练数据集
'''
births_train, births_test = births_hashed.randomSplit([0.6, 0.4])
'''训练模型好简单'''
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
LR_Model = LogisticRegressionWithLBFGS.train(births_train, iterations=10)
LR_results = (
births_test.map(lambda row: row.label) \
.zip(LR_Model \
.predict(births_test\
.map(lambda row: row.features)))
).map(lambda row: (row[0], row[1] * 1.0))
'''评估模型效果'''
import pyspark.mllib.evaluation as ev
LR_evaluation = ev.BinaryClassificationMetrics(LR_results)
print('Area under ROC: {0:.2f}'.format(LR_evaluation.areaUnderROC))
LR_evaluation.unpersist()
'''
Area under ROC: 0.62
'''
'''
使用随机森林
'''
from pyspark.mllib.tree import RandomForest
RF_model = RandomForest \
.trainClassifier(data=births_train,
numClasses=2,
categoricalFeaturesInfo={},
numTrees=6,
featureSubsetStrategy='all',
seed=666)
RF_results = (
births_test.map(lambda row: row.label) \
.zip(RF_model \
.predict(births_test \
.map(lambda row: row.features)))
)
RF_evaluation = ev.BinaryClassificationMetrics(RF_results)
print('Area under ROC: {0:.2f}' \
.format(RF_evaluation.areaUnderROC))
RF_evaluation.unpersist()
'''
Area under ROC: 0.62
'''