1、spark概述

  • 1、什么是spark

    • 基于内存的计算引擎,它的计算速度非常快。但是仅仅只涉及到数据的计算,并没有涉及到数据的存储。
  • 2、为什么要学习spark

    MapReduce框架局限性

    • 1,Map结果写磁盘,Reduce写HDFS,多个MR之间通过HDFS交换数据
    • 2,任务调度和启动开销大
    • 3,无法充分利用内存
    • 4,不适合迭代计算(如机器学习、图计算等等),交互式处理(数据挖掘)
    • 5,不适合流式处理(点击日志分析)
    • 6,MapReduce编程不够灵活,仅支持Map和Reduce两种操作

    Hadoop生态圈

    • 批处理:MapReduce、Hive、Pig
    • 流式计算:Storm
    • 交互式计算:Impala、presto

    需要一种灵活的框架可同时进行批处理、流式计算、交互式计算

    • 内存计算引擎,提供cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销

    • DAG引擎,较少多次计算之间中间结果写到HDFS的开销

      DAG计算:将计算任务在内部分解成为若干个子任务,将这些子任务之间的逻辑关系构建成DAG结构。

    • 使用多线程模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO

    spark的缺点是:吃内存,不太稳定

  • 3、spark特点

    • 1、速度快(比mapreduce在内存中快100倍,在磁盘中快10倍)
      • spark中的job中间结果可以不落地,可以存放在内存中。
      • mapreduce中map和reduce任务都是以进程的方式运行着,而spark中的job是以线程方式运行在进程中。
    • 2、易用性(可以通过java/scala/python/R开发spark应用程序)
    • 3、通用性(可以使用spark sql/spark streaming/mlib/Graphx)
    • 4、兼容性(spark程序可以运行在standalone/yarn/mesos)

Spark2.x 新特性

  • SparkSession:新的上下文入口,统一SQLContext和HiveContext

  • dataframe和dataset统一,dataframe只是dataset[ROW]的类型别名

    dataset在spark1.6出现,在spark2.x时,dataframe只是dataset[ROW]的类型别名,但是dataset是一种强类型语言设定,由于python是弱类型语言,只能使用DataFrame

    dataframe本身是scala实现,而spark也是scala语言写的,python只是一个壳子,对于python编写的spark代码性能不变

  • Spark SQL支持sql 2003标准

    • 支持ansi-sql
    • 支持ddl命令
    • 支持子查询:in/not in、exists/not exists
    • 提升catalyst查询优化器的性能
    • code generation技术将spark sql和dataset的性能提升2~10倍(钨丝计划)
    • vectorization技术提升parquet文件的扫描吞吐量
    • spark mllib基于rdd的api转为维护阶段
    • 未来将主要基于dataset api的ml,向量和矩阵使用性能更高的序列化机制

