4、RDD三类算子
Transformation
- 通过已有的RDD生成一个新的RDD
- Lazy计算:Transformation只会记录RDD转化关系,并不会触发计算
- 举例:map,filter,groupBy,reduceBy
- 优点:可以中间插入优化过程
Action
- 通过RDD计算得到一个或者一组值
- Action是立即执行的
- 举例:count,reduce,saveAsTextFile
- 缺点:不能插入优化过程
Persistence
- cache:缓存到内存
- Persist:更灵活的缓存策略
- cache()方法调用的也是persist方法,缓存策略均为MEMORY_ONLY
- 可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别
- cache或者persist并不是action
- map:通过函数FUNC传递源的每个元素,返回一个新的分布式的数据集
- filter:通过选择FUNC返回true的源的元素返回一个新的数据集
- flatmap:与map类似,但是每个输入项可以映射到0或更多个输出项(因此func应该返回Seq而不是单个项)。
- groupby:当调用(k,v)对的数据集时,返回一个数据集(k,可迭代
)对。 注意:如果分组是为了对每个键执行聚合(例如求和或平均值),则使用reduceByKey或.gateByKey将产生更好的性能。 注意:默认情况下,输出中的并行级别取决于父RDD分区的数量。可以通过可选的Nuffice参数设置不同数量的任务。 - reduceByKey:在对(K,V)对的数据集进行调用时,返回(K,V)对的数据集,其中每个键的值都使用给定的reduce函数func进行聚合,该函数必须是类型(V,V)=>V。与groupByKey中一样,reduce任务的数量可以通过可选的第二个参数进行配置。
- union:返回包含源数据集和参数中元素的合并后的新数据集
- distinct:返回包含源数据集的不重复元素的新数据集
- join:当调用类型(K,V)和(K,W)的数据集时,返回(K,(V,W))对的数据集包含每个键的所有元素对。外部连接通过leftOuterJoin、rightOuterJoin和fullOuterJoin进行支持。
- foreach:在数据集的每个元素上运行函数FUNC。这通常是用于副作用,例如更新累加器或与外部存储系统交互。
- collect:在驱动器程序中将数据集的所有元素作为数组返回。这通常在过滤器或其他操作返回有用的数据子集后非常有用。
- count:返回数据集中的元素数量。
- first:返回数据集的第一个元素
- take:返回具有数据集的第一个n个元素的数组
练习
案例1:map filter
map:所有数据按照map中的参数对应的函数来处理并产生一个新的RDD
filter:所有数据按照map中的参数对应的函数(必须存在返回boolean的函数)来处理,并产生一个新的RDD
rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = rdd1.map(lambda x:x*2)
rdd3 = rdd2.filter(lambda x:x>3)
rdd3.collect()
案例2:flatMap
flatMap:和map类似,但是传入一个数据,返回多个数据
rdd1 = sc.parallelize(["a b c","d e f","h i j"])
rdd2 = rdd1.flatMap(lambda x:x.split(" "))
rdd2.collect()
案例3:交集、并集
union:对两个RDD求并集
intersection:对两个RDD求交集
rdd1 = sc.parallelize([("a",1),("b",2)])
rdd2 = sc.parallelize([("c",1),("b",3)])
rdd3 = rdd1.union(rdd2)
rdd3.collect()
rdd4 = rdd1.intersection(rdd2)
rdd4.collect()
案例4:groupByKey
groupByKey:以元组中的第0个元素作为key,进行分组,返回一个新的RDD
rdd1 = sc.parallelize([("a",1),("b",2)])
rdd2 = sc.parallelize([("c",1),("b",3)])
rdd3 = rdd1.union(rdd2)
rdd3.collect()
rdd4 = rdd3.groupByKey()#返回的元组中第1个元素是可迭代对象,需要进一步遍历才能得到结果
rdd4.collect()
案例5:reduce
reduce:所有数据都需要针对当前函数参数对应的函数进行处理,直接得到一个结果
rdd1 = sc.parallelize([1,2,3,4,5])
rdd1.reduce(lambda a,b:a+b)
案例6:reduceByKey、sortByKey
reduceByKey:所有数据都需要针对当前函数参数对应的函数进行处理,直接得到一个结果(需要有一个key作为基准,求value的和)
sortByKey:所有数据都需要针对当前函数参数对应的函数进行排序处理,直接得到一个结果(需要有一个key作为基准,求value的和)
rdd1 = sc.parallelize([("a",1),("b",2)])
rdd2 = sc.parallelize([("c",1),("b",3)])
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.reduceByKey(lambda a,b:a+b)
rdd4.collect()
rdd5 = rdd4.sortByKey(ascending=False)
rdd5.collect()
rdd6 = rdd4.map(lambda x:(x[1],x[0])).sortByKey(ascending=False).map(lambda x:(x[1],x[0]))
rdd6.collect()