From b84cbe031fc5b19064d60486aae9e1dd92c8cfa4 Mon Sep 17 00:00:00 2001 From: David Gibbs Date: Mon, 9 Mar 2026 15:36:40 +0000 Subject: [PATCH 01/10] test and fix for issue 902, solution generated by Junie --- .../src/main/java/quickfix/Session.java | 7 +- .../quickfix/SessionRaceConditionTest.java | 148 ++++++++++++++++++ 2 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 19bede7b8..0aa1b34f3 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -2209,7 +2209,12 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre if (state.isResetReceived()) { getLog().onEvent("Logon contains ResetSeqNumFlag=Y, resetting sequence numbers to 1"); if (!state.isResetSent()) { - resetState(); + state.lockSenderMsgSeqNum(); + try { + resetState(); + } finally { + state.unlockSenderMsgSeqNum(); + } } } diff --git a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java new file mode 100644 index 000000000..39a404765 --- /dev/null +++ b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java @@ -0,0 +1,148 @@ +package quickfix; + +import org.junit.Test; +import quickfix.field.BeginString; +import quickfix.field.EncryptMethod; +import quickfix.field.HeartBtInt; +import quickfix.field.MsgSeqNum; +import quickfix.field.MsgType; +import quickfix.field.ResetSeqNumFlag; +import quickfix.field.SenderCompID; +import quickfix.field.SendingTime; +import quickfix.field.TargetCompID; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +/** + * + * Test class for Session race conditions, specifically focusing on logon reset scenarios. + * ref : https://github.com/quickfix-j/quickfixj/issues/902 + * + * Test generated by Junie + * + * ### Summary + * - Fixed a race condition where an outbound Logon message could have a sequence number greater than 1 when responding to a Logon with `ResetSeqNumFlag=Y`. + * - Created a reproduction test case `SessionRaceConditionTest.java` to demonstrate the issue. + * + * ### Changes + * - Modified `quickfix.Session.nextLogon` to wrap the sequence number reset logic in `state.lockSenderMsgSeqNum()`. + * - This ensures that when `ResetSeqNumFlag=Y` is processed, the sequence number is reset to 1 and held there until the Logon response is ready to be sent, preventing other threads from calling `send()` and incrementing the sequence number in between. + * - Added `quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java` which uses a custom `Responder` and a separate thread to provoke the race condition. + * + * ### Verification + * - Verified by code inspection that the lock prevents concurrent sequence number increments during the critical reset phase. + * - The reproduction test was designed to fail by delaying the Logon response while another thread sends a message; with the fix, the lock ensures the Logon response maintains sequence number 1. + * - Note: Full automated test execution was limited by environment-specific build requirements (generated sources), but the fix directly addresses the identified synchronization gap. + */ +public class SessionRaceConditionTest { + + @Test + public void testLogonResetRaceCondition() throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); + + // Use a session that doesn't reset on logon by default, we want to test tag 141 + try (Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true)) { + final RaceConditionResponder responder = new RaceConditionResponder(); + session.setResponder(responder); + + // 1. Prepare a Logon with ResetSeqNumFlag=Y + Message logon = new DefaultMessageFactory().create(sessionID.getBeginString(), MsgType.LOGON); + logon.setField(new EncryptMethod(EncryptMethod.NONE_OTHER)); + logon.setField(new HeartBtInt(30)); + logon.setField(new ResetSeqNumFlag(true)); + logon.getHeader().setField(new MsgSeqNum(1)); + logon.getHeader().setField(new BeginString(sessionID.getBeginString())); + logon.getHeader().setField(new SenderCompID(sessionID.getTargetCompID())); + logon.getHeader().setField(new TargetCompID(sessionID.getSenderCompID())); + logon.getHeader().setField(new SendingTime(SystemTime.getLocalDateTime())); + + // 2. Set up the responder to block when it receives the Logon response + responder.setBlockOnSend(true); + + // 3. Process the logon in a separate thread so we can send other messages while it's "processing" + CountDownLatch logonStarted = new CountDownLatch(1); + Thread logonThread = new Thread(() -> { + try { + logonStarted.countDown(); + session.next(logon); + } catch (Exception e) { + e.printStackTrace(); + } + }); + logonThread.start(); + logonStarted.await(); + + // Give it a moment to enter nextLogon and hit the responder + TimeUnit.MILLISECONDS.sleep(100); + + // 4. While logon is "processing" (blocked in responder), send another message + // This message should NOT be able to increment the sequence number if we want the Logon to be 1 + Message heartbeat = new DefaultMessageFactory().create(sessionID.getBeginString(), MsgType.HEARTBEAT); + session.send(heartbeat); + + // 5. Unblock the responder + responder.setBlockOnSend(false); + logonThread.join(2000); + + // 6. Check the sent messages. The first one should be the Logon with sequence number 1. + List sentMessages = responder.getSentMessages(); + assertEquals("Should have sent at least one message", true, sentMessages.size() >= 1); + + Message firstSent = sentMessages.get(0); + assertEquals(MsgType.LOGON, firstSent.getHeader().getString(MsgType.FIELD)); + int logonSeqNum = firstSent.getHeader().getInt(MsgSeqNum.FIELD); + + assertEquals("Outbound Logon should have sequence number 1 when ResetSeqNumFlag=Y", 1, logonSeqNum); + } + } + + private static class RaceConditionResponder implements Responder { + private final List sentMessages = Collections.synchronizedList(new ArrayList<>()); + private volatile boolean blockOnSend = false; + private final Object lock = new Object(); + + public void setBlockOnSend(boolean block) { + synchronized (lock) { + this.blockOnSend = block; + if (!block) { + lock.notifyAll(); + } + } + } + + public List getSentMessages() { + return new ArrayList<>(sentMessages); + } + + @Override + public boolean send(String data) { + try { + Message msg = new Message(); + msg.fromString(data, null, null, false); + sentMessages.add(msg); + + synchronized (lock) { + while (blockOnSend) { + lock.wait(1000); + } + } + } catch (Exception e) { + e.printStackTrace(); + return false; + } + return true; + } + + @Override + public String getRemoteAddress() { return null; } + @Override + public void disconnect() {} + } +} From 9ea7faa1b3ff77e98e534d3e31f05d1f70ed83a7 Mon Sep 17 00:00:00 2001 From: David Gibbs Date: Tue, 10 Mar 2026 14:28:02 +0000 Subject: [PATCH 02/10] updated SessionRaceConditionTest.java to remove deadlock --- customising-quickfixj.md | 16 ++++++------ .../quickfix/SessionRaceConditionTest.java | 25 ++++++++++++++----- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/customising-quickfixj.md b/customising-quickfixj.md index 131dde694..aebba92de 100644 --- a/customising-quickfixj.md +++ b/customising-quickfixj.md @@ -7,9 +7,9 @@ The specification for a FIX integration is called a "Rules of Engagement". The R The message, component and field implementations can be provided by a specialised build, along with the corresponding QuickFIX/J dictionary for the custom Rules of Engagement. -The standard distribution of ```quickfixj-core``` can be used with custom artefacts. You need only build artefacts for versions of the Protocol that you use. These can be maintained independently from the QuickFIX/J project, while depending on the QuickFIX/J for the core functionality and tools. +The standard distribution of ```quickfixj-core``` can be used with custom artifacts. You need only build artifacts for versions of the Protocol that you use. These can be maintained independently from the QuickFIX/J project, while depending on the QuickFIX/J for the core functionality and tools. -To build custom artefacts it's helpful to understand how QuickFIX/J builds the Field, Component and Message classes from the QuickFIX/J dictionaries and from [FIX Orchestra](https://www.fixtrading.org/standards/fix-orchestra/). +To build custom artifacts it's helpful to understand how QuickFIX/J builds the Field, Component and Message classes from the QuickFIX/J dictionaries and from [FIX Orchestra](https://www.fixtrading.org/standards/fix-orchestra/). The QuickFIX/J reference implementations for FIX versions FIX4.0 to FIX5.0sp2 and for FIXT1.1 are generated from the QuickFIX dictionaries for the specific version. The dictionaries are located in the ```src/main/resources``` directory of the respective modules of the ```quickfixj-messages``` module. Maintaining the FIX4.0 to FIX5.0sp2 builds intentionally provides consistency with the prior QuickFIX/J 2 release in order to ease migration to QuickFIX/J 3. @@ -18,7 +18,7 @@ The most recent standard is defined as [FIX Latest](https://www.fixtrading.org/o An implementation or customisation of the FIX Standars derived from the FIX Orchestra repository is known as an "_orchestration_". The standard FIX Orchestra repository requires some modification to work well with QuickFIX/J. This is done by the ```quickfixj-orchestration``` module. -The ```quickfixj-orchestration``` module publishes a modified Orchestra artefact which can then be the basis of a custom FIX Latest build using QuickFIX/J . +The ```quickfixj-orchestration``` module publishes a modified Orchestra artifact which can then be the basis of a custom FIX Latest build using QuickFIX/J . The complete reference FIX Latest specification results in a very large distribution. To use FIX Latest, customisation of the [FIX Orchestra](https://www.fixtrading.org/standards/fix-orchestra/) repository is advisable. @@ -30,7 +30,7 @@ Please see [QuickFIX/J Orchestration](./quickfixj-orchestration/readme.md) for d This behaviour is controlled by the ```${generator.decimal}``` build property. It is "false" by default to avoid surprising side effects of incompatible data types. -To enable the use of ```BigDecimal``` in code generation, set the ```${generator.decimal}``` property to "true" in [quickfixj-messages](./quickfixj-messages/readme.md) and build the message artefacts. +To enable the use of ```BigDecimal``` in code generation, set the ```${generator.decimal}``` property to "true" in [quickfixj-messages](./quickfixj-messages/readme.md) and build the message artifacts. ``` @@ -56,13 +56,13 @@ Runtime incompatibilities can be resolved by: * Amending the QuickFIX Dictionary to coerce the code generation and/or validation * Changing the ordering of code generation and/or overwrite behaviour of code generation * Omitting incompatible versions from your customised build -* Building artefacts independently for the conflicting versions and ensuring they are not used them in the same runtime +* Building artifacts independently for the conflicting versions and ensuring they are not used them in the same runtime See [QuickFIX/J Messages](./quickfixj-messages/readme.md) for details of the build and recommendation for **how to implement custom builds.** ### **Customising the FIX Protocol for specialised Rules of Engagement** -A Rules of Engagement can include customisation Messages, Components and Fields, including User Defined elements. +A Rules of Engagement can include customised Messages, Components and Fields, including User Defined elements. It is not necessary to maintain a fork of the entire QuickFIX/J project to provide customised QuickFIX Dictionaries and to generate type-safe libraries that are interoperable with QuickFIX/J. @@ -82,11 +82,11 @@ See [QuickFIX/J Messages](./quickfixj-messages/readme.md) for details of the bui From QuickFIX/J 3.0.0 the code generation for ```quickfix.Field``` prefers the FIX Orchestra Standard. This results in incompatible changes to the names of constants. -For example : ```SettlType.REGULAR_FX_SPOT_SETTLEMENT``` becomes ```SettlType.REGULAR```. +For example : ```SettlmntTyp.REGULAR_FX_SPOT_SETTLEMENT``` becomes ```SettlmntTyp.REGULAR```. The required code changes may be trivial in most cases, but changes are elective. The following describes how to use ```quickfixj-core``` from QuickFIX/J 3 without needing to implement code changes: -* build the required Message artefacts without the FIX Latest code generation. The Fields will then be generated only from legacy FIX Protocol Versions as they were prior to QuickFIX/J 3.0.0 - **or** +* build the required Message artifacts without the FIX Latest code generation. The Fields will then be generated only from legacy FIX Protocol Versions as they were prior to QuickFIX/J 3.0.0 - **or** * if you want to use Messages, Components and/or Fields from FIX Latest while preferring legacy constants, manipulate the order of code generation and/or the over-write behaviour of code behaviour to prefer earlier versions of FIX. For example, generate FIX Latest first and overwrite the generated Field classes by subsequently running code generation for an earlier version. diff --git a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java index 39a404765..2c0ada60e 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java @@ -17,12 +17,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; /** * * Test class for Session race conditions, specifically focusing on logon reset scenarios. - * ref : https://github.com/quickfix-j/quickfixj/issues/902 + * Ref: ... * * Test generated by Junie * @@ -33,7 +33,7 @@ * ### Changes * - Modified `quickfix.Session.nextLogon` to wrap the sequence number reset logic in `state.lockSenderMsgSeqNum()`. * - This ensures that when `ResetSeqNumFlag=Y` is processed, the sequence number is reset to 1 and held there until the Logon response is ready to be sent, preventing other threads from calling `send()` and incrementing the sequence number in between. - * - Added `quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java` which uses a custom `Responder` and a separate thread to provoke the race condition. + * - Added quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java which uses a custom `Responder` and a separate thread to provoke the race condition. * * ### Verification * - Verified by code inspection that the lock prevents concurrent sequence number increments during the critical reset phase. @@ -67,6 +67,7 @@ public void testLogonResetRaceCondition() throws Exception { responder.setBlockOnSend(true); // 3. Process the logon in a separate thread so we can send other messages while it's "processing" + // The latch is used to ensure that the logon processing has started before sending other messages CountDownLatch logonStarted = new CountDownLatch(1); Thread logonThread = new Thread(() -> { try { @@ -79,21 +80,33 @@ public void testLogonResetRaceCondition() throws Exception { logonThread.start(); logonStarted.await(); - // Give it a moment to enter nextLogon and hit the responder + // Give it a moment to enter the nextLogon and hit the responder TimeUnit.MILLISECONDS.sleep(100); // 4. While logon is "processing" (blocked in responder), send another message // This message should NOT be able to increment the sequence number if we want the Logon to be 1 Message heartbeat = new DefaultMessageFactory().create(sessionID.getBeginString(), MsgType.HEARTBEAT); - session.send(heartbeat); + + CountDownLatch heartbeatStarted = new CountDownLatch(1); + Thread heartBeatThread = new Thread(() -> { + try { + heartbeatStarted.countDown(); + session.send(heartbeat); + } catch (Exception e) { + e.printStackTrace(); + } + }); + heartBeatThread.start(); + heartbeatStarted.await(); // 5. Unblock the responder responder.setBlockOnSend(false); logonThread.join(2000); + heartBeatThread.join(2000); // 6. Check the sent messages. The first one should be the Logon with sequence number 1. List sentMessages = responder.getSentMessages(); - assertEquals("Should have sent at least one message", true, sentMessages.size() >= 1); + assertFalse("Should have sent at least one message", sentMessages.isEmpty()); Message firstSent = sentMessages.get(0); assertEquals(MsgType.LOGON, firstSent.getHeader().getString(MsgType.FIELD)); From d47e4e3b0355c33d382b6cf1ed532ea964075a2d Mon Sep 17 00:00:00 2001 From: David Gibbs Date: Wed, 11 Mar 2026 11:24:51 +0000 Subject: [PATCH 03/10] improved SessionRaceConditionTest to provoke the race condition --- .../quickfix/SessionRaceConditionTest.java | 201 +++++++++++------- 1 file changed, 127 insertions(+), 74 deletions(-) diff --git a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java index 2c0ada60e..1fd3530cd 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java @@ -1,6 +1,8 @@ package quickfix; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import quickfix.field.BeginString; import quickfix.field.EncryptMethod; import quickfix.field.HeartBtInt; @@ -11,48 +13,59 @@ import quickfix.field.SendingTime; import quickfix.field.TargetCompID; +import java.io.IOException; import java.util.ArrayList; +import java.util.Calendar; import java.util.Collections; +import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.*; /** + * Test to verify that Logon messages sent in response to a Logon with ResetSeqNumFlag=Y are always sent with sequence number 1, + * even if other messages are being written to the Session concurrently during the reset process. + * Please note that in the absence of the fix for QFJ-902, this test reliably fails (as expected) when run in IntelliJ IDE + * on a multi-core machine, but on the same hardware does NOT fail when run in console in Ubuntu WSL with Mvn 3.9.13, Open JDK 21.0.2. + * The test is not good enough to prevent regressions, but it does demonstrate the race condition. * - * Test class for Session race conditions, specifically focusing on logon reset scenarios. - * Ref: ... - * - * Test generated by Junie - * - * ### Summary - * - Fixed a race condition where an outbound Logon message could have a sequence number greater than 1 when responding to a Logon with `ResetSeqNumFlag=Y`. - * - Created a reproduction test case `SessionRaceConditionTest.java` to demonstrate the issue. - * - * ### Changes - * - Modified `quickfix.Session.nextLogon` to wrap the sequence number reset logic in `state.lockSenderMsgSeqNum()`. - * - This ensures that when `ResetSeqNumFlag=Y` is processed, the sequence number is reset to 1 and held there until the Logon response is ready to be sent, preventing other threads from calling `send()` and incrementing the sequence number in between. - * - Added quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java which uses a custom `Responder` and a separate thread to provoke the race condition. - * - * ### Verification - * - Verified by code inspection that the lock prevents concurrent sequence number increments during the critical reset phase. - * - The reproduction test was designed to fail by delaying the Logon response while another thread sends a message; with the fix, the lock ensures the Logon response maintains sequence number 1. - * - Note: Full automated test execution was limited by environment-specific build requirements (generated sources), but the fix directly addresses the identified synchronization gap. + * The test results below show the race condition in action. + * <20260311-10:50:29, FIX.4.4:SENDER->TARGET, event> (Received logon) + * Mar 11, 2026 10:50:29 AM quickfix.UnitTestApplication toAdmin + * INFO: to admin [FIX.4.4:SENDER->TARGET] 8=FIX.4.49=5535=034=149=SENDER52=20260311-10:50:29.62856=TARGET10=094 + * <20260311-10:50:29, FIX.4.4:SENDER->TARGET, event> (Responding to Logon request) + * Mar 11, 2026 10:50:29 AM quickfix.UnitTestApplication toAdmin + * INFO: to admin [FIX.4.4:SENDER->TARGET] 8=FIX.4.49=7335=A34=249=SENDER52=20260311-10:50:29.62856=TARGET98=0108=30141=Y10=182 + * <20260311-10:50:29, FIX.4.4:SENDER->TARGET, outgoing> (8=FIX.4.49=7335=A34=249=SENDER52=20260311-10:50:29.62856=TARGET98=0108=30141=Y10=182) + * Ref: QFJ-902 */ public class SessionRaceConditionTest { + private static Logger log = LoggerFactory.getLogger(SessionRaceConditionTest.class); @Test public void testLogonResetRaceCondition() throws Exception { final UnitTestApplication application = new UnitTestApplication(); final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); - - // Use a session that doesn't reset on logon by default, we want to test tag 141 - try (Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true)) { + + // Use a custom MessageStore that delays during reset to allow race condition + DelayingMessageStore store = new DelayingMessageStore(sessionID); + MessageStoreFactory storeFactory = sessionID1 -> store; + + try (Session session = new SessionFactoryTestSupport.Builder() + .setSessionId(sessionID) + .setApplication(application) + .setMessageStoreFactory(storeFactory) + .setPersistMessages(true) + .build()) { + final RaceConditionResponder responder = new RaceConditionResponder(); session.setResponder(responder); - // 1. Prepare a Logon with ResetSeqNumFlag=Y + // Prepare a Logon with ResetSeqNumFlag=Y Message logon = new DefaultMessageFactory().create(sessionID.getBeginString(), MsgType.LOGON); logon.setField(new EncryptMethod(EncryptMethod.NONE_OTHER)); logon.setField(new HeartBtInt(30)); @@ -63,54 +76,67 @@ public void testLogonResetRaceCondition() throws Exception { logon.getHeader().setField(new TargetCompID(sessionID.getSenderCompID())); logon.getHeader().setField(new SendingTime(SystemTime.getLocalDateTime())); - // 2. Set up the responder to block when it receives the Logon response - responder.setBlockOnSend(true); - - // 3. Process the logon in a separate thread so we can send other messages while it's "processing" - // The latch is used to ensure that the logon processing has started before sending other messages + // 2. Process the logon in a separate thread CountDownLatch logonStarted = new CountDownLatch(1); + AtomicBoolean logonFinished = new AtomicBoolean(false); + + store.setDelayReset(true); + + // While logon is processing (and delayed in store.reset()), try to send other messages + // We'll keep sending messages until the logon thread finishes. + Thread sendThread = new Thread(() -> { + try { + // Wait a bit to ensure logonThread has reached the delay in reset() + System.out.println("[DEBUG_LOG] Attempting to send heartbeats during logon reset delay"); + while (!logonFinished.get()) { + Message heartbeat = new DefaultMessageFactory().create(sessionID.getBeginString(), MsgType.HEARTBEAT); + // session.send() calls sendRaw, which locks state.lockSenderMsgSeqNum() + // BUT Session.nextLogon does NOT hold any lock between resetState() and generateLogon() + session.send(heartbeat); + TimeUnit.MILLISECONDS.sleep(10); + } + } catch (Exception e) { + log.error("Exception in sendThread", e); + + } + }, "SendThread"); + sendThread.start(); + + // While HB are being sent in sendThread, the logonThread will be delayed in store.reset() Thread logonThread = new Thread(() -> { try { logonStarted.countDown(); + // next() will call nextLogon, which will call resetState and then generateLogon session.next(logon); } catch (Exception e) { - e.printStackTrace(); + log.error("Exception in Login thread", e); + } finally { + logonFinished.set(true); + log.debug("login thread finished"); } - }); + }, "LogonThread"); + logonThread.start(); logonStarted.await(); - - // Give it a moment to enter the nextLogon and hit the responder - TimeUnit.MILLISECONDS.sleep(100); - // 4. While logon is "processing" (blocked in responder), send another message - // This message should NOT be able to increment the sequence number if we want the Logon to be 1 - Message heartbeat = new DefaultMessageFactory().create(sessionID.getBeginString(), MsgType.HEARTBEAT); + logonThread.join(5000); + sendThread.join(5000); - CountDownLatch heartbeatStarted = new CountDownLatch(1); - Thread heartBeatThread = new Thread(() -> { - try { - heartbeatStarted.countDown(); - session.send(heartbeat); - } catch (Exception e) { - e.printStackTrace(); - } - }); - heartBeatThread.start(); - heartbeatStarted.await(); - - // 5. Unblock the responder - responder.setBlockOnSend(false); - logonThread.join(2000); - heartBeatThread.join(2000); - - // 6. Check the sent messages. The first one should be the Logon with sequence number 1. + // 5. Check the sent messages. + // The Logon response should be the FIRST message sent after reset, and its sequence number MUST be 1. List sentMessages = responder.getSentMessages(); assertFalse("Should have sent at least one message", sentMessages.isEmpty()); - Message firstSent = sentMessages.get(0); - assertEquals(MsgType.LOGON, firstSent.getHeader().getString(MsgType.FIELD)); - int logonSeqNum = firstSent.getHeader().getInt(MsgSeqNum.FIELD); + Message logonResponse = null; + for (Message msg : sentMessages) { + if (MsgType.LOGON.equals(msg.getHeader().getString(MsgType.FIELD))) { + logonResponse = msg; + break; + } + } + + assertNotNull("Should have sent a Logon response", logonResponse); + int logonSeqNum = logonResponse.getHeader().getInt(MsgSeqNum.FIELD); assertEquals("Outbound Logon should have sequence number 1 when ResetSeqNumFlag=Y", 1, logonSeqNum); } @@ -118,17 +144,6 @@ public void testLogonResetRaceCondition() throws Exception { private static class RaceConditionResponder implements Responder { private final List sentMessages = Collections.synchronizedList(new ArrayList<>()); - private volatile boolean blockOnSend = false; - private final Object lock = new Object(); - - public void setBlockOnSend(boolean block) { - synchronized (lock) { - this.blockOnSend = block; - if (!block) { - lock.notifyAll(); - } - } - } public List getSentMessages() { return new ArrayList<>(sentMessages); @@ -140,14 +155,8 @@ public boolean send(String data) { Message msg = new Message(); msg.fromString(data, null, null, false); sentMessages.add(msg); - - synchronized (lock) { - while (blockOnSend) { - lock.wait(1000); - } - } } catch (Exception e) { - e.printStackTrace(); + log.error("Exception in RaceConditionResponder send", e); return false; } return true; @@ -158,4 +167,48 @@ public boolean send(String data) { @Override public void disconnect() {} } + + private static class DelayingMessageStore implements MessageStore { + private final MemoryStore delegate; + private volatile boolean delayReset = false; + + public DelayingMessageStore(SessionID sessionID) throws IOException { + this.delegate = new MemoryStore(sessionID); + } + + public void setDelayReset(boolean delayReset) { + this.delayReset = delayReset; + } + + @Override + public void reset() throws IOException { + if (delayReset) { + try { + log.debug("Delaying reset()"); + // Delaying here simulates the race condition window + TimeUnit.MILLISECONDS.sleep(1000); + log.debug("Resuming reset()"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + delegate.reset(); + } + + @Override + public void setNextSenderMsgSeqNum(int next) throws IOException { + delegate.setNextSenderMsgSeqNum(next); + } + + @Override public boolean set(int sequence, String message) throws IOException { return delegate.set(sequence, message); } + @Override public void get(int startSequence, int endSequence, Collection messages) throws IOException { delegate.get(startSequence, endSequence, messages); } + @Override public int getNextSenderMsgSeqNum() throws IOException { return delegate.getNextSenderMsgSeqNum(); } + @Override public int getNextTargetMsgSeqNum() throws IOException { return delegate.getNextTargetMsgSeqNum(); } + @Override public void setNextTargetMsgSeqNum(int next) throws IOException { delegate.setNextTargetMsgSeqNum(next); } + @Override public void incrNextSenderMsgSeqNum() throws IOException { delegate.incrNextSenderMsgSeqNum(); } + @Override public void incrNextTargetMsgSeqNum() throws IOException { delegate.incrNextTargetMsgSeqNum(); } + @Override public Date getCreationTime() throws IOException { return delegate.getCreationTime(); } + @Override public Calendar getCreationTimeCalendar() throws IOException { return delegate.getCreationTimeCalendar(); } + @Override public void refresh() throws IOException { delegate.refresh(); } + } } From 0b8504c39a58612c76ac1e231fa2a8a7da8ad589 Mon Sep 17 00:00:00 2001 From: David Gibbs Date: Wed, 11 Mar 2026 16:50:41 +0000 Subject: [PATCH 04/10] updated logging in SessionRaceConditionTest --- .../src/test/java/quickfix/SessionRaceConditionTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java index 1fd3530cd..3b779ef27 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java @@ -87,7 +87,7 @@ public void testLogonResetRaceCondition() throws Exception { Thread sendThread = new Thread(() -> { try { // Wait a bit to ensure logonThread has reached the delay in reset() - System.out.println("[DEBUG_LOG] Attempting to send heartbeats during logon reset delay"); + log.info("Attempting to send heartbeats during logon reset delay"); while (!logonFinished.get()) { Message heartbeat = new DefaultMessageFactory().create(sessionID.getBeginString(), MsgType.HEARTBEAT); // session.send() calls sendRaw, which locks state.lockSenderMsgSeqNum() @@ -112,7 +112,7 @@ public void testLogonResetRaceCondition() throws Exception { log.error("Exception in Login thread", e); } finally { logonFinished.set(true); - log.debug("login thread finished"); + log.info("login thread finished"); } }, "LogonThread"); @@ -184,10 +184,10 @@ public void setDelayReset(boolean delayReset) { public void reset() throws IOException { if (delayReset) { try { - log.debug("Delaying reset()"); + log.info("Delaying reset()"); // Delaying here simulates the race condition window TimeUnit.MILLISECONDS.sleep(1000); - log.debug("Resuming reset()"); + log.info("Resuming reset()"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } From 7f384560b4c2707e475e11fde9e48f34aa0960bf Mon Sep 17 00:00:00 2001 From: David Gibbs Date: Wed, 11 Mar 2026 16:58:58 +0000 Subject: [PATCH 05/10] reverted change that test shows does not resolve the race condition --- .gitignore | 1 + quickfixj-core/src/main/java/quickfix/Session.java | 7 +------ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index fb838987e..a455c7514 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ nb-configuration.xml *.bak # ignore vim swap files *.swp +.sdkmanrc diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 0aa1b34f3..19bede7b8 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -2209,12 +2209,7 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre if (state.isResetReceived()) { getLog().onEvent("Logon contains ResetSeqNumFlag=Y, resetting sequence numbers to 1"); if (!state.isResetSent()) { - state.lockSenderMsgSeqNum(); - try { - resetState(); - } finally { - state.unlockSenderMsgSeqNum(); - } + resetState(); } } From 60ef49f2672e57f609d72fa4693e511fcb64356e Mon Sep 17 00:00:00 2001 From: David Gibbs Date: Wed, 11 Mar 2026 18:12:53 +0000 Subject: [PATCH 06/10] added a test introducing a delay to Logon Message processing in toAdmin(Message message, SessionID sessionId) --- .../quickfix/SessionRaceConditionTest.java | 70 ++++++++++++++----- 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java index 3b779ef27..42c863075 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java @@ -31,8 +31,7 @@ * even if other messages are being written to the Session concurrently during the reset process. * Please note that in the absence of the fix for QFJ-902, this test reliably fails (as expected) when run in IntelliJ IDE * on a multi-core machine, but on the same hardware does NOT fail when run in console in Ubuntu WSL with Mvn 3.9.13, Open JDK 21.0.2. - * The test is not good enough to prevent regressions, but it does demonstrate the race condition. - * + * The test does provoke the race condition in the goithub actions build. * The test results below show the race condition in action. * <20260311-10:50:29, FIX.4.4:SENDER->TARGET, event> (Received logon) * Mar 11, 2026 10:50:29 AM quickfix.UnitTestApplication toAdmin @@ -44,44 +43,47 @@ * Ref: QFJ-902 */ public class SessionRaceConditionTest { - private static Logger log = LoggerFactory.getLogger(SessionRaceConditionTest.class); + private static final Logger log = LoggerFactory.getLogger(SessionRaceConditionTest.class); + private static final SessionID SESSION_ID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); @Test - public void testLogonResetRaceCondition() throws Exception { + public void testLogonResetRaceConditionWithDelayingMessageStore() throws Exception { final UnitTestApplication application = new UnitTestApplication(); - final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); // Use a custom MessageStore that delays during reset to allow race condition - DelayingMessageStore store = new DelayingMessageStore(sessionID); + DelayingMessageStore store = new DelayingMessageStore(SESSION_ID); + store.setDelayReset(true); MessageStoreFactory storeFactory = sessionID1 -> store; + logonReceivedWhileMessagesBeingWrittenToSession(application, storeFactory); + } + + private static void logonReceivedWhileMessagesBeingWrittenToSession(Application application, MessageStoreFactory storeFactory) throws IOException, InterruptedException, FieldNotFound { try (Session session = new SessionFactoryTestSupport.Builder() - .setSessionId(sessionID) + .setSessionId(SESSION_ID) .setApplication(application) .setMessageStoreFactory(storeFactory) .setPersistMessages(true) .build()) { - + final RaceConditionResponder responder = new RaceConditionResponder(); session.setResponder(responder); // Prepare a Logon with ResetSeqNumFlag=Y - Message logon = new DefaultMessageFactory().create(sessionID.getBeginString(), MsgType.LOGON); + Message logon = new DefaultMessageFactory().create(SESSION_ID.getBeginString(), MsgType.LOGON); logon.setField(new EncryptMethod(EncryptMethod.NONE_OTHER)); logon.setField(new HeartBtInt(30)); logon.setField(new ResetSeqNumFlag(true)); logon.getHeader().setField(new MsgSeqNum(1)); - logon.getHeader().setField(new BeginString(sessionID.getBeginString())); - logon.getHeader().setField(new SenderCompID(sessionID.getTargetCompID())); - logon.getHeader().setField(new TargetCompID(sessionID.getSenderCompID())); + logon.getHeader().setField(new BeginString(SESSION_ID.getBeginString())); + logon.getHeader().setField(new SenderCompID(SESSION_ID.getTargetCompID())); + logon.getHeader().setField(new TargetCompID(SESSION_ID.getSenderCompID())); logon.getHeader().setField(new SendingTime(SystemTime.getLocalDateTime())); // 2. Process the logon in a separate thread CountDownLatch logonStarted = new CountDownLatch(1); AtomicBoolean logonFinished = new AtomicBoolean(false); - store.setDelayReset(true); - // While logon is processing (and delayed in store.reset()), try to send other messages // We'll keep sending messages until the logon thread finishes. Thread sendThread = new Thread(() -> { @@ -89,7 +91,7 @@ public void testLogonResetRaceCondition() throws Exception { // Wait a bit to ensure logonThread has reached the delay in reset() log.info("Attempting to send heartbeats during logon reset delay"); while (!logonFinished.get()) { - Message heartbeat = new DefaultMessageFactory().create(sessionID.getBeginString(), MsgType.HEARTBEAT); + Message heartbeat = new DefaultMessageFactory().create(SESSION_ID.getBeginString(), MsgType.HEARTBEAT); // session.send() calls sendRaw, which locks state.lockSenderMsgSeqNum() // BUT Session.nextLogon does NOT hold any lock between resetState() and generateLogon() session.send(heartbeat); @@ -122,11 +124,11 @@ public void testLogonResetRaceCondition() throws Exception { logonThread.join(5000); sendThread.join(5000); - // 5. Check the sent messages. + // 5. Check the sent messages. // The Logon response should be the FIRST message sent after reset, and its sequence number MUST be 1. List sentMessages = responder.getSentMessages(); assertFalse("Should have sent at least one message", sentMessages.isEmpty()); - + Message logonResponse = null; for (Message msg : sentMessages) { if (MsgType.LOGON.equals(msg.getHeader().getString(MsgType.FIELD))) { @@ -134,14 +136,44 @@ public void testLogonResetRaceCondition() throws Exception { break; } } - + assertNotNull("Should have sent a Logon response", logonResponse); int logonSeqNum = logonResponse.getHeader().getInt(MsgSeqNum.FIELD); - + assertEquals("Outbound Logon should have sequence number 1 when ResetSeqNumFlag=Y", 1, logonSeqNum); } } + @Test + public void testLogonResetRaceConditionWithDelayingApplication () throws Exception { + final UnitTestApplication application = new DelayingApplication(); + MessageStoreFactory messageStoreFactory = new MemoryStoreFactory(); + logonReceivedWhileMessagesBeingWrittenToSession(application, messageStoreFactory); + } + + private static class DelayingApplication extends UnitTestApplication { + + @Override + public void toAdmin(Message message, SessionID sessionId) { + super.toAdmin(message, sessionId); + try { + if (MsgType.LOGON.equals(message.getHeader().getString(MsgType.FIELD))) { + log.info("Delaying toAdmin for Logon response"); + try { + log.info("Delaying Logon response"); + // Delaying here simulates the race condition window + TimeUnit.MILLISECONDS.sleep(1000); + log.info("Resuming Logon response"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } catch (Exception e) { + log.error("Exception in DelayingApplication toAdmin", e); + } + } + } + private static class RaceConditionResponder implements Responder { private final List sentMessages = Collections.synchronizedList(new ArrayList<>()); From e0e91438cb671a47bccb139bd79e6bcfec27dd54 Mon Sep 17 00:00:00 2001 From: David Gibbs Date: Thu, 12 Mar 2026 15:36:49 +0000 Subject: [PATCH 07/10] Amended use of SessionState lock by Session during Logon --- .../src/main/java/quickfix/Session.java | 240 +++++++++--------- 1 file changed, 119 insertions(+), 121 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 19bede7b8..0d98e6334 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -2186,151 +2186,149 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre } } } + state.lockSenderMsgSeqNum(); + try { + // QFJ-926 - reset session before accepting Logon + resetIfSessionNotCurrent(sessionID, SystemTime.currentTimeMillis()); - // QFJ-926 - reset session before accepting Logon - resetIfSessionNotCurrent(sessionID, SystemTime.currentTimeMillis()); - - if (refreshOnLogon) { - refreshState(); - } - - if (logon.isSetField(ResetSeqNumFlag.FIELD)) { - state.setResetReceived(logon.getBoolean(ResetSeqNumFlag.FIELD)); - } else if (state.isResetSent() && logon.getHeader().getInt(MsgSeqNum.FIELD) == 1) { // QFJ-383 - getLog().onEvent( - "Inferring ResetSeqNumFlag as sequence number is 1 in response to reset request"); - state.setResetReceived(true); - } + if (refreshOnLogon) { + refreshState(); + } - if (!verify(logon, false, validateSequenceNumbers)) { - return; - } + if (logon.isSetField(ResetSeqNumFlag.FIELD)) { + state.setResetReceived(logon.getBoolean(ResetSeqNumFlag.FIELD)); + } else if (state.isResetSent() && logon.getHeader().getInt(MsgSeqNum.FIELD) == 1) { // QFJ-383 + getLog().onEvent( + "Inferring ResetSeqNumFlag as sequence number is 1 in response to reset request"); + state.setResetReceived(true); + } - if (state.isResetReceived()) { - getLog().onEvent("Logon contains ResetSeqNumFlag=Y, resetting sequence numbers to 1"); - if (!state.isResetSent()) { - resetState(); + if (!verify(logon, false, validateSequenceNumbers)) { + return; } - } - if (state.isLogonSendNeeded() && !state.isResetReceived()) { - disconnect("Received logon response before sending request", true); - return; - } + if (state.isResetReceived()) { + getLog().onEvent("Logon contains ResetSeqNumFlag=Y, resetting sequence numbers to 1"); + if (!state.isResetSent()) { + resetState(); + } + } - if (!state.isInitiator() && resetOnLogon) { - resetState(); - } + if (state.isLogonSendNeeded() && !state.isResetReceived()) { + disconnect("Received logon response before sending request", true); + return; + } + if (!state.isInitiator() && resetOnLogon) { + resetState(); + } - // reset logout messages - state.setLogoutReceived(false); - state.setLogoutSent(false); - state.setLogonReceived(true); - // remember the expected sender sequence number of any logon response for future use - final int nextSenderMsgNumAtLogonReceived = state.getMessageStore().getNextSenderMsgSeqNum(); - final int sequence = logon.getHeader().getInt(MsgSeqNum.FIELD); + // reset logout messages + state.setLogoutReceived(false); + state.setLogoutSent(false); + state.setLogonReceived(true); - /* - * We test here that it's not too high (which would result in a resend) and that we are not - * resetting on logon 34=1 - */ - final boolean isLogonInNormalSequence = !(isTargetTooHigh(sequence) && !resetOnLogon); - // if we have a tag 789 sent to us... - if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { + // remember the expected sender sequence number of any logon response for future use + final int nextSenderMsgNumAtLogonReceived = state.getMessageStore().getNextSenderMsgSeqNum(); + final int sequence = logon.getHeader().getInt(MsgSeqNum.FIELD); - final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); - state.lockSenderMsgSeqNum(); - final int actualNextNum; - try { - actualNextNum = state.getNextSenderMsgSeqNum(); - } finally { - state.unlockSenderMsgSeqNum(); - } - // Is the 789 we received too high ?? - if (targetWantsNextSeqNumToBe > actualNextNum) { - // barf! we can't resend what we never sent! something unrecoverable has happened. - final String err = "Tag " + NextExpectedMsgSeqNum.FIELD - + " (NextExpectedMsgSeqNum) is higher than expected. Expected " - + actualNextNum + ", Received " + targetWantsNextSeqNumToBe; - generateLogout(err); - disconnect(err, true); - return; - } - } - getLog().onEvent("Received logon"); - if (!state.isInitiator()) { /* - * If we got one too high they need messages resent use the first message they missed (as we gap fill with that). - * If we reset on logon, the current value will be 1 and we always send 2 (we haven't inc'd for current message yet +1) - * If happy path (we haven't inc'd for current message yet so its current +1) + * We test here that it's not too high (which would result in a resend) and that we are not + * resetting on logon 34=1 */ - int nextExpectedTargetNum = state.getMessageStore().getNextTargetMsgSeqNum(); - // we increment for the logon later (after Logon response sent) in this method if and only if in sequence - if (isLogonInNormalSequence) { - // logon was fine take account of it in 789 - nextExpectedTargetNum++; + final boolean isLogonInNormalSequence = !(isTargetTooHigh(sequence) && !resetOnLogon); + // if we have a tag 789 sent to us... + if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { + + final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); + final int actualNextNum = state.getNextSenderMsgSeqNum(); + // Is the 789 we received too high ?? + if (targetWantsNextSeqNumToBe > actualNextNum) { + // barf! we can't resend what we never sent! something unrecoverable has happened. + final String err = "Tag " + NextExpectedMsgSeqNum.FIELD + + " (NextExpectedMsgSeqNum) is higher than expected. Expected " + + actualNextNum + ", Received " + targetWantsNextSeqNumToBe; + generateLogout(err); + disconnect(err, true); + return; + } + } + getLog().onEvent("Received logon"); + if (!state.isInitiator()) { + /* + * If we got one too high they need messages resent use the first message they missed (as we gap fill with that). + * If we reset on logon, the current value will be 1 and we always send 2 (we haven't inc'd for current message yet +1) + * If happy path (we haven't inc'd for current message yet so its current +1) + */ + int nextExpectedTargetNum = state.getMessageStore().getNextTargetMsgSeqNum(); + // we increment for the logon later (after Logon response sent) in this method if and only if in sequence + if (isLogonInNormalSequence) { + // logon was fine take account of it in 789 + nextExpectedTargetNum++; + } + generateLogon(logon, nextExpectedTargetNum); } - generateLogon(logon, nextExpectedTargetNum); - } - // Check for proper sequence reset response - if (state.isResetSent() && !state.isResetReceived()) { - disconnect("Expected Logon response to have reset sequence numbers in response to ResetSeqNumFlag", true); - return; - } + // Check for proper sequence reset response + if (state.isResetSent() && !state.isResetReceived()) { + disconnect("Expected Logon response to have reset sequence numbers in response to ResetSeqNumFlag", true); + return; + } - state.setResetSent(false); - state.setResetReceived(false); + state.setResetSent(false); + state.setResetReceived(false); - // Looking at the sequence number of the incoming Logon, is it too high indicating possible missed messages ? .. - if (!isLogonInNormalSequence) { - // if 789 was sent then we effectively have already sent a resend request - if (state.isExpectedLogonNextSeqNumSent()) { - // Mark state as if we have already sent a resend request from the logon's 789 (we sent) to infinity. - // This will supress the resend request in doTargetTooHigh ... - state.setResetRangeFromLastExpectedLogonNextSeqNumLogon(); - getLog().onEvent("Required resend will be suppressed as we are setting tag 789"); - } - if (validateSequenceNumbers) { - doTargetTooHigh(logon); + // Looking at the sequence number of the incoming Logon, is it too high indicating possible missed messages ? .. + if (!isLogonInNormalSequence) { + // if 789 was sent then we effectively have already sent a resend request + if (state.isExpectedLogonNextSeqNumSent()) { + // Mark state as if we have already sent a resend request from the logon's 789 (we sent) to infinity. + // This will supress the resend request in doTargetTooHigh ... + state.setResetRangeFromLastExpectedLogonNextSeqNumLogon(); + getLog().onEvent("Required resend will be suppressed as we are setting tag 789"); + } + if (validateSequenceNumbers) { + doTargetTooHigh(logon); + } + } else { + state.incrNextTargetMsgSeqNum(); + nextQueued(); } - } else { - state.incrNextTargetMsgSeqNum(); - nextQueued(); - } - // Do we have a 789 - if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { - final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); + // Do we have a 789 + if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { + final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); - // is the 789 lower (we checked for higher previously) than our next message after receiving the logon - if (targetWantsNextSeqNumToBe != nextSenderMsgNumAtLogonReceived) { - int endSeqNo = nextSenderMsgNumAtLogonReceived; + // is the 789 lower (we checked for higher previously) than our next message after receiving the logon + if (targetWantsNextSeqNumToBe != nextSenderMsgNumAtLogonReceived) { + int endSeqNo = nextSenderMsgNumAtLogonReceived; - // Just do a gap fill when messages aren't persisted - if (!persistMessages) { - endSeqNo += 1; - final int next = state.getNextSenderMsgSeqNum(); - if (endSeqNo > next) { - endSeqNo = next; + // Just do a gap fill when messages aren't persisted + if (!persistMessages) { + endSeqNo += 1; + final int next = state.getNextSenderMsgSeqNum(); + if (endSeqNo > next) { + endSeqNo = next; + } + getLog().onEvent( + "Received implicit ResendRequest via Logon FROM: " + + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived + + " will be reset"); + generateSequenceReset(logon, targetWantsNextSeqNumToBe, // 34= + endSeqNo); // (NewSeqNo 36=) + } else { + // resend missed messages + getLog().onEvent( + "Received implicit ResendRequest via Logon FROM: " + + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived + + " will be resent"); + resendMessages(logon, targetWantsNextSeqNumToBe, endSeqNo); } - getLog().onEvent( - "Received implicit ResendRequest via Logon FROM: " - + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived - + " will be reset"); - generateSequenceReset(logon, targetWantsNextSeqNumToBe, // 34= - endSeqNo); // (NewSeqNo 36=) - } else { - // resend missed messages - getLog().onEvent( - "Received implicit ResendRequest via Logon FROM: " - + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived - + " will be resent"); - resendMessages(logon, targetWantsNextSeqNumToBe, endSeqNo); } } + } finally { + state.unlockSenderMsgSeqNum(); } if (isLoggedOn()) { try { From 27d8574ca9578dae5c197db2799637ca09af41ec Mon Sep 17 00:00:00 2001 From: David Gibbs Date: Sat, 14 Mar 2026 14:24:26 +0000 Subject: [PATCH 08/10] reverting unrelated changes --- .gitignore | 1 - customising-quickfixj.md | 16 ++++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index a455c7514..fb838987e 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,3 @@ nb-configuration.xml *.bak # ignore vim swap files *.swp -.sdkmanrc diff --git a/customising-quickfixj.md b/customising-quickfixj.md index aebba92de..131dde694 100644 --- a/customising-quickfixj.md +++ b/customising-quickfixj.md @@ -7,9 +7,9 @@ The specification for a FIX integration is called a "Rules of Engagement". The R The message, component and field implementations can be provided by a specialised build, along with the corresponding QuickFIX/J dictionary for the custom Rules of Engagement. -The standard distribution of ```quickfixj-core``` can be used with custom artifacts. You need only build artifacts for versions of the Protocol that you use. These can be maintained independently from the QuickFIX/J project, while depending on the QuickFIX/J for the core functionality and tools. +The standard distribution of ```quickfixj-core``` can be used with custom artefacts. You need only build artefacts for versions of the Protocol that you use. These can be maintained independently from the QuickFIX/J project, while depending on the QuickFIX/J for the core functionality and tools. -To build custom artifacts it's helpful to understand how QuickFIX/J builds the Field, Component and Message classes from the QuickFIX/J dictionaries and from [FIX Orchestra](https://www.fixtrading.org/standards/fix-orchestra/). +To build custom artefacts it's helpful to understand how QuickFIX/J builds the Field, Component and Message classes from the QuickFIX/J dictionaries and from [FIX Orchestra](https://www.fixtrading.org/standards/fix-orchestra/). The QuickFIX/J reference implementations for FIX versions FIX4.0 to FIX5.0sp2 and for FIXT1.1 are generated from the QuickFIX dictionaries for the specific version. The dictionaries are located in the ```src/main/resources``` directory of the respective modules of the ```quickfixj-messages``` module. Maintaining the FIX4.0 to FIX5.0sp2 builds intentionally provides consistency with the prior QuickFIX/J 2 release in order to ease migration to QuickFIX/J 3. @@ -18,7 +18,7 @@ The most recent standard is defined as [FIX Latest](https://www.fixtrading.org/o An implementation or customisation of the FIX Standars derived from the FIX Orchestra repository is known as an "_orchestration_". The standard FIX Orchestra repository requires some modification to work well with QuickFIX/J. This is done by the ```quickfixj-orchestration``` module. -The ```quickfixj-orchestration``` module publishes a modified Orchestra artifact which can then be the basis of a custom FIX Latest build using QuickFIX/J . +The ```quickfixj-orchestration``` module publishes a modified Orchestra artefact which can then be the basis of a custom FIX Latest build using QuickFIX/J . The complete reference FIX Latest specification results in a very large distribution. To use FIX Latest, customisation of the [FIX Orchestra](https://www.fixtrading.org/standards/fix-orchestra/) repository is advisable. @@ -30,7 +30,7 @@ Please see [QuickFIX/J Orchestration](./quickfixj-orchestration/readme.md) for d This behaviour is controlled by the ```${generator.decimal}``` build property. It is "false" by default to avoid surprising side effects of incompatible data types. -To enable the use of ```BigDecimal``` in code generation, set the ```${generator.decimal}``` property to "true" in [quickfixj-messages](./quickfixj-messages/readme.md) and build the message artifacts. +To enable the use of ```BigDecimal``` in code generation, set the ```${generator.decimal}``` property to "true" in [quickfixj-messages](./quickfixj-messages/readme.md) and build the message artefacts. ``` @@ -56,13 +56,13 @@ Runtime incompatibilities can be resolved by: * Amending the QuickFIX Dictionary to coerce the code generation and/or validation * Changing the ordering of code generation and/or overwrite behaviour of code generation * Omitting incompatible versions from your customised build -* Building artifacts independently for the conflicting versions and ensuring they are not used them in the same runtime +* Building artefacts independently for the conflicting versions and ensuring they are not used them in the same runtime See [QuickFIX/J Messages](./quickfixj-messages/readme.md) for details of the build and recommendation for **how to implement custom builds.** ### **Customising the FIX Protocol for specialised Rules of Engagement** -A Rules of Engagement can include customised Messages, Components and Fields, including User Defined elements. +A Rules of Engagement can include customisation Messages, Components and Fields, including User Defined elements. It is not necessary to maintain a fork of the entire QuickFIX/J project to provide customised QuickFIX Dictionaries and to generate type-safe libraries that are interoperable with QuickFIX/J. @@ -82,11 +82,11 @@ See [QuickFIX/J Messages](./quickfixj-messages/readme.md) for details of the bui From QuickFIX/J 3.0.0 the code generation for ```quickfix.Field``` prefers the FIX Orchestra Standard. This results in incompatible changes to the names of constants. -For example : ```SettlmntTyp.REGULAR_FX_SPOT_SETTLEMENT``` becomes ```SettlmntTyp.REGULAR```. +For example : ```SettlType.REGULAR_FX_SPOT_SETTLEMENT``` becomes ```SettlType.REGULAR```. The required code changes may be trivial in most cases, but changes are elective. The following describes how to use ```quickfixj-core``` from QuickFIX/J 3 without needing to implement code changes: -* build the required Message artifacts without the FIX Latest code generation. The Fields will then be generated only from legacy FIX Protocol Versions as they were prior to QuickFIX/J 3.0.0 - **or** +* build the required Message artefacts without the FIX Latest code generation. The Fields will then be generated only from legacy FIX Protocol Versions as they were prior to QuickFIX/J 3.0.0 - **or** * if you want to use Messages, Components and/or Fields from FIX Latest while preferring legacy constants, manipulate the order of code generation and/or the over-write behaviour of code behaviour to prefer earlier versions of FIX. For example, generate FIX Latest first and overwrite the generated Field classes by subsequently running code generation for an earlier version. From b4737b6caee781a3edd4910ec16379930203905a Mon Sep 17 00:00:00 2001 From: David Gibbs Date: Mon, 16 Mar 2026 13:02:04 +0000 Subject: [PATCH 09/10] removed SessionRaceConditionTest.java --- .../quickfix/SessionRaceConditionTest.java | 246 ------------------ 1 file changed, 246 deletions(-) delete mode 100644 quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java diff --git a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java b/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java deleted file mode 100644 index 42c863075..000000000 --- a/quickfixj-core/src/test/java/quickfix/SessionRaceConditionTest.java +++ /dev/null @@ -1,246 +0,0 @@ -package quickfix; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import quickfix.field.BeginString; -import quickfix.field.EncryptMethod; -import quickfix.field.HeartBtInt; -import quickfix.field.MsgSeqNum; -import quickfix.field.MsgType; -import quickfix.field.ResetSeqNumFlag; -import quickfix.field.SenderCompID; -import quickfix.field.SendingTime; -import quickfix.field.TargetCompID; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Collections; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.Assert.*; - -/** - * Test to verify that Logon messages sent in response to a Logon with ResetSeqNumFlag=Y are always sent with sequence number 1, - * even if other messages are being written to the Session concurrently during the reset process. - * Please note that in the absence of the fix for QFJ-902, this test reliably fails (as expected) when run in IntelliJ IDE - * on a multi-core machine, but on the same hardware does NOT fail when run in console in Ubuntu WSL with Mvn 3.9.13, Open JDK 21.0.2. - * The test does provoke the race condition in the goithub actions build. - * The test results below show the race condition in action. - * <20260311-10:50:29, FIX.4.4:SENDER->TARGET, event> (Received logon) - * Mar 11, 2026 10:50:29 AM quickfix.UnitTestApplication toAdmin - * INFO: to admin [FIX.4.4:SENDER->TARGET] 8=FIX.4.49=5535=034=149=SENDER52=20260311-10:50:29.62856=TARGET10=094 - * <20260311-10:50:29, FIX.4.4:SENDER->TARGET, event> (Responding to Logon request) - * Mar 11, 2026 10:50:29 AM quickfix.UnitTestApplication toAdmin - * INFO: to admin [FIX.4.4:SENDER->TARGET] 8=FIX.4.49=7335=A34=249=SENDER52=20260311-10:50:29.62856=TARGET98=0108=30141=Y10=182 - * <20260311-10:50:29, FIX.4.4:SENDER->TARGET, outgoing> (8=FIX.4.49=7335=A34=249=SENDER52=20260311-10:50:29.62856=TARGET98=0108=30141=Y10=182) - * Ref: QFJ-902 - */ -public class SessionRaceConditionTest { - private static final Logger log = LoggerFactory.getLogger(SessionRaceConditionTest.class); - private static final SessionID SESSION_ID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); - - @Test - public void testLogonResetRaceConditionWithDelayingMessageStore() throws Exception { - final UnitTestApplication application = new UnitTestApplication(); - - // Use a custom MessageStore that delays during reset to allow race condition - DelayingMessageStore store = new DelayingMessageStore(SESSION_ID); - store.setDelayReset(true); - MessageStoreFactory storeFactory = sessionID1 -> store; - - logonReceivedWhileMessagesBeingWrittenToSession(application, storeFactory); - } - - private static void logonReceivedWhileMessagesBeingWrittenToSession(Application application, MessageStoreFactory storeFactory) throws IOException, InterruptedException, FieldNotFound { - try (Session session = new SessionFactoryTestSupport.Builder() - .setSessionId(SESSION_ID) - .setApplication(application) - .setMessageStoreFactory(storeFactory) - .setPersistMessages(true) - .build()) { - - final RaceConditionResponder responder = new RaceConditionResponder(); - session.setResponder(responder); - - // Prepare a Logon with ResetSeqNumFlag=Y - Message logon = new DefaultMessageFactory().create(SESSION_ID.getBeginString(), MsgType.LOGON); - logon.setField(new EncryptMethod(EncryptMethod.NONE_OTHER)); - logon.setField(new HeartBtInt(30)); - logon.setField(new ResetSeqNumFlag(true)); - logon.getHeader().setField(new MsgSeqNum(1)); - logon.getHeader().setField(new BeginString(SESSION_ID.getBeginString())); - logon.getHeader().setField(new SenderCompID(SESSION_ID.getTargetCompID())); - logon.getHeader().setField(new TargetCompID(SESSION_ID.getSenderCompID())); - logon.getHeader().setField(new SendingTime(SystemTime.getLocalDateTime())); - - // 2. Process the logon in a separate thread - CountDownLatch logonStarted = new CountDownLatch(1); - AtomicBoolean logonFinished = new AtomicBoolean(false); - - // While logon is processing (and delayed in store.reset()), try to send other messages - // We'll keep sending messages until the logon thread finishes. - Thread sendThread = new Thread(() -> { - try { - // Wait a bit to ensure logonThread has reached the delay in reset() - log.info("Attempting to send heartbeats during logon reset delay"); - while (!logonFinished.get()) { - Message heartbeat = new DefaultMessageFactory().create(SESSION_ID.getBeginString(), MsgType.HEARTBEAT); - // session.send() calls sendRaw, which locks state.lockSenderMsgSeqNum() - // BUT Session.nextLogon does NOT hold any lock between resetState() and generateLogon() - session.send(heartbeat); - TimeUnit.MILLISECONDS.sleep(10); - } - } catch (Exception e) { - log.error("Exception in sendThread", e); - - } - }, "SendThread"); - sendThread.start(); - - // While HB are being sent in sendThread, the logonThread will be delayed in store.reset() - Thread logonThread = new Thread(() -> { - try { - logonStarted.countDown(); - // next() will call nextLogon, which will call resetState and then generateLogon - session.next(logon); - } catch (Exception e) { - log.error("Exception in Login thread", e); - } finally { - logonFinished.set(true); - log.info("login thread finished"); - } - }, "LogonThread"); - - logonThread.start(); - logonStarted.await(); - - logonThread.join(5000); - sendThread.join(5000); - - // 5. Check the sent messages. - // The Logon response should be the FIRST message sent after reset, and its sequence number MUST be 1. - List sentMessages = responder.getSentMessages(); - assertFalse("Should have sent at least one message", sentMessages.isEmpty()); - - Message logonResponse = null; - for (Message msg : sentMessages) { - if (MsgType.LOGON.equals(msg.getHeader().getString(MsgType.FIELD))) { - logonResponse = msg; - break; - } - } - - assertNotNull("Should have sent a Logon response", logonResponse); - int logonSeqNum = logonResponse.getHeader().getInt(MsgSeqNum.FIELD); - - assertEquals("Outbound Logon should have sequence number 1 when ResetSeqNumFlag=Y", 1, logonSeqNum); - } - } - - @Test - public void testLogonResetRaceConditionWithDelayingApplication () throws Exception { - final UnitTestApplication application = new DelayingApplication(); - MessageStoreFactory messageStoreFactory = new MemoryStoreFactory(); - logonReceivedWhileMessagesBeingWrittenToSession(application, messageStoreFactory); - } - - private static class DelayingApplication extends UnitTestApplication { - - @Override - public void toAdmin(Message message, SessionID sessionId) { - super.toAdmin(message, sessionId); - try { - if (MsgType.LOGON.equals(message.getHeader().getString(MsgType.FIELD))) { - log.info("Delaying toAdmin for Logon response"); - try { - log.info("Delaying Logon response"); - // Delaying here simulates the race condition window - TimeUnit.MILLISECONDS.sleep(1000); - log.info("Resuming Logon response"); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } catch (Exception e) { - log.error("Exception in DelayingApplication toAdmin", e); - } - } - } - - private static class RaceConditionResponder implements Responder { - private final List sentMessages = Collections.synchronizedList(new ArrayList<>()); - - public List getSentMessages() { - return new ArrayList<>(sentMessages); - } - - @Override - public boolean send(String data) { - try { - Message msg = new Message(); - msg.fromString(data, null, null, false); - sentMessages.add(msg); - } catch (Exception e) { - log.error("Exception in RaceConditionResponder send", e); - return false; - } - return true; - } - - @Override - public String getRemoteAddress() { return null; } - @Override - public void disconnect() {} - } - - private static class DelayingMessageStore implements MessageStore { - private final MemoryStore delegate; - private volatile boolean delayReset = false; - - public DelayingMessageStore(SessionID sessionID) throws IOException { - this.delegate = new MemoryStore(sessionID); - } - - public void setDelayReset(boolean delayReset) { - this.delayReset = delayReset; - } - - @Override - public void reset() throws IOException { - if (delayReset) { - try { - log.info("Delaying reset()"); - // Delaying here simulates the race condition window - TimeUnit.MILLISECONDS.sleep(1000); - log.info("Resuming reset()"); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - delegate.reset(); - } - - @Override - public void setNextSenderMsgSeqNum(int next) throws IOException { - delegate.setNextSenderMsgSeqNum(next); - } - - @Override public boolean set(int sequence, String message) throws IOException { return delegate.set(sequence, message); } - @Override public void get(int startSequence, int endSequence, Collection messages) throws IOException { delegate.get(startSequence, endSequence, messages); } - @Override public int getNextSenderMsgSeqNum() throws IOException { return delegate.getNextSenderMsgSeqNum(); } - @Override public int getNextTargetMsgSeqNum() throws IOException { return delegate.getNextTargetMsgSeqNum(); } - @Override public void setNextTargetMsgSeqNum(int next) throws IOException { delegate.setNextTargetMsgSeqNum(next); } - @Override public void incrNextSenderMsgSeqNum() throws IOException { delegate.incrNextSenderMsgSeqNum(); } - @Override public void incrNextTargetMsgSeqNum() throws IOException { delegate.incrNextTargetMsgSeqNum(); } - @Override public Date getCreationTime() throws IOException { return delegate.getCreationTime(); } - @Override public Calendar getCreationTimeCalendar() throws IOException { return delegate.getCreationTimeCalendar(); } - @Override public void refresh() throws IOException { delegate.refresh(); } - } -} From f9c92afed89673174a3698d91730bae2b124028a Mon Sep 17 00:00:00 2001 From: David Gibbs Date: Mon, 16 Mar 2026 13:11:33 +0000 Subject: [PATCH 10/10] replaced test --- .../SessionLogonSeqNumIssue902Test.java | 373 ++++++++++++++++++ 1 file changed, 373 insertions(+) create mode 100644 quickfixj-core/src/test/java/quickfix/SessionLogonSeqNumIssue902Test.java diff --git a/quickfixj-core/src/test/java/quickfix/SessionLogonSeqNumIssue902Test.java b/quickfixj-core/src/test/java/quickfix/SessionLogonSeqNumIssue902Test.java new file mode 100644 index 000000000..e8db79ab4 --- /dev/null +++ b/quickfixj-core/src/test/java/quickfix/SessionLogonSeqNumIssue902Test.java @@ -0,0 +1,373 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +package quickfix; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import quickfix.field.BeginString; +import quickfix.field.EncryptMethod; +import quickfix.field.HeartBtInt; +import quickfix.field.MsgSeqNum; +import quickfix.field.ResetSeqNumFlag; +import quickfix.field.SenderCompID; +import quickfix.field.TargetCompID; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Regression test for GitHub issue #902. + * + *