2、spark安装部署

  • 1、下载spark安装包

    http://spark.apache.org/downloads.html

    高版本不存在cdh的编译版本,可以从官网下载源码版本,指定高版本hadoop进行编译

    编译步骤:

    • 1,安装java(JDK 1.7及以上)

      export JAVA_HOME=/xxx
      export JRE_HOME=/xxx
      export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH
      export PATH=$JAVA_HOME/bin:$PATH
      
    • 2,安装Maven, 版本为3.3.9或者以上

      下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache//maven/maven-3/3.3.9/binaries

      配置MAVEN_HOME

      export MAVEN_HOME=/xxx
      export PATH=$MAVEN_HOME/bin:$PATH
      
    • 3,下载spark源码

      s1

    • 4,增加cdh的repository

      解压spark的源码包,编辑pom.xml文件, 在repositories节点 加入如下配置:

      <repository>
              <id>cloudera</id>
              <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository>
      
    • 5,编译

      设置内存:

      export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"

      开始编译:

      ./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0 -DskipTests clean package

      源码编译后,bin目录下的文件可能不存在可执行权限,需要通过chmod指令添加可执行权限

      chmod +x xxx

  • 2、规划spark安装目录

  • 3、解压安装包

  • 4、重命名安装目录

  • 5、修改配置文件

    • spark-env.sh(需要将spark-env.sh.template重命名)

      • 配置hadoop的CONF目录,spark会将hadoop中的配置文件进行加载

        HADOOP_CONF_DIR=/root/bigdata/hadoop-2.6.0-cdh5.7.0/etc/hadoop

      • 配置java环境变量

        export JAVA_HOME=java_home_path

      • 配置PYTHON环境

        PYTHON_HOME=/usr/local/python3/bin

        export PYSPARK_PYTHON=/xx/pythonx_home/bin/python3

      • 配置spark的日志目录(注:需要先在hadoop上创建一个目录(directory,spark不会主动创建这个目录))

        SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node-teach1:8020/spark_directory"

      • 配置master的地址(可选)

        export SPARK_MASTER_HOST=node-teach1

      • 配置master的端口(可选)

        `export SPARK_MASTER_PORT=7077

      • 配置spark-defaults文件

        spark.eventLog.enabled true spark.eventLog.dir hdfs://node-teach1:8020/directory

  • 6、配置spark环境变量

    export SPARK_HOME=/root/bigdata/spark-2.3.0-bin-2.6.0-cdh5.7.0
    export PATH=$SPARK_HOME/bin:$PATH
    

3、spark启动和停止

  • 启动pyspark

    • 在节点上执行

      ./pyspark

4、在Spark上编写代码实战

wordcount案例

  • Spark1.6

    sc = SparkContext('local')
    doc = sc.textFile('file:///root/bigdata/data/spark_test.log')
    words = doc.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).reducebyKey(lambda x,y:x+y).collect()
    
  • Spark 2.x

    spark = SparkSession.builder.appName('test').getOrCreate()
    sc = spark.sparkContext
    words = sc.textFile('file:///root/bigdata/data/spark_test.log') \
                .flatMap(lambda line: line.split(" ")) \
                .map(lambda x: (x, 1)) \
                .reduceByKey(lambda a, b: a + b).collect()
    

注:以上为样例对比,后面会加上每句代码的解释。

5、利用PyCharm编写spark wordcount程序

  • 环境配置

    将spark目录下的python目录下的pyspark整体拷贝到pycharm使用的python环境下

    将下图中的pyspark

    s2

    拷贝到pycharm使用的:xxx\Python\Python36\Lib\site-packages目录下

  • 代码

import sys

#SparkSession:spark2.0为了创建spark相关的环境(sparksql,spark-mllib,spark-core)而设计的统一的接口类
from pyspark.sql import SparkSession
import os

#注:在pycharm上运行时,会提示SPARK_HOME找不到,需要配置SPARK_HOME环境变量
os.environ["SPARK_HOME"]="/root/bigdata/spark-2.3.0-bin-2.6.0-cdh5.7.0"
#由于系统中存在两种不同版本的python,需要配置PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON指定运行为python3
os.environ["PYSPARK_PYTHON"]="/usr/local/python3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/local/python3/bin/python3"

#构造器模式,builder模式
#在当前类中创建一个Builder类,Builder类会执行一些操作,最终返回SparkSession对象的实例
#appName:指定应用名称
#getOrCreate:获取或创建,如果内存中存在SparkSession实例,则get,否则create
spark = SparkSession.builder.appName("test").getOrCreate()
#通过SparkSession获取SparkContext用于编写Spark-Core代码
sc = spark.sparkContext

#sc.textFile:读取一个文件
#textFile读取数据时,默认会以hdfs为基准,需要指定文件协议
#flatMap:对读取的文件按行进行处理,返回多个结果
#map:针对于数据做操作,每个操作返回单个结果
#reduceByKey:按照相同的key做操作
counts = sc.textFile("file:///root/bigdata/data/test.log").flatMap(lambda line:line.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)
#执行计算并输出
print(counts.collect())
#执行完成程序后,要停止当前程序
sc.stop()

注:在pycharm上运行时,会提示SPARK_HOME找不到,需要配置SPARK_HOME环境变量

由于系统中存在两种不同版本的python,需要配置PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON指定运行为python3

  • 将代码上传到远程cent-os系统上

  • 利用pycharm运行代码,得出最终的计算结果

results matching ""

    No results matching ""