4、物联网项目实战

4.1 数据解析-1

在物联网中,上传设备列表到服务器的数据格式为json,而此时的json往往嵌套层数非常多,非常复杂,我们需要掌握这种复杂json数据的处理

案例中涉及多个设备的数据,每个设备有多个传感器,下面的json就是就是一个设备的数据内容

'''
dc_id:为设备名称
source:为传感器列表名称
iot event
{
    "dc_id": "dc-101",
    "source": {
        "sensor-igauge": {
            "id": 10,
            "ip": "68.28.91.22",
            "description": "Sensor attached to the container ceilings",
            "temp": 35,
            "c02_level": 1475,
            "geo": {
                "lat": 38,
                "long": 97
            }
        },
        "sensor-ipad": {
            "id": 13,
            "ip": "67.185.72.1",
            "description": "Sensor ipad attached to carbon cylinders",
            "temp": 34,
            "c02_level": 1370,
            "geo": {
                "lat": 47.41,
                "long": -122
            }
        },
        "sensor-inest": {
            "id": 8,
            "ip": "208.109.163.218",
            "description": "Sensor attached to the factory ceilings",
            "temp": 40,
            "c02_level": 1346,
            "geo": {
                "lat": 33.61,
                "long": -111.89
            }
        },
        "sensor-istick": {
            "id": 5,
            "ip": "204.116.105.67",
            "description": "Sensor embedded in exhaust pipes in the ceilings",
            "temp": 40,
            "c02_level": 1574,
            "geo": {
                "lat": 35.93,
                "long": -85.46
            }
        }
    }
}
'''

读取json数据,自设定schema

# -- drop table if exists iot_event;
# --
# -- CREATE TABLE iot_event USING json
# -- OPTIONS
# -- (path "/tmp/zxm/iot_event.json")
# -- ;
# -- select dc_id,source["sensor-igauge"]["description"]  from iot_event;

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext
schema = StructType() \
    .add("dc_id", StringType()) \
    .add("source", MapType(StringType(),
                           StructType() \
                           .add("description", StringType()) \
                           .add("ip", StringType()) \
                           .add("id", LongType()) \
                           .add("temp", LongType()) \
                           .add("c02_level", LongType()) \
                           .add("geo", StructType() \
                                .add("lat", DoubleType()) \
                                .add("long", DoubleType())
                                )
                           )
         )
# jsonDF = spark.read.json("/xxx/iot_event.json")
# 这样读出来后,source被看作是struct,我们希望它是map.于是加上schema
jsonDF = spark.read.schema(schema).json("file:///root/bigdata/data/iot_event.json")

jsonDF.printSchema()
jsonDF.show()
#+------+--------------------+
#| dc_id|              source|
#+------+--------------------+
#|dc-101|[sensor-igauge ->...|
#+------+--------------------+

利用explode进一步解析数据

# 对于这种复杂的结构我们可以通过explode方法,将复杂的json对象拆解成key-value形式
# explode("source"):将source拆解为key-value形式,但是结果也带有dc_id
explodedDF = jsonDF.select("dc_id",explode("source"))
explodedDF.count()
explodedDF.show()
# +------+-------------+--------------------+
# | dc_id|          key|               value|
# +------+-------------+--------------------+
# |dc-101|sensor-igauge|[Sensor attached ...|
# |dc-101|  sensor-ipad|[Sensor ipad atta...|
# |dc-101| sensor-inest|[Sensor attached ...|
# |dc-101|sensor-istick|[Sensor embedded ...|
# |dc-101|sensor-igauge|[Sensor attached ...|
# |dc-101|  sensor-ipad|[Sensor ipad atta...|
# |dc-101| sensor-inest|[Sensor attached ...|
# |dc-101|sensor-istick|[Sensor embedded ...|
# |dc-101|sensor-igauge|[Sensor attached ...|
# |dc-101|  sensor-ipad|[Sensor ipad atta...|
# |dc-101| sensor-inest|[Sensor attached ...|
# |dc-101|sensor-istick|[Sensor embedded ...|
# |dc-101|sensor-igauge|[Sensor attached ...|
# |dc-101|  sensor-ipad|[Sensor ipad atta...|
# |dc-101| sensor-inest|[Sensor attached ...|
# |dc-101|sensor-istick|[Sensor embedded ...|
# |dc-101|sensor-igauge|[Sensor attached ...|
# |dc-101|  sensor-ipad|[Sensor ipad atta...|
# |dc-101| sensor-inest|[Sensor attached ...|
# |dc-101|sensor-istick|[Sensor embedded ...|
# +------+-------------+--------------------+