Bug description: A QuickFIX/J acceptor responds to a client Logon + * with {@code ResetSeqNumFlag=Y} and {@code MsgSeqNum(34)=1} with a Logon + * response carrying {@code MsgSeqNum=2} instead of 1. + * + *

Root cause: Inside {@code Session.sendRaw()}, every outbound + * message goes through {@code persist()}, which increments the + * next-sender-sequence-number before the {@code send()} call. The + * condition that gates the actual {@code send()} call is: + *

+ *   if (MsgType.LOGON.equals(msgType) || MsgType.LOGOUT.equals(msgType)
+ *           || MsgType.RESEND_REQUEST.equals(msgType)
+ *           || MsgType.SEQUENCE_RESET.equals(msgType) || isLoggedOn()) {
+ *       result = send(messageString);
+ *   }
+ * 
+ * A Heartbeat message does NOT match any of those types, and at the moment + * it is triggered the session is not yet fully logged on ({@code isLogonSent()} + * is still {@code false}). Therefore the heartbeat's call to {@code persist()} + * bumps the sequence counter from 1 to 2, but the heartbeat is silently + * dropped. When the acceptor's own Logon response is subsequently serialised + * and sent, it picks up sequence number 2. + * + *

How the race arises in production: In {@code nextLogon()}, + * {@code state.setLogonReceived(true)} is called before + * {@code generateLogon()} is called. Between those two operations, a + * timer/heartbeat thread calling {@code next()} sees + * {@code state.isLogonReceived()==true} and {@code isHeartBeatNeeded()==true} + * (because {@code lastSentTime} is stale from the previous session), so it + * invokes {@code generateHeartbeat()}, consuming seq=1 without sending the + * heartbeat. The subsequent {@code generateLogon()} then uses seq=2. + * + *

