diff --git a/opentakserver/blueprints/ots_api/group_api.py b/opentakserver/blueprints/ots_api/group_api.py index e3d896c7..35c86165 100644 --- a/opentakserver/blueprints/ots_api/group_api.py +++ b/opentakserver/blueprints/ots_api/group_api.py @@ -18,6 +18,21 @@ group_api = Blueprint("group_api", __name__) +def _request_mumble_channel_sync(): + """Best-effort: ask the Mumble Ice daemon to sync channels to OTS groups. + + Why: channel name == group name is required for direction enforcement, so + a new OTS group needs a matching Mumble channel. Silent no-op if the Ice + daemon isn't running. + """ + try: + ice_app = app.extensions.get("mumble_ice_app") + if ice_app is not None: + ice_app.request_sync() + except Exception as e: + logger.warning(f"Failed to request Mumble channel sync: {e}") + + @group_api.route("/api/groups") @roles_required("administrator") def get_groups(): @@ -318,6 +333,7 @@ def add_group(): 500, ) + _request_mumble_channel_sync() return jsonify({"success": True}) @@ -469,4 +485,5 @@ def delete_group(): 500, ) + _request_mumble_channel_sync() return jsonify({"success": True}) diff --git a/opentakserver/defaultconfig.py b/opentakserver/defaultconfig.py index 32ce462a..413c4c06 100644 --- a/opentakserver/defaultconfig.py +++ b/opentakserver/defaultconfig.py @@ -137,6 +137,7 @@ class DefaultConfig: OTS_PROFILE_MAP_SOURCES = True OTS_ENABLE_MUMBLE_AUTHENTICATION = False + OTS_ICE_SECRET = "" OTS_IP_WHITELIST = ["127.0.0.1"] diff --git a/opentakserver/mumble/mumble_authenticator.py b/opentakserver/mumble/mumble_authenticator.py index eff7c741..6d180c6c 100644 --- a/opentakserver/mumble/mumble_authenticator.py +++ b/opentakserver/mumble/mumble_authenticator.py @@ -1,6 +1,8 @@ +import hashlib import os import Ice +from cryptography import x509 from flask import Flask from flask_ldap3_login import AuthenticationResponseStatus from flask_security import verify_password @@ -18,39 +20,138 @@ import Murmur -class MumbleAuthenticator(Murmur.ServerUpdatingAuthenticator): - texture_cache = {} +# Each OTS user gets a 1000-id range in Mumble. PC username auth uses the base +# id (user.id * 1000); ATAK callsign auth uses base + a deterministic offset +# derived from the callsign (so a single OTS account can connect from multiple +# devices simultaneously, as the official ATAK VX voice plugin does). +MUMBLE_ID_RANGE = 1000 +MUMBLE_ID_CALLSIGN_OFFSET_RANGE = MUMBLE_ID_RANGE - 1 + +class MumbleAuthenticator(Murmur.ServerUpdatingAuthenticator): def __init__(self, app, logger, ice): Murmur.ServerUpdatingAuthenticator.__init__(self) self.app: Flask = app self.logger = logger self.ice = ice - def authenticate(self, username, password, certlist, certhash, strong, current=None): + # ----- public lookup helpers (also used by mumble_ice_app) --------------- + + @staticmethod + def resolve_identity(app, username, certlist=None): + """Look up an OTS user from a Mumble username, ATAK callsign, or cert. + + Lookup chain (first match wins): + 1. OTS username (PC clients) + 2. EUD callsign exact match + 3. EUD callsign with `---` suffix stripped (ATAK adds this) + 4. Above with `_` -> ` ` (ATAK Mumble plugin replaces spaces in callsigns) + 5. Cert CN -> EUD.uid (immutable; survives callsign renames) + + Does NOT verify password. Returns (user_or_None, is_callsign_auth_bool). + """ + from opentakserver.models.EUD import EUD + + user = app.security.datastore.find_user(username=username) + if user: + return user, False + + eud = EUD.query.filter_by(callsign=username).first() + + base_callsign = username + if not eud and "---" in username: + base_callsign = username.split("---")[0] + eud = EUD.query.filter_by(callsign=base_callsign).first() + + if not eud: + spaced = base_callsign.replace("_", " ") + if spaced != base_callsign: + eud = EUD.query.filter_by(callsign=spaced).first() + + if not eud and certlist: + eud = MumbleAuthenticator._eud_from_cert(certlist) + + if eud and eud.user_id: + user = app.security.datastore.find_user(id=eud.user_id) + if user: + return user, True + return None, False + + @staticmethod + def _eud_from_cert(certlist): + """Look up an EUD by parsing the client cert chain's CN. + + ATAK device certs use the EUD UID (e.g. `ANDROID-xxxx`) as the cert CN, + so this lookup survives mid-session callsign renames -- the OTS EUDs + table only updates on the next CoT, but the Mumble plugin auths + immediately with the new name. """ - This function is called to authenticate a user + from opentakserver.models.EUD import EUD + + for cert_bytes in certlist: + try: + cert = x509.load_der_x509_certificate(cert_bytes) + cns = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) + if not cns: + continue + eud = EUD.query.filter_by(uid=cns[0].value).first() + if eud: + return eud + except Exception: + continue + return None + + @staticmethod + def mumble_identity(user, is_callsign_auth, presented_username): + """Return (mumble_user_id, display_name) for an authenticated user. + + PC username auth -> (user.id * 1000, user.username) + ATAK callsign auth -> (user.id * 1000 + hash(callsign) % 999 + 1, callsign) + + The hash offset lets one OTS account connect from multiple ATAK devices + simultaneously, each with a unique Mumble user id (the VX plugin even + opens two sockets per device, each with a different `---` suffix). + """ + if is_callsign_auth: + digest = int(hashlib.md5(presented_username.encode()).hexdigest(), 16) + offset = digest % MUMBLE_ID_CALLSIGN_OFFSET_RANGE + 1 + return user.id * MUMBLE_ID_RANGE + offset, presented_username + return user.id * MUMBLE_ID_RANGE, user.username + + # ----- Murmur Ice callbacks --------------------------------------------- + + def authenticate(self, username, password, certlist, certhash, strong, current=None): + """Authenticate a Mumble client. + + Returns (mumble_id, display_name, group_list) on success or + (-1, None, None) on failure. Returning -2 tells Murmur to use its + own auth fallback (used only for SuperUser). """ if username == "SuperUser": - return (-2, None, None) + return -2, None, None self.logger.info("Mumble auth request for {}".format(username)) with self.app.app_context(): - user = self.app.security.datastore.find_user(username=username) + user, is_callsign_auth = self.resolve_identity(self.app, username, certlist) + if not user: - self.logger.warning("Mumble auth: User {} not found".format(username)) + self.logger.warning("Mumble auth: user {} not found".format(username)) return -1, None, None - elif not user.active: - self.logger.warning("Mumble auth: User {} is deactivated".format(username)) + if not user.active: + self.logger.warning("Mumble auth: user {} is deactivated".format(username)) return -1, None, None + mumble_groups = [g.name for g in user.groups] + if any(r.name == "administrator" for r in user.roles): + mumble_groups.append("admin") + + authenticated = False + if self.app.config.get("OTS_ENABLE_LDAP"): auth_result = ldap_manager.authenticate(username, password) if auth_result.status == AuthenticationResponseStatus.success: - self.logger.info("Mumble auth: {} has been authenticated".format(username)) - - # Keep this import here to avoid a circular import when OTS is started + # Keep this import inline to avoid a circular import at startup. from opentakserver.blueprints.ots_api.ldap_api import save_user save_user( @@ -59,73 +160,103 @@ def authenticate(self, username, password, certlist, certhash, strong, current=N auth_result.user_info, auth_result.user_groups, ) - - return user.id, user.username, None - + authenticated = True + elif is_callsign_auth: + # ATAK clients authenticate by client cert, not password. + # The cert was already validated by Murmur's TLS layer before + # this callback was invoked; we only need to verify that the + # presented identity (callsign or cert CN) maps to an EUD. + authenticated = True elif verify_password(password, user.password): - self.logger.info("Mumble auth: {} has been authenticated".format(username)) - return user.id, user.username, None + authenticated = True - self.logger.warning("Mumble auth: Bad password for {}".format(username)) - return -1, None, None + if not authenticated: + self.logger.warning("Mumble auth: bad password for {}".format(username)) + return -1, None, None - def idToTexture(self, id, current=None): - return + mumble_id, display_name = self.mumble_identity(user, is_callsign_auth, username) + self.logger.info( + "Mumble auth: id={} display={} groups={}".format( + mumble_id, display_name, mumble_groups + ) + ) + return mumble_id, display_name, mumble_groups def getInfo(self, id, current=None): + """Return user info to Murmur so it stays authoritative for Ice-authed users. + + Murmur 1.3 will otherwise look up cert/password against its local + ``user_info`` table, which is empty for Ice-authed users. That causes + a "Wrong certificate or password for existing user" rejection on every + reconnect -- before Ice authenticate() is even called. """ - Gets called to fetch user specific information - """ + if id is None or id <= 0: + return False, None + try: + with self.app.app_context(): + from opentakserver.extensions import db + from opentakserver.models.user import User - # We do not expose any additional information so always fall through - return False, None + user = db.session.get(User, id // MUMBLE_ID_RANGE) + if not user: + return False, None + info = {Murmur.UserInfo.UserName: user.username} + if user.email: + info[Murmur.UserInfo.UserEmail] = user.email + return True, info + except Exception as e: + self.logger.error("Mumble getInfo({}) failed: {}".format(id, e)) + return False, None def nameToId(self, name, current=None): + """Tell Murmur the Mumble id that owns a given name. + + Returning -2 (fall through) makes Murmur consult its local users table, + which causes the rename/reconnect rejection bug; returning the encoded + id keeps Ice authoritative. """ - Gets called to get the id for a given username - """ - pass + if not name or name == "SuperUser": + return -2 + try: + with self.app.app_context(): + user, is_callsign_auth = self.resolve_identity(self.app, name) + if not user: + return -2 + mumble_id, _ = self.mumble_identity(user, is_callsign_auth, name) + return mumble_id + except Exception as e: + self.logger.error("Mumble nameToId({}) failed: {}".format(name, e)) + return -2 def idToName(self, id, current=None): - """ - Gets called to get the username for a given id - """ - pass + """Return display name for a Mumble id. Used for ACL/log lookups.""" + if id is None or id <= 0: + return "" + try: + with self.app.app_context(): + from opentakserver.extensions import db + from opentakserver.models.user import User + + user = db.session.get(User, id // MUMBLE_ID_RANGE) + return user.username if user else "" + except Exception as e: + self.logger.error("Mumble idToName({}) failed: {}".format(id, e)) + return "" def idToTexture(self, id, current=None): - """ - Gets called to get the corresponding texture for a user - """ - # seems like it pulled a user's avatar from a phpbb DB + return b"" - def registerUser(self, name, current=None): - """ - Gets called when the server is asked to register a user. - """ - pass + def registerUser(self, info, current=None): + return -2 def unregisterUser(self, id, current=None): - """ - Gets called when the server is asked to unregister a user. - """ - pass + return -1 def getRegisteredUsers(self, filter, current=None): - """ - Returns a list of usernames in the phpBB3 database which contain - filter as a substring. - """ - pass + return {} def setInfo(self, id, info, current=None): - """ - Gets called when the server is supposed to save additional information - about a user to his database - """ - pass + return 0 def setTexture(self, id, texture, current=None): - """ - Gets called when the server is asked to update the user texture of a user - """ - pass + return -1 diff --git a/opentakserver/mumble/mumble_ice_app.py b/opentakserver/mumble/mumble_ice_app.py index 3bc48b8a..c8eee137 100644 --- a/opentakserver/mumble/mumble_ice_app.py +++ b/opentakserver/mumble/mumble_ice_app.py @@ -1,5 +1,6 @@ import os import threading +import time from threading import Timer import Ice @@ -35,12 +36,20 @@ def run(self): idata = Ice.InitializationData() idata.properties = props - # Create Ice connection + # Create Ice connection. The secret must be in ImplicitContext + # before any proxy call (e.g. checkedCast below) or Murmur rejects + # with InvalidSecretException. ice = Ice.initialize(idata) - proxy = ice.stringToProxy("Meta:tcp -h 127.0.0.1 -p 6502") - secret = "" - if secret != "": + secret = self.app.config.get("OTS_ICE_SECRET", "") + if secret: ice.getImplicitContext().put("secret", secret) + else: + self.logger.warning( + "OTS_ICE_SECRET is empty; Murmur will reject Ice calls if its " + "icesecretread/icesecretwrite is set." + ) + + proxy = ice.stringToProxy("Meta:tcp -h 127.0.0.1 -p 6502") try: meta = Murmur.MetaPrx.checkedCast(proxy) except Ice.ConnectionRefusedException: @@ -63,6 +72,12 @@ def __init__(self, app, logger, ice): self.failed_watch = False self.watchdog = None self.auth = None + self.adapter = None + # server_id -> ServerCallbackPrx; guards against duplicate registration + self.server_callbacks = {} + # Expose this daemon to Flask blueprints so group_api can request channel + # syncs after add/delete. app.extensions is a plain dict; reads are thread-safe. + self.app.extensions["mumble_ice_app"] = self def run(self, *args): if not self.initialize_ice_connection(): @@ -81,18 +96,17 @@ def run(self, *args): def initialize_ice_connection(self): """ Establishes the two-way Ice connection and adds the authenticator to the - configured servers + configured servers. The Ice secret was already pushed into the shared + ImplicitContext by MumbleIceDaemon.run(). """ - # if False and 'ice_secret': - # self.ice.getImplicitContext().put("secret", "some_secret") - self.logger.debug("Connecting to Ice server ({}:{})".format("127.0.0.1", 6502)) base = self.ice.stringToProxy("Meta:tcp -h {} -p {}".format("127.0.0.1", 6502)) self.meta = Murmur.MetaPrx.uncheckedCast(base) adapter = self.ice.createObjectAdapterWithEndpoints("Callback.Client", "tcp -h 127.0.0.1") adapter.activate() + self.adapter = adapter metacbprx = adapter.addWithUUID(MetaCallback(self)) self.metacb = Murmur.MetaCallbackPrx.uncheckedCast(metacbprx) @@ -117,6 +131,7 @@ def attach_callbacks(self): "Setting mumble authenticator for virtual server {}".format(server.id()) ) server.setAuthenticator(self.auth) + self.attach_server_callback(server) except ( Murmur.InvalidSecretException, @@ -141,6 +156,98 @@ def attach_callbacks(self): self.connected = True return True + def attach_server_callback(self, server): + """Register DirectionEnforcementCallback for IN/OUT suppress enforcement. + + Guarded against duplicate registration — check_connection() calls + attach_callbacks() every 10 seconds. The guard is cleared by + on_server_stopped() so a restarted virtual server gets a fresh + callback correctly. + """ + server_id = server.id() + if server_id in self.server_callbacks: + return + + cb = DirectionEnforcementCallback(self.app, self.logger, server) + cbprx = self.adapter.addWithUUID(cb) + server_cb = Murmur.ServerCallbackPrx.uncheckedCast(cbprx) + + try: + server.addCallback(server_cb) + self.server_callbacks[server_id] = server_cb + self.logger.info(f"Direction enforcement callback attached to server {server_id}") + except Exception as e: + self.logger.error(f"Failed to attach server callback to {server_id}: {e}") + + self.sync_channels_from_groups(server) + + def sync_channels_from_groups(self, server): + """Create a root-level Mumble channel for each OTS group lacking one. + + Channel name == group name so DirectionEnforcementCallback's lookup + (which keys by channel name) keeps working. Skips __ANON__. Never + deletes channels — too risky if users are mid-conversation; logs instead. + """ + try: + with self.app.app_context(): + from opentakserver.extensions import db + from opentakserver.models.Group import Group + rows = db.session.query(Group).all() + group_names = {g.name for g in rows if g.name and g.name != "__ANON__"} + + if not group_names: + return + + existing = server.getChannels() + root_names = {ch.name for ch in existing.values() if ch.parent == 0} + + missing = group_names - root_names + stale = root_names - group_names - {"Root"} + + for name in sorted(missing): + try: + cid = server.addChannel(name, 0) + self.logger.info( + f"Mumble channel created for OTS group '{name}' " + f"(server={server.id()}, channel_id={cid})" + ) + except Exception as e: + self.logger.error(f"Failed to create channel '{name}': {e}") + + for name in sorted(stale): + self.logger.warning( + f"Mumble channel '{name}' has no matching OTS group " + f"(server={server.id()}); leaving in place" + ) + except Exception as e: + self.logger.error( + f"sync_channels_from_groups failed: {e}", exc_info=True + ) + + def request_sync(self): + """Trigger a channel sync on all booted servers off-thread. + + Called by group_api after add/delete so newly-created groups get a + Mumble channel without waiting for the next service restart. + """ + threading.Thread(target=self._sync_all_servers, daemon=True).start() + + def _sync_all_servers(self): + try: + for server in self.meta.getBootedServers(): + self.sync_channels_from_groups(server) + except Exception as e: + self.logger.error(f"_sync_all_servers: {e}", exc_info=True) + + def on_server_stopped(self, server_id): + """Clear the callback guard and any cached session state for a stopped server. + + Without this, the duplicate-registration guard would prevent re-registration + when the virtual server restarts. + """ + self.server_callbacks.pop(server_id, None) + self.logger.info(f"Cleared callback guard for stopped server {server_id}") + def check_connection(self): """ Tries reapplies all callbacks to make sure the authenticator @@ -159,6 +266,193 @@ def check_connection(self): self.watchdog.start() +class DirectionEnforcementCallback(Murmur.ServerCallback): + """Enforces OTS IN/OUT speak direction by setting Murmur's suppress flag. + + Channel access (who can enter which channel) is controlled by Murmur's + own ACL configuration. This callback's only job is to mute users whose + OTS group membership has direction=OUT (listen-only) and unmute those + with direction=IN. + + All Ice proxy calls (getState/setState) are dispatched to a background + daemon thread to avoid deadlocking the Ice thread pool. + """ + + def __init__(self, app, logger, server): + Murmur.ServerCallback.__init__(self) + self.app = app + self.logger = logger + self.server = server + self.server_id = server.id() + self._channel_cache = None + self._channel_cache_time = 0 + self._session_lock = threading.Lock() + self._session_cache = {} # session_id -> {directions, is_admin, cached_at} + + # ------------------------------------------------------------------ helpers + + def _get_channel_map(self): + """Return {channel_id: channel_name}, cached for 60 seconds.""" + if self._channel_cache is None or (time.time() - self._channel_cache_time) > 60: + try: + channels = self.server.getChannels() + self._channel_cache = {cid: ch.name for cid, ch in channels.items()} + self._channel_cache_time = time.time() + except Exception as e: + self.logger.error(f"Failed to refresh channel map: {e}") + return self._channel_cache or {} + return self._channel_cache + + def _get_user_directions(self, session_id, username): + """Return (group_directions dict, is_admin) for a user, cached for 30 seconds. + + group_directions maps group_name -> 'IN' or 'OUT'. + Prefers IN over OUT when a user has both rows for the same group. + """ + cache_ttl = 30 + now = time.time() + + with self._session_lock: + cached = self._session_cache.get(session_id) + if cached and (now - cached['cached_at']) < cache_ttl: + return cached['directions'], cached['is_admin'] + + group_directions = {} + is_admin = False + + try: + with self.app.app_context(): + # Reuse the authenticator's lookup chain (username -> callsign -> + # base callsign -> underscore->space) so direction enforcement + # finds users by the same path Mumble auth used. + user, _ = MumbleAuthenticator.resolve_identity(self.app, username) + + if not user: + self.logger.warning(f"Direction lookup: OTS user not found for '{username}'") + return {}, False + + for membership in user.group_memberships: + if not membership.enabled: + continue + grp = membership.group.name + # Prefer IN over OUT if both rows exist for the same group + if group_directions.get(grp) != 'IN': + group_directions[grp] = membership.direction + + is_admin = any(r.name == 'administrator' for r in user.roles) + except Exception as e: + self.logger.error(f"Direction lookup failed for '{username}': {e}", exc_info=True) + return {}, False + + with self._session_lock: + self._session_cache[session_id] = { + 'directions': group_directions, + 'is_admin': is_admin, + 'cached_at': now, + } + + return group_directions, is_admin + + def _dispatch_apply(self, session, username, channel_id, group_directions, is_admin): + """Dispatch only the Ice state calls to a background thread. + + DB queries run in the Ice dispatch thread (same as authenticate() — works fine). + Only getState()/setState() must be off-thread to avoid deadlocking the Ice pool. + """ + threading.Thread( + target=self._apply_direction, + args=(session, username, channel_id, group_directions, is_admin), + daemon=True, + ).start() + + def _apply_direction(self, session, username, channel_id, group_directions, is_admin): + """Background thread: apply suppress flag via Ice calls only.""" + try: + # Admins are never suppressed by this callback, so skip all Ice calls. + # Calling getState() here blocks for 30s and crashes the connection. + if is_admin: + return + + channel_map = self._get_channel_map() + channel_name = channel_map.get(channel_id, f"unknown({channel_id})") + + # Root channel: always allow speaking + if channel_name == 'Root': + try: + s = self.server.getState(session) + if s.suppress: + s.suppress = False + self.server.setState(s) + self.logger.info(f"UNMUTED (Root): {username}") + except Exception as e: + self.logger.error(f"Failed to clear suppress for '{username}' in Root: {e}") + return + + direction = group_directions.get(channel_name) + if direction is None: + return + + s = self.server.getState(session) + should_suppress = (direction == 'OUT') + + if should_suppress and not s.suppress: + s.suppress = True + self.server.setState(s) + self.logger.info(f"LISTEN ONLY: {username} in {channel_name} (direction=OUT)") + try: + self.server.sendMessage( + session, + f"Listen Only: You are receive-only in {channel_name}.", + ) + except Exception: + pass + + elif not should_suppress and s.suppress: + s.suppress = False + self.server.setState(s) + self.logger.info(f"SPEAK ENABLED: {username} in {channel_name} (direction=IN)") + + except Exception as e: + self.logger.error( + f"Unhandled error applying direction for '{username}' session={session}: {e}", + exc_info=True, + ) + + # ----------------------------------------------------------- Ice callbacks + + def userConnected(self, state, current=None): + self.logger.info( + f"User connected: {state.name} (session={state.session}, userid={state.userid}) " + f"channel={state.channel}" + ) + # DB lookup runs here in the Ice dispatch thread (safe — same as authenticate()) + directions, is_admin = self._get_user_directions(state.session, state.name) + # Only the Ice getState/setState calls go to a background thread + self._dispatch_apply(state.session, state.name, state.channel, directions, is_admin) + + def userDisconnected(self, state, current=None): + with self._session_lock: + self._session_cache.pop(state.session, None) + self.logger.info(f"User disconnected: {state.name} (session={state.session})") + + def userStateChanged(self, state, current=None): + """Fire on any state change — channel moves trigger direction re-check.""" + directions, is_admin = self._get_user_directions(state.session, state.name) + self._dispatch_apply(state.session, state.name, state.channel, directions, is_admin) + + def userTextMessage(self, state, message, current=None): + pass + + def channelCreated(self, state, current=None): + self._channel_cache = None + + def channelRemoved(self, state, current=None): + self._channel_cache = None + + def channelStateChanged(self, state, current=None): + self._channel_cache = None + + class MetaCallback(Murmur.MetaCallback): def __init__(self, authenticator): Murmur.MetaCallback.__init__(self) @@ -169,11 +463,13 @@ def started(self, server, current=None): This function is called when a virtual server is started and makes sure an authenticator gets attached if needed. """ + server_id = server.id() self.authenticator.logger.info( - "Setting authenticator for virtual server {}".format(server.id()) + "Virtual server {} started — attaching authenticator and direction callback".format(server_id) ) try: server.setAuthenticator(self.authenticator.auth) + self.authenticator.attach_server_callback(server) # Apparently this server was restarted without us noticing except (Murmur.InvalidSecretException, Ice.UnknownUserException) as e: if hasattr(e, "unknown") and e.unknown != "Murmur::InvalidSecretException": @@ -190,9 +486,11 @@ def stopped(self, server, current=None): # Only try to output the server id if we think we are still connected to prevent # flooding of our thread pool try: + server_id = server.id() self.authenticator.logger.info( - "Authenticated virtual server {} got stopped".format(server.id()) + "Virtual server {} stopped — clearing callback guard".format(server_id) ) + self.authenticator.on_server_stopped(server_id) return except Ice.ConnectionRefusedException: self.authenticator.connected = False