4、RDD三类算子

Transformation

  • 通过已有的RDD生成一个新的RDD
  • Lazy计算:Transformation只会记录RDD转化关系,并不会触发计算
  • 举例:map,filter,groupBy,reduceBy
  • 优点:可以中间插入优化过程

Action

  • 通过RDD计算得到一个或者一组值
  • Action是立即执行的
  • 举例:count,reduce,saveAsTextFile
  • 缺点:不能插入优化过程

Persistence

  • cache:缓存到内存
  • Persist:更灵活的缓存策略
  • cache()方法调用的也是persist方法,缓存策略均为MEMORY_ONLY
  • 可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别
  • cache或者persist并不是action

s4

s5

s6

s7

  • map:通过函数FUNC传递源的每个元素,返回一个新的分布式的数据集
  • filter:通过选择FUNC返回true的源的元素返回一个新的数据集
  • flatmap:与map类似,但是每个输入项可以映射到0或更多个输出项(因此func应该返回Seq而不是单个项)。
  • groupby:当调用(k,v)对的数据集时,返回一个数据集(k,可迭代)对。 注意:如果分组是为了对每个键执行聚合(例如求和或平均值),则使用reduceByKey或.gateByKey将产生更好的性能。 注意:默认情况下,输出中的并行级别取决于父RDD分区的数量。可以通过可选的Nuffice参数设置不同数量的任务。
  • reduceByKey:在对(K,V)对的数据集进行调用时,返回(K,V)对的数据集,其中每个键的值都使用给定的reduce函数func进行聚合,该函数必须是类型(V,V)=>V。与groupByKey中一样,reduce任务的数量可以通过可选的第二个参数进行配置。
  • union:返回包含源数据集和参数中元素的合并后的新数据集
  • distinct:返回包含源数据集的不重复元素的新数据集
  • join:当调用类型(K,V)和(K,W)的数据集时,返回(K,(V,W))对的数据集包含每个键的所有元素对。外部连接通过leftOuterJoin、rightOuterJoin和fullOuterJoin进行支持。
  • foreach:在数据集的每个元素上运行函数FUNC。这通常是用于副作用,例如更新累加器或与外部存储系统交互。
  • collect:在驱动器程序中将数据集的所有元素作为数组返回。这通常在过滤器或其他操作返回有用的数据子集后非常有用。
  • count:返回数据集中的元素数量。
  • first:返回数据集的第一个元素
  • take:返回具有数据集的第一个n个元素的数组

练习

案例1:map filter

map:所有数据按照map中的参数对应的函数来处理并产生一个新的RDD

filter:所有数据按照map中的参数对应的函数(必须存在返回boolean的函数)来处理,并产生一个新的RDD

rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = rdd1.map(lambda x:x*2)
rdd3 = rdd2.filter(lambda x:x>3)
rdd3.collect()

案例2:flatMap

flatMap:和map类似,但是传入一个数据,返回多个数据

rdd1 = sc.parallelize(["a b c","d e f","h i j"])
rdd2 = rdd1.flatMap(lambda x:x.split(" "))
rdd2.collect()

案例3:交集、并集

union:对两个RDD求并集

intersection:对两个RDD求交集

rdd1 = sc.parallelize([("a",1),("b",2)])
rdd2 = sc.parallelize([("c",1),("b",3)])
rdd3 = rdd1.union(rdd2)
rdd3.collect()
rdd4 = rdd1.intersection(rdd2)
rdd4.collect()

案例4:groupByKey

groupByKey:以元组中的第0个元素作为key,进行分组,返回一个新的RDD

rdd1 = sc.parallelize([("a",1),("b",2)])
rdd2 = sc.parallelize([("c",1),("b",3)])
rdd3 = rdd1.union(rdd2)
rdd3.collect()
rdd4 = rdd3.groupByKey()#返回的元组中第1个元素是可迭代对象,需要进一步遍历才能得到结果
rdd4.collect()

案例5:reduce

reduce:所有数据都需要针对当前函数参数对应的函数进行处理,直接得到一个结果

rdd1 = sc.parallelize([1,2,3,4,5])
rdd1.reduce(lambda a,b:a+b)

案例6:reduceByKey、sortByKey

reduceByKey:所有数据都需要针对当前函数参数对应的函数进行处理,直接得到一个结果(需要有一个key作为基准,求value的和)

sortByKey:所有数据都需要针对当前函数参数对应的函数进行排序处理,直接得到一个结果(需要有一个key作为基准,求value的和)

rdd1 = sc.parallelize([("a",1),("b",2)])
rdd2 = sc.parallelize([("c",1),("b",3)])
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.reduceByKey(lambda a,b:a+b)
rdd4.collect()
rdd5 = rdd4.sortByKey(ascending=False)
rdd5.collect()
rdd6 = rdd4.map(lambda x:(x[1],x[0])).sortByKey(ascending=False).map(lambda x:(x[1],x[0]))
rdd6.collect()

results matching ""

    No results matching ""