from pyspark import SparkConf from pyspark.sql import SparkSession import os class SparkSessionBase(object): """定义一个后面所有spark程序时候初始化的一个基类 """ SPARK_APP_NAME = None SPARK_URL = "yarn" SPARK_EXECUTOR_MEMORY = "2g" SPARK_EXECUTOR_CORES = 2 SPARK_EXECUTOR_INSTANCES = 2 ENABLE_HIVE_SUPPORT = False def _create_spark_session(self): """创建一个spark session返回给调用者 :return: """ # SparkConf 创建配置 conf = SparkConf() config = ( ("spark.app.name", self.SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称 ("spark.executor.memory", self.SPARK_EXECUTOR_MEMORY), # 设置该app启动时占用的内存用量,默认2g ("spark.master", self.SPARK_URL), # spark master的地址 ("spark.executor.cores", self.SPARK_EXECUTOR_CORES), # 设置spark executor使用的CPU核心数,默认是1核心 ("spark.executor.instances", self.SPARK_EXECUTOR_INSTANCES) ) conf.setAll(config) # 配置相关配置信息 # 创建sparksession实例 if self.ENABLE_HIVE_SUPPORT: return SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate() else: return SparkSession.builder.config(conf=conf).getOrCreate() def _create_spark_hbase(self): conf = SparkConf() # 创建spark config对象 config = ( ("spark.app.name", self.SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称 ("spark.executor.memory", self.SPARK_EXECUTOR_MEMORY), # 设置该app启动时占用的内存用量,默认2g ("spark.master", self.SPARK_URL), # spark master的地址 ("spark.executor.cores", self.SPARK_EXECUTOR_CORES), # 设置spark executor使用的CPU核心数,默认是1核心 ("spark.executor.instances", self.SPARK_EXECUTOR_INSTANCES), ("hbase.zookeeper.quorum", "192.168.19.137"), ("hbase.zookeeper.property.clientPort", "22181") ) conf.setAll(config) # 利用config对象,创建spark session if self.ENABLE_HIVE_SUPPORT: return SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate() else: return SparkSession.builder.config(conf=conf).getOrCreate()