在Spark中主要分为两种属性:
部署相关属性:这类属性根据不同的集群和部署方式效果可能不一样,因此推荐放在启动脚本上;如“spark.driver.memory”、“spark.executor.instances”,这种属性
在运行时通过程序设置可能不会受到影响
应该设置到配置文件或者脚本上。
运行时相关的参数:比如“spark.task.maxFailures”,这种属性可以任意设置,但出于灵活性一般配置在启动脚本上。
所有的参数都可以通过 Spark UI 在环境变量页面中看到,因此如果怀疑代码环境变量有问题,可以好好检查这部分。
一、在代码中配置
Spark可以通过SparkConf对象或者set()方法在代码中进行参数配置。
# 使用SparkConf
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
# 使用SparkSession配置
val spark = SparkSession.builder()
.config("xx","vv")
.getOrCreate()
二、启动脚本配置
在 spark-submit
、spark-sql
和 spark-shell
启动脚本上也可以直接添加参数,如:
./bin/spark-submit
--name "My app"
--master local[4]
--conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
使用 --conf/-c key=value
配置参数
三、配置文件spark-defaults.conf
Spark应用提交时会自动读取该文件中的配置,因此可以把一些全局统一的配置放在这个配置文件中,如:
spark.master spark://master:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode:8021/directory
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 5g
spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
spark默认加载的是客户端(SPARK_HOME)conf/spark-defaults.conf
,可以通过在代码或者脚本里配置参数--properties-file
指定读取其他配置文件
四、环境变量配置
有一些参数也可以通过环境变量传入,此时可以把对应的环境变量放在 conf/spark-env.sh 中,如:
SPARK_CONF_DIR=""
HADOOP_CONF_DIR=""
YARN_CONF_DIR=""
SPARK_CONF_DIR
:
Spark 将使用该目录中的配置文件(spark-defaults.conf、spark-env.sh、log4j.properties 等)
HADOOP_CONF_DIR
:
用于spark读写HDFS
Spark读写HDFS配置
使用 Spark 从 HDFS 读取和写入,有几种方式:
Spark 的类路径中(resources 目录下)应包含两个 Hadoop 配置文件:
hdfs-site.xml
,它为 HDFS 客户端提供默认行为。
core-site.xml
,它设置默认的文件系统名称。
将上述两个配置文件拷贝到spark/conf目录
或者在spark客户端conf/spark-env.sh 中配置HADOOP_CONF_DIR
属性为hadoop集群中该这两个配置文件存放目录。
本地调试配置参考
适用于本地调试远程读取测试集群上HDFS文件
只需要将 hdfs-site.xml
、 core-site.xml
拷贝到Spark 程序的类路径中(resources 目录下)即可
class 远程读取hdfs测试 {
val spark = SparkSession
.builder()
.master("local[1]")
.appName("hive-test")
.getOrCreate()
@After
def after(): Unit ={
spark.stop()
@Test
def readHDFS(): Unit ={
spark.read.text("/test/input").show()
配置上述文件后,如果再测试读写本地文件,需要在文件路径前加上file:///
前缀
Spark读写Hive配置
Spark 的类路径中(resources 目录下)包含hive-site.xml
将hive-site.xml 复制到$SPARK_HOME/conf
目录下
当没有配置hive-site.xml时,Spark会自动在当前应用目录创建metastore_db和创建由spark.sql.warehouse.dir配置的目录,如果没有配置,默认是当前应用目录下的spark-warehouse目录。
引入hive依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
本地调试需要包含hive-site文件
将hive-site.xml 复制到Spark 程序的类路径中(resources 目录下)
开启hive支持
使用 .enableHiveSupport()
开启支持hive读写。
class 远程读取hive测试 {
val spark = SparkSession
.builder()
.master("local[1]")
.appName("hive-test")
.enableHiveSupport() //支持hive读取
.getOrCreate()
@After
def after(): Unit ={
spark.stop()
@Test
def showDataBases(): Unit ={
spark.sql("show databases;").show()
spark.sql("use test;")
spark.sql("show tables;").show()
Spark自定义Hadoop和Hive参数配置
方式1:修改类路径下配置文件
在 Spark 的类路径中为每个应用程序复制和修改hdfs-site.xml
、core-site.xml
、hive-site.xml
方式2:提交任务脚本参数配置
配置hadoop参数: spark.hadoop.key=value
配置hive参数:spark.hive.key=value
./bin/spark-submit \
--name "My app" \
--master local[4] \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
--conf spark.hive.abc=xyz
myApp.jar
Spark提交基础配置参考
#!/bin/bash
source /client/bigdata.env
spark-submit --master yarn --deploy-mode cluster \
--queue default \
--principal user1 \
--keytab user1.keytab \
--driver-cores 4 \
--driver-momory 16G \
--executor-cores 8 \
--executor-memory 16G \
--num-executors 100 \
--name wordcount \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps \
--conf spark.driver.maxResultSize=4G \
--files 读取额外file路径 \
--jars 加载额外jar包路径 \
--class org.apache.spark.examples.SparkPi \
jar包全路径 \
主类main方法读取的args 用空格隔开
常用参数总结
参数名 | 参数说明 |
---|
--master | master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local |
--deploy-mode | 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client |
--driver-memory | Driver内存,默认 1G |
--driver-java-options | 传给 driver 的额外的 Java 选项 |
--driver-library-path | 传给 driver 的额外的库路径 |
--driver-class-path | 传给 driver 的额外的类路径 |
--driver-cores | Driver 的核数,默认是1。在 yarn 或者 standalone 下使用 |
--executor-memory | 每个 executor 的内存,默认是1G |
--num-executors | 启动的 executor 数量。默认为2。在 yarn 下使用 |
--executor-cores | 每个 executor 的核数。在yarn或者standalone下使用 |
--name | 应用程序的名称 |
--file | 用逗号分隔的本地文件,设置后,这些文件将包含在 driver 和 executor |
--jars | 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 |
--conf PROP=VALUE | 指定 spark 配置属性的值如:spark.executor.extraJavaOptions="-XX:MaxPermSize=256m" |
--properties-file | 加载的配置文件,默认为 {SPARK_HOME}/conf/spark-defaults.conf |
--class | 应用程序的主类,仅针对 java 或 scala 应用 |
spark-submit \
--master local[8] \
--driver-cores 2 \
--driver-memory 16g \
--executor-cores 4 \
--num-executors 10 \
--executor-memory 8g \
--class PackageName.ClassName XXXX.jar \
--name "Spark Job Name" \
InputPath \
OutputPath
Spark加载外部jar和文件
有时候我们不希望Spark应用jar过于臃肿,这时候可以只编译应用程序代码包,将依赖的第三方jar包通过--jar命令传递。
同样的有些场景我们需要将众多的配置(比如属性列表、Json等)放到外部文件进行存放,这样配置外部化有两点好处:
可以实现对某个属性灵活变更,而不用去改jar包
使脚本专注于Spark和少数应用参数配置提升可读性
--files和--jars基本相同,以--files为例:
当使用spark-submit --files时,会将--files后面的文件路径记录下来传给driver进程,然后当启动driver进程时,会调用SparkFiles.addFile(file_path),并复制文件到driver的临时文件目录中。之后executor启动之后,将从driver这里fetch文件到自己的工作目录。
通过脚本传递
--file file_paths
file和提交脚本在同一节点,一般是提交机。
通过HDFS传递
这里配置到了代码里
spark.sparkContext.addFile("hdfs:///test/input/words.txt")
其中file_paths可为多种方式:file://,hdfs://,http://,ftp://,local:,多个路径用逗号隔开
获取文件路径:
filePath = SparkFiles.get(fileName)
SparkFiles.get(fileName)
所得的路径
对于driver就是SparkEnv.get.driverTmpDir+fileName
对于executor就是workDir+fileName
。
获取文件数据流:
# executor读取
inputStream = new FileInputStream(fileName)
# driver读取(main方法)
inputStream = new FileInputStream(SparkFiles.get(fileName))
在 yarn cluster提交模式时 ,--files必须使用全局可视的地址(比如hdfs),否则driver将无法找到文件,出现FileNotFoundException。这是因为driver会在集群中任意一台work节点上运行,使用本地地址无法找到文件。
FileNotFoundException异常出现在SparkSession的getOrCreate()初始化方法中,因为此方法会调用addFile(),但是确找不到文件,导致SparkSession初始化失败。注意:–jars原理相同,但是getOrCreate()中调用addJars出现异常,但是并不会导SparkSession初始化失败,程序会继续运行。
在driver中执行的SparkFiles.get(fileName)和executor中执行SparkFiles.get(fileName)结果不同。原理上面已经陈述,driver上和executor上获取的路径并不相同。所以当在driver中使用SparkFiles.get(fileName)获取到文件路径之后,再使用Sparkcontext或者sparksession的DataSource API读取文件,则会出现异常。因为读取动作会在每一个executor上执行,但是读取路径是driver上获取得到的,所以找不到文件。