From 293b2a14b102e14d307ff97cec68c04b47054f86 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 09:50:04 +0000 Subject: [PATCH 1/6] Initial plan From a40e80061d0dfd271c7f4ecc7d805bcbb63a8a42 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 09:53:33 +0000 Subject: [PATCH 2/6] Add threading model documentation (HTML + Markdown) Co-authored-by: chrjohn <6644028+chrjohn@users.noreply.github.com> Agent-Logs-Url: https://github.com/quickfix-j/quickfixj/sessions/e6f7a3f4-396a-403d-b91a-6501d8d500db --- docs/threading-model.md | 228 ++++++++++++++++ .../src/main/doc/usermanual/index.html | 1 + .../main/doc/usermanual/usage/threading.html | 255 ++++++++++++++++++ 3 files changed, 484 insertions(+) create mode 100644 docs/threading-model.md create mode 100644 quickfixj-core/src/main/doc/usermanual/usage/threading.html diff --git a/docs/threading-model.md b/docs/threading-model.md new file mode 100644 index 0000000000..cadb0d9ac5 --- /dev/null +++ b/docs/threading-model.md @@ -0,0 +1,228 @@ +# QuickFIX/J Threading Model + +## 1. Overview + +QuickFIX/J uses [Apache MINA](http://mina.apache.org/) for non-blocking I/O. The threading model for message processing is controlled by the `EventHandlingStrategy` interface (`quickfix.mina.EventHandlingStrategy`), with two concrete implementations: + +- **`SingleThreadedEventHandlingStrategy`** — one thread processes messages for all sessions (`SocketAcceptor`, `SocketInitiator`) +- **`ThreadPerSessionEventHandlingStrategy`** — one thread per session processes messages (`ThreadedSocketAcceptor`, `ThreadedSocketInitiator`) + +Both strategies co-exist with the **timer thread**, which is always present and always calls `Session.next()` (no-arg) on a 1-second schedule, regardless of which event-handling strategy is in use. + +--- + +## 2. Connector Classes and Their Strategy + +| Connector class | Event handling strategy | Thread name(s) | +|---|---|---| +| `SocketAcceptor` | `SingleThreadedEventHandlingStrategy` | `QFJ Message Processor` | +| `SocketInitiator` | `SingleThreadedEventHandlingStrategy` | `QFJ Message Processor` | +| `ThreadedSocketAcceptor` | `ThreadPerSessionEventHandlingStrategy` | `QF/J Session dispatcher: ` | +| `ThreadedSocketInitiator` | `ThreadPerSessionEventHandlingStrategy` | `QF/J Session dispatcher: ` | + +--- + +## 3. Single-Threaded Model (`SingleThreadedEventHandlingStrategy`) + +**Class:** `quickfix.mina.SingleThreadedEventHandlingStrategy` +**Source:** `quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java` + +- A single `BlockingQueue` holds events from **all** sessions. +- One background thread named **`QFJ Message Processor`** (a daemon thread) drains the queue and calls `session.next(message)` for each event via `SessionMessageEvent.processMessage()`. +- The thread is started via `blockInThread()`, which creates a `ThreadAdapter` wrapping the `block()` loop. +- `onMessage()` wraps incoming messages into a `SessionMessageEvent` and puts them on the shared queue. +- The `block()` loop polls the queue with a timeout (`THREAD_WAIT_FOR_MESSAGE_MS`) so it can periodically check the `isStopped` flag. +- On stop, remaining queued messages are drained and processed before the thread exits. +- The `getQueueSize(SessionID)` method returns the total queue size (single queue for all sessions — there is no per-session view). + +**Key point for application developers:** Because all sessions share a single processing thread, a slow `fromApp()` callback will delay processing for **all** other sessions. + +--- + +## 4. Thread-per-Session Model (`ThreadPerSessionEventHandlingStrategy`) + +**Class:** `quickfix.mina.ThreadPerSessionEventHandlingStrategy` +**Source:** `quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java` + +- A `ConcurrentHashMap` maps each session to its own dispatcher thread. +- On the first `onMessage()` call for a given session, a new `MessageDispatchingThread` is created and started via `startDispatcherThread()`. +- Each `MessageDispatchingThread` has its own `BlockingQueue` (or watermark-tracked queue) and loops calling `session.next(message)`. +- Thread name: **`QF/J Session dispatcher: :/`** +- The `Executor` can be customised via `setExecutor()`. The default is `DedicatedThreadExecutor`, which creates a plain `new Thread(command, name).start()`. +- On stop, `stopDispatcherThreads()` enqueues `END_OF_STREAM` to every dispatcher, sets `stopping=true`, and waits (polling every 100 ms) until all dispatchers report `isStopped`. +- After a dispatcher drains its remaining queue on shutdown, it removes itself from the `dispatchers` map. + +**Key point for application developers:** Since each session has its own thread, a slow `fromApp()` for one session does **not** block others. However, your `Application` implementation **must be thread-safe** if it shares state across sessions. + +--- + +## 5. The Timer Thread and `Session.next()` + +This is a critical part of the threading model that is **orthogonal** to the message-processing strategies above. + +**Class:** `quickfix.mina.SessionConnector` +**Source:** `quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java` + +### 5.1 The `QFJ Timer` Thread + +A single `ScheduledExecutorService` (a shared static instance using a `QFTimerThreadFactory`) runs a `SessionTimerTask` at a fixed rate of **every 1000 ms**. + +```java +// SessionConnector.java +private static class QFTimerThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "QFJ Timer"); + thread.setDaemon(true); + return thread; + } +} +``` + +The timer is started by `startSessionTimer()`: + +```java +protected void startSessionTimer() { + if (checkSessionTimerRunning()) { + return; + } + Runnable timerTask = new SessionTimerTask(); + if (shortLivedExecutor != null) { + timerTask = new DelegatingTask(timerTask, shortLivedExecutor); + } + sessionTimerFuture = SCHEDULED_EXECUTOR.scheduleAtFixedRate(timerTask, 0, 1000L, + TimeUnit.MILLISECONDS); +} +``` + +Only one timer is ever started per connector. If `startSessionTimer()` is called again while the timer is still running (e.g. during `createDynamicSession()`), the existing timer is reused. + +### 5.2 `SessionTimerTask` Iterates All Sessions and Calls `Session.next()` + +```java +private class SessionTimerTask implements Runnable { + @Override + public void run() { + try { + for (Session session : sessions.values()) { + try { + session.next(); + } catch (IOException e) { + LogUtil.logThrowable(session.getLog(), "Error in session timer processing", e); + } + } + } catch (Throwable e) { + log.error("Error during timer processing", e); + } + } +} +``` + +Even though each session may have its own dispatcher thread (in the thread-per-session model), the timer thread also calls `session.next()` directly on every session. This is independent of which `EventHandlingStrategy` is in use. + +### 5.3 What Does `Session.next()` (No-arg) Do? + +`Session.next()` is called from the timer, **not** from user code. Its Javadoc states: + +> Called from the timer-related code in the acceptor/initiator implementations. This is not typically called from application code. + +Its responsibilities (from `Session.java`): + +1. **Checks if the session is enabled.** If disabled and still logged on, it initiates a Logout. +2. **Checks session schedule.** If outside the configured session time window, it may reset sequence numbers or disconnect. This check is throttled to once per second. +3. **Returns early if not connected** (`hasResponder()` is false). +4. **Handles logon state:** If logon has not been received, it may send a Logon (for initiators) or detect a logon timeout. +5. **Checks logout timeout** if a logout has been sent. +6. **Heartbeat management:** + - If `HeartBtInt == 0`: returns (no heartbeat management). + - If timed out waiting for a heartbeat: disconnects (unless `DisableHeartBeatCheck=Y`). + - If a TestRequest is needed: sends a TestRequest (`generateTestRequest("TEST")`). + - If a Heartbeat is needed: sends a Heartbeat (`generateHeartbeat()`). + +The full flow: + +``` +QFJ Timer thread (every 1 second) + └─► SessionTimerTask.run() + └─► for each Session in sessions.values(): + └─► Session.next() + ├─ check enabled + ├─ check session schedule / reset + ├─ check hasResponder() + ├─ check logon state (send Logon if initiator) + ├─ check logout timeout + └─ heartbeat management + ├─ isTimedOut() → disconnect + ├─ isTestRequestNeeded() → send TestRequest + └─ isHeartBeatNeeded() → send Heartbeat +``` + +### 5.4 The Overloaded `Session.next(Message)` — Called by Dispatchers + +The `Session.next(Message message)` overload is what `MessageDispatchingThread` and `SessionMessageEvent` call with an actual FIX message. This processes the received message (validates, dispatches to `fromAdmin` / `fromApp`, handles sequence numbers, etc.). This is **distinct** from the no-arg `Session.next()` used by the timer. + +--- + +## 6. Thread Interaction Summary + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ MINA I/O Threads │ +│ (NIO selector threads, named "NioProcessor-N") │ +│ Receive raw bytes → decode FIX message → call EventHandlingStrategy │ +└──────────────────────────────┬──────────────────────────────────────────┘ + │ onMessage(session, message) + ┌───────────────────┴────────────────────┐ + │ │ + SingleThreaded ThreadPerSession + ────────────── ──────────────── + One shared queue Per-session queue + One "QFJ Message Processor" One "QF/J Session dispatcher: + thread calls " thread per session + session.next(msg) calls session.next(msg) + + Both strategies co-exist with the Timer Thread: + +┌─────────────────────────────────────────────────────────────────────────┐ +│ QFJ Timer Thread (daemon) │ +│ ScheduledExecutorService fires every 1000ms │ +│ SessionTimerTask iterates ALL sessions → calls Session.next() │ +│ (handles heartbeats, logon, session schedule, timeouts) │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## 7. Queue Capacity and Back-Pressure + +Both strategies support configurable queue capacity: + +- **Fixed capacity:** `new SingleThreadedEventHandlingStrategy(connector, queueCapacity)` — bounded `LinkedBlockingQueue`. Producers block when full (back-pressure). +- **Watermark-based:** `new SingleThreadedEventHandlingStrategy(connector, lowerWatermark, upperWatermark)` — uses `QueueTrackers.newMultiSessionWatermarkTracker(...)`. Flow control is applied per-session within the shared queue. +- Same two options exist for `ThreadPerSessionEventHandlingStrategy`, with `newSingleSessionWatermarkTracker` per session. + +--- + +## 8. Custom `Executor` Injection + +Both strategies accept a custom `java.util.concurrent.Executor` via `setExecutor(executor)`, called during `start()` from the connector. This allows integration with application-managed thread pools (e.g. virtual threads in Java 21+): + +```java +// Example: use virtual threads for session dispatchers (Java 21+) +ThreadedSocketAcceptor acceptor = new ThreadedSocketAcceptor(...); +acceptor.start(); // internally calls eventHandlingStrategy.setExecutor(longLivedExecutor) +``` + +The `longLivedExecutor` is provided by `SessionConnector` and can be customised. If no executor is set, `DedicatedThreadExecutor` creates a plain `new Thread(...)` per session/strategy. + +--- + +## 9. Thread Safety Implications for Application Developers + +- **`SocketAcceptor` / `SocketInitiator` (single-threaded):** The `Application` callbacks (`fromApp`, `fromAdmin`, etc.) are called from the single `QFJ Message Processor` thread. No concurrent calls to the same session. However, `Session.next()` (timer) runs concurrently from the `QFJ Timer` thread — it does not call application callbacks but it does send messages on the wire. +- **`ThreadedSocketAcceptor` / `ThreadedSocketInitiator` (thread-per-session):** Each session has its own dispatcher thread. Callbacks for **different sessions** may execute concurrently. Your `Application` implementation **must be thread-safe** if it shares state across sessions. +- In both models, the `QFJ Timer` thread runs concurrently with message-processing threads and calls `Session.next()` (no-arg), which may send heartbeats or disconnect. `Session` internally synchronizes on `this` to protect shared state. + +--- + +*See also: [`quickfixj-core/src/main/doc/usermanual/usage/threading.html`](../quickfixj-core/src/main/doc/usermanual/usage/threading.html) for the HTML version of this document.* diff --git a/quickfixj-core/src/main/doc/usermanual/index.html b/quickfixj-core/src/main/doc/usermanual/index.html index 81e10dee78..4ca130871b 100644 --- a/quickfixj-core/src/main/doc/usermanual/index.html +++ b/quickfixj-core/src/main/doc/usermanual/index.html @@ -61,6 +61,7 @@

