Skip to content

ControllerProcessor save→load round-trip through CheckpointState throws Kryo NPE on StatisticsManager.inputStatistics #4686

@Yicong-Huang

Description

@Yicong-Huang

What happened?

ControllerProcessor save → load round-trip through CheckpointState throws KryoException: NullPointerException on StatisticsManager.inputStatistics for a default-constructed instance.

Controller.loadFromCheckpoint relies exactly on this round-trip — it does cpState: ControllerProcessor = chkpt.load(SerializedState.CP_STATE_KEY), then re-attaches runtime services and revives workers. If load() is called against a CheckpointState whose serialized ControllerProcessor has any uninitialized StatisticsManager map, the rehydrate path will explode with this NPE before the controller can resume.

The existing CheckpointSpec only asserts chkpt.save(CP_STATE_KEY, cp) succeeds — it never validates that chkpt.load(CP_STATE_KEY) works on the same object. As a result this gap has been silently uncovered.

How to reproduce?

Add a load() to CheckpointSpec's "Default controller state should be serializable" test:

"ControllerProcessor" should "round-trip through CheckpointState" in {
  val cp = new ControllerProcessor(
    workflow.context,
    ControllerConfig.default,
    CONTROLLER,
    msg => {}
  )
  val chkpt = new CheckpointState()
  chkpt.save(CP_STATE_KEY, cp)
  val restored: ControllerProcessor = chkpt.load(CP_STATE_KEY)  // throws NPE
  assert(restored.actorId == cp.actorId)
}

Run: sbt 'WorkflowExecutionService / Test / testOnly org.apache.texera.amber.engine.faulttolerance.CheckpointSpec'

Observed: the round-trip test fails; the existing should be serializable tests still pass (because they never call load).

Branch

main

Commit Hash (Optional)

Relevant log output

[info] - should round-trip through CheckpointState *** FAILED ***
[info]   com.esotericsoftware.kryo.kryo5.KryoException: java.lang.NullPointerException
[info]   Serialization trace:
[info]   inputStatistics (org.apache.texera.amber.engine.architecture.worker.managers.StatisticsManager)
[info]   statisticsManager (org.apache.texera.amber.engine.architecture.controller.ControllerProcessor)
[info]   at com.esotericsoftware.kryo.kryo5.serializers.ReflectField.read(ReflectField.java:146)
[info]   at com.esotericsoftware.kryo.kryo5.serializers.FieldSerializer.read(FieldSerializer.java:130)
[info]   at com.esotericsoftware.kryo.kryo5.Kryo.readObject(Kryo.java:796)
[info]   ...

Likely fix direction

  • Either give StatisticsManager.inputStatistics (and any other null-default fields) a non-null default value that survives Kryo round-trip, or
  • Mark fields that must not be serialized with @transient and re-initialize them in loadFromCheckpoint's attachRuntimeServicesToCPState.

The two existing should be serializable tests in CheckpointSpec should be replaced (or paired) with full round-trip tests so this class of regression is caught before checkpoint/replay is exercised on a real workflow.

Out of scope

  • The actual Controller.loadFromCheckpoint / take-checkpoint-then-resume e2e is bigger and lives behind a commented-out test in CheckpointSpec already. Once this NPE is fixed, restoring that e2e test is the natural follow-up.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions