spark on k8s执行源码分析

1. submit任务

1.1 spark-submit

image

1
2
3
4
5
6
7
8
$ ./bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/examples.jar

如上代码,是一个典型使用spark-submit提交任务的命令。查看spark-submit脚本,代码如下,可见实际上执行的是Spark的org.apache.spark.deploy.SparkSubmit类。

1
2
3
4
5
6
7
8
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

1.2 SparkSubmit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// 关键代码,spark on k8s时,这里通过参数解析出真正的执行入口
// childMainClass=org.apache.spark.deploy.k8s.submit.KubernetesClientApplication
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

...

val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}

var mainClass: Class[_] = null

try {
mainClass = Utils.classForName(childMainClass)
} catch {
...
}

val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}

...

try {
// 执行KubernetesClientApplication的start入口
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
throw findCause(t)
} finally {
...
}
}

prepareSubmitEnvironment的核心代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = args.toSparkConf()
if (sparkConf.contains("spark.local.connect")) sparkConf.remove("spark.remote")
var childMain Class = ""

// Set the cluster manager
val clusterManager: Int = args.maybeMaster match {
case Some(v) =>
assert(args.maybeRemote.isEmpty || sparkConf.contains("spark.local.connect"))
v match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, k8s, or local")
-1
}
case None => LOCAL // default master or remote mode.
}

// Set the deploy mode; default is client mode
val deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ =>
error("Deploy mode must be either client or cluster")
-1
}

...

val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER

...

if (isKubernetesCluster) {
// KUBERNETES_CLUSTER_SUBMIT_CLASS = org.apache.spark.deploy.k8s.submit.KubernetesClientApplication
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
if (args.isPython) {
childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
childArgs ++= Array("--primary-r-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
}
else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
} else {
childArgs ++= Array("--main-class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += "--arg" += arg
}
}
// Pass the proxyUser to the k8s app so it is possible to add it to the driver args
if (args.proxyUser != null) {
childArgs += "--proxy-user" += args.proxyUser
}
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
}

// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
sparkConf.remove(DRIVER_HOST_ADDRESS)
}

...

(childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
}

1.3 KubernetesClientApplication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private[spark] class KubernetesClientApplication extends SparkApplication {

override def start(args: Array[String], conf: SparkConf): Unit = {
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
run(parsedArguments, conf)
}

private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
val kubernetesAppId = KubernetesConf.getKubernetesAppId()
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
kubernetesAppId,
clientArguments.mainAppResource,
clientArguments.mainClass,
clientArguments.driverArgs,
clientArguments.proxyUser)
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)

Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Some(kubernetesConf.namespace),
KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
SparkKubernetesClientFactory.ClientType.Submission,
sparkConf,
None)) { kubernetesClient =>
// 创建Client对象,在run方法中然后请求K8S创建Spark Driver相关资源
val client = new Client(
kubernetesConf,
// KubernetesDriverBuilder是核心代码,其中涉及所有的Driver资源构建
new KubernetesDriverBuilder(),
kubernetesClient,
watcher)
client.run()
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

class KubernetesDriverBuilder {

def buildFromFeatures(
conf: KubernetesDriverConf,
client: KubernetesClient): KubernetesDriverSpec = {

...

val features = Seq(
// 初始化核心参数,例如CPU、内存、TCP端口等
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
// 创建Service,供Executor访问
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
// Spark Driver的提交参数配置
new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures

val spec = KubernetesDriverSpec(
initialPod,
driverPreKubernetesResources = Seq.empty,
driverKubernetesResources = Seq.empty,
conf.sparkConf.getAll.toMap)

features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
val addedPreResources = feature.getAdditionalPreKubernetesResources()
val addedResources = feature.getAdditionalKubernetesResources()
KubernetesDriverSpec(
configuredPod,
spec.driverPreKubernetesResources ++ addedPreResources,
spec.driverKubernetesResources ++ addedResources,
spec.systemProperties ++ addedSystemProperties)
}
}

}

2. Driver执行

2.1 driver启动参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java \
-Divy.home=/tmp/.ivy \
-Dos.name=Linux \
-XX:-UseCompressedOops \
-Djdk.lang.Process.launchMechanism=vfork \
-cp ':/opt/spark/jars/*:/bin/jars/*' \
-Xmx1g \
-XX:ActiveProcessorCount=4 \
-Dio.netty.availableProcessors=16 \
org.apache.spark.deploy.SparkSubmit \
--conf spark.driver.bindAddress=10.244.0.145 \
--deploy-mode client \
--properties-file /opt/spark/conf/spark.properties \
--class com.demo.JavaSparkInputSql \
local:/opt/spark/examples/jars/spark-demo-3.5.0-jar-with-dependencies.jar \
U0VMRUNUICogRlJPTSBxaXl1MjAyNDA3MjY=

spark.properties配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#Java properties built from Kubernetes config map with name: spark-drv-fa733f9102d7b49f-conf-map
#Tue Jul 30 16:52:56 CST 2024
spark.kubernetes.driverEnv.META_SPACE=1024m
spark.driver.port=7078
spark.executorEnv.SGX_THREAD=2048
spark.kubernetes.resource.type=java
spark.executor.cores=4
spark.kubernetes.executor.podTemplateFile=/opt/spark/pod-template/pod-spec-template.yml
spark.submit.pyFiles=
spark.executor.memory=1g
spark.kubernetes.driverEnv.SGX_KERNEL_HEAP=2GB
spark.kubernetes.authenticate.submission.clientCertFile=/opt/k8s/client.crt
spark.kubernetes.container.image=spark-tee\:0.0.1
spark.kubernetes.file.upload.path=file\:///tmp
spark.master=k8s\://https\://172.25.165.206\:6443
spark.driver.memory=1g
spark.kubernetes.driver.podTemplateFile=/opt/k8s/driver.yaml
spark.executorEnv.SGX_MEM_SIZE=12GB
spark.kubernetes.driver.pod.name=spark-tee-cluster-29c9989102d7b13e-driver
spark.driver.host=spark-tee-cluster-29c9989102d7b13e-driver-svc.default.svc
spark.executorEnv.SGX_KERNEL_HEAP=1GB
spark.executorEnv.SGX_HEAP=1GB
spark.kubernetes.driverEnv.SGX_HEAP=1GB
spark.kubernetes.authenticate.submission.clientKeyFile=/opt/k8s/client.key
spark.submit.deployMode=cluster
spark.executor.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible\=true
spark.kubernetes.authenticate.driver.serviceAccountName=spark
spark.kubernetes.driverEnv.SGX_MEM_SIZE=10GB
spark.kubernetes.submitInDriver=true
spark.app.submitTime=1722329575709
spark.kubernetes.memoryOverheadFactor=0.1
spark.app.name=spark-tee-cluster
spark.kubernetes.container.image.pullPolicy=IfNotPresent
spark.driver.cores=1
spark.kubernetes.sgx.executor.jvm.mem=7G
spark.kubernetes.authenticate.submission.caCertFile=/opt/k8s/ca.crt
spark.driver.blockManager.port=7079
spark.kubernetes.executor.deleteOnTermination=true
spark.app.id=spark-61f9fb98296548a2b29d78d0ff25988f
spark.kubernetes.driverEnv.DRIVER_MEMORY=1g
spark.kubernetes.driverEnv.SGX_THREAD=2048
spark.network.timeout=300
spark.cores.max=8
spark.kubernetes.sgx.driver.jvm.mem=1G
spark.driver.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible\=true
spark.executor.instances=2
spark.jars=local\:/opt/spark/examples/jars/spark-demo-3.5.0-jar-with-dependencies.jar

2.2 SparkContext

在K8S集群中拉起Driver Pod后,Driver中的Spark进程先创建SparkContext,然后开始尝试创建Executor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
...

master match {
...
// 关键代码,Spark On K8S时,
case masterUrl =>
// 通过类加载器读取ExternalClusterManager实现类KubernetesClusterManager
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
// 这里会创建Executor的资源
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}

2.3 KubernetesClusterManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
  
override def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
...

val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
apiServerUri,
Some(sc.conf.get(KUBERNETES_NAMESPACE)),
authConfPrefix,
SparkKubernetesClientFactory.ClientType.Driver,
sc.conf,
defaultServiceAccountCaCrt)

