Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,13 @@ type ImageAndServiceModule struct {
type JobTaskFreestyleSpec struct {
Properties JobProperties `bson:"properties" json:"properties" yaml:"properties"`
Steps []*StepTask `bson:"steps" json:"steps" yaml:"steps"`
Events *Events `bson:"events" json:"events" yaml:"events"`
}

type JobTaskPluginSpec struct {
Properties JobProperties `bson:"properties" json:"properties" yaml:"properties"`
Plugin *PluginTemplate `bson:"plugin" json:"plugin" yaml:"plugin"`
Events *Events `bson:"events" json:"events" yaml:"events"`
}

type JobTaskBlueGreenDeploySpec struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func NewFreestyleJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.Wor
if err := commonmodels.IToi(job.Spec, jobTaskSpec); err != nil {
logger.Error(err)
}
if jobTaskSpec.Events == nil {
jobTaskSpec.Events = &commonmodels.Events{}
}
job.Spec = jobTaskSpec
return &FreestyleJobCtl{
job: job,
Expand Down Expand Up @@ -1164,7 +1167,10 @@ func (c *FreestyleJobCtl) cleanupHelperPod(ctx context.Context, client crClient.
func (c *FreestyleJobCtl) wait(ctx context.Context) {
var err error
taskTimeout := time.After(time.Duration(c.jobTaskSpec.Properties.Timeout) * time.Minute)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, taskTimeout, c.logger)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, taskTimeout, c.logger, func(events *commonmodels.Events) {
c.jobTaskSpec.Events = events
c.ack()
})
if err != nil {
c.job.Error = err.Error()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func NewPluginsJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.Workf
if err := commonmodels.IToi(job.Spec, jobTaskSpec); err != nil {
logger.Error(err)
}
if jobTaskSpec.Events == nil {
jobTaskSpec.Events = &commonmodels.Events{}
}
job.Spec = jobTaskSpec
return &PluginJobCtl{
job: job,
Expand Down Expand Up @@ -150,7 +153,10 @@ func (c *PluginJobCtl) run(ctx context.Context) error {
func (c *PluginJobCtl) wait(ctx context.Context) {
var err error
timeout := time.After(time.Duration(c.jobTaskSpec.Properties.Timeout) * time.Minute)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, timeout, c.logger)
c.job.Status, err = waitJobStart(ctx, c.jobTaskSpec.Properties.Namespace, c.job.K8sJobName, c.kubeclient, c.apiServer, timeout, c.logger, func(events *commonmodels.Events) {
c.jobTaskSpec.Events = events
c.ack()
})
if err != nil {
c.logger.Errorf("wait job start error: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ func int64Ptr(i int64) *int64 { return &i }

func WaitPlainJobEnd(ctx context.Context, taskTimeout int, namespace, jobName string, kubeClient crClient.Client, apiServer crClient.Reader, xl *zap.SugaredLogger) config.Status {
timeout := time.After(time.Duration(taskTimeout) * time.Minute)
status, err := waitJobStart(ctx, namespace, jobName, kubeClient, apiServer, timeout, xl)
status, err := waitJobStart(ctx, namespace, jobName, kubeClient, apiServer, timeout, xl, nil)
if err != nil {
xl.Errorf("wait job start error: %v", err)
}
Expand Down Expand Up @@ -936,12 +936,67 @@ func waitPlainJobEnd(ctx context.Context, taskTimeout int, timeout <-chan time.T
}
}

func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crClient.Client, apiReader client.Reader, timeout <-chan time.Time, xl *zap.SugaredLogger) (config.Status, error) {
func appendPendingPodEvents(podName, namespace string, apiReader client.Reader, events *commonmodels.Events, reported map[string]struct{}, onUpdate func(*commonmodels.Events), xl *zap.SugaredLogger) {
if events == nil {
return
}

selector := fields.Set{"involvedObject.name": podName, "involvedObject.kind": setting.Pod}.AsSelector()
kubeEvents, err := getter.ListEvents(namespace, selector, apiReader)
if err != nil {
xl.Errorf("list events failed for pod %s/%s: %s", namespace, podName, err)
return
}

sort.SliceStable(kubeEvents, func(i, j int) bool {
return kubeEvents[i].CreationTimestamp.Unix() < kubeEvents[j].CreationTimestamp.Unix()
})

changed := false
for _, kubeEvent := range kubeEvents {
eventKey := fmt.Sprintf("%s|%s|%s", kubeEvent.Type, kubeEvent.Reason, kubeEvent.Message)
if _, ok := reported[eventKey]; ok {
continue
}
reported[eventKey] = struct{}{}

eventType := "info"
if kubeEvent.Type == corev1.EventTypeWarning {
eventType = "error"
}

eventTime := kubeEvent.LastTimestamp
if eventTime.IsZero() {
eventTime = kubeEvent.FirstTimestamp
}
if eventTime.IsZero() && !kubeEvent.EventTime.IsZero() {
eventTime = metav1.NewTime(kubeEvent.EventTime.Time)
}
if eventTime.IsZero() {
eventTime = metav1.NewTime(time.Now())
}

*events = append(*events, &commonmodels.Event{
EventType: eventType,
Time: eventTime.Format("2006-01-02 15:04:05"),
Message: fmt.Sprintf("Pod event [%s/%s]: %s", kubeEvent.Type, kubeEvent.Reason, kubeEvent.Message),
})
changed = true
}

if changed && onUpdate != nil {
onUpdate(events)
}
}

func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crClient.Client, apiReader client.Reader, timeout <-chan time.Time, xl *zap.SugaredLogger, onEventsUpdate func(*commonmodels.Events)) (config.Status, error) {
xl.Infof("wait job to start: %s/%s", namespace, jobName)
xl.Infof("Timeout of preparing Pod: %s.", 120*time.Second)
waitPodReadyTimeout := time.After(120 * time.Second)

var podReadyTimeout bool
reportedEvents := make(map[string]struct{})
events := &commonmodels.Events{}
for {
select {
case <-ctx.Done():
Expand All @@ -966,6 +1021,8 @@ func waitJobStart(ctx context.Context, namespace, jobName string, kubeClient crC
continue
}
for _, pod := range podList {
appendPendingPodEvents(pod.Name, namespace, apiReader, events, reportedEvents, onEventsUpdate, xl)

if pod.Status.Phase == corev1.PodFailed {
msg := ""
for _, condition := range pod.Status.Conditions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type JobTaskPreview struct {
ErrorHandlerUserID string `bson:"error_handler_user_id" yaml:"error_handler_user_id" json:"error_handler_user_id"`
ErrorHandlerUserName string `bson:"error_handler_username" yaml:"error_handler_username" json:"error_handler_username"`
RetryCount int `bson:"retry_count" yaml:"retry_count" json:"retry_count"`
Events *commonmodels.Events `bson:"events" json:"events"`
// JobInfo contains the fields that make up the job task name, for frontend display
JobInfo interface{} `bson:"job_info" json:"job_info"`
}
Expand Down Expand Up @@ -2479,6 +2480,23 @@ func HandleJobError(workflowName, jobName, userID, username string, taskID int64
return nil
}

func extractRuntimeJobEvents(job *commonmodels.JobTask) *commonmodels.Events {
switch job.JobType {
case string(config.JobFreestyle), string(config.JobZadigBuild), string(config.JobZadigTesting), string(config.JobZadigScanning), string(config.JobZadigDistributeImage):
taskJobSpec := &commonmodels.JobTaskFreestyleSpec{}
if err := commonmodels.IToi(job.Spec, taskJobSpec); err == nil {
return taskJobSpec.Events
}
case string(config.JobPlugin):
taskJobSpec := &commonmodels.JobTaskPluginSpec{}
if err := commonmodels.IToi(job.Spec, taskJobSpec); err == nil {
return taskJobSpec.Events
}
}

return nil
}

func jobsToJobPreviews(jobs []*commonmodels.JobTask, context map[string]string, now int64, projectName string) []*JobTaskPreview {
envMap := make(map[string]*commonmodels.Product)
resp := []*JobTaskPreview{}
Expand Down Expand Up @@ -2510,6 +2528,7 @@ func jobsToJobPreviews(jobs []*commonmodels.JobTask, context map[string]string,
ErrorHandlerUserID: job.ErrorHandlerUserID,
ErrorHandlerUserName: job.ErrorHandlerUserName,
RetryCount: job.RetryCount,
Events: extractRuntimeJobEvents(job),
}
switch job.JobType {
case string(config.JobFreestyle):
Expand Down
Loading