1、spark概述
1、什么是spark
- 基于内存的计算引擎,它的计算速度非常快。但是仅仅只涉及到数据的计算,并没有涉及到数据的存储。
2、为什么要学习spark
MapReduce框架局限性
- 1,Map结果写磁盘,Reduce写HDFS,多个MR之间通过HDFS交换数据
- 2,任务调度和启动开销大
- 3,无法充分利用内存
- 4,不适合迭代计算(如机器学习、图计算等等),交互式处理(数据挖掘)
- 5,不适合流式处理(点击日志分析)
- 6,MapReduce编程不够灵活,仅支持Map和Reduce两种操作
Hadoop生态圈
- 批处理:MapReduce、Hive、Pig
- 流式计算:Storm
- 交互式计算:Impala、presto
需要一种灵活的框架可同时进行批处理、流式计算、交互式计算
内存计算引擎,提供cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销
DAG引擎,较少多次计算之间中间结果写到HDFS的开销
DAG计算:将计算任务在内部分解成为若干个子任务,将这些子任务之间的逻辑关系构建成DAG结构。
使用多线程模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO
spark的缺点是:吃内存,不太稳定
3、spark特点
- 1、速度快(比mapreduce在内存中快100倍,在磁盘中快10倍)
- spark中的job中间结果可以不落地,可以存放在内存中。
- mapreduce中map和reduce任务都是以进程的方式运行着,而spark中的job是以线程方式运行在进程中。
- 2、易用性(可以通过java/scala/python/R开发spark应用程序)
- 3、通用性(可以使用spark sql/spark streaming/mlib/Graphx)
- 4、兼容性(spark程序可以运行在standalone/yarn/mesos)
- 1、速度快(比mapreduce在内存中快100倍,在磁盘中快10倍)
Spark2.x 新特性
SparkSession:新的上下文入口,统一SQLContext和HiveContext
dataframe和dataset统一,dataframe只是dataset[ROW]的类型别名
dataset在spark1.6出现,在spark2.x时,dataframe只是dataset[ROW]的类型别名,但是dataset是一种强类型语言设定,由于python是弱类型语言,只能使用DataFrame
dataframe本身是scala实现,而spark也是scala语言写的,python只是一个壳子,对于python编写的spark代码性能不变
Spark SQL支持sql 2003标准
- 支持ansi-sql
- 支持ddl命令
- 支持子查询:in/not in、exists/not exists
- 提升catalyst查询优化器的性能
- code generation技术将spark sql和dataset的性能提升2~10倍(钨丝计划)
- vectorization技术提升parquet文件的扫描吞吐量
- spark mllib基于rdd的api转为维护阶段
- 未来将主要基于dataset api的ml,向量和矩阵使用性能更高的序列化机制
2、spark安装部署
1、下载spark安装包
http://spark.apache.org/downloads.html
高版本不存在cdh的编译版本,可以从官网下载源码版本,指定高版本hadoop进行编译
编译步骤:
1,安装java(JDK 1.7及以上)
export JAVA_HOME=/xxx export JRE_HOME=/xxx export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH export PATH=$JAVA_HOME/bin:$PATH
2,安装Maven, 版本为3.3.9或者以上
下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache//maven/maven-3/3.3.9/binaries
配置MAVEN_HOME
export MAVEN_HOME=/xxx export PATH=$MAVEN_HOME/bin:$PATH
3,下载spark源码
4,增加cdh的repository
解压spark的源码包,编辑pom.xml文件, 在repositories节点 加入如下配置:
<repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository>
5,编译
设置内存:
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
开始编译:
./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0 -DskipTests clean package
源码编译后,bin目录下的文件可能不存在可执行权限,需要通过chmod指令添加可执行权限
chmod +x xxx
2、规划spark安装目录
3、解压安装包
4、重命名安装目录
5、修改配置文件
spark-env.sh(需要将spark-env.sh.template重命名)
配置hadoop的CONF目录,spark会将hadoop中的配置文件进行加载
HADOOP_CONF_DIR=/root/bigdata/hadoop-2.6.0-cdh5.7.0/etc/hadoop
配置java环境变量
export JAVA_HOME=java_home_path
配置PYTHON环境
PYTHON_HOME=/usr/local/python3/bin
export PYSPARK_PYTHON=/xx/pythonx_home/bin/python3
配置spark的日志目录(注:需要先在hadoop上创建一个目录(directory,spark不会主动创建这个目录))
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node-teach1:8020/spark_directory"
配置master的地址(可选)
export SPARK_MASTER_HOST=node-teach1
配置master的端口(可选)
`export SPARK_MASTER_PORT=7077
配置spark-defaults文件
spark.eventLog.enabled true spark.eventLog.dir hdfs://node-teach1:8020/directory
6、配置spark环境变量
export SPARK_HOME=/root/bigdata/spark-2.3.0-bin-2.6.0-cdh5.7.0 export PATH=$SPARK_HOME/bin:$PATH
3、spark启动和停止
启动pyspark
在节点上执行
./pyspark
4、在Spark上编写代码实战
wordcount案例
Spark1.6
sc = SparkContext('local') doc = sc.textFile('file:///root/bigdata/data/spark_test.log') words = doc.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).reducebyKey(lambda x,y:x+y).collect()
Spark 2.x
spark = SparkSession.builder.appName('test').getOrCreate() sc = spark.sparkContext words = sc.textFile('file:///root/bigdata/data/spark_test.log') \ .flatMap(lambda line: line.split(" ")) \ .map(lambda x: (x, 1)) \ .reduceByKey(lambda a, b: a + b).collect()
注:以上为样例对比,后面会加上每句代码的解释。
5、利用PyCharm编写spark wordcount程序
环境配置
将spark目录下的python目录下的pyspark整体拷贝到pycharm使用的python环境下
将下图中的pyspark
拷贝到pycharm使用的:xxx\Python\Python36\Lib\site-packages目录下
代码
import sys
#SparkSession:spark2.0为了创建spark相关的环境(sparksql,spark-mllib,spark-core)而设计的统一的接口类
from pyspark.sql import SparkSession
import os
#注:在pycharm上运行时,会提示SPARK_HOME找不到,需要配置SPARK_HOME环境变量
os.environ["SPARK_HOME"]="/root/bigdata/spark-2.3.0-bin-2.6.0-cdh5.7.0"
#由于系统中存在两种不同版本的python,需要配置PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON指定运行为python3
os.environ["PYSPARK_PYTHON"]="/usr/local/python3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/local/python3/bin/python3"
#构造器模式,builder模式
#在当前类中创建一个Builder类,Builder类会执行一些操作,最终返回SparkSession对象的实例
#appName:指定应用名称
#getOrCreate:获取或创建,如果内存中存在SparkSession实例,则get,否则create
spark = SparkSession.builder.appName("test").getOrCreate()
#通过SparkSession获取SparkContext用于编写Spark-Core代码
sc = spark.sparkContext
#sc.textFile:读取一个文件
#textFile读取数据时,默认会以hdfs为基准,需要指定文件协议
#flatMap:对读取的文件按行进行处理,返回多个结果
#map:针对于数据做操作,每个操作返回单个结果
#reduceByKey:按照相同的key做操作
counts = sc.textFile("file:///root/bigdata/data/test.log").flatMap(lambda line:line.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)
#执行计算并输出
print(counts.collect())
#执行完成程序后,要停止当前程序
sc.stop()
注:在pycharm上运行时,会提示SPARK_HOME找不到,需要配置SPARK_HOME环境变量
由于系统中存在两种不同版本的python,需要配置PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON指定运行为python3
将代码上传到远程cent-os系统上
利用pycharm运行代码,得出最终的计算结果