Spark 2.3开始,Spark官方就开始支持Kubernetes作为新的资源调度模式(除此之外还有Standalone、Mesos、YARN)。在K8S上运行Spark降低了集群运维成本,而且可以和其他应用混合部署来提高机器利用率,因此越来越多的用户开始尝试在Kubernetes上运行Spark作业。
直到Spark3.1之前,Spark on Kubernetes仍然被官方定义为experimental,说明还并不成熟。和其他模式不同,没有常驻的节点资源,因此用户在使用时,对于依赖的管理、环境的配置、日志和监控都需要一些新的方式。目前在k8s上运行Spark,生产环境里使用的比例并不大。但kubernetes作为云原生的基石,Spark在k8s上运行已经是大势所趋。
本文以一个wordCount作业(org.apache.spark.examples.JavaWordCount)为例,分析一下Spark作业在Kubernetes上执行的流程。代码参考自Spark 2.4.5 。
过程中会列举一些源码等信息,可能有些枯燥,建议结合下面我画的这张图来整体理解。
Spark官方提供了通过 spark-submit 提交k8s作业的方式[1]。不过在实践中,很多人会选择Google开源的 spark-on-k8s-operator [2],这个项目在官方提交方式的基础上,又封装了一层通过kubernetes CRD提交的模式。下面会分别介绍下。
Spark on Kubernetes官方文档提供的作业提交方式,是通过一个拥有本地Spark环境的client,执行bin/spark-submit来提交作业。这个client可以在k8s集群外也可以是k8s集群内的一个pod,通常会把它作为gateway单独用于作业提交。这种方式看起来和其他调度模式差别不大,只不过在参数中需要指定k8s的apiserver地址、镜像地址等一系列k8s独有的配置信息。
例如,我们提交一个wordCount作业的方式如下(占位符替换成相应的地址),在Cliet节点执行:
1234567
$ bin/spark-submit \ --master k8s://https://{k8s-apiserver-host}:6443 \ --deploy-mode cluster \ --name spark-wordcount-example \ --class org.apache.spark.examples.JavaWordCount \ local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar \ oss://{wordcount-file-oss-bucket}/
执行之后,会在Client节点启动一个java进程,主方法是org.apache.spark.deploy.SparkSubmit:
123456
$ ps -efUID PID PPID C STIME TTY STAT TIME CMDroot 216 7 53 16:24 pts/0 Sl+ 0:03 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0/jre/bin/java -cp /opt/spark/conf/:/opt/spark/jars/* org.apache.spark.deploy.SparkSubmit --master k8s://https://{k8s-apiserver-host}:6443 --deploy-mode cluster --class org.apache.spark.examples.JavaWordCount --name spark-wordcount-example local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar oss://{wordcount-file-oss-bucket}/
此时可以直接看到kubernetes集群中,pod已经启动了:
12345
$ kubectl get podNAME READY STATUS RESTARTS AGEspark-wordcount-example-1628927658927-driver 1/1 Running 0 7sspark-wordcount-example-1628927658927-exec-1 1/1 Running 0 2sspark-wordcount-example-1628927658927-exec-2 1/1 Running 0 2s
其中的Driver Pod,就是上面的SparkSubmit进程来启动的,为了进一步看启动细节的源码,我们此时jstack可以看到调用栈:
1234567891011121314151617181920212223242526
"main" #1 prio=5 os_prio=0 tid=0x00007f0e28051800 nid=0x8c waiting on condition [0x00007f0e2ee81000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000b2282088> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.apache.spark.deploy.k8s.submit.LoggingPodStatusWatcherImpl.awaitCompletion(LoggingPodStatusWatcher.scala:138) at org.apache.spark.deploy.k8s.submit.Client$$anonfun$run$2.apply(KubernetesClientApplication.scala:155) at org.apache.spark.deploy.k8s.submit.Client$$anonfun$run$2.apply(KubernetesClientApplication.scala:140) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2545) at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:140) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$anonfun$run$5.apply(KubernetesClientApplication.scala:250) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$anonfun$run$5.apply(KubernetesClientApplication.scala:241) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2545) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:241) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:204) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:856) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:931) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:940) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
通过调用栈,我们重点看一下下面几个Class的源码
最后,我们再看一下Driver Pod的完整描述:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
apiVersion: v1kind: Podmetadata: labels: spark-app-selector: spark-4977cde55f33498d8cc34dcf80589ba0 spark-role: driver name: spark-wordcount-example-1628999880800-driverspec: containers: - args: - driver - '--properties-file' - /opt/spark/conf/spark.properties - '--class' - org.apache.spark.examples.JavaWordCount - spark-internal - 'oss://{wordcount-file-oss-bucket}/' env: - name: SPARK_DRIVER_BIND_ADDRESS valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP - name: SPARK_CONF_DIR value: /opt/spark/conf image: '{spark镜像地址}' imagePullPolicy: IfNotPresent name: spark-kubernetes-driver ports: - containerPort: 7078 name: driver-rpc-port protocol: TCP - containerPort: 7079 name: blockmanager protocol: TCP - containerPort: 4040 name: spark-ui protocol: TCP resources: limits: memory: 1408Mi requests: cpu: '1' memory: 1408Mi terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /var/run/secrets/kubernetes.io/serviceaccount name: spark-token-nq9p4 readOnly: true serviceAccount: spark serviceAccountName: spark volumes: - configMap: defaultMode: 420 name: spark-wordcount-example-1628999880800-driver-conf-map name: spark-conf-volume - name: spark-token-nq9p4 secret: defaultMode: 420 secretName: spark-token-nq9p4
spark-on-k8s-operator[2],可以让用户以CRD(CustomResourceDefinition) [4] 的方式提交和管理Spark作业。这种方式能够更好的利用k8s原生的能力,具备更好的扩展性。并且在此之上增加了定时任务、重试、监控等一系列功能。具体的功能特性可以在github查看官方文档。
这种operator模式[5] 也是kubernetes官方推荐的一种部署复杂应用的模式,用来构建和管理特定的应用程序:每一种应用都可以设计自己的CRD,然后通过编写自定义的Controller来监听CRD的变更,实现应用部署的具体逻辑。
使用spark-on-k8s-operator,需要提前在k8s集群中安装,此时会启动一个名为sparkoperator的pod。提交作业时,无需准备一个具备Spark环境的Client,直接通过kubectl或者kubernetes api就可以提交Spark作业。spark-on-k8s-operator提供了两个CRD定义,SparkApplication和ScheduledSparkApplication,分别对应了Spark作业和定时任务。提交、查看、删除作业也就变成了对这个CRD的apply、get、delete操作。
例如,和上文一样提交一个wordCount作业,需要准备一个wordcount.yaml:
1234567891011121314151617181920212223
apiVersion: "sparkoperator.k8s.io/v1beta2"kind: SparkApplicationmetadata: name: spark-wordcount-example namespace: defaultspec: type: Java sparkVersion: 2.4.5 mainClass: org.apache.spark.examples.JavaWordCount image: {Spark镜像地址} mainApplicationFile: "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar" arguments: - "oss://{wordcount-file-oss-bucket}/" driver: cores: 1 coreLimit: 1000m memory: 4g executor: cores: 1 coreLimit: 1000m memory: 4g memoryOverhead: 1g instances: 2
然后通过kubectl执行:
1
$ kubectl apply -f wordcount.yaml
执行过后,spark-on-k8s-operator会监听到新创建的SparkApplication,然后通过触发spark-submit创建Driver Pod,接着运行作业。
为了能清晰的看到operator内部流程,我们在operator做spark-submit的代码处做一下PrintStack(operator是golang写的,和java还略有不同):
goroutine 113 [running]:runtime/debug.Stack(0x15, 0x0, 0x0) /usr/local/go/src/runtime/debug/stack.go:24 +0x9druntime/debug.PrintStack() /usr/local/go/src/runtime/debug/stack.go:16 +0x22github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.runSparkSubmit() /workspace/pkg/controller/sparkapplication/submission.go:69 +0x25egithub.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.(*Controller).submitSparkApplication() /workspace/pkg/controller/sparkapplication/controller.go:700 +0xc3fgithub.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.(*Controller).syncSparkApplication() /workspace/pkg/controller/sparkapplication/controller.go:555 +0xa2cgithub.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.(*Controller).processNextItem() /workspace/pkg/controller/sparkapplication/controller.go:264 +0x1e8github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.(*Controller).runWorker() /workspace/pkg/controller/sparkapplication/controller.go:248 +0x63k8s.io/apimachinery/pkg/util/wait.JitterUntil.func1() /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:152 +0x5fk8s.io/apimachinery/pkg/util/wait.JitterUntil() /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:153 +0xf8k8s.io/apimachinery/pkg/util/wait.Until() /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:88 +0x4dcreated by github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.(*Controller).Start /workspace/pkg/controller/sparkapplication/controller.go:166 +0xfe
按照调用栈来看提交作业的流程:
还有一个功能值得关注,SparkApplication的yaml里,定义了很多k8s相关的spec,如env、volumeMounts、podAntiAffinity、tolerations等等,而这些spec并不能被原生的spark-submit提交方式支持(尤其是指Spark2.x)。因此spark operator还实现了一个 Mutating Admission Webhook ,这是Kubernetes的Dynamic Admission Control [7]的一种,它可以拦截Kubernetes API请求,并回调到指定的http地址,在回调的mutating webhook里可以任意修改处理的资源对象,这样就可以实现对Driver和Executor Pod的一些自定义配置。
spark operator为了实现这个webhook,在helm安装时将operator暴露一个service,然后通过配置一个MutatingWebhookConfiguration指向/webhook路径。这样所有的k8s资源对象请求都会被回调到这里处理,operator会按需进行配置。
在上文中我们理清了Driver Pod启动的yaml,那么Pod拉起之后会做什么事情呢。我们知道,Docker镜像run的时候,会触发ENTRYPOINT或者CMD的命令,作为容器运行的主进程。Spark镜像的ENTRYPOINT是/opt/entrypoint.sh,driver模式下里面的内容基本就是把arg参数传递给/bin/spark-submit,然后指定 以client模式再次启动一个SparkSubmit进程 。
12
## 通过ps -ef,可以看到Driver Pod的Java进程启动参数为/usr/lib/jvm/java-1.8.0-openjdk-1.8.0/jre/bin/java -cp /opt/spark/jars/* -Xmx4g org.apache.spark.deploy.SparkSubmit --deploy-mode client --conf spark.driver.bindAddress=xx.xx.xx.xx --properties-file /opt/spark/conf/spark.properties --class org.apache.spark.examples.JavaWordCount local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar oss://{wordcount-file-oss-bucket}/
启动SparkSubmit的源码,和上文分析的一样,只不过这次是以client模式提交的,所以不再会调用到org.apache.spark.deploy.k8s.submit.KubernetesClientApplication,而是直接调用–class后面的作业Class的main方法,在我们的例子中就是直接执行org.apache.spark.examples.JavaWordCount。
Executor是在SparkContext初始化时创建的。我们继续用调用栈的方式,来看一下Driver触发Executor创建时的源码:
1234567891011121314151617181920212223242526272829
"main" #1 prio=5 os_prio=0 tid=0x00007f9258051800 nid=0x376 in Object.wait() [0x00007f925e88f000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000000d5b23770> (a org.apache.spark.scheduler.TaskSchedulerImpl) at org.apache.spark.scheduler.TaskSchedulerImpl.waitBackendReady(TaskSchedulerImpl.scala:821) - locked <0x00000000d5b23770> (a org.apache.spark.scheduler.TaskSchedulerImpl) at org.apache.spark.scheduler.TaskSchedulerImpl.postStartHook(TaskSchedulerImpl.scala:196) at org.apache.spark.SparkContext.<init>(SparkContext.scala:562) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2537) - locked <0x00000000d5950988> (a java.lang.Object) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:959) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:950) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:950) - locked <0x00000000d59509e8> (a org.apache.spark.sql.SparkSession$) - locked <0x00000000d5950a08> (a org.apache.spark.sql.SparkSession$Builder) at org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:43) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:856) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:931) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:940) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
来看一下Executor Pod的具体yaml,以及启动的JVM进程参数:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
apiVersion: v1kind: Podmetadata: labels: spark-app-selector: spark-2480ba0e4b544209bafa0ddd553700f3 spark-exec-id: '1' spark-role: executor name: javawordcount-1629452540505-exec-1 namespace: c-18eecfc5d78446c7 ownerReferences: - apiVersion: v1 controller: true kind: Pod name: spark-word-count-driverspec: containers: - args: - executor env: - name: SPARK_DRIVER_URL value: >- spark://CoarseGrainedScheduler@spark-word-count-279b297b62f07ab8-driver-svc.c-18eecfc5d78446c7.svc:7078 - name: SPARK_EXECUTOR_CORES value: '1' - name: SPARK_EXECUTOR_MEMORY value: 8g - name: SPARK_APPLICATION_ID value: spark-2480ba0e4b544209bafa0ddd553700f3 - name: SPARK_EXECUTOR_ID value: '1' - name: SPARK_EXECUTOR_POD_IP valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP image: '{spark镜像地址}' imagePullPolicy: IfNotPresent name: executor ports: - containerPort: 7079 name: blockmanager protocol: TCP resources: limits: cpu: '1' memory: 9Gi requests: cpu: '1' memory: 9Gi volumeMounts: - mountPath: /var/run/secrets/kubernetes.io/serviceaccount name: default-token-bvgh7 readOnly: true serviceAccount: default serviceAccountName: default volumes: - name: default-token-bvgh7 secret: defaultMode: 420 secretName: default-token-bvgh7
123
$ ps -efUID PID PPID C STIME TTY STAT TIME CMDroot 15 1 53 17:51 ? Sl 0:07 /bin/java -Xms8g -Xmx8g -cp :/opt/spark/jars/* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@spark-word-count-279b297b62f07ab8-driver-svc.c-18eecfc5d78446c7.svc:7078 --executor-id 2 --cores 1 --app-id spark-d79090c58b514cc2b7112ea319983b5c --hostname 10.100.3.67
从entrypoint脚本可以看到,Executor模式下,启动的主类是org.apache.spark.executor.CoarseGrainedExecutorBackend(和Standalone/YARN模式一样)。启动逻辑如下:
为了直观的看到Executor的网络情况,我们在作业执行过程中,在Executor Pod里执行一下netstat。在本例中,7078是Driver rpc的端口,7079是Driver的blockManager端口,Executor的server由于没有指定端口,分配到了39311。
$ netstat -at -WActive Internet connections (servers and established)Proto Recv-Q Send-Q Local Address Foreign Address Statetcp6 0 0 javawordcount-1629538339101-exec-1:39311 [::]:* LISTENtcp6 0 0 javawordcount-1629538339101-exec-1:51370 10-100-3-70.spark-word-count-279b297b62f07ab8-driver-svc.c-18eecfc5d78446c7.svc.cluster.local:7078 TIME_WAITtcp6 0 0 javawordcount-1629538339101-exec-1:47044 10-100-3-70.spark-word-count-279b297b62f07ab8-driver-svc.c-18eecfc5d78446c7.svc.cluster.local:7079 ESTABLISHEDtcp6 0 0 javawordcount-1629538339101-exec-1:51374 10-100-3-70.spark-word-count-279b297b62f07ab8-driver-svc.c-18eecfc5d78446c7.svc.cluster.local:7078 ESTABLISHED
Spark作业结束时,Driver进程正常退出,Driver Pod的状态会置为Succeeded;如果出现异常退出(exitCode>0),Driver Pod状态会变成Failed。在这两种状态下, Driver Pod都不会被真正删除 ,此时可以看到状态和日志,但不会占用集群资源。而 Executor会在SparkContext关闭时,被Driver主动删除 ,因此如果想保留Executor日志不删除Pod,需要配置spark.kubernetes.executor.deleteOnTermination=false。对于一些额外资源,例如Driver的service、configmap,由于在创建之初都绑定了Driver Pod的ownerReference,会在Driver Pod真正删除时才清理。
如果在作业运行中想要终止作业,一种方式是直接通过k8s删除Driver Pod,另一种方式是执行spark-submit –kill(Spark3才支持)。直接删除Driver Pod的话,由于各种资源包括Executor Pod都设置了ownerReference,所以作业所有相关资源都会回收。
[1] Running Spark on Kubernetes - Spark 3.1.2 Documentation
[2] GoogleCloudPlatform/spark-on-k8s-operator: Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.
[3] Using Watch with the Kubernetes API | Baeldung
[4] Custom Resources | Kubernetes
[5] Operator pattern | Kubernetes
[6] Bitnami Engineering: A deep dive into Kubernetes controllers
[7] Dynamic Admission Control | Kubernetes