Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 115 additions & 96 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1223,105 +1223,21 @@ func demystifyPendingHelper(status v1.PodStatus, info pluginsCore.TaskInfo) (plu
// waiting:
// message: Back-off pulling image "blah"
// reason: ImagePullBackOff

// Init containers run before regular containers; a stuck init container is
// the causal signal when main containers are merely reporting PodInitializing.
// Inspect init container statuses first so failures like ImagePullBackOff on
// an init container surface with the correct reason/classification rather
// than being masked by the downstream PodInitializing wait on the main container.
for _, containerStatus := range status.InitContainerStatuses {
if containerStatus.State.Waiting != nil {
return classifyWaitingContainer(containerStatus.State.Waiting, c, t)
}
}
for _, containerStatus := range status.ContainerStatuses {
if !containerStatus.Ready {
if containerStatus.State.Waiting != nil {
// There are a variety of reasons that can cause a pod to be in this waiting state.
// Waiting state may be legitimate when the container is being downloaded, started or init containers are running
reason := containerStatus.State.Waiting.Reason
finalReason := fmt.Sprintf("%s|%s", c.Reason, reason)
finalMessage := fmt.Sprintf("%s|%s", c.Message, containerStatus.State.Waiting.Message)
switch reason {
case "ErrImagePull", "ContainerCreating", "PodInitializing":
// But, there are only two "reasons" when a pod is successfully being created and hence it is in
// waiting state
// Refer to https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/kubelet_pods.go
// and look for the default waiting states
// We also want to allow Image pulls to be retried, so ErrImagePull will be ignored
// as it eventually enters into ImagePullBackOff
// ErrImagePull -> Transitionary phase to ImagePullBackOff
// ContainerCreating -> Image is being downloaded
// PodInitializing -> Init containers are running
return pluginsCore.PhaseInfoInitializing(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}), t

case "CreateContainerError":
// This may consist of:
// 1. Transient errors: e.g. failed to reserve
// container name, container name [...] already in use
// by container
// 2. Permanent errors: e.g. no command specified
// To handle both types of errors gracefully without
// arbitrary pattern matching in the message, we simply
// allow a grace period for kubelet to resolve
// transient issues with the container runtime. If the
// error persists beyond this time, the corresponding
// task is marked as failed.
// NOTE: The current implementation checks for a timeout
// by comparing the condition's LastTransitionTime
// based on the corresponding kubelet's clock with the
// current time based on FlytePropeller's clock. This
// is not ideal given that these 2 clocks are NOT
// synced, and therefore, only provides an
// approximation of the elapsed time since the last
// transition.

gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}
return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), t

case "CreateContainerConfigError":
gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}
return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), t

case "InvalidImageName":
return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t

case "ImagePullBackOff":
gracePeriod := config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}

return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), t

default:
// Since we are not checking for all error states, we may end up perpetually
// in the queued state returned at the bottom of this function, until the Pod is reaped
// by K8s and we get elusive 'pod not found' errors
// So be default if the container is not waiting with the PodInitializing/ContainerCreating
// reasons, then we will assume a failure reason, and fail instantly
return pluginsCore.PhaseInfoSystemRetryableFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}
return classifyWaitingContainer(containerStatus.State.Waiting, c, t)
}
}
}
Expand All @@ -1332,6 +1248,109 @@ func demystifyPendingHelper(status v1.PodStatus, info pluginsCore.TaskInfo) (plu
return phaseInfo, t
}

// classifyWaitingContainer maps a container's Waiting state (from either ContainerStatuses or
// InitContainerStatuses) to a Flyte PhaseInfo, applying per-reason grace periods and failure
// classifications. t is the condition's LastTransitionTime used as the grace-period clock —
// matching the pre-existing semantics for regular containers.
func classifyWaitingContainer(waiting *v1.ContainerStateWaiting, c v1.PodCondition, t time.Time) (pluginsCore.PhaseInfo, time.Time) {
// There are a variety of reasons that can cause a pod to be in this waiting state.
// Waiting state may be legitimate when the container is being downloaded, started or init containers are running
reason := waiting.Reason
finalReason := fmt.Sprintf("%s|%s", c.Reason, reason)
finalMessage := fmt.Sprintf("%s|%s", c.Message, waiting.Message)
switch reason {
case "ErrImagePull", "ContainerCreating", "PodInitializing":
// But, there are only two "reasons" when a pod is successfully being created and hence it is in
// waiting state
// Refer to https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/kubelet_pods.go
// and look for the default waiting states
// We also want to allow Image pulls to be retried, so ErrImagePull will be ignored
// as it eventually enters into ImagePullBackOff
// ErrImagePull -> Transitionary phase to ImagePullBackOff
// ContainerCreating -> Image is being downloaded
// PodInitializing -> Init containers are running
return pluginsCore.PhaseInfoInitializing(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}), t

case "CreateContainerError":
// This may consist of:
// 1. Transient errors: e.g. failed to reserve
// container name, container name [...] already in use
// by container
// 2. Permanent errors: e.g. no command specified
// To handle both types of errors gracefully without
// arbitrary pattern matching in the message, we simply
// allow a grace period for kubelet to resolve
// transient issues with the container runtime. If the
// error persists beyond this time, the corresponding
// task is marked as failed.
// NOTE: The current implementation checks for a timeout
// by comparing the condition's LastTransitionTime
// based on the corresponding kubelet's clock with the
// current time based on FlytePropeller's clock. This
// is not ideal given that these 2 clocks are NOT
// synced, and therefore, only provides an
// approximation of the elapsed time since the last
// transition.

gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}
return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), t

case "CreateContainerConfigError":
gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}
return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), t

case "InvalidImageName":
return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t

case "ImagePullBackOff":
gracePeriod := config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}

return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), t

default:
// Since we are not checking for all error states, we may end up perpetually
// in the queued state returned at the bottom of this function, until the Pod is reaped
// by K8s and we get elusive 'pod not found' errors
// So be default if the container is not waiting with the PodInitializing/ContainerCreating
// reasons, then we will assume a failure reason, and fail instantly
return pluginsCore.PhaseInfoSystemRetryableFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}
}

func GetMessageAfterGracePeriod(message string, gracePeriod time.Duration) string {
return fmt.Sprintf("Grace period [%s] exceeded|%s", gracePeriod, message)
}
Expand Down
Loading
Loading