diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 19bede7b8..0d98e6334 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -2186,151 +2186,149 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre } } } + state.lockSenderMsgSeqNum(); + try { + // QFJ-926 - reset session before accepting Logon + resetIfSessionNotCurrent(sessionID, SystemTime.currentTimeMillis()); - // QFJ-926 - reset session before accepting Logon - resetIfSessionNotCurrent(sessionID, SystemTime.currentTimeMillis()); - - if (refreshOnLogon) { - refreshState(); - } - - if (logon.isSetField(ResetSeqNumFlag.FIELD)) { - state.setResetReceived(logon.getBoolean(ResetSeqNumFlag.FIELD)); - } else if (state.isResetSent() && logon.getHeader().getInt(MsgSeqNum.FIELD) == 1) { // QFJ-383 - getLog().onEvent( - "Inferring ResetSeqNumFlag as sequence number is 1 in response to reset request"); - state.setResetReceived(true); - } + if (refreshOnLogon) { + refreshState(); + } - if (!verify(logon, false, validateSequenceNumbers)) { - return; - } + if (logon.isSetField(ResetSeqNumFlag.FIELD)) { + state.setResetReceived(logon.getBoolean(ResetSeqNumFlag.FIELD)); + } else if (state.isResetSent() && logon.getHeader().getInt(MsgSeqNum.FIELD) == 1) { // QFJ-383 + getLog().onEvent( + "Inferring ResetSeqNumFlag as sequence number is 1 in response to reset request"); + state.setResetReceived(true); + } - if (state.isResetReceived()) { - getLog().onEvent("Logon contains ResetSeqNumFlag=Y, resetting sequence numbers to 1"); - if (!state.isResetSent()) { - resetState(); + if (!verify(logon, false, validateSequenceNumbers)) { + return; } - } - if (state.isLogonSendNeeded() && !state.isResetReceived()) { - disconnect("Received logon response before sending request", true); - return; - } + if (state.isResetReceived()) { + getLog().onEvent("Logon contains ResetSeqNumFlag=Y, resetting sequence numbers to 1"); + if (!state.isResetSent()) { + resetState(); + } + } - if (!state.isInitiator() && resetOnLogon) { - resetState(); - } + if (state.isLogonSendNeeded() && !state.isResetReceived()) { + disconnect("Received logon response before sending request", true); + return; + } + if (!state.isInitiator() && resetOnLogon) { + resetState(); + } - // reset logout messages - state.setLogoutReceived(false); - state.setLogoutSent(false); - state.setLogonReceived(true); - // remember the expected sender sequence number of any logon response for future use - final int nextSenderMsgNumAtLogonReceived = state.getMessageStore().getNextSenderMsgSeqNum(); - final int sequence = logon.getHeader().getInt(MsgSeqNum.FIELD); + // reset logout messages + state.setLogoutReceived(false); + state.setLogoutSent(false); + state.setLogonReceived(true); - /* - * We test here that it's not too high (which would result in a resend) and that we are not - * resetting on logon 34=1 - */ - final boolean isLogonInNormalSequence = !(isTargetTooHigh(sequence) && !resetOnLogon); - // if we have a tag 789 sent to us... - if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { + // remember the expected sender sequence number of any logon response for future use + final int nextSenderMsgNumAtLogonReceived = state.getMessageStore().getNextSenderMsgSeqNum(); + final int sequence = logon.getHeader().getInt(MsgSeqNum.FIELD); - final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); - state.lockSenderMsgSeqNum(); - final int actualNextNum; - try { - actualNextNum = state.getNextSenderMsgSeqNum(); - } finally { - state.unlockSenderMsgSeqNum(); - } - // Is the 789 we received too high ?? - if (targetWantsNextSeqNumToBe > actualNextNum) { - // barf! we can't resend what we never sent! something unrecoverable has happened. - final String err = "Tag " + NextExpectedMsgSeqNum.FIELD - + " (NextExpectedMsgSeqNum) is higher than expected. Expected " - + actualNextNum + ", Received " + targetWantsNextSeqNumToBe; - generateLogout(err); - disconnect(err, true); - return; - } - } - getLog().onEvent("Received logon"); - if (!state.isInitiator()) { /* - * If we got one too high they need messages resent use the first message they missed (as we gap fill with that). - * If we reset on logon, the current value will be 1 and we always send 2 (we haven't inc'd for current message yet +1) - * If happy path (we haven't inc'd for current message yet so its current +1) + * We test here that it's not too high (which would result in a resend) and that we are not + * resetting on logon 34=1 */ - int nextExpectedTargetNum = state.getMessageStore().getNextTargetMsgSeqNum(); - // we increment for the logon later (after Logon response sent) in this method if and only if in sequence - if (isLogonInNormalSequence) { - // logon was fine take account of it in 789 - nextExpectedTargetNum++; + final boolean isLogonInNormalSequence = !(isTargetTooHigh(sequence) && !resetOnLogon); + // if we have a tag 789 sent to us... + if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { + + final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); + final int actualNextNum = state.getNextSenderMsgSeqNum(); + // Is the 789 we received too high ?? + if (targetWantsNextSeqNumToBe > actualNextNum) { + // barf! we can't resend what we never sent! something unrecoverable has happened. + final String err = "Tag " + NextExpectedMsgSeqNum.FIELD + + " (NextExpectedMsgSeqNum) is higher than expected. Expected " + + actualNextNum + ", Received " + targetWantsNextSeqNumToBe; + generateLogout(err); + disconnect(err, true); + return; + } + } + getLog().onEvent("Received logon"); + if (!state.isInitiator()) { + /* + * If we got one too high they need messages resent use the first message they missed (as we gap fill with that). + * If we reset on logon, the current value will be 1 and we always send 2 (we haven't inc'd for current message yet +1) + * If happy path (we haven't inc'd for current message yet so its current +1) + */ + int nextExpectedTargetNum = state.getMessageStore().getNextTargetMsgSeqNum(); + // we increment for the logon later (after Logon response sent) in this method if and only if in sequence + if (isLogonInNormalSequence) { + // logon was fine take account of it in 789 + nextExpectedTargetNum++; + } + generateLogon(logon, nextExpectedTargetNum); } - generateLogon(logon, nextExpectedTargetNum); - } - // Check for proper sequence reset response - if (state.isResetSent() && !state.isResetReceived()) { - disconnect("Expected Logon response to have reset sequence numbers in response to ResetSeqNumFlag", true); - return; - } + // Check for proper sequence reset response + if (state.isResetSent() && !state.isResetReceived()) { + disconnect("Expected Logon response to have reset sequence numbers in response to ResetSeqNumFlag", true); + return; + } - state.setResetSent(false); - state.setResetReceived(false); + state.setResetSent(false); + state.setResetReceived(false); - // Looking at the sequence number of the incoming Logon, is it too high indicating possible missed messages ? .. - if (!isLogonInNormalSequence) { - // if 789 was sent then we effectively have already sent a resend request - if (state.isExpectedLogonNextSeqNumSent()) { - // Mark state as if we have already sent a resend request from the logon's 789 (we sent) to infinity. - // This will supress the resend request in doTargetTooHigh ... - state.setResetRangeFromLastExpectedLogonNextSeqNumLogon(); - getLog().onEvent("Required resend will be suppressed as we are setting tag 789"); - } - if (validateSequenceNumbers) { - doTargetTooHigh(logon); + // Looking at the sequence number of the incoming Logon, is it too high indicating possible missed messages ? .. + if (!isLogonInNormalSequence) { + // if 789 was sent then we effectively have already sent a resend request + if (state.isExpectedLogonNextSeqNumSent()) { + // Mark state as if we have already sent a resend request from the logon's 789 (we sent) to infinity. + // This will supress the resend request in doTargetTooHigh ... + state.setResetRangeFromLastExpectedLogonNextSeqNumLogon(); + getLog().onEvent("Required resend will be suppressed as we are setting tag 789"); + } + if (validateSequenceNumbers) { + doTargetTooHigh(logon); + } + } else { + state.incrNextTargetMsgSeqNum(); + nextQueued(); } - } else { - state.incrNextTargetMsgSeqNum(); - nextQueued(); - } - // Do we have a 789 - if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { - final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); + // Do we have a 789 + if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { + final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); - // is the 789 lower (we checked for higher previously) than our next message after receiving the logon - if (targetWantsNextSeqNumToBe != nextSenderMsgNumAtLogonReceived) { - int endSeqNo = nextSenderMsgNumAtLogonReceived; + // is the 789 lower (we checked for higher previously) than our next message after receiving the logon + if (targetWantsNextSeqNumToBe != nextSenderMsgNumAtLogonReceived) { + int endSeqNo = nextSenderMsgNumAtLogonReceived; - // Just do a gap fill when messages aren't persisted - if (!persistMessages) { - endSeqNo += 1; - final int next = state.getNextSenderMsgSeqNum(); - if (endSeqNo > next) { - endSeqNo = next; + // Just do a gap fill when messages aren't persisted + if (!persistMessages) { + endSeqNo += 1; + final int next = state.getNextSenderMsgSeqNum(); + if (endSeqNo > next) { + endSeqNo = next; + } + getLog().onEvent( + "Received implicit ResendRequest via Logon FROM: " + + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived + + " will be reset"); + generateSequenceReset(logon, targetWantsNextSeqNumToBe, // 34= + endSeqNo); // (NewSeqNo 36=) + } else { + // resend missed messages + getLog().onEvent( + "Received implicit ResendRequest via Logon FROM: " + + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived + + " will be resent"); + resendMessages(logon, targetWantsNextSeqNumToBe, endSeqNo); } - getLog().onEvent( - "Received implicit ResendRequest via Logon FROM: " - + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived - + " will be reset"); - generateSequenceReset(logon, targetWantsNextSeqNumToBe, // 34= - endSeqNo); // (NewSeqNo 36=) - } else { - // resend missed messages - getLog().onEvent( - "Received implicit ResendRequest via Logon FROM: " - + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived - + " will be resent"); - resendMessages(logon, targetWantsNextSeqNumToBe, endSeqNo); } } + } finally { + state.unlockSenderMsgSeqNum(); } if (isLoggedOn()) { try { diff --git a/quickfixj-core/src/test/java/quickfix/SessionLogonSeqNumIssue902Test.java b/quickfixj-core/src/test/java/quickfix/SessionLogonSeqNumIssue902Test.java new file mode 100644 index 000000000..e8db79ab4 --- /dev/null +++ b/quickfixj-core/src/test/java/quickfix/SessionLogonSeqNumIssue902Test.java @@ -0,0 +1,373 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +package quickfix; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import quickfix.field.BeginString; +import quickfix.field.EncryptMethod; +import quickfix.field.HeartBtInt; +import quickfix.field.MsgSeqNum; +import quickfix.field.ResetSeqNumFlag; +import quickfix.field.SenderCompID; +import quickfix.field.TargetCompID; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Regression test for GitHub issue #902. + * + *
Bug description: A QuickFIX/J acceptor responds to a client Logon + * with {@code ResetSeqNumFlag=Y} and {@code MsgSeqNum(34)=1} with a Logon + * response carrying {@code MsgSeqNum=2} instead of 1. + * + *
Root cause: Inside {@code Session.sendRaw()}, every outbound + * message goes through {@code persist()}, which increments the + * next-sender-sequence-number before the {@code send()} call. The + * condition that gates the actual {@code send()} call is: + *
+ * if (MsgType.LOGON.equals(msgType) || MsgType.LOGOUT.equals(msgType)
+ * || MsgType.RESEND_REQUEST.equals(msgType)
+ * || MsgType.SEQUENCE_RESET.equals(msgType) || isLoggedOn()) {
+ * result = send(messageString);
+ * }
+ *
+ * A Heartbeat message does NOT match any of those types, and at the moment
+ * it is triggered the session is not yet fully logged on ({@code isLogonSent()}
+ * is still {@code false}). Therefore the heartbeat's call to {@code persist()}
+ * bumps the sequence counter from 1 to 2, but the heartbeat is silently
+ * dropped. When the acceptor's own Logon response is subsequently serialised
+ * and sent, it picks up sequence number 2.
+ *
+ * How the race arises in production: In {@code nextLogon()}, + * {@code state.setLogonReceived(true)} is called before + * {@code generateLogon()} is called. Between those two operations, a + * timer/heartbeat thread calling {@code next()} sees + * {@code state.isLogonReceived()==true} and {@code isHeartBeatNeeded()==true} + * (because {@code lastSentTime} is stale from the previous session), so it + * invokes {@code generateHeartbeat()}, consuming seq=1 without sending the + * heartbeat. The subsequent {@code generateLogon()} then uses seq=2. + * + *
Test strategy: We inject the race deterministically using a
+ * custom {@link Log} implementation that intercepts the {@code "Received logon"}
+ * event (which fires between {@code setLogonReceived(true)} and
+ * {@code generateLogon()} inside {@code nextLogon()}) and calls
+ * {@link Session#next()} from within that event handler. We use
+ * {@link MockSystemTimeSource} to ensure {@code isHeartBeatNeeded()} returns
+ * {@code true} at exactly that moment.
+ */
+public class SessionLogonSeqNumIssue902Test {
+
+ private static final String RECEIVED_LOGON_EVENT = "Received logon";
+
+ private MockSystemTimeSource timeSource;
+ private Session acceptorSession;
+ private RecordingResponder responder;
+
+ /** Holds the session so the log hook can reach it after construction. */
+ private final AtomicReference The test deliberately injects a simulated timer-thread interleaving
+ * (via the session log hook described in the class Javadoc) so that
+ * {@code generateHeartbeat()} runs between {@code setLogonReceived(true)}
+ * and {@code generateLogon()} inside {@code nextLogon()}. This reproduces
+ * the race condition described in issue #902.
+ *
+ * With the current (buggy) code the heartbeat's {@code persist()} call
+ * bumps the sequence counter from 1 to 2 without sending the heartbeat,
+ * so the Logon response ends up with {@code MsgSeqNum=2}. The assertion
+ * at the end therefore fails, proving that the bug exists.
+ */
+ @Test
+ public void testAcceptorLogonResponseSeqNumIsOneWhenResetSeqNumFlagReceived()
+ throws Exception {
+
+ // -----------------------------------------------------------------
+ // 1. Build the acceptor session.
+ //
+ // The custom LogFactory creates a Log whose onEvent() method
+ // fires session.next() when "Received logon" is logged. That
+ // log event is emitted by nextLogon() AFTER setLogonReceived(true)
+ // but BEFORE generateLogon(), which is exactly the window where
+ // the production race condition occurs.
+ // -----------------------------------------------------------------
+ SessionID sessionID = new SessionID(
+ FixVersions.BEGINSTRING_FIX44, "ACCEPTOR", "CLIENT");
+
+ LogFactory injectingLogFactory = id -> new Log() {
+ @Override public void clear() {}
+ @Override public void onIncoming(String message) {}
+ @Override public void onOutgoing(String message) {}
+ @Override public void onErrorEvent(String text) {}
+
+ @Override
+ public void onEvent(String text) {
+ /*
+ * "Received logon" is the log event emitted at line ~2262
+ * of Session.java, between:
+ * state.setLogonReceived(true) [line ~2229]
+ * generateLogon(...) [line ~2275]
+ *
+ * Calling session.next() here models a timer thread that
+ * woke up at exactly this critical moment.
+ *
+ * The AtomicBoolean guard ensures we inject at most once,
+ * even if re-entrant log calls occur.
+ */
+ if (RECEIVED_LOGON_EVENT.equals(text)
+ && heartbeatInjected.compareAndSet(false, true)) {
+ Session s = sessionRef.get();
+ if (s != null) {
+ try {
+ // This triggers generateHeartbeat() which calls
+ // persist() and bumps the seq counter 1 → 2
+ // without sending the heartbeat (isLogonSent()
+ // is still false at this point).
+ s.next();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unexpected IOException in injected next() call", e);
+ }
+ }
+ }
+ }
+ };
+
+ /*
+ * Build the acceptor session with heartbeatInterval=0 (the default for
+ * acceptors). This ensures that SessionState.isInitiator() == false,
+ * which is required for nextLogon() to call generateLogon() and thus
+ * produce the Logon response.
+ *
+ * NOTE: heartbeatInterval=0 causes the Session constructor to set
+ * state.initiator = (0 != 0) = false. If we passed 30 here,
+ * state.initiator would be true and generateLogon() would be
+ * skipped entirely, making the test scenario impossible to set up.
+ */
+ acceptorSession = new SessionFactoryTestSupport.Builder()
+ .setSessionId(sessionID)
+ .setApplication(new UnitTestApplication())
+ .setIsInitiator(false)
+ .setCheckLatency(false)
+ .setCheckCompID(true)
+ .setPersistMessages(true)
+ .setLogFactory(injectingLogFactory)
+ .build();
+
+ /*
+ * Now update the heartbeat interval to 30 s WITHOUT changing
+ * state.isInitiator() (which is final and was set to false above).
+ * This makes next() reach the isHeartBeatNeeded() check instead of
+ * returning early at the "getHeartBeatInterval() == 0" guard, which
+ * is the prerequisite for the bug to manifest.
+ */
+ acceptorSession.setHeartBeatInterval(30);
+
+ sessionRef.set(acceptorSession);
+
+ responder = new RecordingResponder();
+ acceptorSession.setResponder(responder);
+
+ // -----------------------------------------------------------------
+ // 2. Simulate a previous session by setting seq numbers to 5
+ // and recording lastSentTime at the INITIAL clock value (t=1 000 000).
+ // This makes lastSentTime "stale" once we advance the clock.
+ // -----------------------------------------------------------------
+ SessionState state = getSessionState(acceptorSession);
+ state.getMessageStore().setNextSenderMsgSeqNum(5);
+ state.getMessageStore().setNextTargetMsgSeqNum(5);
+ state.setLastSentTime(SystemTime.currentTimeMillis()); // = 1_000_000
+
+ // -----------------------------------------------------------------
+ // 3. Advance the mock clock by 31 s (> the 30 s heartbeat interval)
+ // so that isHeartBeatNeeded() returns true when next() is called
+ // from inside the log hook.
+ // -----------------------------------------------------------------
+ timeSource.increment(31_000L); // clock = 1_031_000
+
+ // -----------------------------------------------------------------
+ // 4. Deliver the incoming Logon with ResetSeqNumFlag=Y, MsgSeqNum=1.
+ //
+ // Execution path (with the injected interleaving):
+ // nextLogon()
+ // → resetState() : seq counter reset to 1
+ // → setLogonReceived(true) : session now appears "logon received"
+ // → log("Received logon") : our hook fires session.next()
+ // → isHeartBeatNeeded() = true (31 s > 30 s, stale time)
+ // → generateHeartbeat()
+ // → sendRaw(heartbeat):
+ // persist() bumps seq 1 → 2
+ // send() NOT called (isLogonSent() still false)
+ // → next() returns
+ // → generateLogon() : initializeHeader sets MsgSeqNum=2 ← BUG
+ // → sendRaw(logon): sends the Logon response with "34=2"
+ // -----------------------------------------------------------------
+ acceptorSession.next(buildIncomingLogon(sessionID));
+
+ // -----------------------------------------------------------------
+ // 5. Sanity-check: the log hook must have fired to confirm the test
+ // scenario was actually triggered.
+ // -----------------------------------------------------------------
+ assertTrue("Log hook for 'Received logon' must have fired during test",
+ heartbeatInjected.get());
+
+ // -----------------------------------------------------------------
+ // 6. Assert: the Logon response MUST carry MsgSeqNum=1.
+ //
+ // With the current buggy code this assertion FAILS because the
+ // heartbeat consumed seq=1 and the Logon response was sent with
+ // MsgSeqNum=2.
+ // -----------------------------------------------------------------
+ String logonResponse = findLogonResponse(responder.sentMessages);
+ int seqNum = extractMsgSeqNum(logonResponse);
+
+ assertEquals(
+ "Acceptor Logon response must carry MsgSeqNum=1 when responding to "
+ + "a Logon with ResetSeqNumFlag=Y and MsgSeqNum=1, but got MsgSeqNum="
+ + seqNum,
+ 1,
+ seqNum);
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ /**
+ * Builds an incoming FIX 4.4 Logon message addressed to {@code sessionID}'s
+ * acceptor side, carrying {@code ResetSeqNumFlag=Y} and {@code MsgSeqNum=1}.
+ */
+ private static quickfix.fix44.Logon buildIncomingLogon(SessionID sessionID)
+ throws FieldNotFound {
+ quickfix.fix44.Logon logon = new quickfix.fix44.Logon();
+ logon.getHeader().setString(BeginString.FIELD,
+ sessionID.getBeginString());
+ logon.getHeader().setString(SenderCompID.FIELD,
+ sessionID.getTargetCompID()); // from the remote peer's perspective
+ logon.getHeader().setString(TargetCompID.FIELD,
+ sessionID.getSenderCompID());
+ logon.getHeader().setInt(MsgSeqNum.FIELD, 1);
+ logon.setInt(EncryptMethod.FIELD, EncryptMethod.NONE_OTHER);
+ logon.setInt(HeartBtInt.FIELD, 30);
+ logon.setBoolean(ResetSeqNumFlag.FIELD, true);
+ return logon;
+ }
+
+ /**
+ * Returns the last outbound Logon message (35=A) recorded by the responder.
+ *
+ * @throws AssertionError if no Logon response was sent
+ */
+ private static String findLogonResponse(List