添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

1 背景

​  Spark 2.3开始,Spark官方就开始支持Kubernetes作为新的资源调度模式(除此之外还有Standalone、Mesos、YARN)。在K8S上运行Spark降低了集群运维成本,而且可以和其他应用混合部署来提高机器利用率,因此越来越多的用户开始尝试在Kubernetes上运行Spark作业。

​  直到Spark3.1之前,Spark on Kubernetes仍然被官方定义为experimental,说明还并不成熟。和其他模式不同,没有常驻的节点资源,因此用户在使用时,对于依赖的管理、环境的配置、日志和监控都需要一些新的方式。目前在k8s上运行Spark,生产环境里使用的比例并不大。但kubernetes作为云原生的基石,Spark在k8s上运行已经是大势所趋。

​  本文以一个wordCount作业(org.apache.spark.examples.JavaWordCount)为例,分析一下Spark作业在Kubernetes上执行的流程。代码参考自Spark 2.4.5 。

​  过程中会列举一些源码等信息,可能有些枯燥,建议结合下面我画的这张图来整体理解。

2 作业提交

​  Spark官方提供了通过 spark-submit 提交k8s作业的方式[1]。不过在实践中,很多人会选择Google开源的 spark-on-k8s-operator [2],这个项目在官方提交方式的基础上,又封装了一层通过kubernetes CRD提交的模式。下面会分别介绍下。

2.1 通过原生方式提交作业

​  Spark on Kubernetes官方文档提供的作业提交方式,是通过一个拥有本地Spark环境的client,执行bin/spark-submit来提交作业。这个client可以在k8s集群外也可以是k8s集群内的一个pod,通常会把它作为gateway单独用于作业提交。这种方式看起来和其他调度模式差别不大,只不过在参数中需要指定k8s的apiserver地址、镜像地址等一系列k8s独有的配置信息。

例如,我们提交一个wordCount作业的方式如下(占位符替换成相应的地址),在Cliet节点执行:

1
2
3
4
5
6
7
$ bin/spark-submit \
--master k8s://https://{k8s-apiserver-host}:6443 \
--deploy-mode cluster \
--name spark-wordcount-example \
--class org.apache.spark.examples.JavaWordCount \
local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar \
oss://{wordcount-file-oss-bucket}/

执行之后,会在Client节点启动一个java进程,主方法是org.apache.spark.deploy.SparkSubmit:

1
2
3
4
5
6
$ ps -ef
UID PID PPID C STIME TTY STAT TIME CMD
root 216 7 53 16:24 pts/0 Sl+ 0:03 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0/jre/bin/java -cp /opt
/spark/conf/:/opt/spark/jars/* org.apache.spark.deploy.SparkSubmit --master k8s://https://{k8s-apiserver-host}:6443 --deploy-mode cluster --clas
s org.apache.spark.examples.JavaWordCount --name spark-wordcount-example local:///opt/spark/examples/target/scala-2.11/jars/spark-example
s_2.11-2.4.5.jar oss://{wordcount-file-oss-bucket}/

此时可以直接看到kubernetes集群中,pod已经启动了:

1
2
3
4
5
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
spark-wordcount-example-1628927658927-driver 1/1 Running 0 7s
spark-wordcount-example-1628927658927-exec-1 1/1 Running 0 2s
spark-wordcount-example-1628927658927-exec-2 1/1 Running 0 2s

其中的Driver Pod,就是上面的SparkSubmit进程来启动的,为了进一步看启动细节的源码,我们此时jstack可以看到调用栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
"main" #1 prio=5 os_prio=0 tid=0x00007f0e28051800 nid=0x8c waiting on condition [0x00007f0e2ee81000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000b2282088> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at org.apache.spark.deploy.k8s.submit.LoggingPodStatusWatcherImpl.awaitCompletion(LoggingPodStatusWatcher.scala:138)
at org.apache.spark.deploy.k8s.submit.Client$$anonfun$run$2.apply(KubernetesClientApplication.scala:155)
at org.apache.spark.deploy.k8s.submit.Client$$anonfun$run$2.apply(KubernetesClientApplication.scala:140)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2545)
at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:140)
at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$anonfun$run$5.apply(KubernetesClientApplication.scala:250)
at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$anonfun$run$5.apply(KubernetesClientApplication.scala:241)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2545)
at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:241)
at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:204)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:856)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:931)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:940)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

通过调用栈,我们重点看一下下面几个Class的源码

  • org.apache.spark.deploy.SparkSubmit
  •