Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ Here are explanations of what these functions provide for you.

The sample code below shows how you might start up a FIX acceptor which listens on a socket. If you wanted an initiator, you would simply replace the acceptor in this code fragment with a `SocketInitiator`. `ThreadedSocketInitiator` and `ThreadedSocketAcceptor` classes are also available. These will supply a thread to each session that is created. If you use these you must make sure your application is thread safe.

For a detailed description of the QuickFIX/J threading model — including the single-threaded vs. thread-per-session strategies, the timer thread, heartbeat management, queue back-pressure, and thread-safety implications for application developers — see [docs/threading-model.md](docs/threading-model.md) and [docs/threading-developer-guide.md‎](docs/threading-developer-guide.md).


```Java
import quickfix.*;
import java.io.FileInputStream;
Expand Down
116 changes: 116 additions & 0 deletions docs/threading-developer-guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# QuickFIX/J Threading Model — Developer Guide

QuickFIX/J provides two threading strategies for processing FIX messages. The choice of strategy
affects how your application handles concurrent sessions and what thread-safety guarantees you must
provide.

## Threading Strategies

### Single-Threaded Strategy

**Classes:** `SocketAcceptor` / `SocketInitiator`

All sessions share a single message-processing thread (named `QFJ Message Processor`). Incoming
messages from all sessions are placed in a shared queue and dispatched one at a time by this thread.

This means your `Application` callbacks (`fromApp`, `fromAdmin`, etc.) are always invoked from the
same thread, so you do not need to make your application code thread-safe with respect to concurrent
session callbacks. However, a slow callback will delay message processing for all other sessions.

**Use when:**
- You have a small number of sessions.
- Simplicity and predictable, sequential message processing are more important than throughput.
- You want to avoid the complexity of thread-safe application code.

### Thread-Per-Session Strategy

**Classes:** `ThreadedSocketAcceptor` / `ThreadedSocketInitiator`

Each session gets its own dedicated message-dispatching thread. Incoming messages for a session are
queued and processed by that session's thread independently of other sessions.

Because your `Application` callbacks can be invoked concurrently from multiple session threads, your
application code **must be thread-safe**.

**Use when:**
- You have multiple sessions and need them to process messages independently.
- A slow or blocking callback for one session must not impact other sessions.
- You can ensure your application implementation is thread-safe.

## Queue Capacity and Back-pressure

Both strategies support configuring the internal message queue capacity to control back-pressure:

```java
// Fixed-capacity queue (blocks producers when full)
Acceptor acceptor = new ThreadedSocketAcceptor(
application, storeFactory, settings, logFactory, messageFactory,
queueCapacity);

// Watermark-based flow control
Acceptor acceptor = ThreadedSocketAcceptor.newBuilder()
.withApplication(application)
.withMessageStoreFactory(storeFactory)
.withSettings(settings)
.withLogFactory(logFactory)
.withMessageFactory(messageFactory)
.withQueueLowerWatermark(lowerWatermark)
.withQueueUpperWatermark(upperWatermark)
.build();
```

The same constructors and builder options are available on `SocketAcceptor`, `SocketInitiator`, and
`ThreadedSocketInitiator`.

## Choosing a Strategy

| | `SocketAcceptor` / `SocketInitiator` | `ThreadedSocketAcceptor` / `ThreadedSocketInitiator` |
|---|---|---|
| Message processing | Single shared thread | One thread per session |
| Application thread-safety required | No | Yes |
| Session isolation | No | Yes |
| Typical use case | Few sessions, simple apps | Many sessions, independent processing |

## Example: Starting an Acceptor

```java
import quickfix.*;
import java.io.FileInputStream;

