Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions quickfixj-core/src/test/java/quickfix/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -1837,6 +1839,15 @@ private News createAppMessage(int sequence) {
return news;
}

private quickfix.fix42.News createFix42AppMessage(int sequence) {
final quickfix.fix42.News news = new quickfix.fix42.News(new Headline("Headline"));
news.getHeader().setString(SenderCompID.FIELD, "TARGET");
news.getHeader().setString(TargetCompID.FIELD, "SENDER");
news.getHeader().setInt(MsgSeqNum.FIELD, sequence);
news.getHeader().setUtcTimeStamp(SendingTime.FIELD, LocalDateTime.now(ZoneOffset.UTC));
return news;
}

private SessionState getSessionState(Session session)
throws NoSuchFieldException, IllegalAccessException {
final Field stateField = session.getClass().getDeclaredField("state");
Expand Down Expand Up @@ -3239,4 +3250,120 @@ public void testResendAbortsWhenSendReturnsFalse() throws Exception {
assertEquals("Only 2 messages should succeed", 2, responder.sentMessages.size());
}
}

@Test
public void testChunkedResendRequestBoundariesAndQueueDrain_FIX42_chunk2500_fromAppCalledForQueuedAppMessages()
throws Exception {
final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "SENDER", "TARGET");
final CountingApplication application = new CountingApplication();
final boolean isInitiator = false, resetOnLogon = false, validateSequenceNumbers = true;
final int resendRequestChunkSize = 2500;

try (Session session = new Session(application,
new MemoryStoreFactory(), new InMemoryMessageQueueFactory(), sessionID, null, null, null,
new SLF4JLogFactory(new SessionSettings()),
new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30,
UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true,
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
false, false, false, false, true, false, true, false, null, true,
resendRequestChunkSize, false, false, true, new ArrayList<>(),
Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {

UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
final SessionState state = getSessionState(session);
final InMemoryMessageQueue queue = (InMemoryMessageQueue) state.getMessageQueue();

// Logon at seqnum 1 using FIX42 Logon (session expects seqnum 1)
quickfix.fix42.Logon logon42 = new quickfix.fix42.Logon();
setUpHeader(session.getSessionID(), logon42, true, 1);
logon42.setInt(HeartBtInt.FIELD, 30);
logon42.setInt(EncryptMethod.FIELD, 0);
logon42.toString();
session.next(logon42);
assertTrue(session.isLoggedOn());

// Simulate session state being far ahead
session.setNextTargetMsgSeqNum(57989);

// Capture resend request chunk boundaries via a SessionStateListener
final List<int[]> resendEvents = new ArrayList<>();
session.addStateListener(new SessionStateListener() {
@Override
public void onResendRequestSent(SessionID sid, int beginSeqNo, int endSeqNo, int currentEndSeqNo) {
resendEvents.add(new int[]{beginSeqNo, endSeqNo, currentEndSeqNo});
}
});

// Receive queued application messages far ahead of expected target seqnum
processMessage(session, createFix42AppMessage(61989));
processMessage(session, createFix42AppMessage(61990));
processMessage(session, createFix42AppMessage(61991));
for (int i = 61992; i <= 62001; i++) {
processMessage(session, createFix42AppMessage(i));
}

// Before the gap is filled, none of the queued messages should have been delivered
assertEquals(0, application.fromAppCount);

// Assert first resend request chunk: begin=57989, currentEnd=60488
assertEquals(57989, resendEvents.get(0)[0]);
assertEquals(60488, resendEvents.get(0)[2]);

// Process first SequenceReset GapFill covering 57989..60488 (admin gap)
Message sr1 = createSequenceReset(57989, 60489, true);
sr1.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX42);
processMessage(session, sr1);

// After first GapFill, a second resend request chunk should have been sent
assertEquals(60489, resendEvents.get(1)[0]);
assertEquals(61988, resendEvents.get(1)[2]);

// Process final SequenceReset GapFill: NewSeqNo=61989 (first queued app message),
// so queued messages 61989..62001 remain in the queue and are drained by nextQueued()
Message sr2 = createSequenceReset(60489, 61989, true);
sr2.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX42);
processMessage(session, sr2);

// After draining, the in-memory queue must be empty
assertTrue(queue.getBackingMap().isEmpty());

// fromApp() must have been called for every queued message seqnum 61989..62001
for (int seq = 61989; seq <= 62001; seq++) {
assertTrue("fromApp not called for seqnum " + seq,
application.fromAppSeqNums.contains(seq));
}
}
}

private static class CountingApplication implements Application {
int fromAppCount = 0;
final Set<Integer> fromAppSeqNums = new HashSet<>();

@Override
public void fromApp(Message message, SessionID sessionId)
throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
fromAppCount++;
fromAppSeqNums.add(message.getHeader().getInt(MsgSeqNum.FIELD));
}

@Override
public void toApp(Message message, SessionID sessionId) throws DoNotSend {}

@Override
public void fromAdmin(Message message, SessionID sessionId)
throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {}

@Override
public void toAdmin(Message message, SessionID sessionId) {}

@Override
public void onCreate(SessionID sessionId) {}

@Override
public void onLogon(SessionID sessionId) {}

@Override
public void onLogout(SessionID sessionId) {}
}
}
Loading