Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,13 @@ public void next() throws IOException {
} else {
getLog().onWarnEvent("Heartbeat failure detected but deactivated");
}
} else {
} else if (isLoggedOn()) {
// QFJ-902: Only generate heartbeats/test requests when the session is
// fully established (Logon sent AND received). Generating a heartbeat
// in the window between setLogonReceived(true) and the acceptor's own
// outgoing Logon being sent would silently consume a sequence number
// via persist() without transmitting the message, causing the Logon
// response to carry the wrong MsgSeqNum.
if (state.isTestRequestNeeded()) {
generateTestRequest("TEST");
getLog().onEvent("Sent test request TEST");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,373 @@
/*******************************************************************************
* Copyright (c) quickfixengine.org All rights reserved.
*
* This file is part of the QuickFIX FIX Engine
*
* This file may be distributed under the terms of the quickfixengine.org
* license as defined by quickfixengine.org and appearing in the file
* LICENSE included in the packaging of this file.
*
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE.
*
* See http://www.quickfixengine.org/LICENSE for licensing information.
*
* Contact ask@quickfixengine.org if any conditions of this licensing
* are not clear to you.
******************************************************************************/

package quickfix;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import quickfix.field.BeginString;
import quickfix.field.EncryptMethod;
import quickfix.field.HeartBtInt;
import quickfix.field.MsgSeqNum;
import quickfix.field.ResetSeqNumFlag;
import quickfix.field.SenderCompID;
import quickfix.field.TargetCompID;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Regression test for GitHub issue #902.
*
* <p><b>Bug description:</b> A QuickFIX/J acceptor responds to a client Logon
* with {@code ResetSeqNumFlag=Y} and {@code MsgSeqNum(34)=1} with a Logon
* response carrying {@code MsgSeqNum=2} instead of 1.
*
* <p><b>Root cause:</b> Inside {@code Session.sendRaw()}, every outbound
* message goes through {@code persist()}, which increments the
* next-sender-sequence-number <em>before</em> the {@code send()} call. The
* condition that gates the actual {@code send()} call is:
* <pre>
* if (MsgType.LOGON.equals(msgType) || MsgType.LOGOUT.equals(msgType)
* || MsgType.RESEND_REQUEST.equals(msgType)
* || MsgType.SEQUENCE_RESET.equals(msgType) || isLoggedOn()) {
* result = send(messageString);
* }
* </pre>
* A Heartbeat message does NOT match any of those types, and at the moment
* it is triggered the session is not yet fully logged on ({@code isLogonSent()}
* is still {@code false}). Therefore the heartbeat's call to {@code persist()}
* bumps the sequence counter from 1 to 2, but the heartbeat is silently
* dropped. When the acceptor's own Logon response is subsequently serialised
* and sent, it picks up sequence number 2.
*
* <p><b>How the race arises in production:</b> In {@code nextLogon()},
* {@code state.setLogonReceived(true)} is called <em>before</em>
* {@code generateLogon()} is called. Between those two operations, a
* timer/heartbeat thread calling {@code next()} sees
* {@code state.isLogonReceived()==true} and {@code isHeartBeatNeeded()==true}
* (because {@code lastSentTime} is stale from the previous session), so it
* invokes {@code generateHeartbeat()}, consuming seq=1 without sending the
* heartbeat. The subsequent {@code generateLogon()} then uses seq=2.
*
* <p><b>Test strategy:</b> We inject the race deterministically using a
* custom {@link Log} implementation that intercepts the {@code "Received logon"}
* event (which fires between {@code setLogonReceived(true)} and
* {@code generateLogon()} inside {@code nextLogon()}) and calls
* {@link Session#next()} from within that event handler. We use
* {@link MockSystemTimeSource} to ensure {@code isHeartBeatNeeded()} returns
* {@code true} at exactly that moment.
*/
public class SessionLogonSeqNumIssue902Test {

private static final String RECEIVED_LOGON_EVENT = "Received logon";

private MockSystemTimeSource timeSource;
private Session acceptorSession;
private RecordingResponder responder;

/** Holds the session so the log hook can reach it after construction. */
private final AtomicReference<Session> sessionRef = new AtomicReference<>();

/**
* Guard that prevents re-entrant injection: the hook must fire at most
* once per test run so that the simulated timer-thread interleaving is
* applied exactly once.
*/
private final AtomicBoolean heartbeatInjected = new AtomicBoolean(false);

@Before
public void setUp() {
timeSource = new MockSystemTimeSource(1_000_000L);
SystemTime.setTimeSource(timeSource);
}

@After
public void tearDown() throws Exception {
SystemTime.setTimeSource(null);
if (acceptorSession != null) {
acceptorSession.close();
}
}

/**
* Verifies that the acceptor's Logon response carries {@code MsgSeqNum=1}
* when it receives a Logon with {@code ResetSeqNumFlag=Y} and
* {@code MsgSeqNum=1}.
*
* <p>The test deliberately injects a simulated timer-thread interleaving
* (via the session log hook described in the class Javadoc) so that
* {@code generateHeartbeat()} runs between {@code setLogonReceived(true)}
* and {@code generateLogon()} inside {@code nextLogon()}. This reproduces
* the race condition described in issue #902.
*
* <p>With the current (buggy) code the heartbeat's {@code persist()} call
* bumps the sequence counter from 1 to 2 without sending the heartbeat,
* so the Logon response ends up with {@code MsgSeqNum=2}. The assertion
* at the end therefore <b>fails</b>, proving that the bug exists.
*/
@Test
public void testAcceptorLogonResponseSeqNumIsOneWhenResetSeqNumFlagReceived()
throws Exception {

// -----------------------------------------------------------------
// 1. Build the acceptor session.
//
// The custom LogFactory creates a Log whose onEvent() method
// fires session.next() when "Received logon" is logged. That
// log event is emitted by nextLogon() AFTER setLogonReceived(true)
// but BEFORE generateLogon(), which is exactly the window where
// the production race condition occurs.
// -----------------------------------------------------------------
SessionID sessionID = new SessionID(
FixVersions.BEGINSTRING_FIX44, "ACCEPTOR", "CLIENT");

LogFactory injectingLogFactory = id -> new Log() {
@Override public void clear() {}
@Override public void onIncoming(String message) {}
@Override public void onOutgoing(String message) {}
@Override public void onErrorEvent(String text) {}

@Override
public void onEvent(String text) {
/*
* "Received logon" is the log event emitted at line ~2262
* of Session.java, between:
* state.setLogonReceived(true) [line ~2229]
* generateLogon(...) [line ~2275]
*
* Calling session.next() here models a timer thread that
* woke up at exactly this critical moment.
*
* The AtomicBoolean guard ensures we inject at most once,
* even if re-entrant log calls occur.
*/
if (RECEIVED_LOGON_EVENT.equals(text)
&& heartbeatInjected.compareAndSet(false, true)) {
Session s = sessionRef.get();
if (s != null) {
try {
// This triggers generateHeartbeat() which calls
// persist() and bumps the seq counter 1 → 2
// without sending the heartbeat (isLogonSent()
// is still false at this point).
s.next();
} catch (IOException e) {
throw new RuntimeException(
"Unexpected IOException in injected next() call", e);
}
}
}
}
};

/*
* Build the acceptor session with heartbeatInterval=0 (the default for
* acceptors). This ensures that SessionState.isInitiator() == false,
* which is required for nextLogon() to call generateLogon() and thus
* produce the Logon response.
*
* NOTE: heartbeatInterval=0 causes the Session constructor to set
* state.initiator = (0 != 0) = false. If we passed 30 here,
* state.initiator would be true and generateLogon() would be
* skipped entirely, making the test scenario impossible to set up.
*/
acceptorSession = new SessionFactoryTestSupport.Builder()
.setSessionId(sessionID)
.setApplication(new UnitTestApplication())
.setIsInitiator(false)
.setCheckLatency(false)
.setCheckCompID(true)
.setPersistMessages(true)
.setLogFactory(injectingLogFactory)
.build();

/*
* Now update the heartbeat interval to 30 s WITHOUT changing
* state.isInitiator() (which is final and was set to false above).
* This makes next() reach the isHeartBeatNeeded() check instead of
* returning early at the "getHeartBeatInterval() == 0" guard, which
* is the prerequisite for the bug to manifest.
*/
acceptorSession.setHeartBeatInterval(30);

sessionRef.set(acceptorSession);

responder = new RecordingResponder();
acceptorSession.setResponder(responder);

// -----------------------------------------------------------------
// 2. Simulate a previous session by setting seq numbers to 5
// and recording lastSentTime at the INITIAL clock value (t=1 000 000).
// This makes lastSentTime "stale" once we advance the clock.
// -----------------------------------------------------------------
SessionState state = getSessionState(acceptorSession);
state.getMessageStore().setNextSenderMsgSeqNum(5);
state.getMessageStore().setNextTargetMsgSeqNum(5);
state.setLastSentTime(SystemTime.currentTimeMillis()); // = 1_000_000

// -----------------------------------------------------------------
// 3. Advance the mock clock by 31 s (> the 30 s heartbeat interval)
// so that isHeartBeatNeeded() returns true when next() is called
// from inside the log hook.
// -----------------------------------------------------------------
timeSource.increment(31_000L); // clock = 1_031_000

// -----------------------------------------------------------------
// 4. Deliver the incoming Logon with ResetSeqNumFlag=Y, MsgSeqNum=1.
//
// Execution path (with the injected interleaving):
// nextLogon()
// → resetState() : seq counter reset to 1
// → setLogonReceived(true) : session now appears "logon received"
// → log("Received logon") : our hook fires session.next()
// → isHeartBeatNeeded() = true (31 s > 30 s, stale time)
// → generateHeartbeat()
// → sendRaw(heartbeat):
// persist() bumps seq 1 → 2
// send() NOT called (isLogonSent() still false)
// → next() returns
// → generateLogon() : initializeHeader sets MsgSeqNum=2 ← BUG
// → sendRaw(logon): sends the Logon response with "34=2"
// -----------------------------------------------------------------
acceptorSession.next(buildIncomingLogon(sessionID));

// -----------------------------------------------------------------
// 5. Sanity-check: the log hook must have fired to confirm the test
// scenario was actually triggered.
// -----------------------------------------------------------------
assertTrue("Log hook for 'Received logon' must have fired during test",
heartbeatInjected.get());

// -----------------------------------------------------------------
// 6. Assert: the Logon response MUST carry MsgSeqNum=1.
//
// With the current buggy code this assertion FAILS because the
// heartbeat consumed seq=1 and the Logon response was sent with
// MsgSeqNum=2.
// -----------------------------------------------------------------
String logonResponse = findLogonResponse(responder.sentMessages);
int seqNum = extractMsgSeqNum(logonResponse);

assertEquals(
"Acceptor Logon response must carry MsgSeqNum=1 when responding to "
+ "a Logon with ResetSeqNumFlag=Y and MsgSeqNum=1, but got MsgSeqNum="
+ seqNum,
1,
seqNum);
}

// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------

/**
* Builds an incoming FIX 4.4 Logon message addressed to {@code sessionID}'s
* acceptor side, carrying {@code ResetSeqNumFlag=Y} and {@code MsgSeqNum=1}.
*/
private static quickfix.fix44.Logon buildIncomingLogon(SessionID sessionID)
throws FieldNotFound {
quickfix.fix44.Logon logon = new quickfix.fix44.Logon();
logon.getHeader().setString(BeginString.FIELD,
sessionID.getBeginString());
logon.getHeader().setString(SenderCompID.FIELD,
sessionID.getTargetCompID()); // from the remote peer's perspective
logon.getHeader().setString(TargetCompID.FIELD,
sessionID.getSenderCompID());
logon.getHeader().setInt(MsgSeqNum.FIELD, 1);
logon.setInt(EncryptMethod.FIELD, EncryptMethod.NONE_OTHER);
logon.setInt(HeartBtInt.FIELD, 30);
logon.setBoolean(ResetSeqNumFlag.FIELD, true);
return logon;
}

/**
* Returns the last outbound Logon message (35=A) recorded by the responder.
*
* @throws AssertionError if no Logon response was sent
*/
private static String findLogonResponse(List<String> sent) {
for (int i = sent.size() - 1; i >= 0; i--) {
if (sent.get(i).contains("\00135=A\001")) {
return sent.get(i);
}
}
throw new AssertionError(
"No Logon response (35=A) found in sent messages: " + sent);
}

/**
* Parses a raw FIX message string and returns the integer value of tag 34
* (MsgSeqNum).
*/
private static int extractMsgSeqNum(String fixMessage) {
for (String field : fixMessage.split("\001")) {
if (field.startsWith("34=")) {
return Integer.parseInt(field.substring(3));
}
}
throw new AssertionError("Tag 34 (MsgSeqNum) not found in message: " + fixMessage);
}

/**
* Obtains the private {@code SessionState} from a {@link Session} via
* reflection.
*/
private static SessionState getSessionState(Session session) throws Exception {
Field stateField = Session.class.getDeclaredField("state");
stateField.setAccessible(true);
return (SessionState) stateField.get(session);
}

// -------------------------------------------------------------------------
// Inner types
// -------------------------------------------------------------------------

/**
* A minimal {@link Responder} that records every raw FIX string passed to
* {@link #send(String)}.
*/
private static final class RecordingResponder implements Responder {

final List<String> sentMessages = new CopyOnWriteArrayList<>();

@Override
public boolean send(String data) {
sentMessages.add(data);
return true;
}

@Override
public void disconnect() {
}

@Override
public String getRemoteAddress() {
return "127.0.0.1:54321";
}
}
}
Loading