explodedDF.printSchema()
# root
#  |-- dc_id: string (nullable = true)
#  |-- key: string (nullable = false)
#  |-- value: struct (nullable = true)
#  |    |-- description: string (nullable = true)
#  |    |-- ip: string (nullable = true)
#  |    |-- id: long (nullable = true)
#  |    |-- temp: long (nullable = true)
#  |    |-- c02_level: long (nullable = true)
#  |    |-- geo: struct (nullable = true)
#  |    |    |-- lat: double (nullable = true)
#  |    |    |-- long: double (nullable = true)

通过getItem将数据进一步提取

#获取id数据
notifydevicesDS = explodedDF \
    .select( "dc_id", "key",explodedDF.value.getItem("id"))
notifydevicesDS.printSchema()

#获取id和temp数据
notifydevicesDS = explodedDF \
    .select( "dc_id", "key",
             explodedDF.value.getItem("id") ,
             explodedDF.value['temp'] )
notifydevicesDS.printSchema()

#为ip,temp等数据取别名
notifydevicesDF = explodedDF \
    .select( "dc_id", "key",
             explodedDF.value.getItem("id") ,
             explodedDF.value['ip'].alias('ip'),
             explodedDF.value['temp'].alias('tmp') ,
             explodedDF.value['geo']['lat'].alias('lat') ,
             explodedDF.value['geo']['long'].alias('long')
             )

notifydevicesDF.printSchema()
notifydevicesDF.show()

封装一个json转换为DataFrame的方法

# Convenience function for turning JSON strings into DataFrames.
# 下面这个函数的功能是把一个json字符串转化维df
def jsonToDataFrame(json, schema=None):
  # SparkSessions are available with Spark 2.0+
  reader = spark.read
  if schema:
    reader.schema(schema)
  return reader.json(sc.parallelize([json]))

4.2 数据解析-2

from pyspark.sql.types import *

iot_event = [
    """{
    "devices": {
       "thermostats": {
          "peyiJNo0IldT2YlIVtYaGQ": {
            "device_id": "peyiJNo0IldT2YlIVtYaGQ",
            "locale": "en-US",
            "software_version": "4.0",
            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            "where_name": "Hallway Upstairs",
            "last_connection": "2016-10-31T23:59:59.000Z",
            "is_online": true,
            "can_cool": true,
            "can_heat": true,
            "is_using_emergency_heat": true,
            "has_fan": true,
            "fan_timer_active": true,
            "fan_timer_timeout": "2016-10-31T23:59:59.000Z",
            "temperature_scale": "F",
            "target_temperature_f": 72,
            "target_temperature_high_f": 80,
            "target_temperature_low_f": 65,
            "eco_temperature_high_f": 80,
            "eco_temperature_low_f": 65,
            "away_temperature_high_f": 80,
            "away_temperature_low_f": 65,
            "hvac_mode": "heat",
            "humidity": 40,
            "hvac_state": "heating",
            "is_locked": true,
            "locked_temp_min_f": 65,
            "locked_temp_max_f": 80
            }
          },
          "smoke_co_alarms": {
            "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
              "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
              "locale": "en-US",
              "software_version": "1.01",
              "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
              "where_name": "Jane's Room",
              "last_connection": "2016-10-31T23:59:59.000Z",
              "is_online": true,
              "battery_health": "ok",
              "co_alarm_state": "ok",
              "smoke_alarm_state": "ok",
              "is_manual_test_active": true,
              "last_manual_test_time": "2016-10-31T23:59:59.000Z",
              "ui_color_state": "gray"
              }
            },
         "cameras": {
          "awJo6rH0IldT2YlIVtYaGQ": {
            "device_id": "awJo6rH",
            "software_version": "4.0",
            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            "where_name": "Foyer",
            "is_online": true,
            "is_streaming": true,
            "is_audio_input_enabled": true,
            "last_is_online_change": "2016-12-29T18:42:00.000Z",
            "is_video_history_enabled": true,
            "web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
            "app_url": "nestmobile://cameras/device_id?auth=access_token",
            "is_public_share_enabled": true,
            "activity_zones": { "name": "Walkway", "id": 244083 },
            "last_event": "2016-10-31T23:59:59.000Z"
            }
          }
        }
       }"""]

