fix: security fixes F-01 through F-11#847
Conversation
📝 WalkthroughWalkthroughThis PR consolidates multiple security hardening and stability improvements: non-deterministic token generation with randomized salts, TTL-based token eviction with async locking, OS-level file-write serialization, sandbox execution via isolated processes with restricted imports, SQL CTE validation, iframe script disabling, auth callback configuration requirements, and WebServer CORS/authentication refactoring. ChangesMulti-layer security hardening and storage improvements
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 4❌ Failed checks (3 warnings, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
No description provided. |
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/ai/src/ai/common/sandbox.py (1)
229-247:⚠️ Potential issue | 🟠 Major | ⚡ Quick winBound
resultbefore sending it over IPC.Only
stdoutandstderrare truncated here. A sandbox script can still setresultto a huge string/list/dict and force an unbounded payload through the queue, which turns attacker-controlled output into memory growth and spurious timeouts. Please normalizeresultto a bounded serialized form beforeresult_queue.put(...).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/ai/src/ai/common/sandbox.py` around lines 229 - 247, The sandbox currently reads sandbox_globals.get('result') into result_val and attaches it to response without truncation, allowing unbounded attacker-controlled payloads to be sent via result_queue.put; before putting the response, normalize and bound result_val by serializing and truncating it (reuse the existing _truncate behavior): if result_val is a primitive (str/int/float/bool/None) or container (list/dict) produce a safe serialized representation with recursive/truncated elements, otherwise use a truncated repr, then assign that bounded string/structure to response['result'] prior to calling result_queue.put; update the handling around result_val/response/result_queue.put to ensure all branches use this truncated/serialized form.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/ai/src/ai/account/keystore.py`:
- Around line 154-157: The code in add_token currently overwrites
self._token_map[token] unconditionally which allows stale activations to replace
newer reservations; update add_token to first verify a live matching reservation
exists and is owned by the caller before activating: look up the reservation
record (e.g. a reservation map like self._reservation_map.get(token) or an
existing token_map entry flagged as a reservation), ensure its owner/identifier
matches the caller (task_name/pool or explicit reservation id) and that it is
not expired, and only then set self._token_map[token] = (apikey, task_name,
pool, endpoint, created_at); if the reservation is missing/doesn’t match or is
expired, do not overwrite and instead raise/return an error. Ensure you still
preserve created_at from the existing active entry when appropriate and perform
this check under self._lock.
In `@packages/ai/src/ai/account/store_providers/filesystem.py`:
- Around line 46-66: The handle-based APIs (open_write, write_chunk,
close_write, open_read) currently bypass the per-path lock used by
write_file/write_file_atomic/delete_file; fix this by having open_write and
open_read acquire the same per-path lock (use the existing lock-file naming
convention and _acquire_lock/_release_lock helpers) and store the lock_fd (and
any lock state) on the returned handle so write_chunk/close_write (and the
handle's close for reads) release the lock and close lock_fd when the handle
lifetime ends; ensure writes acquire an exclusive lock (shared=False) and reads
acquire a shared lock (shared=True), and create the lock file before acquiring
as done in write_file so streamed reads/writes cannot interleave with
atomic/other ops.
- Around line 202-206: The cleanup that unlinks the lock file after closing
lock_fd must be removed to preserve the inode and maintain proper POSIX locking;
locate the block that references lock_fd and lock_path (the try/except that
calls lock_path.unlink()) and delete the unlink call and its surrounding
try/except so the file is only closed, not removed, leaving the .{name}.lock
file in place to ensure subsequent callers block on the same inode rather than a
newly created file.
- Around line 232-251: read_bytes can observe partial writes because write_bytes
acquires an exclusive lock but read_bytes opens the file directly; modify
read_bytes to obtain the same lock file and call self._acquire_lock(lock_fd,
shared=True) before opening/reading the target file, then release the lock via
self._release_lock and os.close(lock_fd) in a finally block (mirroring
write_bytes' lock_path, os.open(...), and cleanup) so readers see consistent
file contents; ensure you handle the file-not-exist case and still release/close
the lock FD if created.
In `@packages/ai/src/ai/common/account/pipeline_validation.py`:
- Around line 22-26: The set comprehension building account_app_ids uses
s['appId'] which can raise KeyError for malformed entries; change it to use
s.get('appId') and only include truthy appIds (e.g., check s.get('appId') and
s.get('status') in active_statuses) so missing or falsy appId values are
skipped; update the comprehension that references account_info.subscribedApps
and the variable account_app_ids accordingly.
- Around line 27-29: The loop compares required_plans (plan tier names) against
account_app_ids (SubscribedApp.appId), which are different identifier spaces;
update the validation in pipeline_validation.py so it checks required plan tiers
against the account's subscription plan field instead of appId: locate where
required_plans and account_app_ids are used, retrieve each account
subscription's plan/tier (e.g., from the SubscribedApp or Subscription object
field that represents plan/plan_tier) and verify required_plans are present
there, or alternatively if the intent was to check apps, rename required_plans
to required_app_ids and compare against SubscribedApp.appId; ensure the code
references the subscription plan field rather than SubscribedApp.appId.
In `@packages/ai/src/ai/common/sandbox.py`:
- Around line 274-307: The current pattern in the sandbox runner (where proc is
the multiprocessing.Process targeting _sandbox_worker and result_queue is a
multiprocessing.Queue) calls proc.join(...) then checks result_queue.empty() and
uses get_nowait(), which can deadlock or miss results; replace this with a
reliable drain by waiting on the queue itself (e.g., call
result_queue.get(timeout=effective_timeout) to atomically wait for the child's
result or switch to multiprocessing.SimpleQueue or a Pipe for one-shot IPC) and
use that return value instead of empty()/get_nowait(); ensure process teardown
(terminate/kill and proc.join) and queue cleanup happen in a finally block so
the child is always cleaned up even on timeout or exceptions.
In `@packages/ai/src/ai/web/server.py`:
- Around line 308-314: The use() method currently imports caller-selected
modules unconditionally; change it to validate module_name against the
ALLOWED_MODULES allowlist before calling importlib.import_module. Specifically,
in the use(self, module_name: str, ...) function, check that module_name is
present in ALLOWED_MODULES (or raise/return an error) prior to calling
importlib.import_module(module_name); only after passing this allowlist check
should you import the module, call mod.register(self, ... ) if present, and
store it in self.app.state.modules[module_name].
- Around line 293-299: The add_websocket method currently only updates
_public_paths when public=True, so private WebSocket endpoints skip the auth
cache; update add_websocket(path, handler, public=False) to mirror add_route by
adding an else branch that appends the path to self._private_paths and refreshes
self._compiled_private_paths (e.g., using compile_path) when public is False,
ensuring authenticate_request() sees compiled private WebSocket paths; keep the
existing self.app.add_api_websocket_route(path, handler) call.
---
Outside diff comments:
In `@packages/ai/src/ai/common/sandbox.py`:
- Around line 229-247: The sandbox currently reads sandbox_globals.get('result')
into result_val and attaches it to response without truncation, allowing
unbounded attacker-controlled payloads to be sent via result_queue.put; before
putting the response, normalize and bound result_val by serializing and
truncating it (reuse the existing _truncate behavior): if result_val is a
primitive (str/int/float/bool/None) or container (list/dict) produce a safe
serialized representation with recursive/truncated elements, otherwise use a
truncated repr, then assign that bounded string/structure to response['result']
prior to calling result_queue.put; update the handling around
result_val/response/result_queue.put to ensure all branches use this
truncated/serialized form.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 1cb32b18-1613-45b7-9810-c26cc41bbf00
📒 Files selected for processing (10)
.github/copilot-instructions.mdpackages/ai/src/ai/account/base.pypackages/ai/src/ai/account/keystore.pypackages/ai/src/ai/account/store_providers/filesystem.pypackages/ai/src/ai/common/account/pipeline_validation.pypackages/ai/src/ai/common/database/sql_safety.pypackages/ai/src/ai/common/sandbox.pypackages/ai/src/ai/web/endpoints/auth_callback.pypackages/ai/src/ai/web/server.pypackages/shared-ui/src/modules/chat/components/MarkdownRenderer.tsx
| async with self._lock: | ||
| existing = self._token_map.get(token) | ||
| created_at = existing[4] if existing else time.monotonic() | ||
| self._token_map[token] = (apikey, task_name, pool, endpoint, created_at) |
There was a problem hiding this comment.
Require a live matching reservation before activating a token.
add_token() overwrites any existing entry without checking that the caller still owns the reservation. After TTL reuse, a stale activation can replace a newer reservation for the same token and break the API-key binding.
🐛 Proposed fix
async def add_token(self, apikey: str, token: str, task_name: str, pool: str, endpoint: str) -> None:
"""
Add or update a token with backend routing information.
@@
"""
async with self._lock:
+ self._evict_expired()
existing = self._token_map.get(token)
- created_at = existing[4] if existing else time.monotonic()
+ if not existing or existing[0] != apikey:
+ raise ValueError(f'Token "{token}" is not valid.')
+ created_at = existing[4]
self._token_map[token] = (apikey, task_name, pool, endpoint, created_at)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/ai/account/keystore.py` around lines 154 - 157, The code in
add_token currently overwrites self._token_map[token] unconditionally which
allows stale activations to replace newer reservations; update add_token to
first verify a live matching reservation exists and is owned by the caller
before activating: look up the reservation record (e.g. a reservation map like
self._reservation_map.get(token) or an existing token_map entry flagged as a
reservation), ensure its owner/identifier matches the caller (task_name/pool or
explicit reservation id) and that it is not expired, and only then set
self._token_map[token] = (apikey, task_name, pool, endpoint, created_at); if the
reservation is missing/doesn’t match or is expired, do not overwrite and instead
raise/return an error. Ensure you still preserve created_at from the existing
active entry when appropriate and perform this check under self._lock.
| """ | ||
| Write data to file. | ||
|
|
||
| Fix F-05: acquires an exclusive flock before writing so that | ||
| concurrent coroutines writing to the same path are serialised and | ||
| cannot interleave their output. | ||
| """ | ||
| try: | ||
| full_path = self._get_full_path(filename) | ||
|
|
||
| # Create parent directories | ||
| full_path.parent.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| # Write file | ||
| async with aiofiles.open(full_path, 'w', encoding='utf-8') as f: | ||
| await f.write(data) | ||
| lock_path = full_path.parent / f'.{full_path.name}.lock' | ||
| lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR, 0o644) | ||
|
|
||
| try: | ||
| await self._acquire_lock(lock_fd, shared=False) | ||
| async with aiofiles.open(full_path, 'w', encoding='utf-8') as f: | ||
| await f.write(data) | ||
| finally: | ||
| await self._release_lock(lock_fd) | ||
| os.close(lock_fd) |
There was a problem hiding this comment.
Handle-based I/O still bypasses this lock protocol.
open_write/write_chunk/close_write and open_read never acquire .{name}.lock, so a streamed transfer can still interleave with write_file, write_file_atomic, or delete_file and expose partial content. To actually close the filesystem race this PR is targeting, those context-based APIs need to hold the same per-path lock for the full lifetime of the handle.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/ai/account/store_providers/filesystem.py` around lines 46 -
66, The handle-based APIs (open_write, write_chunk, close_write, open_read)
currently bypass the per-path lock used by
write_file/write_file_atomic/delete_file; fix this by having open_write and
open_read acquire the same per-path lock (use the existing lock-file naming
convention and _acquire_lock/_release_lock helpers) and store the lock_fd (and
any lock state) on the returned handle so write_chunk/close_write (and the
handle's close for reads) release the lock and close lock_fd when the handle
lifetime ends; ensure writes acquire an exclusive lock (shared=False) and reads
acquire a shared lock (shared=True), and create the lock file before acquiring
as done in write_file so streamed reads/writes cannot interleave with
atomic/other ops.
| try: | ||
| if lock_path.exists(): | ||
| lock_path.unlink() | ||
| except Exception: # noqa: S110 | ||
| pass # Ignore cleanup errors | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Do not unlink the lock file after releasing it.
After lock_fd is closed, another operation can open the same lock file and wait on that inode while this cleanup removes the path. The next caller will then recreate a fresh .{name}.lock inode and acquire an unrelated lock, which breaks mutual exclusion on Unix and can reintroduce write/delete races.
Suggested fix
finally:
await self._release_lock(lock_fd)
os.close(lock_fd)
-
- try:
- if lock_path.exists():
- lock_path.unlink()
- except Exception:
- pass📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| if lock_path.exists(): | |
| lock_path.unlink() | |
| except Exception: # noqa: S110 | |
| pass # Ignore cleanup errors | |
| except Exception: | |
| pass | |
| finally: | |
| await self._release_lock(lock_fd) | |
| os.close(lock_fd) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/ai/account/store_providers/filesystem.py` around lines 202 -
206, The cleanup that unlinks the lock file after closing lock_fd must be
removed to preserve the inode and maintain proper POSIX locking; locate the
block that references lock_fd and lock_path (the try/except that calls
lock_path.unlink()) and delete the unlink call and its surrounding try/except so
the file is only closed, not removed, leaving the .{name}.lock file in place to
ensure subsequent callers block on the same inode rather than a newly created
file.
| """ | ||
| Write binary data to file. | ||
|
|
||
| Fix F-11: acquires an exclusive lock before writing binary data, | ||
| consistent with the text write_file fix (F-05). | ||
| """ | ||
| try: | ||
| full_path = self._get_full_path(filename) | ||
| full_path.parent.mkdir(parents=True, exist_ok=True) | ||
| async with aiofiles.open(full_path, 'wb') as f: | ||
| await f.write(data) | ||
|
|
||
| lock_path = full_path.parent / f'.{full_path.name}.lock' | ||
| lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR, 0o644) | ||
|
|
||
| try: | ||
| await self._acquire_lock(lock_fd, shared=False) | ||
| async with aiofiles.open(full_path, 'wb') as f: | ||
| await f.write(data) | ||
| finally: | ||
| await self._release_lock(lock_fd) | ||
| os.close(lock_fd) |
There was a problem hiding this comment.
Binary reads can still observe partial writes.
write_bytes now serializes writers, but read_bytes still opens the target directly without taking the shared lock. A concurrent read_bytes can therefore read a truncated or partially rewritten file while this 'wb' write is in progress.
Suggested fix
async def read_bytes(self, filename: str) -> bytes:
"""Read binary data from file."""
try:
full_path = self._get_full_path(filename)
if not full_path.exists():
raise StorageError(f'File not found: {filename}')
- async with aiofiles.open(full_path, 'rb') as f:
- return await f.read()
+ lock_path = full_path.parent / f'.{full_path.name}.lock'
+ full_path.parent.mkdir(parents=True, exist_ok=True)
+ lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR, 0o644)
+
+ try:
+ await self._acquire_lock(lock_fd, shared=True)
+ async with aiofiles.open(full_path, 'rb') as f:
+ return await f.read()
+ finally:
+ await self._release_lock(lock_fd)
+ os.close(lock_fd)
except StorageError:
raise
except Exception as e:
raise StorageError(f'Failed to read file {filename}: {e}') from e📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| """ | |
| Write binary data to file. | |
| Fix F-11: acquires an exclusive lock before writing binary data, | |
| consistent with the text write_file fix (F-05). | |
| """ | |
| try: | |
| full_path = self._get_full_path(filename) | |
| full_path.parent.mkdir(parents=True, exist_ok=True) | |
| async with aiofiles.open(full_path, 'wb') as f: | |
| await f.write(data) | |
| lock_path = full_path.parent / f'.{full_path.name}.lock' | |
| lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR, 0o644) | |
| try: | |
| await self._acquire_lock(lock_fd, shared=False) | |
| async with aiofiles.open(full_path, 'wb') as f: | |
| await f.write(data) | |
| finally: | |
| await self._release_lock(lock_fd) | |
| os.close(lock_fd) | |
| async def read_bytes(self, filename: str) -> bytes: | |
| """Read binary data from file.""" | |
| try: | |
| full_path = self._get_full_path(filename) | |
| if not full_path.exists(): | |
| raise StorageError(f'File not found: {filename}') | |
| lock_path = full_path.parent / f'.{full_path.name}.lock' | |
| full_path.parent.mkdir(parents=True, exist_ok=True) | |
| lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR, 0o644) | |
| try: | |
| await self._acquire_lock(lock_fd, shared=True) | |
| async with aiofiles.open(full_path, 'rb') as f: | |
| return await f.read() | |
| finally: | |
| await self._release_lock(lock_fd) | |
| os.close(lock_fd) | |
| except StorageError: | |
| raise | |
| except Exception as e: | |
| raise StorageError(f'Failed to read file {filename}: {e}') from e |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/ai/account/store_providers/filesystem.py` around lines 232 -
251, read_bytes can observe partial writes because write_bytes acquires an
exclusive lock but read_bytes opens the file directly; modify read_bytes to
obtain the same lock file and call self._acquire_lock(lock_fd, shared=True)
before opening/reading the target file, then release the lock via
self._release_lock and os.close(lock_fd) in a finally block (mirroring
write_bytes' lock_path, os.open(...), and cleanup) so readers see consistent
file contents; ensure you handle the file-not-exist case and still release/close
the lock FD if created.
| account_app_ids = { | ||
| s['appId'] | ||
| for s in (account_info.subscribedApps or []) | ||
| if s.get('status') in active_statuses | ||
| } |
There was a problem hiding this comment.
Fix inconsistent dict access: use .get() for 'appId' key.
Line 23 uses s['appId'] which will raise KeyError if the key is missing, while line 25 safely uses s.get('status'). If subscribedApps contains a malformed entry without an 'appId' key, the validation will crash.
🛡️ Proposed fix to use safe dict access
active_statuses = {'active', 'trialing'}
account_app_ids = {
- s['appId']
+ s.get('appId')
for s in (account_info.subscribedApps or [])
- if s.get('status') in active_statuses
+ if s.get('appId') and s.get('status') in active_statuses
}Note: Added s.get('appId') check to the filter condition to exclude entries with missing or falsy appId values from the set.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/ai/common/account/pipeline_validation.py` around lines 22 -
26, The set comprehension building account_app_ids uses s['appId'] which can
raise KeyError for malformed entries; change it to use s.get('appId') and only
include truthy appIds (e.g., check s.get('appId') and s.get('status') in
active_statuses) so missing or falsy appId values are skipped; update the
comprehension that references account_info.subscribedApps and the variable
account_app_ids accordingly.
| for required_plan in required_plans: | ||
| if required_plan not in account_plans: | ||
| if required_plan not in account_app_ids: | ||
| return False |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Search for usage patterns of 'plans' and 'appId' to verify they reference the same identifier space.
# Search for where component schema 'plans' are defined or documented
echo "=== Searching for component schema 'plans' definitions ==="
rg -n -C3 --type=py "plans.*=.*\[" -g '!test*'
# Search for SubscribedApp or subscribedApps type definitions
echo -e "\n=== Searching for SubscribedApp type definitions ==="
rg -n -C5 --type=py "class.*SubscribedApp|subscribedApps.*:" -g '!test*'
# Search for where appId and plans are used together
echo -e "\n=== Searching for appId and plans used together ==="
rg -n -C3 --type=py "(appId.*plan|plan.*appId)" -iRepository: rocketride-org/rocketride-server
Length of output: 2531
🏁 Script executed:
#!/bin/bash
# Read the full pipeline_validation.py file to understand the data flow
cat -n packages/ai/src/ai/common/account/pipeline_validation.pyRepository: rocketride-org/rocketride-server
Length of output: 3105
🏁 Script executed:
#!/bin/bash
# Search for getServiceDefinition implementation or usage examples
echo "=== Searching for getServiceDefinition usage and context ==="
rg -n -C5 "getServiceDefinition" --type=py | head -100
# Search for component schema examples or definitions
echo -e "\n=== Searching for component schema definitions ==="
rg -n "schema.*plans" --type=py | head -50
# Look for tests related to plan validation
echo -e "\n=== Searching for plan validation tests ==="
fd -t f -e py -p "*test*" | xargs grep -l "plan\|Plan" | head -10Repository: rocketride-org/rocketride-server
Length of output: 8729
🏁 Script executed:
#!/bin/bash
# Look for RocketRide documentation files
echo "=== RocketRide documentation files ==="
find .rocketride -name "*.md" -type f 2>/dev/null | head -20
# Also search for any component schema examples or test fixtures
echo -e "\n=== Searching for component schema examples ==="
fd -t f \( -name "*.json" -o -name "*.py" \) -e json -e py | xargs grep -l "\"plans\"" 2>/dev/null | head -10Repository: rocketride-org/rocketride-server
Length of output: 413
🏁 Script executed:
#!/bin/bash
# Fix the fd command and search for schema examples
echo "=== Searching for component schema examples in test files ==="
find . -type f -name "*.json" | xargs grep -l "plans" 2>/dev/null | head -20
# Search for test files related to pipeline validation
echo -e "\n=== Searching for pipeline validation tests ==="
find . -type f -name "*test*.py" -o -name "*_test.py" | xargs grep -l "pipeline_validation\|AccountPipelineValidation" 2>/dev/null
# Search for any documentation about the plans field
echo -e "\n=== Searching for documentation about 'plans' field ==="
rg -i "plans.*field|plans.*subscription|plans.*appid" --type=py -A2 -B2
# Look for setup or fixture files that might define component schemas
echo -e "\n=== Searching for fixture or setup files ==="
find . -type f \( -name "*.json" -o -name "conftest.py" \) | grep -i "fixture\|setup\|test" | head -10Repository: rocketride-org/rocketride-server
Length of output: 787
🏁 Script executed:
#!/bin/bash
# Read the component schema file to see the structure of 'plans'
echo "=== Content of services.enterprise.json ==="
cat ./nodes/src/nodes/llm_vertex/services.enterprise.json
# Look for other component schema files
echo -e "\n=== Other service definition files ==="
find . -name "services*.json" -type f | head -20Repository: rocketride-org/rocketride-server
Length of output: 13808
🏁 Script executed:
#!/bin/bash
# Get full SubscribedApp definition with surrounding context
echo "=== Full SubscribedApp TypedDict definition ==="
sed -n '59,75p' packages/ai/src/ai/account/models.py
# Search for how subscribedApps is populated or used
echo -e "\n=== Usage of subscribedApps in codebase ==="
rg -n "subscribedApps" --type=py -B2 -A2 | head -80
# Look for examples or test data showing what appId values look like
echo -e "\n=== Searching for appId values or examples ==="
rg -n "appId.*=" --type=py -B1 -A1 | head -60Repository: rocketride-org/rocketride-server
Length of output: 4163
🏁 Script executed:
#!/bin/bash
# Search for where subscribedApps is populated or constructed
echo "=== Searching for where subscribedApps is assigned/populated ==="
rg -n "subscribedApps\s*=" --type=py -B5 -A5
# Search for AccountInfo construction
echo -e "\n=== Searching for AccountInfo instantiation ==="
rg -n "AccountInfo(" --type=py -B3 -A3 | head -100
# Look in billing context for app/plan relationship
echo -e "\n=== Searching for billing-related plan/app definitions ==="
rg -n "plans.*appId|appId.*plan" --type=py -i
# Check if there's external documentation or comments about appId meaning
echo -e "\n=== Searching for comments about appId semantics ==="
rg -n -C3 "appId.*app|app.*appId" --type=py | grep -i "comment\|#\|plan\|tier"Repository: rocketride-org/rocketride-server
Length of output: 852
🏁 Script executed:
#!/bin/bash
# Read more context around AccountInfo and subscribedApps
echo "=== AccountInfo section with full context ==="
sed -n '88,115p' packages/ai/src/ai/account/models.py
# Search for where AccountInfo might come from (API endpoints, integrations)
echo -e "\n=== Searching for AccountInfo in API/integration code ==="
rg -n "AccountInfo" --type=py | grep -E "(from|import|return|fetch|load|auth)" | head -20
# Look for any test fixtures or examples with subscribedApps
echo -e "\n=== Searching for test data or fixtures ==="
find . -name "*.py" -type f | xargs grep -l "subscribedApps" 2>/dev/null
# Check if there's a schema validation or model definition file
echo -e "\n=== Checking for schema files ==="
find . -name "*.json" -o -name "*.yaml" | xargs grep -l "subscribedApps\|appId" 2>/dev/null | head -10Repository: rocketride-org/rocketride-server
Length of output: 3583
🏁 Script executed:
#!/bin/bash
# Search for billing/subscription integration code
echo "=== Searching for subscription/plan handling ==="
find . -path ./node_modules -prune -o -type f -name "*.py" -print | xargs grep -l "subscription\|billing" 2>/dev/null | head -10
# Check the client billing types
echo -e "\n=== Client billing types ==="
cat packages/client-python/src/rocketride/types/billing.py 2>/dev/null | head -100
# Look for any server-side account setup that populates subscribedApps
echo -e "\n=== Searching for account initialization or auth setup ==="
rg -n "subscribedApps.*\[" --type=py -B10 -A2
# Check if there's any relationship between plan names and appId documented
echo -e "\n=== Searching for plan tier definitions ==="
find . -type f \( -name "*.json" -o -name "*.py" \) | xargs grep -l "enterprise\|premium\|basic" 2>/dev/null | head -10Repository: rocketride-org/rocketride-server
Length of output: 7253
🏁 Script executed:
#!/bin/bash
# Search for where AccountPipelineValidation.validate is called
echo "=== Searching for where validate() is called ==="
rg -n "\.validate\(" --type=py -B3 -A3 | grep -A5 -B5 "pipeline_validation\|AccountPipelineValidation"
# Check if there's documentation about what appId values represent
echo -e "\n=== More context on appId in BillingDetail ==="
sed -n '40,75p' packages/client-python/src/rocketride/types/billing.py
# Search for AppManifestEntry to understand the app ID structure
echo -e "\n=== Searching for AppManifestEntry ==="
rg -n "AppManifestEntry" --type=py -B2 -A5 | head -50Repository: rocketride-org/rocketride-server
Length of output: 3143
Critical issue: Comparing incompatible identifier types.
The code compares required_plan values (plan tier names like "enterprise" from component schemas) against account_app_ids (app identifiers like "rocketride.brandy" from SubscribedApp.appId). These are different identifier spaces. The validation checks if plan tier names exist in app IDs, which will always fail. The SubscribedApp type contains appId and status, but no plan tier field. The fix needs to check against actual subscription plan tiers, not app identifiers.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/ai/common/account/pipeline_validation.py` around lines 27 -
29, The loop compares required_plans (plan tier names) against account_app_ids
(SubscribedApp.appId), which are different identifier spaces; update the
validation in pipeline_validation.py so it checks required plan tiers against
the account's subscription plan field instead of appId: locate where
required_plans and account_app_ids are used, retrieve each account
subscription's plan/tier (e.g., from the SubscribedApp or Subscription object
field that represents plan/plan_tier) and verify required_plans are present
there, or alternatively if the intent was to check apps, rename required_plans
to required_app_ids and compare against SubscribedApp.appId; ensure the code
references the subscription plan field rather than SubscribedApp.appId.
| # Use a multiprocessing Queue to receive the result from the child process. | ||
| result_queue: multiprocessing.Queue = multiprocessing.Queue() | ||
|
|
||
| proc = multiprocessing.Process( | ||
| target=_sandbox_worker, | ||
| args=(code, allowlist, result_queue), | ||
| daemon=True, | ||
| ) | ||
| # Clear the import cache so the freshly installed module is found | ||
| importlib.invalidate_caches() | ||
| proc.start() | ||
| proc.join(timeout=effective_timeout) | ||
|
|
||
| if proc.is_alive(): | ||
| # Timed out — forcibly kill the child process (F-09 fix). | ||
| proc.terminate() | ||
| proc.join(timeout=2) | ||
| if proc.is_alive(): | ||
| proc.kill() | ||
| proc.join(timeout=1) | ||
| return { | ||
| 'stdout': '', | ||
| 'stderr': f'[Execution timed out after {effective_timeout}s]', | ||
| 'exit_code': -1, | ||
| 'timed_out': True, | ||
| } | ||
|
|
||
| if result_queue.empty(): | ||
| return { | ||
| 'stdout': '', | ||
| 'stderr': '[Sandbox process exited without producing a result]', | ||
| 'exit_code': proc.exitcode if proc.exitcode is not None else -1, | ||
| 'timed_out': False, | ||
| } | ||
|
|
||
| return result_queue.get_nowait() |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
According to Python's multiprocessing documentation, is it safe to call Process.join() before reading from a multiprocessing.Queue written by that child, and is Queue.empty() reliable across processes?
💡 Result:
No. In the multiprocessing docs, it’s warned that “a process that has put items in a queue … will not terminate until all the buffered items are fed by the ‘feeder’ thread to the underlying pipe” and that you must ensure “all items which have been put on the queue will eventually be removed before the process is joined; otherwise you cannot be sure that processes … will terminate” [1]. So calling Process.join before you’ve drained (read) the items from the multiprocessing.Queue written by that child is not considered safe/robust; in some cases it can deadlock. Also, Queue.empty is not reliable across processes. The multiprocessing docs state that for multiprocessing.Queue, empty “Because of multithreading/multiprocessing semantics, this is not reliable” [2]. They also note there can be an “infinitesimal delay” after a put on an empty queue before empty flips to False / get_nowait succeeds [3]. Practical guidance consistent with the docs: don’t use empty/qsize for cross-process flow control; instead use blocking get (or get with a timeout) and an explicit termination protocol (e.g., sentinels) so the consumer knows when to stop [2][3].
Citations:
- 1: https://docs.python.org/3/library/multiprocessing.html?highlight=current_processing
- 2: https://docs.python.org/3.12/library/multiprocessing.html
- 3: https://docs.python.org/3/library/multiprocessing.html
Replace join() then empty()/get_nowait() with reliable queue drainage.
The current flow calls proc.join(timeout=...) before reading from the queue, which violates multiprocessing semantics. A child process won't fully terminate until its queued output is drained by the parent's feeder thread; premature join can deadlock or cause the parent to timeout incorrectly while valid results sit buffered in IPC. The follow-up empty() check is also unreliable across processes (documented as unsafe due to multiprocessing semantics). Instead, use result_queue.get(timeout=effective_timeout) to atomically wait for and retrieve the result, or replace Queue with SimpleQueue/Pipe for one-shot communication. Wrap queue/process teardown in finally to ensure cleanup.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/ai/common/sandbox.py` around lines 274 - 307, The current
pattern in the sandbox runner (where proc is the multiprocessing.Process
targeting _sandbox_worker and result_queue is a multiprocessing.Queue) calls
proc.join(...) then checks result_queue.empty() and uses get_nowait(), which can
deadlock or miss results; replace this with a reliable drain by waiting on the
queue itself (e.g., call result_queue.get(timeout=effective_timeout) to
atomically wait for the child's result or switch to multiprocessing.SimpleQueue
or a Pipe for one-shot IPC) and use that return value instead of
empty()/get_nowait(); ensure process teardown (terminate/kill and proc.join) and
queue cleanup happen in a finally block so the child is always cleaned up even
on timeout or exceptions.
| def add_websocket(self, path: str, handler, public: bool = False): | ||
| """Register a WebSocket endpoint.""" | ||
| self.app.add_api_websocket_route(path, handler) | ||
| if public: | ||
| # If this is a public endpoint, add it to the public endpoints set | ||
| self._public_paths.append(path) | ||
| # Invalidate the compiled cache so it is rebuilt on the next is_public_route() call | ||
| self._compiled_public_paths = None | ||
|
|
||
| if private: | ||
| # If this is a private endpoint, specifically add it to the private endpoints set | ||
| self._private_paths.append(path) | ||
| # Invalidate the compiled cache so it is rebuilt on the next is_public_route() call | ||
| self._compiled_private_paths = None | ||
|
|
||
| def use(self, moduleName: str, config: Optional[Dict[str, Any]] = None): | ||
| """ | ||
| Dynamically loads a service module and enables its API endpoints. | ||
|
|
||
| Args: | ||
| module (str): The name of the service module to load. | ||
|
|
||
| Returns: | ||
| module: The loaded module instance. | ||
|
|
||
| Behavior: | ||
| - Uses `importlib.import_module()` to dynamically import a module. | ||
| - The module is expected to exist within the `ai.modules.<module>.endpoints` namespace. | ||
| - This allows API endpoints from the specified module to be registered dynamically. | ||
|
|
||
| Example: | ||
| >>> use('analytics') | ||
| <module 'ai.modules.analytics.endpoints' from 'path/to/module.py'> | ||
| """ | ||
| # Clean it up | ||
| moduleName = moduleName.lower().strip() | ||
|
|
||
| # Validate against allowlist to prevent arbitrary module injection. | ||
| # Without this check, an attacker could load arbitrary Python modules | ||
| # via importlib.import_module(), leading to remote code execution. | ||
| if moduleName not in ALLOWED_MODULES: | ||
| raise ValueError( | ||
| f'Module {moduleName!r} is not allowed. Permitted modules: {", ".join(sorted(ALLOWED_MODULES))}' | ||
| ) | ||
|
|
||
| # If it is already loaded, return success | ||
| if moduleName in self.app.state.modules: | ||
| return | ||
|
|
||
| # Dynamically import the module using its fully-qualified package path | ||
| moduleHandle = importlib.import_module(f'ai.modules.{moduleName}') | ||
| self._compiled_public_paths = [compile_path(p)[0] for p in self._public_paths] | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Auth middleware implementation ==="
fd -i 'middleware.py' packages/ai/src/ai/web -x sh -c 'echo "--- {} ---"; sed -n "1,240p" "$1"' sh {}
echo
echo "=== WebSocket registrations ==="
rg -nP --type=py '\badd_websocket\s*\(' packages/ai/src -C2
echo
echo "=== authenticate_request call sites ==="
rg -nP --type=py '\bauthenticate_request\s*\(' packages/ai/src -C2Repository: rocketride-org/rocketride-server
Length of output: 2672
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== is_public_route() implementation ==="
rg -nP --type=py '\bis_public_route\s*\(' packages/ai/src -A10
echo
echo "=== All _private_paths usages ==="
rg -nP --type=py '_private_paths' packages/ai/src
echo
echo "=== Full add_route and add_websocket methods for comparison ==="
rg -nP --type=py '^\s+def add_route|^\s+def add_websocket' packages/ai/src -A8Repository: rocketride-org/rocketride-server
Length of output: 3296
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Context around line 258-259 (is_public_route implementation) ==="
sed -n '250,275p' packages/ai/src/ai/web/server.pyRepository: rocketride-org/rocketride-server
Length of output: 1075
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== All add_websocket calls ==="
rg -nP --type=py '\badd_websocket\s*\(' packages/ai/src -B2 -A2
echo
echo "=== Check if any WebSocket added without public=True ==="
rg -nP --type=py 'add_websocket.*\(.*\)' packages/ai/srcRepository: rocketride-org/rocketride-server
Length of output: 725
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Search for add_websocket invocations (any object calling it) ==="
rg -nP 'add_websocket' packages/ai/src --type=py
echo
echo "=== Search in entire repo for add_websocket calls ==="
rg -nP 'add_websocket' . --type=py 2>/dev/null | head -30Repository: rocketride-org/rocketride-server
Length of output: 418
Add private WebSocket paths to auth cache to match HTTP route behavior.
The add_websocket() method lacks an else branch to populate _private_paths when public=False. This causes private WebSocket routes to bypass the _compiled_private_paths check in authenticate_request(), allowing them to skip authentication. The add_route() method correctly populates both caches; add_websocket() must do the same.
🐛 Proposed fix
def add_websocket(self, path: str, handler, public: bool = False):
"""Register a WebSocket endpoint."""
self.app.add_api_websocket_route(path, handler)
if public:
self._public_paths.append(path)
self._compiled_public_paths = [compile_path(p)[0] for p in self._public_paths]
+ else:
+ self._private_paths.append(path)
+ self._compiled_private_paths = [compile_path(p)[0] for p in self._private_paths]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/ai/web/server.py` around lines 293 - 299, The add_websocket
method currently only updates _public_paths when public=True, so private
WebSocket endpoints skip the auth cache; update add_websocket(path, handler,
public=False) to mirror add_route by adding an else branch that appends the path
to self._private_paths and refreshes self._compiled_private_paths (e.g., using
compile_path) when public is False, ensuring authenticate_request() sees
compiled private WebSocket paths; keep the existing
self.app.add_api_websocket_route(path, handler) call.
| def use(self, module_name: str, *args, **kwargs): | ||
| """Dynamically load and activate a module.""" | ||
| mod = importlib.import_module(module_name) | ||
| if hasattr(mod, 'register'): | ||
| mod.register(self, *args, **kwargs) | ||
| self.app.state.modules[module_name] = mod | ||
| return mod |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== /use endpoint implementation ==="
fd -i 'use.py' packages/ai/src/ai/web/endpoints -x sh -c 'echo "--- {} ---"; sed -n "1,240p" "$1"' sh {}
echo
echo "=== ALLOWED_MODULES definition and related references ==="
rg -nP --type=py '\b(ALLOWED_MODULES|ALL)\b' packages/ai/src -C2
echo
echo "=== Dynamic import call sites ==="
rg -nP --type=py '\bimport_module\s*\(' packages/ai/src -C2Repository: rocketride-org/rocketride-server
Length of output: 4800
🏁 Script executed:
# Get the full ALLOWED_MODULES definition
rg -nP --type=py 'ALL = frozenset' packages/ai/src/ai/modules/__init__.py -A 20
# Check the full server.py use() method context and module-level imports
sed -n '1,50p' packages/ai/src/ai/web/server.py
# Check if there's any validation happening elsewhere in the use() flow
sed -n '300,320p' packages/ai/src/ai/web/server.py
# Confirm ALLOWED_MODULES is module-level and accessible
rg -nP --type=py 'ALLOWED_MODULES' packages/ai/src/ai/web/server.py -B 5 -A 10Repository: rocketride-org/rocketride-server
Length of output: 3502
Restore the module allowlist before importing caller-selected modules.
ALLOWED_MODULES is imported in this file, but use() calls importlib.import_module(module_name) unconditionally. If the /use endpoint forwards request input here, an authenticated caller can load arbitrary installed modules and trigger their import-time side effects. Validate that module_name is in ALLOWED_MODULES before importing.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/ai/web/server.py` around lines 308 - 314, The use() method
currently imports caller-selected modules unconditionally; change it to validate
module_name against the ALLOWED_MODULES allowlist before calling
importlib.import_module. Specifically, in the use(self, module_name: str, ...)
function, check that module_name is present in ALLOWED_MODULES (or raise/return
an error) prior to calling importlib.import_module(module_name); only after
passing this allowlist check should you import the module, call
mod.register(self, ... ) if present, and store it in
self.app.state.modules[module_name].
|
Hi @aviralb13 — thanks for tackling these security findings, the individual fixes (F-01 through F-11) address real issues. However, this PR has grown well beyond targeted security fixes and we can't merge it in its current form. A few key concerns: Breaking API changes in Module loading allowlist removed: The Scope creep: MIT license headers, docstrings, and comments have been stripped across multiple files, the MarkdownRenderer has been re-indented to 8-space tabs, and Recommendation: Could you split this into smaller, focused PRs — one per fix or a few closely related fixes per PR? The security logic (F-01 sandbox escape, F-02 KeyStore races, F-10 token generation, etc.) can each land cleanly on their own without the refactoring and comment removal mixed in. That way we can get the critical fixes merged quickly without risking breakage. |
Summary
Type
Fix
Testing
./builder testpassesChecklist
Linked Issue
Fixes #
Fixes #12, Fixes #15, Fixes #18