spark-operator源码解析

1. spark-operator介绍

Apache Spark的Kubernetes Operator旨在使指定和运行Spark应用程序变得像在Kubernetes上运行其他工作负载一样简单且符合惯例。它使用 Kubernetes 自定义资源来指定、运行和显示Spark应用程序的状态。

image

具体来说,用户使用sparkctl(或kubectl)创建一个SparkApplication对象。SparkApplication控制器通过 API 服务器的观察器接收对象,创建携带spark-submit参数的提交,并将提交发送给提交运行器。提交运行器提交应用程序运行并创建应用程序的驱动程序 pod。驱动程序 pod 启动后会创建执行器 pod。在应用程序运行时,Spark pod 监视器会监视应用程序的 pod,并将 pod 的状态更新发送回控制器,然后控制器会相应地更新应用程序的状态。

2. 代码解析

2.1. 代码结构

  • controller:SparkApplication控制器,用于监视对象的创建、更新和删除事件 SparkApplication,并根据监视事件采取行动,
  • submission runner:用于执行spark-submit
  • monitor:用于监控Spark pod并将pod状态更新发送到控制器
  • Mutating Admission Webhook:Spark 驱动程序和执行程序的定制逻辑
  • ctl:sparkctl命令行工具。

2.2. 启动入口

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 创建controller
applicationController := sparkapplication.NewController(
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, *ingressClassName, batchSchedulerMgr, *enableUIService)
scheduledApplicationController := scheduledsparkapplication.NewController(
crClient, kubeClient, apiExtensionsClient, crInformerFactory, clock.RealClock{})

...

// 启动Worker协程
if err = applicationController.Start(*controllerThreads, stopCh); err != nil {
glog.Fatal(err)
}
if err = scheduledApplicationController.Start(*controllerThreads, stopCh); err != nil {
glog.Fatal(err)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 创建controller时注册监听器

// 监听自定义CRD资源SparkApplication的新增/更新/删除事件
// 然后将事件放入队列中
crdInformer := crdInformerFactory.Sparkoperator().V1beta2().SparkApplications()
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.onAdd,
UpdateFunc: controller.onUpdate,
DeleteFunc: controller.onDelete,
})
controller.applicationLister = crdInformer.Lister()

// 监听POD的新增/更新/删除事件
// 然后将事件放入队列中
podsInformer := podInformerFactory.Core().V1().Pods()
sparkPodEventHandler := newSparkPodEventHandler(controller.queue.AddRateLimited, controller.applicationLister)
podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sparkPodEventHandler.onPodAdded,
UpdateFunc: sparkPodEventHandler.onPodUpdated,
DeleteFunc: sparkPodEventHandler.onPodDeleted,
})
controller.podLister = podsInformer.Lister()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 启动Worker协程
func (c *Controller) Start(workers int, stopCh <-chan struct{}) error {
// Wait for all involved caches to be synced, before processing items from the queue is started.
if !cache.WaitForCacheSync(stopCh, c.cacheSynced) {
return fmt.Errorf("timed out waiting for cache to sync")
}

// 默认开启10个协程,每1S轮询任务队列,执行队列中的任务
glog.Info("Starting the workers of the SparkApplication controller")
for i := 0; i < workers; i++ {
// runWorker will loop until "something bad" happens. Until will then rekick
// the worker after one second.
go wait.Until(c.runWorker, time.Second, stopCh)
}

return nil
}

2.3. 状态机

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
const (
// SparkApplication刚创建或未进行任何操作之前
NewState ApplicationStateType = ""
// Spark应用程序已经成功提交到K8S中,但尚未开始运行
SubmittedState ApplicationStateType = "SUBMITTED"
// Spark应用程序正在运行中
RunningState ApplicationStateType = "RUNNING"
// Spark应用程序已成功完成执行,并正常退出
CompletedState ApplicationStateType = "COMPLETED"
// Spark应用程序执行失败
FailedState ApplicationStateType = "FAILED"
// Spark应用程序提交时失败,未能启动
FailedSubmissionState ApplicationStateType = "SUBMISSION_FAILED"
// Spark应用程序准备重新运行,例如失败后等待重试
PendingRerunState ApplicationStateType = "PENDING_RERUN"
InvalidatingState ApplicationStateType = "INVALIDATING"
// Spark应用程序即将完成,正在进行最后的步骤
SucceedingState ApplicationStateType = "SUCCEEDING"
// Spark应用程序即将失败,正在进行失败处理
FailingState ApplicationStateType = "FAILING"
UnknownState ApplicationStateType = "UNKNOWN"
)

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// 状态机变更核心代码
func (c *Controller) syncSparkApplication(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("failed to get the namespace and name from key %s: %v", key, err)
}
app, err := c.getSparkApplication(namespace, name)
if err != nil {
return err
}
if app == nil {
// SparkApplication not found.
return nil
}
if !app.DeletionTimestamp.IsZero() {
c.handleSparkApplicationDeletion(app)
return nil
}