public class MyApp {

public static void main(String[] args) throws Exception {
Application application = new MyApplication();
SessionSettings settings = new SessionSettings(new FileInputStream(args[0]));
MessageStoreFactory storeFactory = new FileStoreFactory(settings);
LogFactory logFactory = new FileLogFactory(settings);
MessageFactory messageFactory = new DefaultMessageFactory();

// Single-threaded: all sessions share one message-processing thread
Acceptor acceptor = new SocketAcceptor(
application, storeFactory, settings, logFactory, messageFactory);

// OR thread-per-session: each session has its own message-processing thread
// (application must be thread-safe)
// Acceptor acceptor = new ThreadedSocketAcceptor(
// application, storeFactory, settings, logFactory, messageFactory);

acceptor.start();
// ... run your application ...
acceptor.stop();
}
}
```

## Thread Safety Guidance

Regardless of which strategy you choose, note that `Session.sendToTarget()` is thread-safe and may
be called from any thread to send outgoing messages.

When using `ThreadedSocketAcceptor` or `ThreadedSocketInitiator`, ensure that any shared state
accessed in your `Application` implementation (e.g., order books, maps, counters) is properly
synchronized or uses thread-safe data structures.

---

*For a deep technical reference on the threading internals, see [`threading-model.md`](./threading-model.md).*
226 changes: 226 additions & 0 deletions docs/threading-model.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# QuickFIX/J Threading Model

## 1. Overview

QuickFIX/J uses [Apache MINA](http://mina.apache.org/) for non-blocking I/O. The threading model for message processing is controlled by the `EventHandlingStrategy` interface (`quickfix.mina.EventHandlingStrategy`), with two concrete implementations:

- **`SingleThreadedEventHandlingStrategy`** — one thread processes messages for all sessions (`SocketAcceptor`, `SocketInitiator`)
- **`ThreadPerSessionEventHandlingStrategy`** — one thread per session processes messages (`ThreadedSocketAcceptor`, `ThreadedSocketInitiator`)

Both strategies co-exist with the **timer thread**, which is always present and always calls `Session.next()` (no-arg) on a 1-second schedule, regardless of which event-handling strategy is in use.

---

## 2. Connector Classes and Their Strategy

| Connector class | Event handling strategy | Thread name(s) |
|---|---|---|
| `SocketAcceptor` | `SingleThreadedEventHandlingStrategy` | `QFJ Message Processor` |
| `SocketInitiator` | `SingleThreadedEventHandlingStrategy` | `QFJ Message Processor` |
| `ThreadedSocketAcceptor` | `ThreadPerSessionEventHandlingStrategy` | `QF/J Session dispatcher: <sessionID>` |
| `ThreadedSocketInitiator` | `ThreadPerSessionEventHandlingStrategy` | `QF/J Session dispatcher: <sessionID>` |

---

## 3. Single-Threaded Model (`SingleThreadedEventHandlingStrategy`)

**Class:** `quickfix.mina.SingleThreadedEventHandlingStrategy`
**Source:** `quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java`

- A single `BlockingQueue<SessionMessageEvent>` holds events from **all** sessions.
- One background thread named **`QFJ Message Processor`** (a daemon thread) drains the queue and calls `session.next(message)` for each event via `SessionMessageEvent.processMessage()`.
- The thread is started via `blockInThread()`, which creates a `ThreadAdapter` wrapping the `block()` loop.
- `onMessage()` wraps incoming messages into a `SessionMessageEvent` and puts them on the shared queue.
- The `block()` loop polls the queue with a timeout (`THREAD_WAIT_FOR_MESSAGE_MS`) so it can periodically check the `isStopped` flag.
- On stop, remaining queued messages are drained and processed before the thread exits.
- The `getQueueSize(SessionID)` method returns the total queue size (single queue for all sessions — there is no per-session view).

**Key point for application developers:** Because all sessions share a single processing thread, a slow `fromApp()` callback will delay processing for **all** other sessions.

---

## 4. Thread-per-Session Model (`ThreadPerSessionEventHandlingStrategy`)

**Class:** `quickfix.mina.ThreadPerSessionEventHandlingStrategy`
**Source:** `quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java`

- A `ConcurrentHashMap<SessionID, MessageDispatchingThread>` maps each session to its own dispatcher thread.
- On the first `onMessage()` call for a given session, a new `MessageDispatchingThread` is created and started via `startDispatcherThread()`.
- Each `MessageDispatchingThread` has its own `BlockingQueue<Message>` (or watermark-tracked queue) and loops calling `session.next(message)`.
- Thread name: **`QF/J Session dispatcher: <BeginString>:<SenderCompID>/<TargetCompID>`**
- The `Executor` can be customised via `setExecutor()`. The default is `DedicatedThreadExecutor`, which creates a plain `new Thread(command, name).start()`.
- On stop, `stopDispatcherThreads()` enqueues `END_OF_STREAM` to every dispatcher, sets `stopping=true`, and waits (polling every 100 ms) until all dispatchers report `isStopped`.
- After a dispatcher drains its remaining queue on shutdown, it removes itself from the `dispatchers` map.

**Key point for application developers:** Since each session has its own thread, a slow `fromApp()` for one session does **not** block others. However, your `Application` implementation **must be thread-safe** if it shares state across sessions.

---

## 5. The Timer Thread and `Session.next()`

This is a critical part of the threading model that is **orthogonal** to the message-processing strategies above.

**Class:** `quickfix.mina.SessionConnector`
**Source:** `quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java`

### 5.1 The `QFJ Timer` Thread

A single `ScheduledExecutorService` (a shared static instance using a `QFTimerThreadFactory`) runs a `SessionTimerTask` at a fixed rate of **every 1000 ms**.

```java
// SessionConnector.java
private static class QFTimerThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "QFJ Timer");
thread.setDaemon(true);
return thread;
}
}
```

The timer is started by `startSessionTimer()`:

```java
protected void startSessionTimer() {
if (checkSessionTimerRunning()) {
return;
}
Runnable timerTask = new SessionTimerTask();
if (shortLivedExecutor != null) {
timerTask = new DelegatingTask(timerTask, shortLivedExecutor);
}
sessionTimerFuture = SCHEDULED_EXECUTOR.scheduleAtFixedRate(timerTask, 0, 1000L,
TimeUnit.MILLISECONDS);
}
```

Only one timer is ever started per connector. If `startSessionTimer()` is called again while the timer is still running (e.g. during `createDynamicSession()`), the existing timer is reused.

### 5.2 `SessionTimerTask` Iterates All Sessions and Calls `Session.next()`

```java
private class SessionTimerTask implements Runnable {
@Override
public void run() {
try {
for (Session session : sessions.values()) {
try {
session.next();
} catch (IOException e) {
LogUtil.logThrowable(session.getLog(), "Error in session timer processing", e);
}
}
} catch (Throwable e) {
log.error("Error during timer processing", e);
}
}
}
```

Even though each session may have its own dispatcher thread (in the thread-per-session model), the timer thread also calls `session.next()` directly on every session. This is independent of which `EventHandlingStrategy` is in use.

### 5.3 What Does `Session.next()` (No-arg) Do?

`Session.next()` is called from the timer, **not** from user code. Its Javadoc states:

> Called from the timer-related code in the acceptor/initiator implementations. This is not typically called from application code.

Its responsibilities (from `Session.java`):

1. **Checks if the session is enabled.** If disabled and still logged on, it initiates a Logout.
2. **Checks session schedule.** If outside the configured session time window, it may reset sequence numbers or disconnect. This check is throttled to once per second.
3. **Returns early if not connected** (`hasResponder()` is false).
4. **Handles logon state:** If logon has not been received, it may send a Logon (for initiators) or detect a logon timeout.
5. **Checks logout timeout** if a logout has been sent.
6. **Heartbeat management:**
- If `HeartBtInt == 0`: returns (no heartbeat management).
- If timed out waiting for a heartbeat: disconnects (unless `DisableHeartBeatCheck=Y`).
- If a TestRequest is needed: sends a TestRequest (`generateTestRequest("TEST")`).
- If a Heartbeat is needed: sends a Heartbeat (`generateHeartbeat()`).

The full flow:

```
QFJ Timer thread (every 1 second)
└─► SessionTimerTask.run()
└─► for each Session in sessions.values():
└─► Session.next()
├─ check enabled
├─ check session schedule / reset
├─ check hasResponder()
├─ check logon state (send Logon if initiator)
├─ check logout timeout
└─ heartbeat management
├─ isTimedOut() → disconnect
├─ isTestRequestNeeded() → send TestRequest
└─ isHeartBeatNeeded() → send Heartbeat
```

### 5.4 The Overloaded `Session.next(Message)` — Called by Dispatchers

The `Session.next(Message message)` overload is what `MessageDispatchingThread` and `SessionMessageEvent` call with an actual FIX message. This processes the received message (validates, dispatches to `fromAdmin` / `fromApp`, handles sequence numbers, etc.). This is **distinct** from the no-arg `Session.next()` used by the timer.

---

## 6. Thread Interaction Summary

```
┌─────────────────────────────────────────────────────────────────────────┐
│ MINA I/O Threads │
│ (NIO selector threads, named "NioProcessor-N") │
│ Receive raw bytes → decode FIX message → call EventHandlingStrategy │
└──────────────────────────────┬──────────────────────────────────────────┘
│ onMessage(session, message)
┌───────────────────┴────────────────────┐
│ │
SingleThreaded ThreadPerSession
────────────── ────────────────
One shared queue Per-session queue
One "QFJ Message Processor" One "QF/J Session dispatcher:
thread calls <sessionID>" thread per session
session.next(msg) calls session.next(msg)

