6、Spark Streaming和外部数据源交互
正确的交互方式-foreachRDD
forEachRDD:对流中生成的每个RDD应用函数的最通用的输出运算符。这个函数应该将每个RDD中的数据推送到外部系统,例如将RDD保存到文件中或通过网络将其写入数据库。注意,函数func是在运行流应用程序的驱动程序过程中执行的,并且其中通常包含RDD的actions,这将强制进行RDD的计算。
该函数针对于存储到外部的存储系统:类似于mysql,oracle等数据库。
关于存储到mysql数据库的代码编写方法:
常见错误写法:
#错误写法1:
#不能够运行
def sendRecord(rdd):
#工作在driver节点,无法将其序列化反序列化传递到worker节点,会出现错误
connection = createNewConnection() # executed at the driver
#执行下面的代码,系统会自动跳到worker节点
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)
#错误写法2:
#能够运行,但对于每发送一次记录都会创建一次连接
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
正确写法:
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
#Partition:一个机器中的多个RDD存放的位置就是Partition
#针对于每一个Partition,只做一个数据库连接
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
优化正确写法:
#在每个节点中创建一个连接池,执行计算时,观察是否有连接空闲,有就使用,使用完成后,放回连接池
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
示例:
注:安装pymysql
pip3 install pymysql
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 3)
def printTemp(iter):
# 打开数据库连接
db = pymysql.connect("localhost", "root", "root", "word_count")
for rec in iter:
insert_to_db(db,rec[0],rec[1])
db.close()
import pymysql
def insert_to_db(db,rec0,rec1):
cursor = db.cursor()
sql = "INSERT INTO word(NAME,W_COUNT) VALUES (\"%s\",%s)"%(rec0,rec1)
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
lines = ssc.socketTextStream("localhost", 9999)
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda x,y:x+y)
counts.foreachRDD(lambda rdd: rdd.foreachPartition(printTemp))
ssc.start()
ssc.awaitTermination()