6、Spark Streaming和外部数据源交互

正确的交互方式-foreachRDD

forEachRDD:对流中生成的每个RDD应用函数的最通用的输出运算符。这个函数应该将每个RDD中的数据推送到外部系统,例如将RDD保存到文件中或通过网络将其写入数据库。注意,函数func是在运行流应用程序的驱动程序过程中执行的,并且其中通常包含RDD的actions,这将强制进行RDD的计算。

该函数针对于存储到外部的存储系统:类似于mysql,oracle等数据库。

关于存储到mysql数据库的代码编写方法:

常见错误写法:

#错误写法1:
#不能够运行
def sendRecord(rdd):
    #工作在driver节点,无法将其序列化反序列化传递到worker节点,会出现错误
    connection = createNewConnection()  # executed at the driver
    #执行下面的代码,系统会自动跳到worker节点
    rdd.foreach(lambda record: connection.send(record))
    connection.close()

dstream.foreachRDD(sendRecord)

#错误写法2:
#能够运行,但对于每发送一次记录都会创建一次连接
def sendRecord(record):
    connection = createNewConnection()
    connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))

正确写法:

def sendPartition(iter):
    connection = createNewConnection()
    for record in iter:
        connection.send(record)
    connection.close()
#Partition:一个机器中的多个RDD存放的位置就是Partition
#针对于每一个Partition,只做一个数据库连接
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

优化正确写法:

#在每个节点中创建一个连接池,执行计算时,观察是否有连接空闲,有就使用,使用完成后,放回连接池
def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

示例:

注:安装pymysql

pip3 install pymysql

from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 3)

def printTemp(iter):
    # 打开数据库连接
    db = pymysql.connect("localhost", "root", "root", "word_count")
    for rec in iter:
        insert_to_db(db,rec[0],rec[1])
    db.close()

import pymysql
def insert_to_db(db,rec0,rec1):
    cursor = db.cursor()
    sql = "INSERT INTO word(NAME,W_COUNT) VALUES (\"%s\",%s)"%(rec0,rec1)
    try:
        cursor.execute(sql)
        db.commit()
    except:
        db.rollback()

lines = ssc.socketTextStream("localhost", 9999)
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda x,y:x+y)
counts.foreachRDD(lambda rdd: rdd.foreachPartition(printTemp))

ssc.start()
ssc.awaitTermination()

results matching ""

    No results matching ""