appCopy := app.DeepCopy()
// Apply the default values to the copy. Note that the default values applied
// won't be sent to the API server as we only update the /status subresource.
v1beta2.SetSparkApplicationDefaults(appCopy)

// Take action based on application state.
switch appCopy.Status.AppState.State {
case v1beta2.NewState:
c.recordSparkApplicationEvent(appCopy)
if err := c.validateSparkApplication(appCopy); err != nil {
appCopy.Status.AppState.State = v1beta2.FailedState
appCopy.Status.AppState.ErrorMessage = err.Error()
} else {
appCopy = c.submitSparkApplication(appCopy)
}
case v1beta2.SucceedingState:
if !shouldRetry(appCopy) {
appCopy.Status.AppState.State = v1beta2.CompletedState
c.recordSparkApplicationEvent(appCopy)
} else {
if err := c.deleteSparkResources(appCopy); err != nil {
glog.Errorf("failed to delete resources associated with SparkApplication %s/%s: %v",
appCopy.Namespace, appCopy.Name, err)
return err
}
appCopy.Status.AppState.State = v1beta2.PendingRerunState
}
case v1beta2.FailingState:
if !shouldRetry(appCopy) {
appCopy.Status.AppState.State = v1beta2.FailedState
c.recordSparkApplicationEvent(appCopy)
} else if isNextRetryDue(appCopy.Spec.RestartPolicy.OnFailureRetryInterval, appCopy.Status.ExecutionAttempts, appCopy.Status.TerminationTime) {
if err := c.deleteSparkResources(appCopy); err != nil {
glog.Errorf("failed to delete resources associated with SparkApplication %s/%s: %v",
appCopy.Namespace, appCopy.Name, err)
return err
}
appCopy.Status.AppState.State = v1beta2.PendingRerunState
}
case v1beta2.FailedSubmissionState:
if !shouldRetry(appCopy) {
// App will never be retried. Move to terminal FailedState.
appCopy.Status.AppState.State = v1beta2.FailedState
c.recordSparkApplicationEvent(appCopy)
} else if isNextRetryDue(appCopy.Spec.RestartPolicy.OnSubmissionFailureRetryInterval, appCopy.Status.SubmissionAttempts, appCopy.Status.LastSubmissionAttemptTime) {
if c.validateSparkResourceDeletion(appCopy) {
c.submitSparkApplication(appCopy)
} else {
if err := c.deleteSparkResources(appCopy); err != nil {
glog.Errorf("failed to delete resources associated with SparkApplication %s/%s: %v",
appCopy.Namespace, appCopy.Name, err)
return err
}
}
}
case v1beta2.InvalidatingState:
// Invalidate the current run and enqueue the SparkApplication for re-execution.
if err := c.deleteSparkResources(appCopy); err != nil {
glog.Errorf("failed to delete resources associated with SparkApplication %s/%s: %v",
appCopy.Namespace, appCopy.Name, err)
return err
}
c.clearStatus(&appCopy.Status)
appCopy.Status.AppState.State = v1beta2.PendingRerunState
case v1beta2.PendingRerunState:
glog.V(2).Infof("SparkApplication %s/%s is pending rerun", appCopy.Namespace, appCopy.Name)
if c.validateSparkResourceDeletion(appCopy) {
glog.V(2).Infof("Resources for SparkApplication %s/%s successfully deleted", appCopy.Namespace, appCopy.Name)
c.recordSparkApplicationEvent(appCopy)
c.clearStatus(&appCopy.Status)
appCopy = c.submitSparkApplication(appCopy)
}
case v1beta2.SubmittedState, v1beta2.RunningState, v1beta2.UnknownState:
if err := c.getAndUpdateAppState(appCopy); err != nil {
return err
}
case v1beta2.CompletedState, v1beta2.FailedState:
if c.hasApplicationExpired(app) {
glog.Infof("Garbage collecting expired SparkApplication %s/%s", app.Namespace, app.Name)
err := c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Delete(context.TODO(), app.Name, metav1.DeleteOptions{GracePeriodSeconds: int64ptr(0)})
if err != nil && !errors.IsNotFound(err) {
return err
}
return nil
}
if err := c.getAndUpdateExecutorState(appCopy); err != nil {
return err
}
}