iot_event_schema = StructType() \
    .add("devices", StructType() \
         .add("thermostats", MapType(StringType(), StructType() \
                                     .add("device_id", StringType(),True)\
                                     .add("locale", StringType(),True) \
                                     .add("software_version", StringType(),True) \
                                     .add("structure_id", StringType(),True) \
                                     .add("where_name", StringType(),True) \
                                     .add("last_connection", StringType(),True) \
                                     .add("is_online", BooleanType(),True) \
                                     .add("can_cool", BooleanType(),True) \
                                     .add("can_heat", BooleanType(),True) \
                                     .add("is_using_emergency_heat", BooleanType(),True) \
                                     .add("has_fan", BooleanType(),True) \
                                     .add("fan_timer_active", BooleanType(),True) \
                                     .add("fan_timer_timeout", StringType(),True) \
                                     .add("temperature_scale", StringType(),True) \
                                     .add("target_temperature_f", DoubleType(),True) \
                                     .add("target_temperature_high_f", DoubleType(),True) \
                                     .add("target_temperature_low_f", DoubleType(),True) \
                                     .add("eco_temperature_high_f", DoubleType(),True) \
                                     .add("eco_temperature_low_f", DoubleType(),True) \
                                     .add("away_temperature_high_f", DoubleType(),True) \
                                     .add("away_temperature_low_f", DoubleType(),True) \
                                     .add("hvac_mode", StringType(),True) \
                                     .add("humidity", LongType(),True) \
                                     .add("hvac_state", StringType(),True) \
                                     .add("is_locked", StringType(),True) \
                                     .add("locked_temp_min_f", DoubleType()) \
                                     .add("locked_temp_max_f", DoubleType()))) \
         .add("smoke_co_alarms", MapType(StringType() , StructType() \
                                         .add("device_id", StringType()) \
                                         .add("locale", StringType()) \
                                         .add("software_version", StringType()) \
                                         .add("structure_id", StringType()) \
                                         .add("where_name", StringType()) \
                                         .add("last_connection", StringType()) \
                                         .add("is_online", BooleanType()) \
                                         .add("battery_health", StringType()) \
                                         .add("co_alarm_state", StringType()) \
                                         .add("smoke_alarm_state", StringType()) \
                                         .add("is_manual_test_active", BooleanType()) \
                                         .add("last_manual_test_time", StringType()) \
                                         .add("ui_color_state", StringType()))) \
         .add("cameras", MapType(StringType(), StructType() \
                                 .add("device_id", StringType()) \
                                 .add("software_version", StringType()) \
                                 .add("structure_id", StringType()) \
                                 .add("where_name", StringType()) \
                                 .add("is_online", BooleanType()) \
                                 .add("is_streaming", BooleanType()) \
                                 .add("is_audio_input_enabled", BooleanType()) \
                                 .add("last_is_online_change", StringType()) \
                                 .add("is_video_history_enabled", BooleanType()) \
                                 .add("web_url", StringType()) \
                                 .add("app_url", StringType()) \
                                 .add("is_public_share_enabled", BooleanType()) \
                                 .add("activity_zones",StructType() \
                                      .add("name", StringType()) \
                                      .add("id", LongType())) \
                                 .add("last_event", StringType()))))

#上述操作会报错:
# spark.debug.maxToStringFields。。。
#原因是默认的StringFields有限制个数
#可以通过重新初始化SparkSession添加config来解决
spark = SparkSession \
        .builder \
        .config('spark.debug.maxToStringFields', '100')\
        .getOrCreate()

iot_event_rdd=sc.parallelize(iot_event)

iot_event_df = spark.read.schema(iot_event_schema).json(iot_event_rdd)

