From a6b7b435dd993e2f730ccda63268648ed89631da Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 6 Apr 2026 13:59:42 +0000 Subject: [PATCH 1/3] Add strategy comparison table and acceptor example to threading-model.md from #1173 Agent-Logs-Url: https://github.com/quickfix-j/quickfixj/sessions/400b6836-0665-474e-84ba-00cf1944ea0f Co-authored-by: chrjohn <6644028+chrjohn@users.noreply.github.com> --- README.md | 4 + docs/threading-developer-guide.md | 116 +++++++++++++ docs/threading-model.md | 272 ++++++++++++++++++++++++++++++ 3 files changed, 392 insertions(+) create mode 100644 docs/threading-developer-guide.md create mode 100644 docs/threading-model.md diff --git a/README.md b/README.md index 92f1add46..1e3573884 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,10 @@ $ mvnw clean install -Dmaven.javadoc.skip=true -DskipTests -PskipBundlePlugin,mi https://quickfix-j.github.io/quickfixj/quickfixj-core/src/main/doc/usermanual/usage/configuration.html +## threading model + +For a detailed description of the QuickFIX/J threading model — including the single-threaded vs. thread-per-session strategies, the timer thread, heartbeat management, queue back-pressure, and thread-safety implications for application developers — see [docs/threading-model.md](docs/threading-model.md). + ## basics ### related projects diff --git a/docs/threading-developer-guide.md b/docs/threading-developer-guide.md new file mode 100644 index 000000000..92f2f9efb --- /dev/null +++ b/docs/threading-developer-guide.md @@ -0,0 +1,116 @@ +# QuickFIX/J Threading Model — Developer Guide + +QuickFIX/J provides two threading strategies for processing FIX messages. The choice of strategy +affects how your application handles concurrent sessions and what thread-safety guarantees you must +provide. + +## Threading Strategies + +### Single-Threaded Strategy + +**Classes:** `SocketAcceptor` / `SocketInitiator` + +All sessions share a single message-processing thread (named `QFJ Message Processor`). Incoming +messages from all sessions are placed in a shared queue and dispatched one at a time by this thread. + +This means your `Application` callbacks (`fromApp`, `fromAdmin`, etc.) are always invoked from the +same thread, so you do not need to make your application code thread-safe with respect to concurrent +session callbacks. However, a slow callback will delay message processing for all other sessions. + +**Use when:** +- You have a small number of sessions. +- Simplicity and predictable, sequential message processing are more important than throughput. +- You want to avoid the complexity of thread-safe application code. + +### Thread-Per-Session Strategy + +**Classes:** `ThreadedSocketAcceptor` / `ThreadedSocketInitiator` + +Each session gets its own dedicated message-dispatching thread. Incoming messages for a session are +queued and processed by that session's thread independently of other sessions. + +Because your `Application` callbacks can be invoked concurrently from multiple session threads, your +application code **must be thread-safe**. + +**Use when:** +- You have multiple sessions and need them to process messages independently. +- A slow or blocking callback for one session must not impact other sessions. +- You can ensure your application implementation is thread-safe. + +## Queue Capacity and Back-pressure + +Both strategies support configuring the internal message queue capacity to control back-pressure: + +```java +// Fixed-capacity queue (blocks producers when full) +Acceptor acceptor = new ThreadedSocketAcceptor( + application, storeFactory, settings, logFactory, messageFactory, + queueCapacity); + +// Watermark-based flow control +Acceptor acceptor = ThreadedSocketAcceptor.newBuilder() + .withApplication(application) + .withMessageStoreFactory(storeFactory) + .withSettings(settings) + .withLogFactory(logFactory) + .withMessageFactory(messageFactory) + .withQueueLowerWatermark(lowerWatermark) + .withQueueUpperWatermark(upperWatermark) + .build(); +``` + +The same constructors and builder options are available on `SocketAcceptor`, `SocketInitiator`, and +`ThreadedSocketInitiator`. + +## Choosing a Strategy + +| | `SocketAcceptor` / `SocketInitiator` | `ThreadedSocketAcceptor` / `ThreadedSocketInitiator` | +|---|---|---| +| Message processing | Single shared thread | One thread per session | +| Application thread-safety required | No | Yes | +| Session isolation | No | Yes | +| Typical use case | Few sessions, simple apps | Many sessions, independent processing | + +## Example: Starting an Acceptor + +```java +import quickfix.*; +import java.io.FileInputStream; + +public class MyApp { + + public static void main(String[] args) throws Exception { + Application application = new MyApplication(); + SessionSettings settings = new SessionSettings(new FileInputStream(args[0])); + MessageStoreFactory storeFactory = new FileStoreFactory(settings); + LogFactory logFactory = new FileLogFactory(settings); + MessageFactory messageFactory = new DefaultMessageFactory(); + + // Single-threaded: all sessions share one message-processing thread + Acceptor acceptor = new SocketAcceptor( + application, storeFactory, settings, logFactory, messageFactory); + + // OR thread-per-session: each session has its own message-processing thread + // (application must be thread-safe) + // Acceptor acceptor = new ThreadedSocketAcceptor( + // application, storeFactory, settings, logFactory, messageFactory); + + acceptor.start(); + // ... run your application ... + acceptor.stop(); + } +} +``` + +## Thread Safety Guidance + +Regardless of which strategy you choose, note that `Session.sendToTarget()` is thread-safe and may +be called from any thread to send outgoing messages. + +When using `ThreadedSocketAcceptor` or `ThreadedSocketInitiator`, ensure that any shared state +accessed in your `Application` implementation (e.g., order books, maps, counters) is properly +synchronized or uses thread-safe data structures. + +--- + +*For a deep technical reference on the threading internals, see [`threading-model.md`](./threading-model.md).* \ No newline at end of file diff --git a/docs/threading-model.md b/docs/threading-model.md new file mode 100644 index 000000000..0172c02f8 --- /dev/null +++ b/docs/threading-model.md @@ -0,0 +1,272 @@ +# QuickFIX/J Threading Model + +## 1. Overview + +QuickFIX/J uses [Apache MINA](http://mina.apache.org/) for non-blocking I/O. The threading model for message processing is controlled by the `EventHandlingStrategy` interface (`quickfix.mina.EventHandlingStrategy`), with two concrete implementations: + +- **`SingleThreadedEventHandlingStrategy`** — one thread processes messages for all sessions (`SocketAcceptor`, `SocketInitiator`) +- **`ThreadPerSessionEventHandlingStrategy`** — one thread per session processes messages (`ThreadedSocketAcceptor`, `ThreadedSocketInitiator`) + +Both strategies co-exist with the **timer thread**, which is always present and always calls `Session.next()` (no-arg) on a 1-second schedule, regardless of which event-handling strategy is in use. + +--- + +## 2. Connector Classes and Their Strategy + +| Connector class | Event handling strategy | Thread name(s) | +|---|---|---| +| `SocketAcceptor` | `SingleThreadedEventHandlingStrategy` | `QFJ Message Processor` | +| `SocketInitiator` | `SingleThreadedEventHandlingStrategy` | `QFJ Message Processor` | +| `ThreadedSocketAcceptor` | `ThreadPerSessionEventHandlingStrategy` | `QF/J Session dispatcher: ` | +| `ThreadedSocketInitiator` | `ThreadPerSessionEventHandlingStrategy` | `QF/J Session dispatcher: ` | + +--- + +## 3. Single-Threaded Model (`SingleThreadedEventHandlingStrategy`) + +**Class:** `quickfix.mina.SingleThreadedEventHandlingStrategy` +**Source:** `quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java` + +- A single `BlockingQueue` holds events from **all** sessions. +- One background thread named **`QFJ Message Processor`** (a daemon thread) drains the queue and calls `session.next(message)` for each event via `SessionMessageEvent.processMessage()`. +- The thread is started via `blockInThread()`, which creates a `ThreadAdapter` wrapping the `block()` loop. +- `onMessage()` wraps incoming messages into a `SessionMessageEvent` and puts them on the shared queue. +- The `block()` loop polls the queue with a timeout (`THREAD_WAIT_FOR_MESSAGE_MS`) so it can periodically check the `isStopped` flag. +- On stop, remaining queued messages are drained and processed before the thread exits. +- The `getQueueSize(SessionID)` method returns the total queue size (single queue for all sessions — there is no per-session view). + +**Key point for application developers:** Because all sessions share a single processing thread, a slow `fromApp()` callback will delay processing for **all** other sessions. + +--- + +## 4. Thread-per-Session Model (`ThreadPerSessionEventHandlingStrategy`) + +**Class:** `quickfix.mina.ThreadPerSessionEventHandlingStrategy` +**Source:** `quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java` + +- A `ConcurrentHashMap` maps each session to its own dispatcher thread. +- On the first `onMessage()` call for a given session, a new `MessageDispatchingThread` is created and started via `startDispatcherThread()`. +- Each `MessageDispatchingThread` has its own `BlockingQueue` (or watermark-tracked queue) and loops calling `session.next(message)`. +- Thread name: **`QF/J Session dispatcher: :/`** +- The `Executor` can be customised via `setExecutor()`. The default is `DedicatedThreadExecutor`, which creates a plain `new Thread(command, name).start()`. +- On stop, `stopDispatcherThreads()` enqueues `END_OF_STREAM` to every dispatcher, sets `stopping=true`, and waits (polling every 100 ms) until all dispatchers report `isStopped`. +- After a dispatcher drains its remaining queue on shutdown, it removes itself from the `dispatchers` map. + +**Key point for application developers:** Since each session has its own thread, a slow `fromApp()` for one session does **not** block others. However, your `Application` implementation **must be thread-safe** if it shares state across sessions. + +--- + +## 5. The Timer Thread and `Session.next()` + +This is a critical part of the threading model that is **orthogonal** to the message-processing strategies above. + +**Class:** `quickfix.mina.SessionConnector` +**Source:** `quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java` + +### 5.1 The `QFJ Timer` Thread + +A single `ScheduledExecutorService` (a shared static instance using a `QFTimerThreadFactory`) runs a `SessionTimerTask` at a fixed rate of **every 1000 ms**. + +```java +// SessionConnector.java +private static class QFTimerThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "QFJ Timer"); + thread.setDaemon(true); + return thread; + } +} +``` + +The timer is started by `startSessionTimer()`: + +```java +protected void startSessionTimer() { + if (checkSessionTimerRunning()) { + return; + } + Runnable timerTask = new SessionTimerTask(); + if (shortLivedExecutor != null) { + timerTask = new DelegatingTask(timerTask, shortLivedExecutor); + } + sessionTimerFuture = SCHEDULED_EXECUTOR.scheduleAtFixedRate(timerTask, 0, 1000L, + TimeUnit.MILLISECONDS); +} +``` + +Only one timer is ever started per connector. If `startSessionTimer()` is called again while the timer is still running (e.g. during `createDynamicSession()`), the existing timer is reused. + +### 5.2 `SessionTimerTask` Iterates All Sessions and Calls `Session.next()` + +```java +private class SessionTimerTask implements Runnable { + @Override + public void run() { + try { + for (Session session : sessions.values()) { + try { + session.next(); + } catch (IOException e) { + LogUtil.logThrowable(session.getLog(), "Error in session timer processing", e); + } + } + } catch (Throwable e) { + log.error("Error during timer processing", e); + } + } +} +``` + +Even though each session may have its own dispatcher thread (in the thread-per-session model), the timer thread also calls `session.next()` directly on every session. This is independent of which `EventHandlingStrategy` is in use. + +### 5.3 What Does `Session.next()` (No-arg) Do? + +`Session.next()` is called from the timer, **not** from user code. Its Javadoc states: + +> Called from the timer-related code in the acceptor/initiator implementations. This is not typically called from application code. + +Its responsibilities (from `Session.java`): + +1. **Checks if the session is enabled.** If disabled and still logged on, it initiates a Logout. +2. **Checks session schedule.** If outside the configured session time window, it may reset sequence numbers or disconnect. This check is throttled to once per second. +3. **Returns early if not connected** (`hasResponder()` is false). +4. **Handles logon state:** If logon has not been received, it may send a Logon (for initiators) or detect a logon timeout. +5. **Checks logout timeout** if a logout has been sent. +6. **Heartbeat management:** + - If `HeartBtInt == 0`: returns (no heartbeat management). + - If timed out waiting for a heartbeat: disconnects (unless `DisableHeartBeatCheck=Y`). + - If a TestRequest is needed: sends a TestRequest (`generateTestRequest("TEST")`). + - If a Heartbeat is needed: sends a Heartbeat (`generateHeartbeat()`). + +The full flow: + +``` +QFJ Timer thread (every 1 second) + └─► SessionTimerTask.run() + └─► for each Session in sessions.values(): + └─► Session.next() + ├─ check enabled + ├─ check session schedule / reset + ├─ check hasResponder() + ├─ check logon state (send Logon if initiator) + ├─ check logout timeout + └─ heartbeat management + ├─ isTimedOut() → disconnect + ├─ isTestRequestNeeded() → send TestRequest + └─ isHeartBeatNeeded() → send Heartbeat +``` + +### 5.4 The Overloaded `Session.next(Message)` — Called by Dispatchers + +The `Session.next(Message message)` overload is what `MessageDispatchingThread` and `SessionMessageEvent` call with an actual FIX message. This processes the received message (validates, dispatches to `fromAdmin` / `fromApp`, handles sequence numbers, etc.). This is **distinct** from the no-arg `Session.next()` used by the timer. + +--- + +## 6. Thread Interaction Summary + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ MINA I/O Threads │ +│ (NIO selector threads, named "NioProcessor-N") │ +│ Receive raw bytes → decode FIX message → call EventHandlingStrategy │ +└──────────────────────────────┬──────────────────────────────────────────┘ + │ onMessage(session, message) + ┌───────────────────┴────────────────────┐ + │ │ + SingleThreaded ThreadPerSession + ────────────── ──────────────── + One shared queue Per-session queue + One "QFJ Message Processor" One "QF/J Session dispatcher: + thread calls " thread per session + session.next(msg) calls session.next(msg) + + Both strategies co-exist with the Timer Thread: + +┌─────────────────────────────────────────────────────────────────────────┐ +│ QFJ Timer Thread (daemon) │ +│ ScheduledExecutorService fires every 1000ms │ +│ SessionTimerTask iterates ALL sessions → calls Session.next() │ +│ (handles heartbeats, logon, session schedule, timeouts) │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Choosing a Strategy + +| | `SocketAcceptor` / `SocketInitiator` | `ThreadedSocketAcceptor` / `ThreadedSocketInitiator` | +|---|---|---| +| Message processing | Single shared thread | One thread per session | +| Application thread-safety required | No | Yes | +| Session isolation | No | Yes | +| Typical use case | Few sessions, simple apps | Many sessions, independent processing | + +--- + +## 7. Queue Capacity and Back-Pressure + +Both strategies support configurable queue capacity: + +- **Fixed capacity:** `new SingleThreadedEventHandlingStrategy(connector, queueCapacity)` — bounded `LinkedBlockingQueue`. Producers block when full (back-pressure). +- **Watermark-based:** `new SingleThreadedEventHandlingStrategy(connector, lowerWatermark, upperWatermark)` — uses `QueueTrackers.newMultiSessionWatermarkTracker(...)`. Flow control is applied per-session within the shared queue. +- Same two options exist for `ThreadPerSessionEventHandlingStrategy`, with `newSingleSessionWatermarkTracker` per session. + +--- + +## 8. Custom `Executor` Injection + +Both strategies accept a custom `java.util.concurrent.Executor` via `setExecutor(executor)`, called during `start()` from the connector. This allows integration with application-managed thread pools (e.g. virtual threads in Java 21+): + +```java +// Example: use virtual threads for session dispatchers (Java 21+) +ThreadedSocketAcceptor acceptor = new ThreadedSocketAcceptor(...); +acceptor.start(); // internally calls eventHandlingStrategy.setExecutor(longLivedExecutor) +``` + +The `longLivedExecutor` is provided by `SessionConnector` and can be customised. If no executor is set, `DedicatedThreadExecutor` creates a plain `new Thread(...)` per session/strategy. + +--- + +## 9. Thread Safety Implications for Application Developers + +- **`SocketAcceptor` / `SocketInitiator` (single-threaded):** The `Application` callbacks (`fromApp`, `fromAdmin`, etc.) are called from the single `QFJ Message Processor` thread. No concurrent calls to the same session. However, `Session.next()` (timer) runs concurrently from the `QFJ Timer` thread — it does not call application callbacks but it does send messages on the wire. +- **`ThreadedSocketAcceptor` / `ThreadedSocketInitiator` (thread-per-session):** Each session has its own dispatcher thread. Callbacks for **different sessions** may execute concurrently. Your `Application` implementation **must be thread-safe** if it shares state across sessions. +- In both models, the `QFJ Timer` thread runs concurrently with message-processing threads and calls `Session.next()` (no-arg), which may send heartbeats or disconnect. `Session` internally synchronizes on `this` to protect shared state. + +--- + +## Example: Starting an Acceptor + +```java +import quickfix.*; +import java.io.FileInputStream; + +public class MyApp { + + public static void main(String[] args) throws Exception { + Application application = new MyApplication(); + SessionSettings settings = new SessionSettings(new FileInputStream(args[0])); + MessageStoreFactory storeFactory = new FileStoreFactory(settings); + LogFactory logFactory = new FileLogFactory(settings); + MessageFactory messageFactory = new DefaultMessageFactory(); + + // Single-threaded: all sessions share one message-processing thread + Acceptor acceptor = new SocketAcceptor( + application, storeFactory, settings, logFactory, messageFactory); + + // OR thread-per-session: each session has its own message-processing thread + // (application must be thread-safe) + // Acceptor acceptor = new ThreadedSocketAcceptor( + // application, storeFactory, settings, logFactory, messageFactory); + + acceptor.start(); + // ... run your application ... + acceptor.stop(); + } +} +``` + +--- + +*See also: [`quickfixj-core/src/main/doc/usermanual/usage/threading.html`](../quickfixj-core/src/main/doc/usermanual/usage/threading.html) for the HTML version of this document.* From f825df566f8f767d2926552edfceb22dde8ef51c Mon Sep 17 00:00:00 2001 From: Christoph John Date: Tue, 7 Apr 2026 14:32:04 +0200 Subject: [PATCH 2/3] Remove sections on strategy and example from docs Removed the 'Choosing a Strategy' and 'Example: Starting an Acceptor' sections from the threading model documentation since they are available in threading-developer-guide.md --- docs/threading-model.md | 46 ----------------------------------------- 1 file changed, 46 deletions(-) diff --git a/docs/threading-model.md b/docs/threading-model.md index 0172c02f8..9c1672b2b 100644 --- a/docs/threading-model.md +++ b/docs/threading-model.md @@ -193,17 +193,6 @@ The `Session.next(Message message)` overload is what `MessageDispatchingThread` --- -## Choosing a Strategy - -| | `SocketAcceptor` / `SocketInitiator` | `ThreadedSocketAcceptor` / `ThreadedSocketInitiator` | -|---|---|---| -| Message processing | Single shared thread | One thread per session | -| Application thread-safety required | No | Yes | -| Session isolation | No | Yes | -| Typical use case | Few sessions, simple apps | Many sessions, independent processing | - ---- - ## 7. Queue Capacity and Back-Pressure Both strategies support configurable queue capacity: @@ -235,38 +224,3 @@ The `longLivedExecutor` is provided by `SessionConnector` and can be customised. - In both models, the `QFJ Timer` thread runs concurrently with message-processing threads and calls `Session.next()` (no-arg), which may send heartbeats or disconnect. `Session` internally synchronizes on `this` to protect shared state. --- - -## Example: Starting an Acceptor - -```java -import quickfix.*; -import java.io.FileInputStream; - -public class MyApp { - - public static void main(String[] args) throws Exception { - Application application = new MyApplication(); - SessionSettings settings = new SessionSettings(new FileInputStream(args[0])); - MessageStoreFactory storeFactory = new FileStoreFactory(settings); - LogFactory logFactory = new FileLogFactory(settings); - MessageFactory messageFactory = new DefaultMessageFactory(); - - // Single-threaded: all sessions share one message-processing thread - Acceptor acceptor = new SocketAcceptor( - application, storeFactory, settings, logFactory, messageFactory); - - // OR thread-per-session: each session has its own message-processing thread - // (application must be thread-safe) - // Acceptor acceptor = new ThreadedSocketAcceptor( - // application, storeFactory, settings, logFactory, messageFactory); - - acceptor.start(); - // ... run your application ... - acceptor.stop(); - } -} -``` - ---- - -*See also: [`quickfixj-core/src/main/doc/usermanual/usage/threading.html`](../quickfixj-core/src/main/doc/usermanual/usage/threading.html) for the HTML version of this document.* From a4afb8dc1e33d3bb8a553d1dadb308ab76f70b6a Mon Sep 17 00:00:00 2001 From: Christoph John Date: Tue, 7 Apr 2026 14:35:28 +0200 Subject: [PATCH 3/3] Reorganize threading model information in README Moved threading model section to a new location in the README and updated the reference links. --- README.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 1e3573884..c7c7cdfbb 100644 --- a/README.md +++ b/README.md @@ -69,10 +69,6 @@ $ mvnw clean install -Dmaven.javadoc.skip=true -DskipTests -PskipBundlePlugin,mi https://quickfix-j.github.io/quickfixj/quickfixj-core/src/main/doc/usermanual/usage/configuration.html -## threading model - -For a detailed description of the QuickFIX/J threading model — including the single-threaded vs. thread-per-session strategies, the timer thread, heartbeat management, queue back-pressure, and thread-safety implications for application developers — see [docs/threading-model.md](docs/threading-model.md). - ## basics ### related projects @@ -115,6 +111,9 @@ Here are explanations of what these functions provide for you. The sample code below shows how you might start up a FIX acceptor which listens on a socket. If you wanted an initiator, you would simply replace the acceptor in this code fragment with a `SocketInitiator`. `ThreadedSocketInitiator` and `ThreadedSocketAcceptor` classes are also available. These will supply a thread to each session that is created. If you use these you must make sure your application is thread safe. +For a detailed description of the QuickFIX/J threading model — including the single-threaded vs. thread-per-session strategies, the timer thread, heartbeat management, queue back-pressure, and thread-safety implications for application developers — see [docs/threading-model.md](docs/threading-model.md) and [docs/threading-developer-guide.md‎](docs/threading-developer-guide.md). + + ```Java import quickfix.*; import java.io.FileInputStream;