Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,14 @@ public FlowSlot() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
try {
checkFlow(resourceWrapper, context, node, count, prioritized);
} catch (PriorityWaitException ex) {
// When a prioritized request passes flow control by waiting, subsequent slots
// (e.g. circuit breaker / degrade) should still be executed.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
throw ex;
}

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package com.alibaba.csp.sentinel.slots.block.flow;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.context.ContextTestUtil;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.util.function.Function;
Expand All @@ -29,6 +31,7 @@
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

/**
Expand Down Expand Up @@ -93,4 +96,85 @@ public void testCheckFlowBlock() throws Exception {

flowSlot.checkFlow(new StringResourceWrapper(resA, EntryType.IN), context, node, 1, false);
}

@Test
@SuppressWarnings("unchecked")
public void testPriorityWaitExceptionStillFiresSubsequentSlots() throws Throwable {
FlowRuleChecker checker = mock(FlowRuleChecker.class);
FlowSlot flowSlot = new FlowSlot(checker);
Context context = mock(Context.class);
DefaultNode node = mock(DefaultNode.class);
ResourceWrapper resource = new StringResourceWrapper("testRes", EntryType.IN);

// Make checker throw PriorityWaitException
doThrow(new PriorityWaitException(100)).when(checker).checkFlow(
any(Function.class), any(ResourceWrapper.class), any(Context.class),
any(DefaultNode.class), anyInt(), anyBoolean());

// Track whether the next slot in the chain is called
final AtomicBoolean nextSlotCalled = new AtomicBoolean(false);
AbstractLinkedProcessorSlot<DefaultNode> nextSlot = new AbstractLinkedProcessorSlot<DefaultNode>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count, boolean prioritized, Object... args) {
nextSlotCalled.set(true);
}

@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
}
};
flowSlot.setNext(nextSlot);

// entry() should still throw PriorityWaitException
try {
flowSlot.entry(context, resource, node, 1, true);
fail("Should throw PriorityWaitException");
} catch (PriorityWaitException ex) {
assertEquals(100, ex.getWaitInMs());
}

// Subsequent slot should have been executed
assertTrue("Next slot should be called even when PriorityWaitException is thrown",
nextSlotCalled.get());
}

@Test
@SuppressWarnings("unchecked")
public void testPriorityWaitWithDownstreamBlockPropagatesBlockException() throws Throwable {
FlowRuleChecker checker = mock(FlowRuleChecker.class);
FlowSlot flowSlot = new FlowSlot(checker);
Context context = mock(Context.class);
DefaultNode node = mock(DefaultNode.class);
ResourceWrapper resource = new StringResourceWrapper("testRes", EntryType.IN);

// Make checker throw PriorityWaitException
doThrow(new PriorityWaitException(100)).when(checker).checkFlow(
any(Function.class), any(ResourceWrapper.class), any(Context.class),
any(DefaultNode.class), anyInt(), anyBoolean());

// Simulate a downstream slot (e.g. circuit breaker) that blocks
AbstractLinkedProcessorSlot<DefaultNode> blockingSlot = new AbstractLinkedProcessorSlot<DefaultNode>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count, boolean prioritized, Object... args) throws Throwable {
throw new FlowException("blocked by downstream");
}

@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
}
};
flowSlot.setNext(blockingSlot);

// BlockException from downstream should propagate instead of PriorityWaitException
try {
flowSlot.entry(context, resource, node, 1, true);
fail("Should throw BlockException");
} catch (FlowException ex) {
// BlockException from downstream slot should propagate
} catch (PriorityWaitException ex) {
fail("BlockException from downstream should take precedence");
}
}
}