'''
>>> iot_event_df.printSchema()
root
 |-- devices: struct (nullable = true)
 |    |-- thermostats: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |-- device_id: string (nullable = true)
 |    |    |    |-- locale: string (nullable = true)
 |    |    |    |-- software_version: string (nullable = true)
 |    |    |    |-- structure_id: string (nullable = true)
 |    |    |    |-- where_name: string (nullable = true)
 |    |    |    |-- last_connection: string (nullable = true)
 |    |    |    |-- is_online: boolean (nullable = true)
 |    |    |    |-- can_cool: boolean (nullable = true)
 |    |    |    |-- can_heat: boolean (nullable = true)
 |    |    |    |-- is_using_emergency_heat: boolean (nullable = true)
 |    |    |    |-- has_fan: boolean (nullable = true)
 |    |    |    |-- fan_timer_active: boolean (nullable = true)
 |    |    |    |-- fan_timer_timeout: string (nullable = true)
 |    |    |    |-- temperature_scale: string (nullable = true)
 |    |    |    |-- target_temperature_f: double (nullable = true)
 |    |    |    |-- target_temperature_high_f: double (nullable = true)
 |    |    |    |-- target_temperature_low_f: double (nullable = true)
 |    |    |    |-- eco_temperature_high_f: double (nullable = true)
 |    |    |    |-- eco_temperature_low_f: double (nullable = true)
 |    |    |    |-- away_temperature_high_f: double (nullable = true)
 |    |    |    |-- away_temperature_low_f: double (nullable = true)
 |    |    |    |-- hvac_mode: string (nullable = true)
 |    |    |    |-- humidity: long (nullable = true)
 |    |    |    |-- hvac_state: string (nullable = true)
 |    |    |    |-- is_locked: string (nullable = true)
 |    |    |    |-- locked_temp_min_f: double (nullable = true)
 |    |    |    |-- locked_temp_max_f: double (nullable = true)
 |    |-- smoke_co_alarms: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |-- device_id: string (nullable = true)
 |    |    |    |-- locale: string (nullable = true)
 |    |    |    |-- software_version: string (nullable = true)
 |    |    |    |-- structure_id: string (nullable = true)
 |    |    |    |-- where_name: string (nullable = true)
 |    |    |    |-- last_connection: string (nullable = true)
 |    |    |    |-- is_online: boolean (nullable = true)
 |    |    |    |-- battery_health: string (nullable = true)
 |    |    |    |-- co_alarm_state: string (nullable = true)
 |    |    |    |-- smoke_alarm_state: string (nullable = true)
 |    |    |    |-- is_manual_test_active: boolean (nullable = true)
 |    |    |    |-- last_manual_test_time: string (nullable = true)
 |    |    |    |-- ui_color_state: string (nullable = true)
 |    |-- cameras: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |-- device_id: string (nullable = true)
 |    |    |    |-- software_version: string (nullable = true)
 |    |    |    |-- structure_id: string (nullable = true)
 |    |    |    |-- where_name: string (nullable = true)
 |    |    |    |-- is_online: boolean (nullable = true)
 |    |    |    |-- is_streaming: boolean (nullable = true)
 |    |    |    |-- is_audio_input_enabled: boolean (nullable = true)
 |    |    |    |-- last_is_online_change: string (nullable = true)
 |    |    |    |-- is_video_history_enabled: boolean (nullable = true)
 |    |    |    |-- web_url: string (nullable = true)
 |    |    |    |-- app_url: string (nullable = true)
 |    |    |    |-- is_public_share_enabled: boolean (nullable = true)
 |    |    |    |-- activity_zones: struct (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- last_event: string (nullable = true)
'''

mapColumnsDF = iot_event_df.select(iot_event_df.devices.getItem("smoke_co_alarms").alias ("smoke_alarms"),iot_event_df.devices.getItem("cameras").alias ("cameras"),iot_event_df.devices.getItem("thermostats").alias ("thermostats"))

from pyspark.sql.functions import *
ex_ThermostatsDF = mapColumnsDF.select(explode("thermostats"))

thermostateDF = ex_ThermostatsDF \
    .select(ex_ThermostatsDF.value.getItem("device_id").alias("device_id"),
           ex_ThermostatsDF.value.getItem("locale").alias("locale"),
           ex_ThermostatsDF.value.getItem("where_name").alias("location"),
           ex_ThermostatsDF.value.getItem("last_connection").alias("last_connected"),
           ex_ThermostatsDF.value.getItem("humidity").alias("humidity"),
ex_ThermostatsDF.value.getItem("target_temperature_f").alias("target_temperature_f"),
           ex_ThermostatsDF.value.getItem("hvac_mode").alias("mode"),
           ex_ThermostatsDF.value.getItem("software_version").alias("version")
           )
'''
>>> thermostateDF.printSchema()
root
 |-- device_id: string (nullable = true)
 |-- locale: string (nullable = true)
 |-- location: string (nullable = true)
 |-- last_connected: string (nullable = true)
 |-- humidity: long (nullable = true)
 |-- target_temperature_f: double (nullable = true)
 |-- mode: string (nullable = true)
 |-- version: string (nullable = true)

>>> thermostateDF.show()
'''

ex_CamerasDF = mapColumnsDF.select(explode("cameras"))
cameraDF = ex_CamerasDF \
    .select(ex_CamerasDF.value.getItem("device_id").alias("device_id"),
            ex_CamerasDF.value.getItem("where_name").alias("location"),
            ex_CamerasDF.value.getItem("software_version").alias("version"),
            ex_CamerasDF.value.getItem("activity_zones").getItem("name").alias("name"),
            ex_CamerasDF.value.getItem("activity_zones").getItem("id").alias("id")
    )

//or you could use the original iot_event_df and use the devices.X notation

ex_SmokedAlarmsDF =  iot_event_df.select(explode("devices.smoke_co_alarms"))

smokedAlarmsDF = ex_SmokedAlarmsDF \
    .select(ex_SmokedAlarmsDF.value.getItem("device_id").alias("device_id"),
            ex_SmokedAlarmsDF.value.getItem("where_name").alias("location"),
            ex_SmokedAlarmsDF.value.getItem("software_version").alias("version"),
            ex_SmokedAlarmsDF.value.getItem("last_connection").alias("last_connected"),
            ex_SmokedAlarmsDF.value.getItem("battery_health").alias("battery_health"))

results matching ""

    No results matching ""