diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go index 5976d46c2fe..c61c5b2fb8f 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -1223,105 +1223,21 @@ func demystifyPendingHelper(status v1.PodStatus, info pluginsCore.TaskInfo) (plu // waiting: // message: Back-off pulling image "blah" // reason: ImagePullBackOff + + // Init containers run before regular containers; a stuck init container is + // the causal signal when main containers are merely reporting PodInitializing. + // Inspect init container statuses first so failures like ImagePullBackOff on + // an init container surface with the correct reason/classification rather + // than being masked by the downstream PodInitializing wait on the main container. + for _, containerStatus := range status.InitContainerStatuses { + if containerStatus.State.Waiting != nil { + return classifyWaitingContainer(containerStatus.State.Waiting, c, t) + } + } for _, containerStatus := range status.ContainerStatuses { if !containerStatus.Ready { if containerStatus.State.Waiting != nil { - // There are a variety of reasons that can cause a pod to be in this waiting state. - // Waiting state may be legitimate when the container is being downloaded, started or init containers are running - reason := containerStatus.State.Waiting.Reason - finalReason := fmt.Sprintf("%s|%s", c.Reason, reason) - finalMessage := fmt.Sprintf("%s|%s", c.Message, containerStatus.State.Waiting.Message) - switch reason { - case "ErrImagePull", "ContainerCreating", "PodInitializing": - // But, there are only two "reasons" when a pod is successfully being created and hence it is in - // waiting state - // Refer to https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/kubelet_pods.go - // and look for the default waiting states - // We also want to allow Image pulls to be retried, so ErrImagePull will be ignored - // as it eventually enters into ImagePullBackOff - // ErrImagePull -> Transitionary phase to ImagePullBackOff - // ContainerCreating -> Image is being downloaded - // PodInitializing -> Init containers are running - return pluginsCore.PhaseInfoInitializing(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}), t - - case "CreateContainerError": - // This may consist of: - // 1. Transient errors: e.g. failed to reserve - // container name, container name [...] already in use - // by container - // 2. Permanent errors: e.g. no command specified - // To handle both types of errors gracefully without - // arbitrary pattern matching in the message, we simply - // allow a grace period for kubelet to resolve - // transient issues with the container runtime. If the - // error persists beyond this time, the corresponding - // task is marked as failed. - // NOTE: The current implementation checks for a timeout - // by comparing the condition's LastTransitionTime - // based on the corresponding kubelet's clock with the - // current time based on FlytePropeller's clock. This - // is not ideal given that these 2 clocks are NOT - // synced, and therefore, only provides an - // approximation of the elapsed time since the last - // transition. - - gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration - if time.Since(t) >= gracePeriod { - return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ - OccurredAt: &t, - }), t - } - return pluginsCore.PhaseInfoInitializing( - t, - pluginsCore.DefaultPhaseVersion, - fmt.Sprintf("[%s]: %s", finalReason, finalMessage), - &pluginsCore.TaskInfo{OccurredAt: &t}, - ), t - - case "CreateContainerConfigError": - gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration - if time.Since(t) >= gracePeriod { - return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ - OccurredAt: &t, - }), t - } - return pluginsCore.PhaseInfoInitializing( - t, - pluginsCore.DefaultPhaseVersion, - fmt.Sprintf("[%s]: %s", finalReason, finalMessage), - &pluginsCore.TaskInfo{OccurredAt: &t}, - ), t - - case "InvalidImageName": - return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{ - OccurredAt: &t, - }), t - - case "ImagePullBackOff": - gracePeriod := config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration - if time.Since(t) >= gracePeriod { - return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ - OccurredAt: &t, - }), t - } - - return pluginsCore.PhaseInfoInitializing( - t, - pluginsCore.DefaultPhaseVersion, - fmt.Sprintf("[%s]: %s", finalReason, finalMessage), - &pluginsCore.TaskInfo{OccurredAt: &t}, - ), t - - default: - // Since we are not checking for all error states, we may end up perpetually - // in the queued state returned at the bottom of this function, until the Pod is reaped - // by K8s and we get elusive 'pod not found' errors - // So be default if the container is not waiting with the PodInitializing/ContainerCreating - // reasons, then we will assume a failure reason, and fail instantly - return pluginsCore.PhaseInfoSystemRetryableFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{ - OccurredAt: &t, - }), t - } + return classifyWaitingContainer(containerStatus.State.Waiting, c, t) } } } @@ -1332,6 +1248,109 @@ func demystifyPendingHelper(status v1.PodStatus, info pluginsCore.TaskInfo) (plu return phaseInfo, t } +// classifyWaitingContainer maps a container's Waiting state (from either ContainerStatuses or +// InitContainerStatuses) to a Flyte PhaseInfo, applying per-reason grace periods and failure +// classifications. t is the condition's LastTransitionTime used as the grace-period clock — +// matching the pre-existing semantics for regular containers. +func classifyWaitingContainer(waiting *v1.ContainerStateWaiting, c v1.PodCondition, t time.Time) (pluginsCore.PhaseInfo, time.Time) { + // There are a variety of reasons that can cause a pod to be in this waiting state. + // Waiting state may be legitimate when the container is being downloaded, started or init containers are running + reason := waiting.Reason + finalReason := fmt.Sprintf("%s|%s", c.Reason, reason) + finalMessage := fmt.Sprintf("%s|%s", c.Message, waiting.Message) + switch reason { + case "ErrImagePull", "ContainerCreating", "PodInitializing": + // But, there are only two "reasons" when a pod is successfully being created and hence it is in + // waiting state + // Refer to https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/kubelet_pods.go + // and look for the default waiting states + // We also want to allow Image pulls to be retried, so ErrImagePull will be ignored + // as it eventually enters into ImagePullBackOff + // ErrImagePull -> Transitionary phase to ImagePullBackOff + // ContainerCreating -> Image is being downloaded + // PodInitializing -> Init containers are running + return pluginsCore.PhaseInfoInitializing(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}), t + + case "CreateContainerError": + // This may consist of: + // 1. Transient errors: e.g. failed to reserve + // container name, container name [...] already in use + // by container + // 2. Permanent errors: e.g. no command specified + // To handle both types of errors gracefully without + // arbitrary pattern matching in the message, we simply + // allow a grace period for kubelet to resolve + // transient issues with the container runtime. If the + // error persists beyond this time, the corresponding + // task is marked as failed. + // NOTE: The current implementation checks for a timeout + // by comparing the condition's LastTransitionTime + // based on the corresponding kubelet's clock with the + // current time based on FlytePropeller's clock. This + // is not ideal given that these 2 clocks are NOT + // synced, and therefore, only provides an + // approximation of the elapsed time since the last + // transition. + + gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration + if time.Since(t) >= gracePeriod { + return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ + OccurredAt: &t, + }), t + } + return pluginsCore.PhaseInfoInitializing( + t, + pluginsCore.DefaultPhaseVersion, + fmt.Sprintf("[%s]: %s", finalReason, finalMessage), + &pluginsCore.TaskInfo{OccurredAt: &t}, + ), t + + case "CreateContainerConfigError": + gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration + if time.Since(t) >= gracePeriod { + return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ + OccurredAt: &t, + }), t + } + return pluginsCore.PhaseInfoInitializing( + t, + pluginsCore.DefaultPhaseVersion, + fmt.Sprintf("[%s]: %s", finalReason, finalMessage), + &pluginsCore.TaskInfo{OccurredAt: &t}, + ), t + + case "InvalidImageName": + return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{ + OccurredAt: &t, + }), t + + case "ImagePullBackOff": + gracePeriod := config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration + if time.Since(t) >= gracePeriod { + return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ + OccurredAt: &t, + }), t + } + + return pluginsCore.PhaseInfoInitializing( + t, + pluginsCore.DefaultPhaseVersion, + fmt.Sprintf("[%s]: %s", finalReason, finalMessage), + &pluginsCore.TaskInfo{OccurredAt: &t}, + ), t + + default: + // Since we are not checking for all error states, we may end up perpetually + // in the queued state returned at the bottom of this function, until the Pod is reaped + // by K8s and we get elusive 'pod not found' errors + // So be default if the container is not waiting with the PodInitializing/ContainerCreating + // reasons, then we will assume a failure reason, and fail instantly + return pluginsCore.PhaseInfoSystemRetryableFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{ + OccurredAt: &t, + }), t + } +} + func GetMessageAfterGracePeriod(message string, gracePeriod time.Duration) string { return fmt.Sprintf("Grace period [%s] exceeded|%s", gracePeriod, message) } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index f30a6475325..e7c6087508b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -2706,6 +2706,211 @@ func TestDemystifyPending(t *testing.T) { assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase()) assert.True(t, taskStatus.CleanupOnFailure()) }) + + // Init container scenarios: a stuck init container must win over the downstream + // PodInitializing wait that K8s surfaces on the main container while init is blocked. + mainContainerPodInitializing := []v1.ContainerStatus{ + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "PodInitializing", + Message: "waiting for init containers", + }, + }, + }, + } + + t.Run("InitContainer_ErrImagePull", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.InitContainerStatuses = []v1.ContainerStatus{ + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "ErrImagePull", + Message: "cannot pull init image", + }, + }, + }, + } + s2.ContainerStatuses = mainContainerPodInitializing + taskStatus, err := DemystifyPending(s2, pluginsCore.TaskInfo{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseInitializing, taskStatus.Phase()) + }) + + t.Run("InitContainer_ImagePullBackOffWithinGracePeriod", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.Conditions[0].LastTransitionTime = metav1.Now() + s2.InitContainerStatuses = []v1.ContainerStatus{ + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "ImagePullBackOff", + Message: "Back-off pulling image \"bad-init-image\"", + }, + }, + }, + } + s2.ContainerStatuses = mainContainerPodInitializing + taskStatus, err := DemystifyPending(s2, pluginsCore.TaskInfo{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseInitializing, taskStatus.Phase()) + }) + + t.Run("InitContainer_ImagePullBackOffOutsideGracePeriod", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.Conditions[0].LastTransitionTime.Time = metav1.Now().Add(-config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration) + s2.InitContainerStatuses = []v1.ContainerStatus{ + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "ImagePullBackOff", + Message: "Back-off pulling image \"bad-init-image\"", + }, + }, + }, + } + s2.ContainerStatuses = mainContainerPodInitializing + taskStatus, err := DemystifyPending(s2, pluginsCore.TaskInfo{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) + assert.Contains(t, taskStatus.Err().Code, "ImagePullBackOff") + assert.Contains(t, taskStatus.Err().Message, "bad-init-image") + }) + + t.Run("InitContainer_InvalidImageName", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.InitContainerStatuses = []v1.ContainerStatus{ + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "InvalidImageName", + Message: "couldn't parse image reference", + }, + }, + }, + } + s2.ContainerStatuses = mainContainerPodInitializing + taskStatus, err := DemystifyPending(s2, pluginsCore.TaskInfo{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) + }) + + t.Run("InitContainer_CreateContainerConfigErrorOutsideGracePeriod", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.Conditions[0].LastTransitionTime.Time = metav1.Now().Add(-config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration) + s2.InitContainerStatuses = []v1.ContainerStatus{ + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "CreateContainerConfigError", + Message: "secret \"my-secret\" not found", + }, + }, + }, + } + s2.ContainerStatuses = mainContainerPodInitializing + taskStatus, err := DemystifyPending(s2, pluginsCore.TaskInfo{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) + }) + + t.Run("InitContainer_CreateContainerErrorOutsideGracePeriod", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.Conditions[0].LastTransitionTime.Time = metav1.Now().Add(-config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration) + s2.InitContainerStatuses = []v1.ContainerStatus{ + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "CreateContainerError", + Message: "no command specified", + }, + }, + }, + } + s2.ContainerStatuses = mainContainerPodInitializing + taskStatus, err := DemystifyPending(s2, pluginsCore.TaskInfo{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) + }) + + t.Run("InitContainer_DefaultUnknownReason", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.InitContainerStatuses = []v1.ContainerStatus{ + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "RandomInitError", + Message: "something went wrong in init", + }, + }, + }, + } + s2.ContainerStatuses = mainContainerPodInitializing + taskStatus, err := DemystifyPending(s2, pluginsCore.TaskInfo{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) + }) + + t.Run("InitContainer_SucceededFallsThroughToMainContainer", func(t *testing.T) { + // When init containers have completed (State.Waiting == nil) we should fall + // through to regular-container inspection and surface the main container's wait. + s2 := *s.DeepCopy() + s2.InitContainerStatuses = []v1.ContainerStatus{ + { + Ready: true, + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ExitCode: 0}, + }, + }, + } + s2.ContainerStatuses = mainContainerPodInitializing + taskStatus, err := DemystifyPending(s2, pluginsCore.TaskInfo{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseInitializing, taskStatus.Phase()) + }) + + t.Run("InitContainer_FirstSucceededSecondFailingPicksFailing", func(t *testing.T) { + // A prior init container succeeded; the next is stuck on ImagePullBackOff past grace. + s2 := *s.DeepCopy() + s2.Conditions[0].LastTransitionTime.Time = metav1.Now().Add(-config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration) + s2.InitContainerStatuses = []v1.ContainerStatus{ + { + Ready: true, + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ExitCode: 0}, + }, + }, + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "ImagePullBackOff", + Message: "Back-off pulling image \"second-init\"", + }, + }, + }, + } + s2.ContainerStatuses = mainContainerPodInitializing + taskStatus, err := DemystifyPending(s2, pluginsCore.TaskInfo{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase()) + assert.True(t, taskStatus.CleanupOnFailure()) + assert.Contains(t, taskStatus.Err().Message, "second-init") + }) } func TestDemystifyPendingTimeout(t *testing.T) {