3、JSON数据的处理

3.1 介绍

JSON数据

  • Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame

    Spark SQL能够自动将JSON数据集以结构化的形式加载为一个DataFrame

  • This conversion can be done using SparkSession.read.json on a JSON file

    读取一个JSON文件可以用SparkSession.read.json方法

从JSON到DataFrame

  • 指定DataFrame的schema

    1,通过反射自动推断,适合静态数据

    2,程序指定,适合程序运行中动态生成的数据

加载json数据

#使用内部的schema
jsonDF = spark.read.json("xxx.json")
jsonDF = spark.read.format('json').load('xxx.json')

#指定schema
jsonDF = spark.read.schema(jsonSchema).json('xxx.json')

嵌套结构的JSON

  • 重要的方法

    1,get_json_object

    2,get_json

    3,explode

3.2 实践

3.1 静态json数据的读取和操作

无嵌套结构的json数据

from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext

# ==========================================
#                无嵌套结构的json
# ==========================================
jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]

从json字符串数组得到DataFrame

# 从json字符串数组得到rdd有两种方法
# 1. 转换为rdd,再从rdd到DataFrame
# 2. 直接利用spark.createDataFrame(),见后面例子

jsonRDD = sc.parallelize(jsonString)   # stringJSONRDD
jsonDF =  spark.read.json(jsonRDD)  # convert RDD into DataFrame
jsonDF.printSchema()
jsonDF.show()

直接从文件生成DataFrame

# -- 直接从文件生成DataFrame
#只有被压缩后的json文件内容,才能被spark-sql正确读取,否则格式化后的数据读取会出现问题
jsonDF = spark.read.json("xxx.json")
# or
# jsonDF = spark.read.format('json').load('xxx.json')

jsonDF.printSchema()
jsonDF.show()

jsonDF.filter(jsonDF.pop>4000).show(10)
#依照已有的DataFrame,创建一个临时的表(相当于mysql数据库中的一个表),这样就可以用纯sql语句进行数据操作
jsonDF.createOrReplaceTempView("tmp_table")

resultDF = spark.sql("select * from tmp_table where pop>4000")
resultDF.show(10)

3.2 动态json数据的读取和操作

指定DataFrame的Schema

3.1节中的例子为通过反射自动推断schema,适合静态数据

下面我们来讲解如何进行程序指定schema

没有嵌套结构的json

jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]

jsonRDD = sc.parallelize(jsonString)

from pyspark.sql.types import *

#定义结构类型
#StructType:schema的整体结构,表示JSON的对象结构
#XXXStype:指的是某一列的数据类型
jsonSchema = StructType() \
  .add("id", StringType(),True) \
  .add("city", StringType()) \
  .add("pop" , LongType()) \
  .add("state",StringType())

jsonSchema = StructType() \
  .add("id", LongType(),True) \
  .add("city", StringType()) \
  .add("pop" , DoubleType()) \
  .add("state",StringType())

reader = spark.read.schema(jsonSchema)

jsonDF = reader.json(jsonRDD)
jsonDF.printSchema()
jsonDF.show()

带有嵌套结构的json

from pyspark.sql.types import *
jsonSchema = StructType([
    StructField("id", StringType(), True),
    StructField("city", StringType(), True),
    StructField("loc" , ArrayType(DoubleType())),
    StructField("pop", LongType(), True),
    StructField("state", StringType(), True)
])

reader = spark.read.schema(jsonSchema)
jsonDF = reader.json('data/nest.json')
jsonDF.printSchema()
jsonDF.show(2)
jsonDF.filter(jsonDF.pop>4000).show(10)

3.3 复杂结构的处理

eventArray = [(0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),

 (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),

 (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }""")]

from pyspark.sql.functions import *
from pyspark.sql.types import *

eventJSONDF = spark.createDataFrame(eventArray,('key','json'))

'''
get_json_object:
spark 从1.6开始提供了get_json_object方法,可以把一个json字符串解析成json对象,进而提取其中的字段数据,但这个方法每次提取一个字段,
如果有多个字段要提取,就需要写多个get_json_object方法,如下:
'''
eventDF = eventJSONDF \
    .select(eventJSONDF.key,
            get_json_object(eventJSONDF.json, '$.device_id').alias("device_id"),
            get_json_object(eventJSONDF.json, '$.device_type').alias("device_type"),
            get_json_object(eventJSONDF.json, '$.ip').alias("ip")
            )

eventDF.select(["device_id","ip"]).show()
eventDF.filter(eventDF.key>1).show()

'''
from_json可以看作是get_json_object的孪生兄弟,它是利用Schema来提取信息的。
使用from_json,我们就避免多次写get_json_object了,而是构造了一个json对象作为DataFrame中的一列,还可以把整个json对象当作一个整体,然后用*这种方式使用。
这个方法在2.0.2中没有,从2.1开始出现
'''

jsonSchema = StructType() \
    .add("device_type", StringType()) \
    .add("device_id", LongType()) \
    .add("ip", StringType()) \
    .add("cca3",StringType()) \
    .add("cn", StringType()) \
    .add("temp", LongType()) \
    .add("signal", LongType()) \
    .add("battery_level", LongType()) \
    .add("c02_level", LongType()) \
    .add("timestamp", TimestampType())


eventJSONDF = spark.createDataFrame(eventArray,('key','json'))
eventDF = eventJSONDF \
    .select(eventJSONDF.key,
            from_json(eventJSONDF.json,jsonSchema).alias('device')
            )
eventDF.select("device.*").filter("device.temp > 10 and device.signal > 15")

results matching ""

    No results matching ""