diff --git a/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go b/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go index a1544c0a53..0346414570 100644 --- a/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go +++ b/pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go @@ -345,11 +345,13 @@ type ImageAndServiceModule struct { type JobTaskFreestyleSpec struct { Properties JobProperties `bson:"properties" json:"properties" yaml:"properties"` Steps []*StepTask `bson:"steps" json:"steps" yaml:"steps"` + Events *Events `bson:"events" json:"events" yaml:"events"` } type JobTaskPluginSpec struct { Properties JobProperties `bson:"properties" json:"properties" yaml:"properties"` Plugin *PluginTemplate `bson:"plugin" json:"plugin" yaml:"plugin"` + Events *Events `bson:"events" json:"events" yaml:"events"` } type JobTaskBlueGreenDeploySpec struct { diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go index 73c5526221..17ab31a2f6 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_freestyle.go @@ -97,6 +97,9 @@ func NewFreestyleJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.Wor if err := commonmodels.IToi(job.Spec, jobTaskSpec); err != nil { logger.Error(err) } + if jobTaskSpec.Events == nil { + jobTaskSpec.Events = &commonmodels.Events{} + } job.Spec = jobTaskSpec return &FreestyleJobCtl{ job: job, @@ -1164,7 +1167,10 @@ func (c *FreestyleJobCtl) cleanupHelperPod(ctx context.Context, client crClient. func (c *FreestyleJobCtl) wait(ctx context.Context) { var err error taskTimeout := time.After(time.Duration(c.jobTaskSpec.Properties.Timeout) * time.Minute) - c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, taskTimeout, c.logger) + c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, taskTimeout, c.logger, func(events *commonmodels.Events) { + c.jobTaskSpec.Events = events + c.ack() + }) if err != nil { c.job.Error = err.Error() } diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go index 3e5bcf2ff1..4ac3db2018 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go @@ -48,6 +48,9 @@ func NewPluginsJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.Workf if err := commonmodels.IToi(job.Spec, jobTaskSpec); err != nil { logger.Error(err) } + if jobTaskSpec.Events == nil { + jobTaskSpec.Events = &commonmodels.Events{} + } job.Spec = jobTaskSpec return &PluginJobCtl{ job: job, @@ -150,7 +153,10 @@ func (c *PluginJobCtl) run(ctx context.Context) error { func (c *PluginJobCtl) wait(ctx context.Context) { var err error timeout := time.After(time.Duration(c.jobTaskSpec.Properties.Timeout) * time.Minute) - c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, timeout, c.logger) + c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, timeout, c.logger, func(events *commonmodels.Events) { + c.jobTaskSpec.Events = events + c.ack() + }) if err != nil { c.logger.Errorf("wait job start error: %v", err) } diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go index c18fd03d27..3677e56439 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go @@ -896,7 +896,7 @@ func int64Ptr(i int64) *int64 { return &i } func WaitPlainJobEnd(ctx context.Context, taskTimeout int, namespace, jobName string, kubeClient crClient.Client, apiServer crClient.Reader, xl *zap.SugaredLogger) config.Status { timeout := time.After(time.Duration(taskTimeout) * time.Minute) - status, err := waitJobStart(ctx, namespace, jobName, kubeClient, apiServer, timeout, xl) + status, err := waitJobStart(ctx, namespace, jobName, kubeClient, apiServer, timeout, xl, nil) if err != nil { xl.Errorf("wait job start error: %v", err) } @@ -936,12 +936,67 @@ func waitPlainJobEnd(ctx context.Context, taskTimeout int, timeout <-chan time.T } } -func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crClient.Client, apiReader client.Reader, timeout <-chan time.Time, xl *zap.SugaredLogger) (config.Status, error) { +func appendPendingPodEvents(podName, namespace string, apiReader client.Reader, events *commonmodels.Events, reported map[string]struct{}, onUpdate func(*commonmodels.Events), xl *zap.SugaredLogger) { + if events == nil { + return + } + + selector := fields.Set{"involvedObject.name": podName, "involvedObject.kind": setting.Pod}.AsSelector() + kubeEvents, err := getter.ListEvents(namespace, selector, apiReader) + if err != nil { + xl.Errorf("list events failed for pod %s/%s: %s", namespace, podName, err) + return + } + + sort.SliceStable(kubeEvents, func(i, j int) bool { + return kubeEvents[i].CreationTimestamp.Unix() < kubeEvents[j].CreationTimestamp.Unix() + }) + + changed := false + for _, kubeEvent := range kubeEvents { + eventKey := fmt.Sprintf("%s|%s|%s", kubeEvent.Type, kubeEvent.Reason, kubeEvent.Message) + if _, ok := reported[eventKey]; ok { + continue + } + reported[eventKey] = struct{}{} + + eventType := "info" + if kubeEvent.Type == corev1.EventTypeWarning { + eventType = "error" + } + + eventTime := kubeEvent.LastTimestamp + if eventTime.IsZero() { + eventTime = kubeEvent.FirstTimestamp + } + if eventTime.IsZero() && !kubeEvent.EventTime.IsZero() { + eventTime = metav1.NewTime(kubeEvent.EventTime.Time) + } + if eventTime.IsZero() { + eventTime = metav1.NewTime(time.Now()) + } + + *events = append(*events, &commonmodels.Event{ + EventType: eventType, + Time: eventTime.Format("2006-01-02 15:04:05"), + Message: fmt.Sprintf("Pod event [%s/%s]: %s", kubeEvent.Type, kubeEvent.Reason, kubeEvent.Message), + }) + changed = true + } + + if changed && onUpdate != nil { + onUpdate(events) + } +} + +func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crClient.Client, apiReader client.Reader, timeout <-chan time.Time, xl *zap.SugaredLogger, onEventsUpdate func(*commonmodels.Events)) (config.Status, error) { xl.Infof("wait job to start: %s/%s", namespace, jobName) xl.Infof("Timeout of preparing Pod: %s.", 120*time.Second) waitPodReadyTimeout := time.After(120 * time.Second) var podReadyTimeout bool + reportedEvents := make(map[string]struct{}) + events := &commonmodels.Events{} for { select { case <-ctx.Done(): @@ -966,6 +1021,8 @@ func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crC continue } for _, pod := range podList { + appendPendingPodEvents(pod.Name, namespace, apiReader, events, reportedEvents, onEventsUpdate, xl) + if pod.Status.Phase == corev1.PodFailed { msg := "" for _, condition := range pod.Status.Conditions { diff --git a/pkg/microservice/aslan/core/workflow/service/workflow/workflow_task_v4.go b/pkg/microservice/aslan/core/workflow/service/workflow/workflow_task_v4.go index 5e95e09336..cdbc92eb9c 100644 --- a/pkg/microservice/aslan/core/workflow/service/workflow/workflow_task_v4.go +++ b/pkg/microservice/aslan/core/workflow/service/workflow/workflow_task_v4.go @@ -135,6 +135,7 @@ type JobTaskPreview struct { ErrorHandlerUserID string `bson:"error_handler_user_id" yaml:"error_handler_user_id" json:"error_handler_user_id"` ErrorHandlerUserName string `bson:"error_handler_username" yaml:"error_handler_username" json:"error_handler_username"` RetryCount int `bson:"retry_count" yaml:"retry_count" json:"retry_count"` + Events *commonmodels.Events `bson:"events" json:"events"` // JobInfo contains the fields that make up the job task name, for frontend display JobInfo interface{} `bson:"job_info" json:"job_info"` } @@ -2479,6 +2480,23 @@ func HandleJobError(workflowName, jobName, userID, username string, taskID int64 return nil } +func extractRuntimeJobEvents(job *commonmodels.JobTask) *commonmodels.Events { + switch job.JobType { + case string(config.JobFreestyle), string(config.JobZadigBuild), string(config.JobZadigTesting), string(config.JobZadigScanning), string(config.JobZadigDistributeImage): + taskJobSpec := &commonmodels.JobTaskFreestyleSpec{} + if err := commonmodels.IToi(job.Spec, taskJobSpec); err == nil { + return taskJobSpec.Events + } + case string(config.JobPlugin): + taskJobSpec := &commonmodels.JobTaskPluginSpec{} + if err := commonmodels.IToi(job.Spec, taskJobSpec); err == nil { + return taskJobSpec.Events + } + } + + return nil +} + func jobsToJobPreviews(jobs []*commonmodels.JobTask, context map[string]string, now int64, projectName string) []*JobTaskPreview { envMap := make(map[string]*commonmodels.Product) resp := []*JobTaskPreview{} @@ -2510,6 +2528,7 @@ func jobsToJobPreviews(jobs []*commonmodels.JobTask, context map[string]string, ErrorHandlerUserID: job.ErrorHandlerUserID, ErrorHandlerUserName: job.ErrorHandlerUserName, RetryCount: job.RetryCount, + Events: extractRuntimeJobEvents(job), } switch job.JobType { case string(config.JobFreestyle):