Both strategies co-exist with the Timer Thread:

┌─────────────────────────────────────────────────────────────────────────┐
│ QFJ Timer Thread (daemon) │
│ ScheduledExecutorService fires every 1000ms │
│ SessionTimerTask iterates ALL sessions → calls Session.next() │
│ (handles heartbeats, logon, session schedule, timeouts) │
└─────────────────────────────────────────────────────────────────────────┘
```

---

## 7. Queue Capacity and Back-Pressure

Both strategies support configurable queue capacity:

- **Fixed capacity:** `new SingleThreadedEventHandlingStrategy(connector, queueCapacity)` — bounded `LinkedBlockingQueue`. Producers block when full (back-pressure).
- **Watermark-based:** `new SingleThreadedEventHandlingStrategy(connector, lowerWatermark, upperWatermark)` — uses `QueueTrackers.newMultiSessionWatermarkTracker(...)`. Flow control is applied per-session within the shared queue.
- Same two options exist for `ThreadPerSessionEventHandlingStrategy`, with `newSingleSessionWatermarkTracker` per session.

---

## 8. Custom `Executor` Injection

Both strategies accept a custom `java.util.concurrent.Executor` via `setExecutor(executor)`, called during `start()` from the connector. This allows integration with application-managed thread pools (e.g. virtual threads in Java 21+):

```java
// Example: use virtual threads for session dispatchers (Java 21+)
ThreadedSocketAcceptor acceptor = new ThreadedSocketAcceptor(...);
acceptor.start(); // internally calls eventHandlingStrategy.setExecutor(longLivedExecutor)
```

The `longLivedExecutor` is provided by `SessionConnector` and can be customised. If no executor is set, `DedicatedThreadExecutor` creates a plain `new Thread(...)` per session/strategy.

---

## 9. Thread Safety Implications for Application Developers

- **`SocketAcceptor` / `SocketInitiator` (single-threaded):** The `Application` callbacks (`fromApp`, `fromAdmin`, etc.) are called from the single `QFJ Message Processor` thread. No concurrent calls to the same session. However, `Session.next()` (timer) runs concurrently from the `QFJ Timer` thread — it does not call application callbacks but it does send messages on the wire.
- **`ThreadedSocketAcceptor` / `ThreadedSocketInitiator` (thread-per-session):** Each session has its own dispatcher thread. Callbacks for **different sessions** may execute concurrently. Your `Application` implementation **must be thread-safe** if it shares state across sessions.
- In both models, the `QFJ Timer` thread runs concurrently with message-processing threads and calls `Session.next()` (no-arg), which may send heartbeats or disconnect. `Session` internally synchronizes on `this` to protect shared state.

---
Loading