Test strategy: We inject the race deterministically using a + * custom {@link Log} implementation that intercepts the {@code "Received logon"} + * event (which fires between {@code setLogonReceived(true)} and + * {@code generateLogon()} inside {@code nextLogon()}) and calls + * {@link Session#next()} from within that event handler. We use + * {@link MockSystemTimeSource} to ensure {@code isHeartBeatNeeded()} returns + * {@code true} at exactly that moment. + */ +public class SessionLogonSeqNumIssue902Test { + + private static final String RECEIVED_LOGON_EVENT = "Received logon"; + + private MockSystemTimeSource timeSource; + private Session acceptorSession; + private RecordingResponder responder; + + /** Holds the session so the log hook can reach it after construction. */ + private final AtomicReference sessionRef = new AtomicReference<>(); + + /** + * Guard that prevents re-entrant injection: the hook must fire at most + * once per test run so that the simulated timer-thread interleaving is + * applied exactly once. + */ + private final AtomicBoolean heartbeatInjected = new AtomicBoolean(false); + + @Before + public void setUp() { + timeSource = new MockSystemTimeSource(1_000_000L); + SystemTime.setTimeSource(timeSource); + } + + @After + public void tearDown() throws Exception { + SystemTime.setTimeSource(null); + if (acceptorSession != null) { + acceptorSession.close(); + } + } + + /** + * Verifies that the acceptor's Logon response carries {@code MsgSeqNum=1} + * when it receives a Logon with {@code ResetSeqNumFlag=Y} and + * {@code MsgSeqNum=1}. + * + *

The test deliberately injects a simulated timer-thread interleaving + * (via the session log hook described in the class Javadoc) so that + * {@code generateHeartbeat()} runs between {@code setLogonReceived(true)} + * and {@code generateLogon()} inside {@code nextLogon()}. This reproduces + * the race condition described in issue #902. + * + *

With the current (buggy) code the heartbeat's {@code persist()} call + * bumps the sequence counter from 1 to 2 without sending the heartbeat, + * so the Logon response ends up with {@code MsgSeqNum=2}. The assertion + * at the end therefore fails, proving that the bug exists. + */ + @Test + public void testAcceptorLogonResponseSeqNumIsOneWhenResetSeqNumFlagReceived() + throws Exception { + + // ----------------------------------------------------------------- + // 1. Build the acceptor session. + // + // The custom LogFactory creates a Log whose onEvent() method + // fires session.next() when "Received logon" is logged. That + // log event is emitted by nextLogon() AFTER setLogonReceived(true) + // but BEFORE generateLogon(), which is exactly the window where + // the production race condition occurs. + // ----------------------------------------------------------------- + SessionID sessionID = new SessionID( + FixVersions.BEGINSTRING_FIX44, "ACCEPTOR", "CLIENT"); + + LogFactory injectingLogFactory = id -> new Log() { + @Override public void clear() {} + @Override public void onIncoming(String message) {} + @Override public void onOutgoing(String message) {} + @Override public void onErrorEvent(String text) {} + + @Override + public void onEvent(String text) { + /* + * "Received logon" is the log event emitted at line ~2262 + * of Session.java, between: + * state.setLogonReceived(true) [line ~2229] + * generateLogon(...) [line ~2275] + * + * Calling session.next() here models a timer thread that + * woke up at exactly this critical moment. + * + * The AtomicBoolean guard ensures we inject at most once, + * even if re-entrant log calls occur. + */ + if (RECEIVED_LOGON_EVENT.equals(text) + && heartbeatInjected.compareAndSet(false, true)) { + Session s = sessionRef.get(); + if (s != null) { + try { + // This triggers generateHeartbeat() which calls + // persist() and bumps the seq counter 1 → 2 + // without sending the heartbeat (isLogonSent() + // is still false at this point). + s.next(); + } catch (IOException e) { + throw new RuntimeException( + "Unexpected IOException in injected next() call", e); + } + } + } + } + }; + + /* + * Build the acceptor session with heartbeatInterval=0 (the default for + * acceptors). This ensures that SessionState.isInitiator() == false, + * which is required for nextLogon() to call generateLogon() and thus + * produce the Logon response. + * + * NOTE: heartbeatInterval=0 causes the Session constructor to set + * state.initiator = (0 != 0) = false. If we passed 30 here, + * state.initiator would be true and generateLogon() would be + * skipped entirely, making the test scenario impossible to set up. + */ + acceptorSession = new SessionFactoryTestSupport.Builder() + .setSessionId(sessionID) + .setApplication(new UnitTestApplication()) + .setIsInitiator(false) + .setCheckLatency(false) + .setCheckCompID(true) + .setPersistMessages(true) + .setLogFactory(injectingLogFactory) + .build(); + + /* + * Now update the heartbeat interval to 30 s WITHOUT changing + * state.isInitiator() (which is final and was set to false above). + * This makes next() reach the isHeartBeatNeeded() check instead of + * returning early at the "getHeartBeatInterval() == 0" guard, which + * is the prerequisite for the bug to manifest. + */ + acceptorSession.setHeartBeatInterval(30); + + sessionRef.set(acceptorSession); + + responder = new RecordingResponder(); + acceptorSession.setResponder(responder); + + // ----------------------------------------------------------------- + // 2. Simulate a previous session by setting seq numbers to 5 + // and recording lastSentTime at the INITIAL clock value (t=1 000 000). + // This makes lastSentTime "stale" once we advance the clock. + // ----------------------------------------------------------------- + SessionState state = getSessionState(acceptorSession); + state.getMessageStore().setNextSenderMsgSeqNum(5); + state.getMessageStore().setNextTargetMsgSeqNum(5); + state.setLastSentTime(SystemTime.currentTimeMillis()); // = 1_000_000 + + // ----------------------------------------------------------------- + // 3. Advance the mock clock by 31 s (> the 30 s heartbeat interval) + // so that isHeartBeatNeeded() returns true when next() is called + // from inside the log hook. + // ----------------------------------------------------------------- + timeSource.increment(31_000L); // clock = 1_031_000 + + // ----------------------------------------------------------------- + // 4. Deliver the incoming Logon with ResetSeqNumFlag=Y, MsgSeqNum=1. + // + // Execution path (with the injected interleaving): + // nextLogon() + // → resetState() : seq counter reset to 1 + // → setLogonReceived(true) : session now appears "logon received" + // → log("Received logon") : our hook fires session.next() + // → isHeartBeatNeeded() = true (31 s > 30 s, stale time) + // → generateHeartbeat() + // → sendRaw(heartbeat): + // persist() bumps seq 1 → 2 + // send() NOT called (isLogonSent() still false) + // → next() returns + // → generateLogon() : initializeHeader sets MsgSeqNum=2 ← BUG + // → sendRaw(logon): sends the Logon response with "34=2" + // ----------------------------------------------------------------- + acceptorSession.next(buildIncomingLogon(sessionID)); + + // ----------------------------------------------------------------- + // 5. Sanity-check: the log hook must have fired to confirm the test + // scenario was actually triggered. + // ----------------------------------------------------------------- + assertTrue("Log hook for 'Received logon' must have fired during test", + heartbeatInjected.get()); + + // ----------------------------------------------------------------- + // 6. Assert: the Logon response MUST carry MsgSeqNum=1. + // + // With the current buggy code this assertion FAILS because the + // heartbeat consumed seq=1 and the Logon response was sent with + // MsgSeqNum=2. + // ----------------------------------------------------------------- + String logonResponse = findLogonResponse(responder.sentMessages); + int seqNum = extractMsgSeqNum(logonResponse); + + assertEquals( + "Acceptor Logon response must carry MsgSeqNum=1 when responding to " + + "a Logon with ResetSeqNumFlag=Y and MsgSeqNum=1, but got MsgSeqNum=" + + seqNum, + 1, + seqNum); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + /** + * Builds an incoming FIX 4.4 Logon message addressed to {@code sessionID}'s + * acceptor side, carrying {@code ResetSeqNumFlag=Y} and {@code MsgSeqNum=1}. + */ + private static quickfix.fix44.Logon buildIncomingLogon(SessionID sessionID) + throws FieldNotFound { + quickfix.fix44.Logon logon = new quickfix.fix44.Logon(); + logon.getHeader().setString(BeginString.FIELD, + sessionID.getBeginString()); + logon.getHeader().setString(SenderCompID.FIELD, + sessionID.getTargetCompID()); // from the remote peer's perspective + logon.getHeader().setString(TargetCompID.FIELD, + sessionID.getSenderCompID()); + logon.getHeader().setInt(MsgSeqNum.FIELD, 1); + logon.setInt(EncryptMethod.FIELD, EncryptMethod.NONE_OTHER); + logon.setInt(HeartBtInt.FIELD, 30); + logon.setBoolean(ResetSeqNumFlag.FIELD, true); + return logon; + } + + /** + * Returns the last outbound Logon message (35=A) recorded by the responder. + * + * @throws AssertionError if no Logon response was sent + */ + private static String findLogonResponse(List sent) { + for (int i = sent.size() - 1; i >= 0; i--) { + if (sent.get(i).contains("\00135=A\001")) { + return sent.get(i); + } + } + throw new AssertionError( + "No Logon response (35=A) found in sent messages: " + sent); + } + + /** + * Parses a raw FIX message string and returns the integer value of tag 34 + * (MsgSeqNum). + */ + private static int extractMsgSeqNum(String fixMessage) { + for (String field : fixMessage.split("\001")) { + if (field.startsWith("34=")) { + return Integer.parseInt(field.substring(3)); + } + } + throw new AssertionError("Tag 34 (MsgSeqNum) not found in message: " + fixMessage); + } + + /** + * Obtains the private {@code SessionState} from a {@link Session} via + * reflection. + */ + private static SessionState getSessionState(Session session) throws Exception { + Field stateField = Session.class.getDeclaredField("state"); + stateField.setAccessible(true); + return (SessionState) stateField.get(session); + } + + // ------------------------------------------------------------------------- + // Inner types + // ------------------------------------------------------------------------- + + /** + * A minimal {@link Responder} that records every raw FIX string passed to + * {@link #send(String)}. + */ + private static final class RecordingResponder implements Responder { + + final List sentMessages = new CopyOnWriteArrayList<>(); + + @Override + public boolean send(String data) { + sentMessages.add(data); + return true; + } + + @Override + public void disconnect() { + } + + @Override + public String getRemoteAddress() { + return "127.0.0.1:54321"; + } + } +}