diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 19bede7b8..0d98e6334 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -2186,151 +2186,149 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre } } } + state.lockSenderMsgSeqNum(); + try { + // QFJ-926 - reset session before accepting Logon + resetIfSessionNotCurrent(sessionID, SystemTime.currentTimeMillis()); - // QFJ-926 - reset session before accepting Logon - resetIfSessionNotCurrent(sessionID, SystemTime.currentTimeMillis()); - - if (refreshOnLogon) { - refreshState(); - } - - if (logon.isSetField(ResetSeqNumFlag.FIELD)) { - state.setResetReceived(logon.getBoolean(ResetSeqNumFlag.FIELD)); - } else if (state.isResetSent() && logon.getHeader().getInt(MsgSeqNum.FIELD) == 1) { // QFJ-383 - getLog().onEvent( - "Inferring ResetSeqNumFlag as sequence number is 1 in response to reset request"); - state.setResetReceived(true); - } + if (refreshOnLogon) { + refreshState(); + } - if (!verify(logon, false, validateSequenceNumbers)) { - return; - } + if (logon.isSetField(ResetSeqNumFlag.FIELD)) { + state.setResetReceived(logon.getBoolean(ResetSeqNumFlag.FIELD)); + } else if (state.isResetSent() && logon.getHeader().getInt(MsgSeqNum.FIELD) == 1) { // QFJ-383 + getLog().onEvent( + "Inferring ResetSeqNumFlag as sequence number is 1 in response to reset request"); + state.setResetReceived(true); + } - if (state.isResetReceived()) { - getLog().onEvent("Logon contains ResetSeqNumFlag=Y, resetting sequence numbers to 1"); - if (!state.isResetSent()) { - resetState(); + if (!verify(logon, false, validateSequenceNumbers)) { + return; } - } - if (state.isLogonSendNeeded() && !state.isResetReceived()) { - disconnect("Received logon response before sending request", true); - return; - } + if (state.isResetReceived()) { + getLog().onEvent("Logon contains ResetSeqNumFlag=Y, resetting sequence numbers to 1"); + if (!state.isResetSent()) { + resetState(); + } + } - if (!state.isInitiator() && resetOnLogon) { - resetState(); - } + if (state.isLogonSendNeeded() && !state.isResetReceived()) { + disconnect("Received logon response before sending request", true); + return; + } + if (!state.isInitiator() && resetOnLogon) { + resetState(); + } - // reset logout messages - state.setLogoutReceived(false); - state.setLogoutSent(false); - state.setLogonReceived(true); - // remember the expected sender sequence number of any logon response for future use - final int nextSenderMsgNumAtLogonReceived = state.getMessageStore().getNextSenderMsgSeqNum(); - final int sequence = logon.getHeader().getInt(MsgSeqNum.FIELD); + // reset logout messages + state.setLogoutReceived(false); + state.setLogoutSent(false); + state.setLogonReceived(true); - /* - * We test here that it's not too high (which would result in a resend) and that we are not - * resetting on logon 34=1 - */ - final boolean isLogonInNormalSequence = !(isTargetTooHigh(sequence) && !resetOnLogon); - // if we have a tag 789 sent to us... - if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { + // remember the expected sender sequence number of any logon response for future use + final int nextSenderMsgNumAtLogonReceived = state.getMessageStore().getNextSenderMsgSeqNum(); + final int sequence = logon.getHeader().getInt(MsgSeqNum.FIELD); - final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); - state.lockSenderMsgSeqNum(); - final int actualNextNum; - try { - actualNextNum = state.getNextSenderMsgSeqNum(); - } finally { - state.unlockSenderMsgSeqNum(); - } - // Is the 789 we received too high ?? - if (targetWantsNextSeqNumToBe > actualNextNum) { - // barf! we can't resend what we never sent! something unrecoverable has happened. - final String err = "Tag " + NextExpectedMsgSeqNum.FIELD - + " (NextExpectedMsgSeqNum) is higher than expected. Expected " - + actualNextNum + ", Received " + targetWantsNextSeqNumToBe; - generateLogout(err); - disconnect(err, true); - return; - } - } - getLog().onEvent("Received logon"); - if (!state.isInitiator()) { /* - * If we got one too high they need messages resent use the first message they missed (as we gap fill with that). - * If we reset on logon, the current value will be 1 and we always send 2 (we haven't inc'd for current message yet +1) - * If happy path (we haven't inc'd for current message yet so its current +1) + * We test here that it's not too high (which would result in a resend) and that we are not + * resetting on logon 34=1 */ - int nextExpectedTargetNum = state.getMessageStore().getNextTargetMsgSeqNum(); - // we increment for the logon later (after Logon response sent) in this method if and only if in sequence - if (isLogonInNormalSequence) { - // logon was fine take account of it in 789 - nextExpectedTargetNum++; + final boolean isLogonInNormalSequence = !(isTargetTooHigh(sequence) && !resetOnLogon); + // if we have a tag 789 sent to us... + if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { + + final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); + final int actualNextNum = state.getNextSenderMsgSeqNum(); + // Is the 789 we received too high ?? + if (targetWantsNextSeqNumToBe > actualNextNum) { + // barf! we can't resend what we never sent! something unrecoverable has happened. + final String err = "Tag " + NextExpectedMsgSeqNum.FIELD + + " (NextExpectedMsgSeqNum) is higher than expected. Expected " + + actualNextNum + ", Received " + targetWantsNextSeqNumToBe; + generateLogout(err); + disconnect(err, true); + return; + } + } + getLog().onEvent("Received logon"); + if (!state.isInitiator()) { + /* + * If we got one too high they need messages resent use the first message they missed (as we gap fill with that). + * If we reset on logon, the current value will be 1 and we always send 2 (we haven't inc'd for current message yet +1) + * If happy path (we haven't inc'd for current message yet so its current +1) + */ + int nextExpectedTargetNum = state.getMessageStore().getNextTargetMsgSeqNum(); + // we increment for the logon later (after Logon response sent) in this method if and only if in sequence + if (isLogonInNormalSequence) { + // logon was fine take account of it in 789 + nextExpectedTargetNum++; + } + generateLogon(logon, nextExpectedTargetNum); } - generateLogon(logon, nextExpectedTargetNum); - } - // Check for proper sequence reset response - if (state.isResetSent() && !state.isResetReceived()) { - disconnect("Expected Logon response to have reset sequence numbers in response to ResetSeqNumFlag", true); - return; - } + // Check for proper sequence reset response + if (state.isResetSent() && !state.isResetReceived()) { + disconnect("Expected Logon response to have reset sequence numbers in response to ResetSeqNumFlag", true); + return; + } - state.setResetSent(false); - state.setResetReceived(false); + state.setResetSent(false); + state.setResetReceived(false); - // Looking at the sequence number of the incoming Logon, is it too high indicating possible missed messages ? .. - if (!isLogonInNormalSequence) { - // if 789 was sent then we effectively have already sent a resend request - if (state.isExpectedLogonNextSeqNumSent()) { - // Mark state as if we have already sent a resend request from the logon's 789 (we sent) to infinity. - // This will supress the resend request in doTargetTooHigh ... - state.setResetRangeFromLastExpectedLogonNextSeqNumLogon(); - getLog().onEvent("Required resend will be suppressed as we are setting tag 789"); - } - if (validateSequenceNumbers) { - doTargetTooHigh(logon); + // Looking at the sequence number of the incoming Logon, is it too high indicating possible missed messages ? .. + if (!isLogonInNormalSequence) { + // if 789 was sent then we effectively have already sent a resend request + if (state.isExpectedLogonNextSeqNumSent()) { + // Mark state as if we have already sent a resend request from the logon's 789 (we sent) to infinity. + // This will supress the resend request in doTargetTooHigh ... + state.setResetRangeFromLastExpectedLogonNextSeqNumLogon(); + getLog().onEvent("Required resend will be suppressed as we are setting tag 789"); + } + if (validateSequenceNumbers) { + doTargetTooHigh(logon); + } + } else { + state.incrNextTargetMsgSeqNum(); + nextQueued(); } - } else { - state.incrNextTargetMsgSeqNum(); - nextQueued(); - } - // Do we have a 789 - if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { - final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); + // Do we have a 789 + if (logon.isSetField(NextExpectedMsgSeqNum.FIELD) && enableNextExpectedMsgSeqNum) { + final int targetWantsNextSeqNumToBe = logon.getInt(NextExpectedMsgSeqNum.FIELD); - // is the 789 lower (we checked for higher previously) than our next message after receiving the logon - if (targetWantsNextSeqNumToBe != nextSenderMsgNumAtLogonReceived) { - int endSeqNo = nextSenderMsgNumAtLogonReceived; + // is the 789 lower (we checked for higher previously) than our next message after receiving the logon + if (targetWantsNextSeqNumToBe != nextSenderMsgNumAtLogonReceived) { + int endSeqNo = nextSenderMsgNumAtLogonReceived; - // Just do a gap fill when messages aren't persisted - if (!persistMessages) { - endSeqNo += 1; - final int next = state.getNextSenderMsgSeqNum(); - if (endSeqNo > next) { - endSeqNo = next; + // Just do a gap fill when messages aren't persisted + if (!persistMessages) { + endSeqNo += 1; + final int next = state.getNextSenderMsgSeqNum(); + if (endSeqNo > next) { + endSeqNo = next; + } + getLog().onEvent( + "Received implicit ResendRequest via Logon FROM: " + + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived + + " will be reset"); + generateSequenceReset(logon, targetWantsNextSeqNumToBe, // 34= + endSeqNo); // (NewSeqNo 36=) + } else { + // resend missed messages + getLog().onEvent( + "Received implicit ResendRequest via Logon FROM: " + + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived + + " will be resent"); + resendMessages(logon, targetWantsNextSeqNumToBe, endSeqNo); } - getLog().onEvent( - "Received implicit ResendRequest via Logon FROM: " - + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived - + " will be reset"); - generateSequenceReset(logon, targetWantsNextSeqNumToBe, // 34= - endSeqNo); // (NewSeqNo 36=) - } else { - // resend missed messages - getLog().onEvent( - "Received implicit ResendRequest via Logon FROM: " - + targetWantsNextSeqNumToBe + " TO: " + nextSenderMsgNumAtLogonReceived - + " will be resent"); - resendMessages(logon, targetWantsNextSeqNumToBe, endSeqNo); } } + } finally { + state.unlockSenderMsgSeqNum(); } if (isLoggedOn()) { try { diff --git a/quickfixj-core/src/test/java/quickfix/SessionLogonSeqNumIssue902Test.java b/quickfixj-core/src/test/java/quickfix/SessionLogonSeqNumIssue902Test.java new file mode 100644 index 000000000..e8db79ab4 --- /dev/null +++ b/quickfixj-core/src/test/java/quickfix/SessionLogonSeqNumIssue902Test.java @@ -0,0 +1,373 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +package quickfix; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import quickfix.field.BeginString; +import quickfix.field.EncryptMethod; +import quickfix.field.HeartBtInt; +import quickfix.field.MsgSeqNum; +import quickfix.field.ResetSeqNumFlag; +import quickfix.field.SenderCompID; +import quickfix.field.TargetCompID; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Regression test for GitHub issue #902. + * + *

Bug description: A QuickFIX/J acceptor responds to a client Logon + * with {@code ResetSeqNumFlag=Y} and {@code MsgSeqNum(34)=1} with a Logon + * response carrying {@code MsgSeqNum=2} instead of 1. + * + *

Root cause: Inside {@code Session.sendRaw()}, every outbound + * message goes through {@code persist()}, which increments the + * next-sender-sequence-number before the {@code send()} call. The + * condition that gates the actual {@code send()} call is: + *

+ *   if (MsgType.LOGON.equals(msgType) || MsgType.LOGOUT.equals(msgType)
+ *           || MsgType.RESEND_REQUEST.equals(msgType)
+ *           || MsgType.SEQUENCE_RESET.equals(msgType) || isLoggedOn()) {
+ *       result = send(messageString);
+ *   }
+ * 
+ * A Heartbeat message does NOT match any of those types, and at the moment + * it is triggered the session is not yet fully logged on ({@code isLogonSent()} + * is still {@code false}). Therefore the heartbeat's call to {@code persist()} + * bumps the sequence counter from 1 to 2, but the heartbeat is silently + * dropped. When the acceptor's own Logon response is subsequently serialised + * and sent, it picks up sequence number 2. + * + *

How the race arises in production: In {@code nextLogon()}, + * {@code state.setLogonReceived(true)} is called before + * {@code generateLogon()} is called. Between those two operations, a + * timer/heartbeat thread calling {@code next()} sees + * {@code state.isLogonReceived()==true} and {@code isHeartBeatNeeded()==true} + * (because {@code lastSentTime} is stale from the previous session), so it + * invokes {@code generateHeartbeat()}, consuming seq=1 without sending the + * heartbeat. The subsequent {@code generateLogon()} then uses seq=2. + * + *

Test strategy: We inject the race deterministically using a + * custom {@link Log} implementation that intercepts the {@code "Received logon"} + * event (which fires between {@code setLogonReceived(true)} and + * {@code generateLogon()} inside {@code nextLogon()}) and calls + * {@link Session#next()} from within that event handler. We use + * {@link MockSystemTimeSource} to ensure {@code isHeartBeatNeeded()} returns + * {@code true} at exactly that moment. + */ +public class SessionLogonSeqNumIssue902Test { + + private static final String RECEIVED_LOGON_EVENT = "Received logon"; + + private MockSystemTimeSource timeSource; + private Session acceptorSession; + private RecordingResponder responder; + + /** Holds the session so the log hook can reach it after construction. */ + private final AtomicReference sessionRef = new AtomicReference<>(); + + /** + * Guard that prevents re-entrant injection: the hook must fire at most + * once per test run so that the simulated timer-thread interleaving is + * applied exactly once. + */ + private final AtomicBoolean heartbeatInjected = new AtomicBoolean(false); + + @Before + public void setUp() { + timeSource = new MockSystemTimeSource(1_000_000L); + SystemTime.setTimeSource(timeSource); + } + + @After + public void tearDown() throws Exception { + SystemTime.setTimeSource(null); + if (acceptorSession != null) { + acceptorSession.close(); + } + } + + /** + * Verifies that the acceptor's Logon response carries {@code MsgSeqNum=1} + * when it receives a Logon with {@code ResetSeqNumFlag=Y} and + * {@code MsgSeqNum=1}. + * + *

The test deliberately injects a simulated timer-thread interleaving + * (via the session log hook described in the class Javadoc) so that + * {@code generateHeartbeat()} runs between {@code setLogonReceived(true)} + * and {@code generateLogon()} inside {@code nextLogon()}. This reproduces + * the race condition described in issue #902. + * + *

With the current (buggy) code the heartbeat's {@code persist()} call + * bumps the sequence counter from 1 to 2 without sending the heartbeat, + * so the Logon response ends up with {@code MsgSeqNum=2}. The assertion + * at the end therefore fails, proving that the bug exists. + */ + @Test + public void testAcceptorLogonResponseSeqNumIsOneWhenResetSeqNumFlagReceived() + throws Exception { + + // ----------------------------------------------------------------- + // 1. Build the acceptor session. + // + // The custom LogFactory creates a Log whose onEvent() method + // fires session.next() when "Received logon" is logged. That + // log event is emitted by nextLogon() AFTER setLogonReceived(true) + // but BEFORE generateLogon(), which is exactly the window where + // the production race condition occurs. + // ----------------------------------------------------------------- + SessionID sessionID = new SessionID( + FixVersions.BEGINSTRING_FIX44, "ACCEPTOR", "CLIENT"); + + LogFactory injectingLogFactory = id -> new Log() { + @Override public void clear() {} + @Override public void onIncoming(String message) {} + @Override public void onOutgoing(String message) {} + @Override public void onErrorEvent(String text) {} + + @Override + public void onEvent(String text) { + /* + * "Received logon" is the log event emitted at line ~2262 + * of Session.java, between: + * state.setLogonReceived(true) [line ~2229] + * generateLogon(...) [line ~2275] + * + * Calling session.next() here models a timer thread that + * woke up at exactly this critical moment. + * + * The AtomicBoolean guard ensures we inject at most once, + * even if re-entrant log calls occur. + */ + if (RECEIVED_LOGON_EVENT.equals(text) + && heartbeatInjected.compareAndSet(false, true)) { + Session s = sessionRef.get(); + if (s != null) { + try { + // This triggers generateHeartbeat() which calls + // persist() and bumps the seq counter 1 → 2 + // without sending the heartbeat (isLogonSent() + // is still false at this point). + s.next(); + } catch (IOException e) { + throw new RuntimeException( + "Unexpected IOException in injected next() call", e); + } + } + } + } + }; + + /* + * Build the acceptor session with heartbeatInterval=0 (the default for + * acceptors). This ensures that SessionState.isInitiator() == false, + * which is required for nextLogon() to call generateLogon() and thus + * produce the Logon response. + * + * NOTE: heartbeatInterval=0 causes the Session constructor to set + * state.initiator = (0 != 0) = false. If we passed 30 here, + * state.initiator would be true and generateLogon() would be + * skipped entirely, making the test scenario impossible to set up. + */ + acceptorSession = new SessionFactoryTestSupport.Builder() + .setSessionId(sessionID) + .setApplication(new UnitTestApplication()) + .setIsInitiator(false) + .setCheckLatency(false) + .setCheckCompID(true) + .setPersistMessages(true) + .setLogFactory(injectingLogFactory) + .build(); + + /* + * Now update the heartbeat interval to 30 s WITHOUT changing + * state.isInitiator() (which is final and was set to false above). + * This makes next() reach the isHeartBeatNeeded() check instead of + * returning early at the "getHeartBeatInterval() == 0" guard, which + * is the prerequisite for the bug to manifest. + */ + acceptorSession.setHeartBeatInterval(30); + + sessionRef.set(acceptorSession); + + responder = new RecordingResponder(); + acceptorSession.setResponder(responder); + + // ----------------------------------------------------------------- + // 2. Simulate a previous session by setting seq numbers to 5 + // and recording lastSentTime at the INITIAL clock value (t=1 000 000). + // This makes lastSentTime "stale" once we advance the clock. + // ----------------------------------------------------------------- + SessionState state = getSessionState(acceptorSession); + state.getMessageStore().setNextSenderMsgSeqNum(5); + state.getMessageStore().setNextTargetMsgSeqNum(5); + state.setLastSentTime(SystemTime.currentTimeMillis()); // = 1_000_000 + + // ----------------------------------------------------------------- + // 3. Advance the mock clock by 31 s (> the 30 s heartbeat interval) + // so that isHeartBeatNeeded() returns true when next() is called + // from inside the log hook. + // ----------------------------------------------------------------- + timeSource.increment(31_000L); // clock = 1_031_000 + + // ----------------------------------------------------------------- + // 4. Deliver the incoming Logon with ResetSeqNumFlag=Y, MsgSeqNum=1. + // + // Execution path (with the injected interleaving): + // nextLogon() + // → resetState() : seq counter reset to 1 + // → setLogonReceived(true) : session now appears "logon received" + // → log("Received logon") : our hook fires session.next() + // → isHeartBeatNeeded() = true (31 s > 30 s, stale time) + // → generateHeartbeat() + // → sendRaw(heartbeat): + // persist() bumps seq 1 → 2 + // send() NOT called (isLogonSent() still false) + // → next() returns + // → generateLogon() : initializeHeader sets MsgSeqNum=2 ← BUG + // → sendRaw(logon): sends the Logon response with "34=2" + // ----------------------------------------------------------------- + acceptorSession.next(buildIncomingLogon(sessionID)); + + // ----------------------------------------------------------------- + // 5. Sanity-check: the log hook must have fired to confirm the test + // scenario was actually triggered. + // ----------------------------------------------------------------- + assertTrue("Log hook for 'Received logon' must have fired during test", + heartbeatInjected.get()); + + // ----------------------------------------------------------------- + // 6. Assert: the Logon response MUST carry MsgSeqNum=1. + // + // With the current buggy code this assertion FAILS because the + // heartbeat consumed seq=1 and the Logon response was sent with + // MsgSeqNum=2. + // ----------------------------------------------------------------- + String logonResponse = findLogonResponse(responder.sentMessages); + int seqNum = extractMsgSeqNum(logonResponse); + + assertEquals( + "Acceptor Logon response must carry MsgSeqNum=1 when responding to " + + "a Logon with ResetSeqNumFlag=Y and MsgSeqNum=1, but got MsgSeqNum=" + + seqNum, + 1, + seqNum); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + /** + * Builds an incoming FIX 4.4 Logon message addressed to {@code sessionID}'s + * acceptor side, carrying {@code ResetSeqNumFlag=Y} and {@code MsgSeqNum=1}. + */ + private static quickfix.fix44.Logon buildIncomingLogon(SessionID sessionID) + throws FieldNotFound { + quickfix.fix44.Logon logon = new quickfix.fix44.Logon(); + logon.getHeader().setString(BeginString.FIELD, + sessionID.getBeginString()); + logon.getHeader().setString(SenderCompID.FIELD, + sessionID.getTargetCompID()); // from the remote peer's perspective + logon.getHeader().setString(TargetCompID.FIELD, + sessionID.getSenderCompID()); + logon.getHeader().setInt(MsgSeqNum.FIELD, 1); + logon.setInt(EncryptMethod.FIELD, EncryptMethod.NONE_OTHER); + logon.setInt(HeartBtInt.FIELD, 30); + logon.setBoolean(ResetSeqNumFlag.FIELD, true); + return logon; + } + + /** + * Returns the last outbound Logon message (35=A) recorded by the responder. + * + * @throws AssertionError if no Logon response was sent + */ + private static String findLogonResponse(List sent) { + for (int i = sent.size() - 1; i >= 0; i--) { + if (sent.get(i).contains("\00135=A\001")) { + return sent.get(i); + } + } + throw new AssertionError( + "No Logon response (35=A) found in sent messages: " + sent); + } + + /** + * Parses a raw FIX message string and returns the integer value of tag 34 + * (MsgSeqNum). + */ + private static int extractMsgSeqNum(String fixMessage) { + for (String field : fixMessage.split("\001")) { + if (field.startsWith("34=")) { + return Integer.parseInt(field.substring(3)); + } + } + throw new AssertionError("Tag 34 (MsgSeqNum) not found in message: " + fixMessage); + } + + /** + * Obtains the private {@code SessionState} from a {@link Session} via + * reflection. + */ + private static SessionState getSessionState(Session session) throws Exception { + Field stateField = Session.class.getDeclaredField("state"); + stateField.setAccessible(true); + return (SessionState) stateField.get(session); + } + + // ------------------------------------------------------------------------- + // Inner types + // ------------------------------------------------------------------------- + + /** + * A minimal {@link Responder} that records every raw FIX string passed to + * {@link #send(String)}. + */ + private static final class RecordingResponder implements Responder { + + final List sentMessages = new CopyOnWriteArrayList<>(); + + @Override + public boolean send(String data) { + sentMessages.add(data); + return true; + } + + @Override + public void disconnect() { + } + + @Override + public String getRemoteAddress() { + return "127.0.0.1:54321"; + } + } +}