Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
90382c9
Initial commit of tunable worker pools
dwoz Dec 13, 2025
56f678e
Use worker pools config in our tests
dwoz Dec 14, 2025
91be45b
Route requests
dwoz Dec 31, 2025
ebccc4c
Fix worker pool routing: implement forward_message and remove port co…
dwoz Jan 2, 2026
80e46c2
Fix dispatcher architecture: connect dispatcher to frontend as worker
dwoz Jan 2, 2026
b220dbd
Fix test to pass frontend_channels to PoolDispatcherChannel
dwoz Jan 2, 2026
af3c59c
Disable worker_pools_enabled by default - architecture needs rework
dwoz Jan 3, 2026
ada2830
Implement working ZeroMQ-based worker pool routing
dwoz Jan 3, 2026
194e02c
Fix critical bug in worker pool response routing
dwoz Jan 4, 2026
362736d
Fix DEALER correlation with FIFO request tracking
dwoz Jan 4, 2026
553e333
Fix request-response correlation with envelope forwarding
dwoz Jan 6, 2026
5800009
Create workers.ipc marker file for master status check
dwoz Jan 6, 2026
291a2b4
Fix MWorkerQueue process name for pooled routing
dwoz Jan 11, 2026
c74f615
Fix linter
dwoz Jan 18, 2026
6655c96
Fix tests
dwoz Jan 21, 2026
3254fa2
Fix non-deterministic port selection and socket conflicts in worker p…
dwoz Apr 2, 2026
edd8ce5
Fix KeyError: 'transport' in iter_transport_opts and pool_routing test
dwoz Apr 3, 2026
133cbeb
Optimize worker startup and fix RequestRouter for encrypted payloads
dwoz Apr 3, 2026
6efe3c2
Fix TypeError in pre_fork and unify method signature
dwoz Apr 3, 2026
3b402f4
Fix KeyError: 'aes' in publish daemons
dwoz Apr 3, 2026
2786e76
Fix unit tests and prevent InvalidStateError in ZeroMQ transport
dwoz Apr 4, 2026
efd59c6
Fix pre-commit failures and ZeroMQ connect race condition
dwoz Apr 4, 2026
20424e7
Fix ZeroMQ EFSM errors and improve channel cleanup
dwoz Apr 4, 2026
fa2d9f0
Fix linting and handle encrypted payloads in routing channel
dwoz Apr 4, 2026
fd08afc
Harden ZeroMQ transport and fix InvalidStateError
dwoz Apr 4, 2026
890cac3
Fix AttributeError in RequestClient and resolve WebSocket transport i…
dwoz Apr 5, 2026
5f2ab12
Fix unit test failures and improve worker channel initialization
dwoz Apr 5, 2026
f9c675e
Eliminate ZeroMQ task race condition with task_id
dwoz Apr 5, 2026
c39d704
Final hardening of ZeroMQ transport concurrency
dwoz Apr 5, 2026
da56b4a
Fix AsyncReqMessageClient and prevent message loss
dwoz Apr 5, 2026
c04c27b
Final hardening and stabilization of ZeroMQ transport
dwoz Apr 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions CI_FAILURE_TRACKER.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# CI Failure Tracker

This file tracks persistent test failures in the `tunnable-mworkers` branch to avoid "wack-a-mole" regressions.

## Latest Run: 23974535354 (fa2d9f03d0)

### **Functional Tests (ZeroMQ 4)**
These tests are failing across almost all platforms (Linux, macOS, Windows).
- **Core Error**: `Socket was found in invalid state` (`EFSM`) and `Unknown error 321`.
- **Status**: **FIXED** (to be verified in CI).
- **Fixes Applied**:
1. **Concurrency**: Added `asyncio.Lock` to `connect()` to prevent redundant `_send_recv` tasks.
2. **InvalidStateError**: Added `if not future.done()` checks before EVERY `set_result`/`set_exception` call in `_send_recv`.
3. **Cleanup**: Added `close()` method to `PoolRoutingChannelV2Revised`.
4. **Robust Reconnect**: Ensured ANY ZeroMQ error in the loop triggers a close and reconnect to reset the `REQ` state machine.
5. **Reconnect Storm Prevention**: Skip futures that are already done when pulling from the queue.

