4、物联网项目实战
4.1 数据解析-1
在物联网中,上传设备列表到服务器的数据格式为json,而此时的json往往嵌套层数非常多,非常复杂,我们需要掌握这种复杂json数据的处理
案例中涉及多个设备的数据,每个设备有多个传感器,下面的json就是就是一个设备的数据内容
'''
dc_id:为设备名称
source:为传感器列表名称
iot event
{
"dc_id": "dc-101",
"source": {
"sensor-igauge": {
"id": 10,
"ip": "68.28.91.22",
"description": "Sensor attached to the container ceilings",
"temp": 35,
"c02_level": 1475,
"geo": {
"lat": 38,
"long": 97
}
},
"sensor-ipad": {
"id": 13,
"ip": "67.185.72.1",
"description": "Sensor ipad attached to carbon cylinders",
"temp": 34,
"c02_level": 1370,
"geo": {
"lat": 47.41,
"long": -122
}
},
"sensor-inest": {
"id": 8,
"ip": "208.109.163.218",
"description": "Sensor attached to the factory ceilings",
"temp": 40,
"c02_level": 1346,
"geo": {
"lat": 33.61,
"long": -111.89
}
},
"sensor-istick": {
"id": 5,
"ip": "204.116.105.67",
"description": "Sensor embedded in exhaust pipes in the ceilings",
"temp": 40,
"c02_level": 1574,
"geo": {
"lat": 35.93,
"long": -85.46
}
}
}
}
'''
读取json数据,自设定schema
# -- drop table if exists iot_event;
# --
# -- CREATE TABLE iot_event USING json
# -- OPTIONS
# -- (path "/tmp/zxm/iot_event.json")
# -- ;
# -- select dc_id,source["sensor-igauge"]["description"] from iot_event;
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext
schema = StructType() \
.add("dc_id", StringType()) \
.add("source", MapType(StringType(),
StructType() \
.add("description", StringType()) \
.add("ip", StringType()) \
.add("id", LongType()) \
.add("temp", LongType()) \
.add("c02_level", LongType()) \
.add("geo", StructType() \
.add("lat", DoubleType()) \
.add("long", DoubleType())
)
)
)
# jsonDF = spark.read.json("/xxx/iot_event.json")
# 这样读出来后,source被看作是struct,我们希望它是map.于是加上schema
jsonDF = spark.read.schema(schema).json("file:///root/bigdata/data/iot_event.json")
jsonDF.printSchema()
jsonDF.show()
#+------+--------------------+
#| dc_id| source|
#+------+--------------------+
#|dc-101|[sensor-igauge ->...|
#+------+--------------------+
利用explode进一步解析数据
# 对于这种复杂的结构我们可以通过explode方法,将复杂的json对象拆解成key-value形式
# explode("source"):将source拆解为key-value形式,但是结果也带有dc_id
explodedDF = jsonDF.select("dc_id",explode("source"))
explodedDF.count()
explodedDF.show()
# +------+-------------+--------------------+
# | dc_id| key| value|
# +------+-------------+--------------------+
# |dc-101|sensor-igauge|[Sensor attached ...|
# |dc-101| sensor-ipad|[Sensor ipad atta...|
# |dc-101| sensor-inest|[Sensor attached ...|
# |dc-101|sensor-istick|[Sensor embedded ...|
# |dc-101|sensor-igauge|[Sensor attached ...|
# |dc-101| sensor-ipad|[Sensor ipad atta...|
# |dc-101| sensor-inest|[Sensor attached ...|
# |dc-101|sensor-istick|[Sensor embedded ...|
# |dc-101|sensor-igauge|[Sensor attached ...|
# |dc-101| sensor-ipad|[Sensor ipad atta...|
# |dc-101| sensor-inest|[Sensor attached ...|
# |dc-101|sensor-istick|[Sensor embedded ...|
# |dc-101|sensor-igauge|[Sensor attached ...|
# |dc-101| sensor-ipad|[Sensor ipad atta...|
# |dc-101| sensor-inest|[Sensor attached ...|
# |dc-101|sensor-istick|[Sensor embedded ...|
# |dc-101|sensor-igauge|[Sensor attached ...|
# |dc-101| sensor-ipad|[Sensor ipad atta...|
# |dc-101| sensor-inest|[Sensor attached ...|
# |dc-101|sensor-istick|[Sensor embedded ...|
# +------+-------------+--------------------+
explodedDF.printSchema()
# root
# |-- dc_id: string (nullable = true)
# |-- key: string (nullable = false)
# |-- value: struct (nullable = true)
# | |-- description: string (nullable = true)
# | |-- ip: string (nullable = true)
# | |-- id: long (nullable = true)
# | |-- temp: long (nullable = true)
# | |-- c02_level: long (nullable = true)
# | |-- geo: struct (nullable = true)
# | | |-- lat: double (nullable = true)
# | | |-- long: double (nullable = true)
通过getItem将数据进一步提取
#获取id数据
notifydevicesDS = explodedDF \
.select( "dc_id", "key",explodedDF.value.getItem("id"))
notifydevicesDS.printSchema()
#获取id和temp数据
notifydevicesDS = explodedDF \
.select( "dc_id", "key",
explodedDF.value.getItem("id") ,
explodedDF.value['temp'] )
notifydevicesDS.printSchema()
#为ip,temp等数据取别名
notifydevicesDF = explodedDF \
.select( "dc_id", "key",
explodedDF.value.getItem("id") ,
explodedDF.value['ip'].alias('ip'),
explodedDF.value['temp'].alias('tmp') ,
explodedDF.value['geo']['lat'].alias('lat') ,
explodedDF.value['geo']['long'].alias('long')
)
notifydevicesDF.printSchema()
notifydevicesDF.show()
封装一个json转换为DataFrame的方法
# Convenience function for turning JSON strings into DataFrames.
# 下面这个函数的功能是把一个json字符串转化维df
def jsonToDataFrame(json, schema=None):
# SparkSessions are available with Spark 2.0+
reader = spark.read
if schema:
reader.schema(schema)
return reader.json(sc.parallelize([json]))
4.2 数据解析-2
from pyspark.sql.types import *
iot_event = [
"""{
"devices": {
"thermostats": {
"peyiJNo0IldT2YlIVtYaGQ": {
"device_id": "peyiJNo0IldT2YlIVtYaGQ",
"locale": "en-US",
"software_version": "4.0",
"structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
"where_name": "Hallway Upstairs",
"last_connection": "2016-10-31T23:59:59.000Z",
"is_online": true,
"can_cool": true,
"can_heat": true,
"is_using_emergency_heat": true,
"has_fan": true,
"fan_timer_active": true,
"fan_timer_timeout": "2016-10-31T23:59:59.000Z",
"temperature_scale": "F",
"target_temperature_f": 72,
"target_temperature_high_f": 80,
"target_temperature_low_f": 65,
"eco_temperature_high_f": 80,
"eco_temperature_low_f": 65,
"away_temperature_high_f": 80,
"away_temperature_low_f": 65,
"hvac_mode": "heat",
"humidity": 40,
"hvac_state": "heating",
"is_locked": true,
"locked_temp_min_f": 65,
"locked_temp_max_f": 80
}
},
"smoke_co_alarms": {
"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
"device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
"locale": "en-US",
"software_version": "1.01",
"structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
"where_name": "Jane's Room",
"last_connection": "2016-10-31T23:59:59.000Z",
"is_online": true,
"battery_health": "ok",
"co_alarm_state": "ok",
"smoke_alarm_state": "ok",
"is_manual_test_active": true,
"last_manual_test_time": "2016-10-31T23:59:59.000Z",
"ui_color_state": "gray"
}
},
"cameras": {
"awJo6rH0IldT2YlIVtYaGQ": {
"device_id": "awJo6rH",
"software_version": "4.0",
"structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
"where_name": "Foyer",
"is_online": true,
"is_streaming": true,
"is_audio_input_enabled": true,
"last_is_online_change": "2016-12-29T18:42:00.000Z",
"is_video_history_enabled": true,
"web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
"app_url": "nestmobile://cameras/device_id?auth=access_token",
"is_public_share_enabled": true,
"activity_zones": { "name": "Walkway", "id": 244083 },
"last_event": "2016-10-31T23:59:59.000Z"
}
}
}
}"""]
iot_event_schema = StructType() \
.add("devices", StructType() \
.add("thermostats", MapType(StringType(), StructType() \
.add("device_id", StringType(),True)\
.add("locale", StringType(),True) \
.add("software_version", StringType(),True) \
.add("structure_id", StringType(),True) \
.add("where_name", StringType(),True) \
.add("last_connection", StringType(),True) \
.add("is_online", BooleanType(),True) \
.add("can_cool", BooleanType(),True) \
.add("can_heat", BooleanType(),True) \
.add("is_using_emergency_heat", BooleanType(),True) \
.add("has_fan", BooleanType(),True) \
.add("fan_timer_active", BooleanType(),True) \
.add("fan_timer_timeout", StringType(),True) \
.add("temperature_scale", StringType(),True) \
.add("target_temperature_f", DoubleType(),True) \
.add("target_temperature_high_f", DoubleType(),True) \
.add("target_temperature_low_f", DoubleType(),True) \
.add("eco_temperature_high_f", DoubleType(),True) \
.add("eco_temperature_low_f", DoubleType(),True) \
.add("away_temperature_high_f", DoubleType(),True) \
.add("away_temperature_low_f", DoubleType(),True) \
.add("hvac_mode", StringType(),True) \
.add("humidity", LongType(),True) \
.add("hvac_state", StringType(),True) \
.add("is_locked", StringType(),True) \
.add("locked_temp_min_f", DoubleType()) \
.add("locked_temp_max_f", DoubleType()))) \
.add("smoke_co_alarms", MapType(StringType() , StructType() \
.add("device_id", StringType()) \
.add("locale", StringType()) \
.add("software_version", StringType()) \
.add("structure_id", StringType()) \
.add("where_name", StringType()) \
.add("last_connection", StringType()) \
.add("is_online", BooleanType()) \
.add("battery_health", StringType()) \
.add("co_alarm_state", StringType()) \
.add("smoke_alarm_state", StringType()) \
.add("is_manual_test_active", BooleanType()) \
.add("last_manual_test_time", StringType()) \
.add("ui_color_state", StringType()))) \
.add("cameras", MapType(StringType(), StructType() \
.add("device_id", StringType()) \
.add("software_version", StringType()) \
.add("structure_id", StringType()) \
.add("where_name", StringType()) \
.add("is_online", BooleanType()) \
.add("is_streaming", BooleanType()) \
.add("is_audio_input_enabled", BooleanType()) \
.add("last_is_online_change", StringType()) \
.add("is_video_history_enabled", BooleanType()) \
.add("web_url", StringType()) \
.add("app_url", StringType()) \
.add("is_public_share_enabled", BooleanType()) \
.add("activity_zones",StructType() \
.add("name", StringType()) \
.add("id", LongType())) \
.add("last_event", StringType()))))
#上述操作会报错:
# spark.debug.maxToStringFields。。。
#原因是默认的StringFields有限制个数
#可以通过重新初始化SparkSession添加config来解决
spark = SparkSession \
.builder \
.config('spark.debug.maxToStringFields', '100')\
.getOrCreate()
iot_event_rdd=sc.parallelize(iot_event)
iot_event_df = spark.read.schema(iot_event_schema).json(iot_event_rdd)
'''
>>> iot_event_df.printSchema()
root
|-- devices: struct (nullable = true)
| |-- thermostats: map (nullable = true)
| | |-- key: string
| | |-- value: struct (valueContainsNull = true)
| | | |-- device_id: string (nullable = true)
| | | |-- locale: string (nullable = true)
| | | |-- software_version: string (nullable = true)
| | | |-- structure_id: string (nullable = true)
| | | |-- where_name: string (nullable = true)
| | | |-- last_connection: string (nullable = true)
| | | |-- is_online: boolean (nullable = true)
| | | |-- can_cool: boolean (nullable = true)
| | | |-- can_heat: boolean (nullable = true)
| | | |-- is_using_emergency_heat: boolean (nullable = true)
| | | |-- has_fan: boolean (nullable = true)
| | | |-- fan_timer_active: boolean (nullable = true)
| | | |-- fan_timer_timeout: string (nullable = true)
| | | |-- temperature_scale: string (nullable = true)
| | | |-- target_temperature_f: double (nullable = true)
| | | |-- target_temperature_high_f: double (nullable = true)
| | | |-- target_temperature_low_f: double (nullable = true)
| | | |-- eco_temperature_high_f: double (nullable = true)
| | | |-- eco_temperature_low_f: double (nullable = true)
| | | |-- away_temperature_high_f: double (nullable = true)
| | | |-- away_temperature_low_f: double (nullable = true)
| | | |-- hvac_mode: string (nullable = true)
| | | |-- humidity: long (nullable = true)
| | | |-- hvac_state: string (nullable = true)
| | | |-- is_locked: string (nullable = true)
| | | |-- locked_temp_min_f: double (nullable = true)
| | | |-- locked_temp_max_f: double (nullable = true)
| |-- smoke_co_alarms: map (nullable = true)
| | |-- key: string
| | |-- value: struct (valueContainsNull = true)
| | | |-- device_id: string (nullable = true)
| | | |-- locale: string (nullable = true)
| | | |-- software_version: string (nullable = true)
| | | |-- structure_id: string (nullable = true)
| | | |-- where_name: string (nullable = true)
| | | |-- last_connection: string (nullable = true)
| | | |-- is_online: boolean (nullable = true)
| | | |-- battery_health: string (nullable = true)
| | | |-- co_alarm_state: string (nullable = true)
| | | |-- smoke_alarm_state: string (nullable = true)
| | | |-- is_manual_test_active: boolean (nullable = true)
| | | |-- last_manual_test_time: string (nullable = true)
| | | |-- ui_color_state: string (nullable = true)
| |-- cameras: map (nullable = true)
| | |-- key: string
| | |-- value: struct (valueContainsNull = true)
| | | |-- device_id: string (nullable = true)
| | | |-- software_version: string (nullable = true)
| | | |-- structure_id: string (nullable = true)
| | | |-- where_name: string (nullable = true)
| | | |-- is_online: boolean (nullable = true)
| | | |-- is_streaming: boolean (nullable = true)
| | | |-- is_audio_input_enabled: boolean (nullable = true)
| | | |-- last_is_online_change: string (nullable = true)
| | | |-- is_video_history_enabled: boolean (nullable = true)
| | | |-- web_url: string (nullable = true)
| | | |-- app_url: string (nullable = true)
| | | |-- is_public_share_enabled: boolean (nullable = true)
| | | |-- activity_zones: struct (nullable = true)
| | | | |-- name: string (nullable = true)
| | | | |-- id: long (nullable = true)
| | | |-- last_event: string (nullable = true)
'''
mapColumnsDF = iot_event_df.select(iot_event_df.devices.getItem("smoke_co_alarms").alias ("smoke_alarms"),iot_event_df.devices.getItem("cameras").alias ("cameras"),iot_event_df.devices.getItem("thermostats").alias ("thermostats"))
from pyspark.sql.functions import *
ex_ThermostatsDF = mapColumnsDF.select(explode("thermostats"))
thermostateDF = ex_ThermostatsDF \
.select(ex_ThermostatsDF.value.getItem("device_id").alias("device_id"),
ex_ThermostatsDF.value.getItem("locale").alias("locale"),
ex_ThermostatsDF.value.getItem("where_name").alias("location"),
ex_ThermostatsDF.value.getItem("last_connection").alias("last_connected"),
ex_ThermostatsDF.value.getItem("humidity").alias("humidity"),
ex_ThermostatsDF.value.getItem("target_temperature_f").alias("target_temperature_f"),
ex_ThermostatsDF.value.getItem("hvac_mode").alias("mode"),
ex_ThermostatsDF.value.getItem("software_version").alias("version")
)
'''
>>> thermostateDF.printSchema()
root
|-- device_id: string (nullable = true)
|-- locale: string (nullable = true)
|-- location: string (nullable = true)
|-- last_connected: string (nullable = true)
|-- humidity: long (nullable = true)
|-- target_temperature_f: double (nullable = true)
|-- mode: string (nullable = true)
|-- version: string (nullable = true)
>>> thermostateDF.show()
'''
ex_CamerasDF = mapColumnsDF.select(explode("cameras"))
cameraDF = ex_CamerasDF \
.select(ex_CamerasDF.value.getItem("device_id").alias("device_id"),
ex_CamerasDF.value.getItem("where_name").alias("location"),
ex_CamerasDF.value.getItem("software_version").alias("version"),
ex_CamerasDF.value.getItem("activity_zones").getItem("name").alias("name"),
ex_CamerasDF.value.getItem("activity_zones").getItem("id").alias("id")
)
//or you could use the original iot_event_df and use the devices.X notation
ex_SmokedAlarmsDF = iot_event_df.select(explode("devices.smoke_co_alarms"))
smokedAlarmsDF = ex_SmokedAlarmsDF \
.select(ex_SmokedAlarmsDF.value.getItem("device_id").alias("device_id"),
ex_SmokedAlarmsDF.value.getItem("where_name").alias("location"),
ex_SmokedAlarmsDF.value.getItem("software_version").alias("version"),
ex_SmokedAlarmsDF.value.getItem("last_connection").alias("last_connected"),
ex_SmokedAlarmsDF.value.getItem("battery_health").alias("battery_health"))