if appCopy != nil {
err = c.updateStatusAndExportMetrics(app, appCopy)
if err != nil {
glog.Errorf("failed to update SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return err
}

if state := appCopy.Status.AppState.State; state == v1beta2.CompletedState ||
state == v1beta2.FailedState {
if err := c.cleanUpOnTermination(app, appCopy); err != nil {
glog.Errorf("failed to clean up resources for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return err
}
}
}

return nil
}

2.4. 提交Spark任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func runSparkSubmit(submission *submission) (bool, error) {
sparkHome, present := os.LookupEnv(sparkHomeEnvVar)
if !present {
glog.Error("SPARK_HOME is not specified")
}
// 组装spark-submit命令提交Spark任务
command := filepath.Join(sparkHome, "/bin/spark-submit")

cmd := execCommand(command, submission.args...)
glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
output, err := cmd.Output()
glog.V(3).Infof("spark-submit output: %s", string(output))
if err != nil {
var errorMsg string
if exitErr, ok := err.(*exec.ExitError); ok {
errorMsg = string(exitErr.Stderr)
}
// The driver pod of the application already exists.
if strings.Contains(errorMsg, podAlreadyExistsErrorCode) {
glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name)
return false, nil
}
if errorMsg != "" {
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg)
}
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err)
}

return true, nil
}

2.5. scheduler扩展

为了解决K8S调度器的很多痛点,例如任务资源死锁、无队列资源隔离等问题,spark-operator支持使用volcano作为其调度器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (v *VolcanoBatchScheduler) syncPodGroupInClusterMode(app *v1beta2.SparkApplication) error {
//We need both mark Driver and Executor when submitting
//NOTE: In cluster mode, the initial size of PodGroup is set to 1 in order to schedule driver pod first.
if _, ok := app.Spec.Driver.Annotations[v1beta1.KubeGroupNameAnnotationKey]; !ok {
//Both driver and executor resource will be considered.
totalResource := sumResourceList([]corev1.ResourceList{getExecutorRequestResource(app), getDriverRequestResource(app)})

if app.Spec.BatchSchedulerOptions != nil && len(app.Spec.BatchSchedulerOptions.Resources) > 0 {
totalResource = app.Spec.BatchSchedulerOptions.Resources
}
if err := v.syncPodGroup(app, 1, totalResource); err == nil {
app.Spec.Executor.Annotations[v1beta1.KubeGroupNameAnnotationKey] = v.getAppPodGroupName(app)
app.Spec.Driver.Annotations[v1beta1.KubeGroupNameAnnotationKey] = v.getAppPodGroupName(app)
} else {
return err
}
}
return nil
}

func (v *VolcanoBatchScheduler) syncPodGroup(app *v1beta2.SparkApplication, size int32, minResource corev1.ResourceList) error {
var (
err error
pg *v1beta1.PodGroup
)
podGroupName := v.getAppPodGroupName(app)
if pg, err = v.volcanoClient.SchedulingV1beta1().PodGroups(app.Namespace).Get(context.TODO(), podGroupName, metav1.GetOptions{}); err != nil {
if !errors.IsNotFound(err) {
return err
}
podGroup := v1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: app.Namespace,
Name: podGroupName,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(app, v1beta2.SchemeGroupVersion.WithKind("SparkApplication")),
},
},
Spec: v1beta1.PodGroupSpec{
MinMember: size,
MinResources: &minResource,
},
Status: v1beta1.PodGroupStatus{
Phase: v1beta1.PodGroupPending,
},
}

if app.Spec.BatchSchedulerOptions != nil {
//Update pod group queue if it's specified in Spark Application
if app.Spec.BatchSchedulerOptions.Queue != nil {
podGroup.Spec.Queue = *app.Spec.BatchSchedulerOptions.Queue
}
//Update pod group priorityClassName if it's specified in Spark Application
if app.Spec.BatchSchedulerOptions.PriorityClassName != nil {
podGroup.Spec.PriorityClassName = *app.Spec.BatchSchedulerOptions.PriorityClassName
}
}
_, err = v.volcanoClient.SchedulingV1beta1().PodGroups(app.Namespace).Create(context.TODO(), &podGroup, metav1.CreateOptions{})
} else {
if pg.Spec.MinMember != size {
pg.Spec.MinMember = size
_, err = v.volcanoClient.SchedulingV1beta1().PodGroups(app.Namespace).Update(context.TODO(), pg, metav1.UpdateOptions{})
}
}
if err != nil {
return fmt.Errorf("failed to sync PodGroup with error: %s. Abandon schedule pods via volcano", err)
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi-1
namespace: default
spec:
type: Scala
mode: cluster
image: "spark:3.5.0"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar"
sparkVersion: "3.5.0"
batchScheduler: "volcano"
sparkUIOptions:
serviceLabels:
test-label/v1: 'true'
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.5.0
annotations:
scheduling.k8s.io/group-name: spark-pg-1
serviceAccount: spark-operator-spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.5.0
annotations:
scheduling.k8s.io/group-name: spark-pg-1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
---
apiVersion: scheduling.volcano.sh/v1beta1
kind: PodGroup
metadata:
name: spark-pg-1
namespace: default
spec:
minMember: 1
minResources:
cpu: "3"
memory: "1536Mi"
queue: default
>