...

// 关键代码入口,涉及Executor的资源创建
val executorPodsAllocator = makeExecutorPodsAllocator(sc, kubernetesClient, snapshotsStore)

val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
snapshotsStore,
kubernetesClient,
sc.conf)

val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"kubernetes-executor-pod-polling-sync")
val podsPollingEventSource = new ExecutorPodsPollingSnapshotSource(
sc.conf, kubernetesClient, snapshotsStore, eventsPollingExecutor)

new KubernetesClusterSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
sc,
kubernetesClient,
schedulerExecutorService,
snapshotsStore,
executorPodsAllocator,
executorPodsLifecycleEventHandler,
podsWatchEventSource,
podsPollingEventSource)
}



private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore) = {

...
cstr.newInstance(
sc.conf,
sc.env.securityManager,
// 核心代码,涉及Executor的资源创建
new KubernetesExecutorBuilder(),
kubernetesClient,
snapshotsStore,
new SystemClock())
}

2.4 KubernetesExecutorBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

def buildFromFeatures(
conf: KubernetesExecutorConf,
secMgr: SecurityManager,
client: KubernetesClient,
resourceProfile: ResourceProfile): KubernetesExecutorSpec = {
...

val features = Seq(
// 初始化核心参数,例如CPU、内存等
new BasicExecutorFeatureStep(conf, secMgr, resourceProfile),
new ExecutorKubernetesCredentialsFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new HadoopConfExecutorFeatureStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures

val spec = KubernetesExecutorSpec(
initialPod,
executorKubernetesResources = Seq.empty)

features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedResources = feature.getAdditionalKubernetesResources()
KubernetesExecutorSpec(
configuredPod,
spec.executorKubernetesResources ++ addedResources)
}
}

3. Executor执行

3.1 executor启动参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java \
-Djava.net.preferIPv6Addresses=false \
-XX:+IgnoreUnrecognizedVMOptions \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED \
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.net=ALL-UNNAMED \
--add-opens=java.base/java.nio=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED \
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED \
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED \
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED \
-Djdk.reflect.useDirectMethodHandle=false \
-Dio.netty.tryReflectionSetAccessible=true \
-Dspark.driver.port=7078 \
-Dspark.network.timeout=300 \
-Dspark.driver.blockManager.port=7079 \
-XX:-UseCompressedOops \
-XX:ActiveProcessorCount=4 \
-Divy.home=/tmp/.ivy \
-Xms10m \
-Xmx1024m \
-Dos.name=Linux \
-Dio.netty.availableProcessors=32 \
-Djdk.lang.Process.launchMechanism=vfork \
-cp ':/opt/spark/jars/*:/bin/jars/*' \
org.apache.spark.executor.CoarseGrainedExecutorBackend \
--driver-url spark://CoarseGrainedScheduler@spark-tee-cluster-29c9989102d7b13e-driver-svc.default.svc:7078 \
--executor-id 1 \
--cores 4 \
--app-id spark-61f9fb98296548a2b29d78d0ff25988f \
--hostname 10.244.0.146
>