Building and Testing QuickFIX/J

Using QuickFIX/J

  • Creating Your Application
  • +
  • Threading Model
  • Configuration
  • Acceptor Failover Support
  • Dynamic Acceptor Session Definition
  • diff --git a/quickfixj-core/src/main/doc/usermanual/usage/threading.html b/quickfixj-core/src/main/doc/usermanual/usage/threading.html new file mode 100644 index 0000000000..995ba1135f --- /dev/null +++ b/quickfixj-core/src/main/doc/usermanual/usage/threading.html @@ -0,0 +1,255 @@ + + + +Threading Model + + + +
    +

    QuickFIX/J User Manual

    +
    + +

    Threading Model

    + +

    QuickFIX/J uses Apache MINA for non-blocking I/O. The +threading model for message processing is controlled by the quickfix.mina.EventHandlingStrategy +interface, with two concrete implementations:

    + +
      +
    • SingleThreadedEventHandlingStrategy — one thread processes messages + for all sessions (SocketAcceptor, SocketInitiator)
    • +
    • ThreadPerSessionEventHandlingStrategy — one thread per session + processes messages (ThreadedSocketAcceptor, ThreadedSocketInitiator)
    • +
    + +

    Both strategies co-exist with the timer thread, which is always present and always calls +Session.next() (no-arg) on a 1-second schedule, regardless of which event-handling +strategy is in use.

    + +

    Connector Classes and Their Strategy

    + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    Connector classEvent handling strategyThread name(s)
    SocketAcceptorSingleThreadedEventHandlingStrategyQFJ Message Processor
    SocketInitiatorSingleThreadedEventHandlingStrategyQFJ Message Processor
    ThreadedSocketAcceptorThreadPerSessionEventHandlingStrategyQF/J Session dispatcher: <sessionID>
    ThreadedSocketInitiatorThreadPerSessionEventHandlingStrategyQF/J Session dispatcher: <sessionID>
    + +

    Single-Threaded Model (SingleThreadedEventHandlingStrategy)

    + +

    Class: quickfix.mina.SingleThreadedEventHandlingStrategy
    +Source: quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java

    + +
      +
    • A single BlockingQueue<SessionMessageEvent> holds events from all sessions.
    • +
    • One background thread named QFJ Message Processor (a daemon thread) drains the + queue and calls session.next(message) for each event via + SessionMessageEvent.processMessage().
    • +
    • The thread is started via blockInThread(), which creates a ThreadAdapter + wrapping the block() loop.
    • +
    • onMessage() wraps incoming messages into a SessionMessageEvent and puts + them on the shared queue.
    • +
    • The block() loop polls the queue with a timeout (THREAD_WAIT_FOR_MESSAGE_MS) + so it can periodically check the isStopped flag.
    • +
    • On stop, remaining queued messages are drained and processed before the thread exits.
    • +
    • The getQueueSize(SessionID) method returns the total queue size (single queue for all + sessions — there is no per-session view).
    • +
    + +

    Key point for application developers: Because all sessions share a single processing thread, +a slow fromApp() callback will delay processing for all other sessions.

    + +

    Thread-per-Session Model (ThreadPerSessionEventHandlingStrategy)

    + +

    Class: quickfix.mina.ThreadPerSessionEventHandlingStrategy
    +Source: quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java

    + +
      +
    • A ConcurrentHashMap<SessionID, MessageDispatchingThread> maps each session to its + own dispatcher thread.
    • +
    • On the first onMessage() call for a given session, a new + MessageDispatchingThread is created and started via startDispatcherThread().
    • +
    • Each MessageDispatchingThread has its own BlockingQueue<Message> (or + watermark-tracked queue) and loops calling session.next(message).
    • +
    • Thread name: QF/J Session dispatcher: <BeginString>:<SenderCompID>/<TargetCompID>
    • +
    • The Executor can be customised via setExecutor(). The default is + DedicatedThreadExecutor, which creates a plain new Thread(command, name).start().
    • +
    • On stop, stopDispatcherThreads() enqueues END_OF_STREAM to every dispatcher, + sets stopping=true, and waits (polling every 100 ms) until all dispatchers report + isStopped.
    • +
    • After a dispatcher drains its remaining queue on shutdown, it removes itself from the + dispatchers map.
    • +
    + +

    Key point for application developers: Since each session has its own thread, a slow +fromApp() for one session does not block others. However, your +Application implementation must be thread-safe if it shares state across sessions.

    + +

    The Timer Thread and Session.next()

    + +

    This is a critical part of the threading model that is orthogonal to the message-processing +strategies above.

    + +

    Class: quickfix.mina.SessionConnector
    +Source: quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java

    + +

    The QFJ Timer Thread

    + +

    A single ScheduledExecutorService (a shared static instance using a +QFTimerThreadFactory) runs a SessionTimerTask at a fixed rate of +every 1000 ms. The timer thread is a daemon thread named QFJ Timer.

    + +

    Only one timer is ever started per connector. If startSessionTimer() is called again +while the timer is still running (e.g. during createDynamicSession()), the existing timer +is reused.

    + +

    SessionTimerTask Iterates All Sessions and Calls Session.next()

    + +
    +private class SessionTimerTask implements Runnable {
    +    @Override
    +    public void run() {
    +        try {
    +            for (Session session : sessions.values()) {
    +                try {
    +                    session.next();
    +                } catch (IOException e) {
    +                    LogUtil.logThrowable(session.getLog(), "Error in session timer processing", e);
    +                }
    +            }
    +        } catch (Throwable e) {
    +            log.error("Error during timer processing", e);
    +        }
    +    }
    +}
    +
    + +

    Even though each session may have its own dispatcher thread (in the thread-per-session model), the +timer thread also calls session.next() directly on every session. This is independent of +which EventHandlingStrategy is in use.

    + +

    What Does Session.next() (No-arg) Do?

    + +

    Session.next() is called from the timer, not from user code. Its responsibilities +(from quickfix.Session) are:

    + +
      +
    1. Checks if the session is enabled. If disabled and still logged on, it initiates a Logout.
    2. +
    3. Checks session schedule. If outside the configured session time window, it may reset + sequence numbers or disconnect. This check is throttled to once per second.
    4. +
    5. Returns early if not connected (hasResponder() is false).
    6. +
    7. Handles logon state: If logon has not been received, it may send a Logon (for initiators) + or detect a logon timeout.
    8. +
    9. Checks logout timeout if a logout has been sent.
    10. +
    11. Heartbeat management: +
        +
      • If HeartBtInt == 0: returns (no heartbeat management).
      • +
      • If timed out waiting for a heartbeat: disconnects (unless DisableHeartBeatCheck=Y).
      • +
      • If a TestRequest is needed: sends a TestRequest (generateTestRequest("TEST")).
      • +
      • If a Heartbeat is needed: sends a Heartbeat (generateHeartbeat()).
      • +
      +
    12. +
    + +

    The Overloaded Session.next(Message) — Called by Dispatchers

    + +

    The Session.next(Message message) overload is what MessageDispatchingThread +and SessionMessageEvent call with an actual FIX message. This processes the received message +(validates, dispatches to fromAdmin / fromApp, handles sequence numbers, +etc.). This is distinct from the no-arg Session.next() used by the timer.

    + +

    Thread Interaction Summary

    + +
    ++-------------------------------------------------------------------------+
    +|                         MINA I/O Threads                                |
    +|       (NIO selector threads, named "NioProcessor-N")                    |
    +|  Receive raw bytes -> decode FIX message -> call EventHandlingStrategy  |
    ++------------------------------+------------------------------------------+
    +                               |  onMessage(session, message)
    +           +-------------------+--------------------+
    +           |                                        |
    +    SingleThreaded                          ThreadPerSession
    +    --------------                          ----------------
    +  One shared queue                       Per-session queue
    +  One "QFJ Message Processor"            One "QF/J Session dispatcher:
    +  thread calls                           <sessionID>" thread per session
    +  session.next(msg)                      calls session.next(msg)
    +
    +           Both strategies co-exist with the Timer Thread:
    +
    ++-------------------------------------------------------------------------+
    +|                    QFJ Timer Thread (daemon)                            |
    +|  ScheduledExecutorService fires every 1000ms                            |
    +|  SessionTimerTask iterates ALL sessions -> calls Session.next()         |
    +|  (handles heartbeats, logon, session schedule, timeouts)                |
    ++-------------------------------------------------------------------------+
    +
    + +

    Queue Capacity and Back-Pressure

    + +

    Both strategies support configurable queue capacity:

    + +
      +
    • Fixed capacity: new SingleThreadedEventHandlingStrategy(connector, queueCapacity) + — bounded LinkedBlockingQueue. Producers block when full (back-pressure).
    • +
    • Watermark-based: new SingleThreadedEventHandlingStrategy(connector, lowerWatermark, upperWatermark) + — uses QueueTrackers.newMultiSessionWatermarkTracker(...). Flow control is applied + per-session within the shared queue.
    • +
    • Same two options exist for ThreadPerSessionEventHandlingStrategy, with + newSingleSessionWatermarkTracker per session.
    • +
    + +

    Custom Executor Injection

    + +

    Both strategies accept a custom java.util.concurrent.Executor via +setExecutor(executor), called during start() from the connector. This allows +integration with application-managed thread pools (e.g. virtual threads in Java 21+).

    + +

    If no executor is set, DedicatedThreadExecutor creates a plain +new Thread(...) per session/strategy.

    + +

    Thread Safety Implications for Application Developers

    + +
    +
    SocketAcceptor / SocketInitiator (single-threaded)
    +
    The Application callbacks (fromApp, fromAdmin, etc.) are + called from the single QFJ Message Processor thread. There are no concurrent calls to + the same session. However, Session.next() (timer) runs concurrently from the + QFJ Timer thread — it does not call application callbacks but it does send + messages on the wire.
    +
    ThreadedSocketAcceptor / ThreadedSocketInitiator (thread-per-session)
    +
    Each session has its own dispatcher thread. Callbacks for different sessions may execute + concurrently. Your Application implementation must be thread-safe if it shares + state across sessions.
    +
    + +

    In both models, the QFJ Timer thread runs concurrently with message-processing threads +and calls Session.next() (no-arg), which may send heartbeats or disconnect. +Session internally synchronizes on this to protect shared state.

    + + + + + From 8194eea21cb41b295a93624e9db2e9ee28c7a724 Mon Sep 17 00:00:00 2001 From: Christoph John Date: Sat, 21 Mar 2026 12:13:59 +0100 Subject: [PATCH 3/6] docs: Add threading developer guide (user-friendly) --- docs/threading-developer-guide.md | 116 ++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 docs/threading-developer-guide.md diff --git a/docs/threading-developer-guide.md b/docs/threading-developer-guide.md new file mode 100644 index 0000000000..92f2f9efb9 --- /dev/null +++ b/docs/threading-developer-guide.md @@ -0,0 +1,116 @@ +# QuickFIX/J Threading Model — Developer Guide + +QuickFIX/J provides two threading strategies for processing FIX messages. The choice of strategy +affects how your application handles concurrent sessions and what thread-safety guarantees you must +provide. + +## Threading Strategies + +### Single-Threaded Strategy + +**Classes:** `SocketAcceptor` / `SocketInitiator` + +All sessions share a single message-processing thread (named `QFJ Message Processor`). Incoming +messages from all sessions are placed in a shared queue and dispatched one at a time by this thread. + +This means your `Application` callbacks (`fromApp`, `fromAdmin`, etc.) are always invoked from the +same thread, so you do not need to make your application code thread-safe with respect to concurrent +session callbacks. However, a slow callback will delay message processing for all other sessions. + +**Use when:** +- You have a small number of sessions. +- Simplicity and predictable, sequential message processing are more important than throughput. +- You want to avoid the complexity of thread-safe application code. + +### Thread-Per-Session Strategy + +**Classes:** `ThreadedSocketAcceptor` / `ThreadedSocketInitiator` + +Each session gets its own dedicated message-dispatching thread. Incoming messages for a session are +queued and processed by that session's thread independently of other sessions. + +Because your `Application` callbacks can be invoked concurrently from multiple session threads, your +application code **must be thread-safe**. + +**Use when:** +- You have multiple sessions and need them to process messages independently. +- A slow or blocking callback for one session must not impact other sessions. +- You can ensure your application implementation is thread-safe. + +## Queue Capacity and Back-pressure + +Both strategies support configuring the internal message queue capacity to control back-pressure: + +```java +// Fixed-capacity queue (blocks producers when full) +Acceptor acceptor = new ThreadedSocketAcceptor( + application, storeFactory, settings, logFactory, messageFactory, + queueCapacity); + +// Watermark-based flow control +Acceptor acceptor = ThreadedSocketAcceptor.newBuilder() + .withApplication(application) + .withMessageStoreFactory(storeFactory) + .withSettings(settings) + .withLogFactory(logFactory) + .withMessageFactory(messageFactory) + .withQueueLowerWatermark(lowerWatermark) + .withQueueUpperWatermark(upperWatermark) + .build(); +``` + +The same constructors and builder options are available on `SocketAcceptor`, `SocketInitiator`, and +`ThreadedSocketInitiator`. + +## Choosing a Strategy + +| | `SocketAcceptor` / `SocketInitiator` | `ThreadedSocketAcceptor` / `ThreadedSocketInitiator` | +|---|---|---| +| Message processing | Single shared thread | One thread per session | +| Application thread-safety required | No | Yes | +| Session isolation | No | Yes | +| Typical use case | Few sessions, simple apps | Many sessions, independent processing | + +## Example: Starting an Acceptor + +```java +import quickfix.*; +import java.io.FileInputStream; + +public class MyApp { + + public static void main(String[] args) throws Exception { + Application application = new MyApplication(); + SessionSettings settings = new SessionSettings(new FileInputStream(args[0])); + MessageStoreFactory storeFactory = new FileStoreFactory(settings); + LogFactory logFactory = new FileLogFactory(settings); + MessageFactory messageFactory = new DefaultMessageFactory(); + + // Single-threaded: all sessions share one message-processing thread + Acceptor acceptor = new SocketAcceptor( + application, storeFactory, settings, logFactory, messageFactory); + + // OR thread-per-session: each session has its own message-processing thread + // (application must be thread-safe) + // Acceptor acceptor = new ThreadedSocketAcceptor( + // application, storeFactory, settings, logFactory, messageFactory); + + acceptor.start(); + // ... run your application ... + acceptor.stop(); + } +} +``` + +## Thread Safety Guidance + +Regardless of which strategy you choose, note that `Session.sendToTarget()` is thread-safe and may +be called from any thread to send outgoing messages. + +When using `ThreadedSocketAcceptor` or `ThreadedSocketInitiator`, ensure that any shared state +accessed in your `Application` implementation (e.g., order books, maps, counters) is properly +synchronized or uses thread-safe data structures. + +--- + +*For a deep technical reference on the threading internals, see [`threading-model.md`](./threading-model.md).* \ No newline at end of file From 24d40ad7487596582bb44606d7536ac44b7c9210 Mon Sep 17 00:00:00 2001 From: Christoph John Date: Sat, 21 Mar 2026 16:17:09 +0100 Subject: [PATCH 4/6] Delete quickfixj-core/src/main/doc/usermanual/usage/threading.html --- .../main/doc/usermanual/usage/threading.html | 255 ------------------ 1 file changed, 255 deletions(-) delete mode 100644 quickfixj-core/src/main/doc/usermanual/usage/threading.html diff --git a/quickfixj-core/src/main/doc/usermanual/usage/threading.html b/quickfixj-core/src/main/doc/usermanual/usage/threading.html deleted file mode 100644 index 995ba1135f..0000000000 --- a/quickfixj-core/src/main/doc/usermanual/usage/threading.html +++ /dev/null @@ -1,255 +0,0 @@ - - - -Threading Model - - - -
    -

    QuickFIX/J User Manual

    -
    - -

    Threading Model

    - -

    QuickFIX/J uses Apache MINA for non-blocking I/O. The -threading model for message processing is controlled by the quickfix.mina.EventHandlingStrategy -interface, with two concrete implementations:

    - -
      -
    • SingleThreadedEventHandlingStrategy — one thread processes messages - for all sessions (SocketAcceptor, SocketInitiator)
    • -
    • ThreadPerSessionEventHandlingStrategy — one thread per session - processes messages (ThreadedSocketAcceptor, ThreadedSocketInitiator)
    • -
    - -

    Both strategies co-exist with the timer thread, which is always present and always calls -Session.next() (no-arg) on a 1-second schedule, regardless of which event-handling -strategy is in use.

    - -

    Connector Classes and Their Strategy

    - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    Connector classEvent handling strategyThread name(s)
    SocketAcceptorSingleThreadedEventHandlingStrategyQFJ Message Processor
    SocketInitiatorSingleThreadedEventHandlingStrategyQFJ Message Processor
    ThreadedSocketAcceptorThreadPerSessionEventHandlingStrategyQF/J Session dispatcher: <sessionID>
    ThreadedSocketInitiatorThreadPerSessionEventHandlingStrategyQF/J Session dispatcher: <sessionID>
    - -

    Single-Threaded Model (SingleThreadedEventHandlingStrategy)

    - -

    Class: quickfix.mina.SingleThreadedEventHandlingStrategy
    -Source: quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java

    - -
      -
    • A single BlockingQueue<SessionMessageEvent> holds events from all sessions.
    • -
    • One background thread named QFJ Message Processor (a daemon thread) drains the - queue and calls session.next(message) for each event via - SessionMessageEvent.processMessage().
    • -
    • The thread is started via blockInThread(), which creates a ThreadAdapter - wrapping the block() loop.
    • -
    • onMessage() wraps incoming messages into a SessionMessageEvent and puts - them on the shared queue.
    • -
    • The block() loop polls the queue with a timeout (THREAD_WAIT_FOR_MESSAGE_MS) - so it can periodically check the isStopped flag.
    • -
    • On stop, remaining queued messages are drained and processed before the thread exits.
    • -
    • The getQueueSize(SessionID) method returns the total queue size (single queue for all - sessions — there is no per-session view).
    • -
    - -

    Key point for application developers: Because all sessions share a single processing thread, -a slow fromApp() callback will delay processing for all other sessions.

    - -

    Thread-per-Session Model (ThreadPerSessionEventHandlingStrategy)

    - -

    Class: quickfix.mina.ThreadPerSessionEventHandlingStrategy
    -Source: quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java

    - -
      -
    • A ConcurrentHashMap<SessionID, MessageDispatchingThread> maps each session to its - own dispatcher thread.
    • -
    • On the first onMessage() call for a given session, a new - MessageDispatchingThread is created and started via startDispatcherThread().
    • -
    • Each MessageDispatchingThread has its own BlockingQueue<Message> (or - watermark-tracked queue) and loops calling session.next(message).
    • -
    • Thread name: QF/J Session dispatcher: <BeginString>:<SenderCompID>/<TargetCompID>
    • -
    • The Executor can be customised via setExecutor(). The default is - DedicatedThreadExecutor, which creates a plain new Thread(command, name).start().
    • -
    • On stop, stopDispatcherThreads() enqueues END_OF_STREAM to every dispatcher, - sets stopping=true, and waits (polling every 100 ms) until all dispatchers report - isStopped.
    • -
    • After a dispatcher drains its remaining queue on shutdown, it removes itself from the - dispatchers map.
    • -
    - -

    Key point for application developers: Since each session has its own thread, a slow -fromApp() for one session does not block others. However, your -Application implementation must be thread-safe if it shares state across sessions.

    - -

    The Timer Thread and Session.next()

    - -

    This is a critical part of the threading model that is orthogonal to the message-processing -strategies above.

    - -

    Class: quickfix.mina.SessionConnector
    -Source: quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java

    - -

    The QFJ Timer Thread

    - -

    A single ScheduledExecutorService (a shared static instance using a -QFTimerThreadFactory) runs a SessionTimerTask at a fixed rate of -every 1000 ms. The timer thread is a daemon thread named QFJ Timer.

    - -

    Only one timer is ever started per connector. If startSessionTimer() is called again -while the timer is still running (e.g. during createDynamicSession()), the existing timer -is reused.

    - -

    SessionTimerTask Iterates All Sessions and Calls Session.next()

    - -
    -private class SessionTimerTask implements Runnable {
    -    @Override
    -    public void run() {
    -        try {
    -            for (Session session : sessions.values()) {
    -                try {
    -                    session.next();
    -                } catch (IOException e) {
    -                    LogUtil.logThrowable(session.getLog(), "Error in session timer processing", e);
    -                }
    -            }
    -        } catch (Throwable e) {
    -            log.error("Error during timer processing", e);
    -        }
    -    }
    -}
    -
    - -

    Even though each session may have its own dispatcher thread (in the thread-per-session model), the -timer thread also calls session.next() directly on every session. This is independent of -which EventHandlingStrategy is in use.

    - -

    What Does Session.next() (No-arg) Do?

    - -

    Session.next() is called from the timer, not from user code. Its responsibilities -(from quickfix.Session) are:

    - -
      -
    1. Checks if the session is enabled. If disabled and still logged on, it initiates a Logout.
    2. -
    3. Checks session schedule. If outside the configured session time window, it may reset - sequence numbers or disconnect. This check is throttled to once per second.
    4. -
    5. Returns early if not connected (hasResponder() is false).
    6. -
    7. Handles logon state: If logon has not been received, it may send a Logon (for initiators) - or detect a logon timeout.
    8. -
    9. Checks logout timeout if a logout has been sent.
    10. -
    11. Heartbeat management: -
        -
      • If HeartBtInt == 0: returns (no heartbeat management).
      • -
      • If timed out waiting for a heartbeat: disconnects (unless DisableHeartBeatCheck=Y).
      • -
      • If a TestRequest is needed: sends a TestRequest (generateTestRequest("TEST")).
      • -
      • If a Heartbeat is needed: sends a Heartbeat (generateHeartbeat()).
      • -
      -
    12. -
    - -

    The Overloaded Session.next(Message) — Called by Dispatchers

    - -

    The Session.next(Message message) overload is what MessageDispatchingThread -and SessionMessageEvent call with an actual FIX message. This processes the received message -(validates, dispatches to fromAdmin / fromApp, handles sequence numbers, -etc.). This is distinct from the no-arg Session.next() used by the timer.

    - -

    Thread Interaction Summary

    - -
    -+-------------------------------------------------------------------------+
    -|                         MINA I/O Threads                                |
    -|       (NIO selector threads, named "NioProcessor-N")                    |
    -|  Receive raw bytes -> decode FIX message -> call EventHandlingStrategy  |
    -+------------------------------+------------------------------------------+
    -                               |  onMessage(session, message)
    -           +-------------------+--------------------+
    -           |                                        |
    -    SingleThreaded                          ThreadPerSession
    -    --------------                          ----------------
    -  One shared queue                       Per-session queue
    -  One "QFJ Message Processor"            One "QF/J Session dispatcher:
    -  thread calls                           <sessionID>" thread per session
    -  session.next(msg)                      calls session.next(msg)
    -
    -           Both strategies co-exist with the Timer Thread:
    -
    -+-------------------------------------------------------------------------+
    -|                    QFJ Timer Thread (daemon)                            |
    -|  ScheduledExecutorService fires every 1000ms                            |
    -|  SessionTimerTask iterates ALL sessions -> calls Session.next()         |
    -|  (handles heartbeats, logon, session schedule, timeouts)                |
    -+-------------------------------------------------------------------------+
    -
    - -

    Queue Capacity and Back-Pressure

    - -

    Both strategies support configurable queue capacity:

    - -
      -
    • Fixed capacity: new SingleThreadedEventHandlingStrategy(connector, queueCapacity) - — bounded LinkedBlockingQueue. Producers block when full (back-pressure).
    • -
    • Watermark-based: new SingleThreadedEventHandlingStrategy(connector, lowerWatermark, upperWatermark) - — uses QueueTrackers.newMultiSessionWatermarkTracker(...). Flow control is applied - per-session within the shared queue.
    • -
    • Same two options exist for ThreadPerSessionEventHandlingStrategy, with - newSingleSessionWatermarkTracker per session.
    • -
    - -

    Custom Executor Injection

    - -

    Both strategies accept a custom java.util.concurrent.Executor via -setExecutor(executor), called during start() from the connector. This allows -integration with application-managed thread pools (e.g. virtual threads in Java 21+).

    - -

    If no executor is set, DedicatedThreadExecutor creates a plain -new Thread(...) per session/strategy.

    - -

    Thread Safety Implications for Application Developers

    - -
    -
    SocketAcceptor / SocketInitiator (single-threaded)
    -
    The Application callbacks (fromApp, fromAdmin, etc.) are - called from the single QFJ Message Processor thread. There are no concurrent calls to - the same session. However, Session.next() (timer) runs concurrently from the - QFJ Timer thread — it does not call application callbacks but it does send - messages on the wire.
    -
    ThreadedSocketAcceptor / ThreadedSocketInitiator (thread-per-session)
    -
    Each session has its own dispatcher thread. Callbacks for different sessions may execute - concurrently. Your Application implementation must be thread-safe if it shares - state across sessions.
    -
    - -

    In both models, the QFJ Timer thread runs concurrently with message-processing threads -and calls Session.next() (no-arg), which may send heartbeats or disconnect. -Session internally synchronizes on this to protect shared state.

    - - - - - From ac843f0f463ed3ca76fd9fea635387ee5c3a504c Mon Sep 17 00:00:00 2001 From: Christoph John Date: Sat, 21 Mar 2026 16:18:26 +0100 Subject: [PATCH 5/6] Apply suggestion from @chrjohn --- quickfixj-core/src/main/doc/usermanual/index.html | 1 - 1 file changed, 1 deletion(-) diff --git a/quickfixj-core/src/main/doc/usermanual/index.html b/quickfixj-core/src/main/doc/usermanual/index.html index 4ca130871b..81e10dee78 100644 --- a/quickfixj-core/src/main/doc/usermanual/index.html +++ b/quickfixj-core/src/main/doc/usermanual/index.html @@ -61,7 +61,6 @@

    Building and Testing QuickFIX/J

    Using QuickFIX/J

    • Creating Your Application
    • -
    • Threading Model
    • Configuration
    • Acceptor Failover Support
    • Dynamic Acceptor Session Definition
    • From a29364e2244ae3b0718df80b2c42bb0bc6e7cc41 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Mar 2026 16:03:15 +0000 Subject: [PATCH 6/6] Add threading model reference to README.md Co-authored-by: chrjohn <6644028+chrjohn@users.noreply.github.com> Agent-Logs-Url: https://github.com/quickfix-j/quickfixj/sessions/22db1795-4f49-4956-b106-99eb3fef8630 --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 92f1add464..1e35738846 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,10 @@ $ mvnw clean install -Dmaven.javadoc.skip=true -DskipTests -PskipBundlePlugin,mi https://quickfix-j.github.io/quickfixj/quickfixj-core/src/main/doc/usermanual/usage/configuration.html +## threading model + +For a detailed description of the QuickFIX/J threading model — including the single-threaded vs. thread-per-session strategies, the timer thread, heartbeat management, queue back-pressure, and thread-safety implications for application developers — see [docs/threading-model.md](docs/threading-model.md). + ## basics ### related projects