**Failing Test Cases (Representative):**
- `tests.pytests.functional.transport.server.test_ssl_transport.test_ssl_publish_server[SSLTransport(tcp)]` (Timeout)
- `tests.pytests.functional.transport.server.test_ssl_transport.test_ssl_publish_server[SSLTransport(ws)]` (Timeout)
- `tests.pytests.functional.transport.server.test_ssl_transport.test_ssl_file_transfer[SSLTransport(tcp)]` (Timeout)
- `tests.pytests.functional.transport.server.test_ssl_transport.test_ssl_multi_minion[SSLTransport(tcp)]` (Timeout)
- `tests.pytests.functional.transport.server.test_ssl_transport.test_request_server[Transport(ws)]` (Timeout)

### **Scenario Tests (ZeroMQ)**
- **Platform**: Fedora 40, Windows 2022
- **Error**: `asyncio.exceptions.InvalidStateError: invalid state`
- **Location**: `salt/transport/zeromq.py:1703` during `socket.poll`.

### **Integration Tests**
- `tests.pytests.functional.channel.test_pool_routing.test_pool_routing_fast_commands` (KeyError: 'transport' - *Wait, I fixed this, check if it's still failing*)
- `Test Salt / Photon OS 5 integration tcp 4` (Conclusion: failure)

### **Package Tests**
- `Test Package / Windows 2025 NSIS downgrade 3007.13` (Timeout after 600s)

---

## Resolved Issues (To be verified)
- [x] **Pre-Commit**: Passing locally and in latest run.
- [x] **Unit Tests**: `tests/pytests/unit/transport/test_zeromq_worker_pools.py` now passing.
- [x] **KeyError: 'aes'**: Resolved in latest runs.
- [x] **TypeError in pre_fork**: Resolved.
218 changes: 218 additions & 0 deletions salt/channel/pool_routing_v2_revised.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
"""
Worker pool routing at the channel layer - V2 Revised with RequestServer IPC.

This module provides transport-agnostic worker pool routing using Salt's
existing RequestClient/RequestServer infrastructure over IPC sockets.

V2 Revised Design:
- Each worker pool has its own RequestServer listening on IPC
- Routing channel uses RequestClient to forward messages to pool RequestServers
- No transport modifications needed
- Uses transport-native IPC (ZeroMQ/TCP/WS over IPC sockets)
"""

import logging
import os
import zlib

log = logging.getLogger(__name__)


class PoolRoutingChannelV2Revised:
"""
Channel wrapper that routes requests to worker pools using RequestServer IPC.

Architecture:
External Transport → PoolRoutingChannel → RequestClient →
Pool RequestServer (IPC) → Workers
"""

def __init__(self, opts, transport, worker_pools):
"""
Initialize the pool routing channel.

Args:
opts: Master configuration options
transport: The external transport instance (port 4506)
worker_pools: Dict of pool configurations {pool_name: config}
"""
self.opts = opts
self.transport = transport
self.worker_pools = worker_pools
self.pool_clients = {} # RequestClient for each pool
self.pool_servers = {} # RequestServer for each pool

log.info(
"PoolRoutingChannelV2Revised initialized with pools: %s",
list(worker_pools.keys()),
)

def pre_fork(self, process_manager):
"""
Pre-fork setup - create RequestServer for each pool on IPC.

Args:
process_manager: The process manager instance
"""
import salt.transport

# Delegate external transport setup
if hasattr(self.transport, "pre_fork"):
self.transport.pre_fork(process_manager)

# Create a RequestServer for each pool on IPC
for pool_name, config in self.worker_pools.items():
# Create pool-specific opts for IPC
pool_opts = self.opts.copy()

# Configure IPC mode and socket path
if pool_opts.get("ipc_mode") == "tcp":
# TCP IPC mode: use unique port per pool
base_port = pool_opts.get("tcp_master_workers", 4515)
port_offset = zlib.adler32(pool_name.encode()) % 1000
pool_opts["ret_port"] = base_port + port_offset
log.info(
"Pool '%s' RequestServer using TCP IPC on port %d",
pool_name,
pool_opts["ret_port"],
)
else:
# Standard IPC mode: use unique socket per pool
sock_dir = pool_opts.get("sock_dir", "/tmp/salt")
os.makedirs(sock_dir, exist_ok=True)

# Each pool gets its own IPC socket
pool_opts["workers_ipc_name"] = f"workers-{pool_name}.ipc"

# Create RequestServer for this pool using transport factory
pool_server = salt.transport.request_server(pool_opts)

# Pre-fork the pool server (this creates IPC listener)
pool_server.pre_fork(process_manager)

self.pool_servers[pool_name] = pool_server

log.info("PoolRoutingChannelV2Revised pre_fork complete")

def post_fork(self, payload_handler, io_loop):
"""
Post-fork setup in routing process.

Creates RequestClient connections to each pool's RequestServer.

Args:
payload_handler: Handler for processed payloads (not used)
io_loop: The event loop to use
"""
import salt.transport

self.io_loop = io_loop

# Build routing table from worker_pools config
self.command_to_pool = {}
self.default_pool = None

for pool_name, config in self.worker_pools.items():
for cmd in config.get("commands", []):
if cmd == "*":
self.default_pool = pool_name
else:
self.command_to_pool[cmd] = pool_name

# Create RequestClient for each pool
for pool_name in self.worker_pools.keys():
# Create pool-specific opts matching the pool's RequestServer
pool_opts = self.opts.copy()

if pool_opts.get("ipc_mode") == "tcp":
# TCP IPC: connect to pool's port
base_port = pool_opts.get("tcp_master_workers", 4515)
port_offset = zlib.adler32(pool_name.encode()) % 1000
pool_opts["ret_port"] = base_port + port_offset
else:
# IPC socket: connect to pool's socket
pool_opts["workers_ipc_name"] = f"workers-{pool_name}.ipc"

sock_dir = pool_opts.get("sock_dir", "/tmp/salt")

# Create RequestClient that connects to pool's IPC RequestServer
client = salt.transport.request_client(pool_opts, io_loop=io_loop)
self.pool_clients[pool_name] = client

# Connect external transport to our routing handler
if hasattr(self.transport, "post_fork"):
self.transport.post_fork(self.handle_and_route_message, io_loop)

log.info("PoolRoutingChannelV2Revised post_fork complete")

def close(self):
"""
Close the channel and all its pool clients/servers.
"""
log.info("Closing PoolRoutingChannelV2Revised")

# Close all pool clients
for pool_name, client in self.pool_clients.items():
try:
client.close()
except Exception as exc: # pylint: disable=broad-except
log.error("Error closing client for pool '%s': %s", pool_name, exc)
self.pool_clients.clear()

# Close all pool servers
for pool_name, server in self.pool_servers.items():
try:
server.close()
except Exception as exc: # pylint: disable=broad-except
log.error("Error closing server for pool '%s': %s", pool_name, exc)
self.pool_servers.clear()

# Close external transport
if hasattr(self.transport, "close"):
self.transport.close()

async def handle_and_route_message(self, payload):
"""
Handle incoming message and route to appropriate worker pool via RequestClient.

Args:
payload: The message payload from external transport

Returns:
Reply from the worker that processed the request
"""
try:
# Determine which pool
load = payload.get("load", {})
if isinstance(load, dict):
cmd = load.get("cmd", "unknown")
else:
# Encrypted payload (bytes), can't extract command
cmd = "unknown"

pool_name = self.command_to_pool.get(cmd, self.default_pool)

if not pool_name:
pool_name = self.default_pool or list(self.worker_pools.keys())[0]

log.debug(
"Routing request (cmd=%s) to pool '%s'",
cmd,
pool_name,
)

# Forward to pool via RequestClient
client = self.pool_clients[pool_name]

# RequestClient.send() sends payload to pool's RequestServer via IPC
reply = await client.send(payload)

return reply

except Exception as exc: # pylint: disable=broad-except
log.error(
"Error routing request to worker pool: %s",
exc,
exc_info=True,
)
return {"error": "Internal routing error"}
Loading
Loading