diff --git a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java index 745f826121..646f17ffbd 100755 --- a/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java +++ b/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.java @@ -159,7 +159,14 @@ public FlowSlot() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { - checkFlow(resourceWrapper, context, node, count, prioritized); + try { + checkFlow(resourceWrapper, context, node, count, prioritized); + } catch (PriorityWaitException ex) { + // When a prioritized request passes flow control by waiting, subsequent slots + // (e.g. circuit breaker / degrade) should still be executed. + fireEntry(context, resourceWrapper, node, count, prioritized, args); + throw ex; + } fireEntry(context, resourceWrapper, node, count, prioritized, args); } diff --git a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlotTest.java b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlotTest.java index 1a29e27383..ac698400cc 100644 --- a/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlotTest.java +++ b/sentinel-core/src/test/java/com/alibaba/csp/sentinel/slots/block/flow/FlowSlotTest.java @@ -16,11 +16,13 @@ package com.alibaba.csp.sentinel.slots.block.flow; import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.context.Context; import com.alibaba.csp.sentinel.context.ContextTestUtil; import com.alibaba.csp.sentinel.node.DefaultNode; +import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot; import com.alibaba.csp.sentinel.slotchain.ResourceWrapper; import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper; import com.alibaba.csp.sentinel.util.function.Function; @@ -29,6 +31,7 @@ import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; /** @@ -93,4 +96,85 @@ public void testCheckFlowBlock() throws Exception { flowSlot.checkFlow(new StringResourceWrapper(resA, EntryType.IN), context, node, 1, false); } + + @Test + @SuppressWarnings("unchecked") + public void testPriorityWaitExceptionStillFiresSubsequentSlots() throws Throwable { + FlowRuleChecker checker = mock(FlowRuleChecker.class); + FlowSlot flowSlot = new FlowSlot(checker); + Context context = mock(Context.class); + DefaultNode node = mock(DefaultNode.class); + ResourceWrapper resource = new StringResourceWrapper("testRes", EntryType.IN); + + // Make checker throw PriorityWaitException + doThrow(new PriorityWaitException(100)).when(checker).checkFlow( + any(Function.class), any(ResourceWrapper.class), any(Context.class), + any(DefaultNode.class), anyInt(), anyBoolean()); + + // Track whether the next slot in the chain is called + final AtomicBoolean nextSlotCalled = new AtomicBoolean(false); + AbstractLinkedProcessorSlot nextSlot = new AbstractLinkedProcessorSlot() { + @Override + public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, + int count, boolean prioritized, Object... args) { + nextSlotCalled.set(true); + } + + @Override + public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { + } + }; + flowSlot.setNext(nextSlot); + + // entry() should still throw PriorityWaitException + try { + flowSlot.entry(context, resource, node, 1, true); + fail("Should throw PriorityWaitException"); + } catch (PriorityWaitException ex) { + assertEquals(100, ex.getWaitInMs()); + } + + // Subsequent slot should have been executed + assertTrue("Next slot should be called even when PriorityWaitException is thrown", + nextSlotCalled.get()); + } + + @Test + @SuppressWarnings("unchecked") + public void testPriorityWaitWithDownstreamBlockPropagatesBlockException() throws Throwable { + FlowRuleChecker checker = mock(FlowRuleChecker.class); + FlowSlot flowSlot = new FlowSlot(checker); + Context context = mock(Context.class); + DefaultNode node = mock(DefaultNode.class); + ResourceWrapper resource = new StringResourceWrapper("testRes", EntryType.IN); + + // Make checker throw PriorityWaitException + doThrow(new PriorityWaitException(100)).when(checker).checkFlow( + any(Function.class), any(ResourceWrapper.class), any(Context.class), + any(DefaultNode.class), anyInt(), anyBoolean()); + + // Simulate a downstream slot (e.g. circuit breaker) that blocks + AbstractLinkedProcessorSlot blockingSlot = new AbstractLinkedProcessorSlot() { + @Override + public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, + int count, boolean prioritized, Object... args) throws Throwable { + throw new FlowException("blocked by downstream"); + } + + @Override + public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { + } + }; + flowSlot.setNext(blockingSlot); + + // BlockException from downstream should propagate instead of PriorityWaitException + try { + flowSlot.entry(context, resource, node, 1, true); + fail("Should throw BlockException"); + } catch (FlowException ex) { + // BlockException from downstream slot should propagate + } catch (PriorityWaitException ex) { + fail("BlockException from downstream should take precedence"); + } + } }