6、利用spark进行其他常见分析
6.1 通过spark实现点击流日志分析
在新闻类网站中,经常要衡量一条网络新闻的页面访问量,最常见的就是uv和pv,如果在所有新闻中找到访问最多的前几条新闻,topN是最常见的指标。
访问的pv
pv:网站的总访问量
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("pv").getOrCreate() sc = spark.sparkContext rdd1 = sc.textFile("file:///root/bigdata/data/access.log") #把每一行数据记为("pv",1) rdd2 = rdd1.map(lambda x:("pv",1)).reduceByKey(lambda a,b:a+b) rdd2.collect() sc.stop()
访问的uv
uv:网站的独立用户访问量
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("pv").getOrCreate() sc = spark.sparkContext rdd1 = sc.textFile("file:///root/bigdata/data/access.log") #对每一行按照空格拆分,将ip地址取出 rdd2 = rdd1.map(lambda x:x.split(" ")).map(lambda x:x[0]) #把每个ur记为1 rdd3 = rdd2.distinct().map(lambda x:("uv",1)) rdd4 = rdd3.reduceByKey(lambda a,b:a+b) rdd4.saveAsTextFile("hdfs:///uv/result") sc.stop()
访问的topN
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("topN").getOrCreate() sc = spark.sparkContext rdd1 = sc.textFile("file:///root/bigdata/data/access.log") #对每一行按照空格拆分,将url数据取出,把每个url记为1 rdd2 = rdd1.map(lambda x:x.split(" ")).filter(lambda x:len(x)>10).map(lambda x:(x[10],1)) #对数据进行累加,按照url出现次数的降序排列 rdd3 = rdd2.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],ascending=False) #取出序列数据中的前n个 rdd4 = rdd3.take(5) rdd4.collect() sc.stop()
6.2 通过spark实现ip地址查询
需求
在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。
因此,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。
ip日志信息
在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍
思路
1、 加载城市ip段信息,获取ip起始数字和结束数字,经度,纬度
2、 加载日志数据,获取ip信息,然后转换为数字,和ip段比较
3、 比较的时候采用二分法查找,找到对应的经度和纬度
4,对相同的经度和纬度做累计求和
from pyspark.sql import SparkSession
#将ip转换为特殊的数字形式
def ip_transform(ip):
ips = ip.split(".")
ip_num = 0
for i in ips:
ip_num = int(i) | ip_num << 8
return ip_num
#二分法查找ip对应的行的索引
def binary_search(ip_num, broadcast_value):
start = 0
end = len(broadcast_value) - 1
while (start <= end):
mid = int((start + end) / 2)
if ip_num >= int(broadcast_value[mid][0]) and ip_num <= int(broadcast_value[mid][1]):
return mid
if ip_num < int(broadcast_value[mid][0]):
end = mid
if ip_num > int(broadcast_value[mid][1]):
start = mid
def main():
spark = SparkSession.builder.appName("test").getOrCreate()
sc = spark.sparkContext
city_id_rdd = sc.textFile("file:///root/bigdata/data/ip.txt").map(lambda x:x.split("|")).map(lambda x: (x[2], x[3], x[13], x[14]))
#创建一个广播变量
city_broadcast = sc.broadcast(city_id_rdd.collect())
dest_data = sc.textFile("file:///root/bigdata/data/20090121000132.394251.http.format").map(
lambda x: x.split("|")[1])
#根据取出对应的位置信息
def get_pos(x):
city_broadcast_value = city_broadcast.value
#根据单个ip获取对应经纬度信息
def get_result(ip):
ip_num = ip_transform(ip)
index = binary_search(ip_num, city_broadcast_value)
return ((city_broadcast_value[index][2], city_broadcast_value[index][3]), 1)
x = map(tuple,[get_result(ip) for ip in x])
return x
dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x))
result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b)
print(result_rdd.collect())
sc.stop()
if __name__ == '__main__':
main()
广播变量的使用
需求:如果spark需要计算一个3g大小的文件,需要启动1000个task,而每一个task都需要加载3g的文件,此时,需要1000*3g的内存才能进行计算,相当消耗内存,此时我们可以使用广播变量来处理。
广播变量可以将数据在节点中共享,节点中的task任务实现对广播变量的共享,可以大大降低内存消耗。