A proxy MARS server and client designed for CADS with two operational modes:
- Pipe Mode (default): Traditional synchronous client using MARS stdin/stdout pipes
- Shares Mode: Asynchronous WebSocket-based client with shared filesystem access
- Direct MARS process execution via stdin/stdout
- Synchronous request handling
- No additional server infrastructure required
- Backward compatible with all existing deployments
- Asynchronous job processing with server-side execution
- Real-time log streaming from MARS processes
- Connection pooling with automatic failover
- Client-side log filtering to reduce verbose output
- Graceful process management (no orphaned processes)
- CephFS health diagnostics and monitoring
pip install cads-mars-serverFor development:
git clone <repository>
cd cads-mars-server
pip install -e .from cads_adaptors.adaptors.mars import execute_mars
from cads_adaptors import Context
# Executes using traditional pipe client
result = execute_mars(
request={"class": "ea", "date": "20240101", "param": "2t"},
context=Context(),
target_fname="output.grib",
target_dir="/path/to/output",
)1. Enable WebSocket client:
# Via environment variable
export MARS_USE_SHARES=true
# Or in /etc/cads-mars-server.yaml
use_shares: true2. Start WebSocket server(s):
# Start server on worker node
ws-mars-server --host 0.0.0.0 --port 9001
# Or via systemd (recommended)
systemctl enable --now ws-mars-server.service3. Configure server list:
# Environment variable (comma-separated)
export MARS_WS_SERVERS="ws://worker1:9001,ws://worker2:9001,ws://worker3:9001"
# Or in config file
mars_ws_servers:
- ws://worker1:9001
- ws://worker2:9001
- ws://worker3:90014. Execute requests:
from cads_adaptors.adaptors.mars import execute_mars
# Now uses WebSocket client with automatic failover
result = execute_mars(
request={"class": "ea", "date": "20240101"}, # Add more parameters as needed
context=context,
target_fname="output.grib",
target_dir="/shared/filesystem/path", # Must be accessible by servers
)The USE_SHARES configuration controls which client is used:
| Setting | Client Type | Use Case |
|---|---|---|
false (default) |
Pipe | Existing deployments, no infrastructure changes needed |
true |
WebSocket | New deployments, better performance, shared filesystem |
Environment Variables:
export MARS_USE_SHARES=true
export MARS_WS_SERVERS="ws://server1:9001,ws://server2:9001"
export MARS_CLIENT_FILTER_LOGS=true # Enable log filtering
export MARS_MAX_CONCURRENT_CONNECTIONS=10 # Server connection limitConfig File (/etc/cads-mars-server.yaml):
use_shares: true
mars_ws_servers:
- ws://server1:9001
- ws://server2:9001
client_filter_logs: true
max_concurrent_connections: 10| Option | Env Var | Config Key | Default | Description |
|---|---|---|---|---|
| Client Mode | MARS_USE_SHARES |
use_shares |
false |
Use WebSocket client instead of pipe |
| WS Servers | MARS_WS_SERVERS |
mars_ws_servers |
(required) | Comma-separated or list of WebSocket URLs |
| Log Filtering | MARS_CLIENT_FILTER_LOGS |
client_filter_logs |
true |
Filter verbose MARS output |
| Max Connections | MARS_MAX_CONCURRENT_CONNECTIONS |
max_concurrent_connections |
0 (unlimited) |
Server connection limit |
| Request Timeout | MARS_REQUEST_TIMEOUT |
request_timeout |
30 |
Timeout per server attempt (seconds) |
| Heartbeat | MARS_HEARTBEAT_INTERVAL |
heartbeat_interval |
30 |
Job heartbeat interval (seconds) |
| Shared Root | MARS_SHARED_ROOT |
shared_root |
/cache |
Shared filesystem root path |
| Debug Mode | MARS_WS_DEBUG |
debug_mode |
false |
Enable debug logging |
WebSocket mode includes intelligent log filtering to reduce noise:
from cads_adaptors.adaptors.mars import execute_mars
# Use default filtering (shows errors, warnings, progress)
result = execute_mars(request, context=context)
# Inject custom log handler
async def my_handler(line: str, ws, logger) -> Optional[str]:
if "FATAL" in line:
await ws.send(json.dumps({"cmd": "kill"}))
raise RuntimeError(f"Aborted: {line}")
return line if "ERROR" in line else None
result = execute_mars(request, context=context, log_handler=my_handler)See docs/LOG_FILTERING.md for custom handler examples.
For deployments using CephFS, the WebSocket server includes health monitoring:
# Check CephFS health
check-cephfs-healthSee docs/CEPHFS_ARCHITECTURE.md for details on CephFS architecture and troubleshooting.
Start the WebSocket server on worker nodes:
# Basic usage
ws-mars-server --host 0.0.0.0 --port 9001
# With configuration
ws-mars-server --host 0.0.0.0 --port 9001 \
--shared-root /shared/filesystem \
--max-connections 20[Unit]
Description=MARS WebSocket Server
After=network.target
[Service]
Type=simple
User=mars
ExecStart=/usr/local/bin/ws-mars-server --host 0.0.0.0 --port 9001
Restart=always
RestartSec=10
Environment=MARS_SHARED_ROOT=/cache
Environment=MARS_MAX_CONCURRENT_CONNECTIONS=20
[Install]
WantedBy=multi-user.target# Check server is running
ps aux | grep ws-mars-server
# Test connection
curl -i -N -H "Connection: Upgrade" -H "Upgrade: websocket" \
ws://server:9001The WebSocket server uses process group management to prevent orphaned processes:
# Check for orphaned processes
ps aux | grep "[m]ars"
# Server automatically cleans up orphaned processes on startupFor CephFS-specific problems:
# Check CephFS health
check-cephfs-health
# View recent kernel errors
dmesg -T | grep -i ceph | tail -20See docs/CEPHFS_ARCHITECTURE.md for architecture details and troubleshooting.
>>> import cads_mars_serverFor best experience create a new conda environment (e.g. DEVELOP) with Python 3.12:
conda create -n DEVELOP -c conda-forge python=3.12
conda activate DEVELOP
Before pushing to GitHub, run the following commands:
- Update conda environment:
make conda-env-update - Install this package:
pip install -e . - Sync with the latest template (optional):
make template-update - Run quality assurance checks:
make qa - Run tests:
make unit-tests - Run the static type checker:
make type-check - Build the documentation (see Sphinx tutorial):
make docs-build
Copyright 2024, European Union.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.