6、利用spark进行其他常见分析

6.1 通过spark实现点击流日志分析

在新闻类网站中,经常要衡量一条网络新闻的页面访问量,最常见的就是uv和pv,如果在所有新闻中找到访问最多的前几条新闻,topN是最常见的指标。

  • 访问的pv

    pv:网站的总访问量

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("pv").getOrCreate()
    sc = spark.sparkContext
    rdd1 = sc.textFile("file:///root/bigdata/data/access.log")
    #把每一行数据记为("pv",1)
    rdd2 = rdd1.map(lambda x:("pv",1)).reduceByKey(lambda a,b:a+b)
    rdd2.collect()
    sc.stop()
    
  • 访问的uv

    uv:网站的独立用户访问量

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("pv").getOrCreate()
    sc = spark.sparkContext
    rdd1 = sc.textFile("file:///root/bigdata/data/access.log")
    #对每一行按照空格拆分,将ip地址取出
    rdd2 = rdd1.map(lambda x:x.split(" ")).map(lambda x:x[0])
    #把每个ur记为1
    rdd3 = rdd2.distinct().map(lambda x:("uv",1))
    rdd4 = rdd3.reduceByKey(lambda a,b:a+b)
    rdd4.saveAsTextFile("hdfs:///uv/result")
    sc.stop()
    
  • 访问的topN

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("topN").getOrCreate()
    sc = spark.sparkContext
    rdd1 = sc.textFile("file:///root/bigdata/data/access.log")
    #对每一行按照空格拆分,将url数据取出,把每个url记为1
    rdd2 = rdd1.map(lambda x:x.split(" ")).filter(lambda x:len(x)>10).map(lambda x:(x[10],1))
    #对数据进行累加,按照url出现次数的降序排列
    rdd3 = rdd2.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],ascending=False)
    #取出序列数据中的前n个
    rdd4 = rdd3.take(5)
    rdd4.collect()
    sc.stop()
    

6.2 通过spark实现ip地址查询

需求

在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。

因此,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。

ip日志信息

在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍

思路

1、 加载城市ip段信息,获取ip起始数字和结束数字,经度,纬度

2、 加载日志数据,获取ip信息,然后转换为数字,和ip段比较

3、 比较的时候采用二分法查找,找到对应的经度和纬度

4,对相同的经度和纬度做累计求和

from pyspark.sql import SparkSession

#将ip转换为特殊的数字形式
def ip_transform(ip):
    ips = ip.split(".")
    ip_num = 0
    for i in ips:
        ip_num = int(i) | ip_num << 8
    return ip_num

#二分法查找ip对应的行的索引
def binary_search(ip_num, broadcast_value):
    start = 0
    end = len(broadcast_value) - 1
    while (start <= end):
        mid = int((start + end) / 2)
        if ip_num >= int(broadcast_value[mid][0]) and ip_num <= int(broadcast_value[mid][1]):
            return mid
        if ip_num < int(broadcast_value[mid][0]):
            end = mid
        if ip_num > int(broadcast_value[mid][1]):
            start = mid

def main():
    spark = SparkSession.builder.appName("test").getOrCreate()
    sc = spark.sparkContext
    city_id_rdd = sc.textFile("file:///root/bigdata/data/ip.txt").map(lambda x:x.split("|")).map(lambda x: (x[2], x[3], x[13], x[14]))
    #创建一个广播变量
    city_broadcast = sc.broadcast(city_id_rdd.collect())
    dest_data = sc.textFile("file:///root/bigdata/data/20090121000132.394251.http.format").map(
        lambda x: x.split("|")[1])
    #根据取出对应的位置信息
    def get_pos(x):
        city_broadcast_value = city_broadcast.value
        #根据单个ip获取对应经纬度信息
        def get_result(ip):
            ip_num = ip_transform(ip)
            index = binary_search(ip_num, city_broadcast_value)
            return ((city_broadcast_value[index][2], city_broadcast_value[index][3]), 1)

        x = map(tuple,[get_result(ip) for ip in x])
        return x

    dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x))
    result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b)
    print(result_rdd.collect())
    sc.stop()

if __name__ == '__main__':
    main()

广播变量的使用

需求:如果spark需要计算一个3g大小的文件,需要启动1000个task,而每一个task都需要加载3g的文件,此时,需要1000*3g的内存才能进行计算,相当消耗内存,此时我们可以使用广播变量来处理。

广播变量可以将数据在节点中共享,节点中的task任务实现对广播变量的共享,可以大大降低内存消耗。

results matching ""

    No results matching ""