Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/python/controller/auto_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ def add_listener(self, listener: IAutoQueuePersistListener):

@classmethod
@overrides(Persist)
def from_str(cls: "AutoQueuePersist", content: str) -> "AutoQueuePersist":
persist = AutoQueuePersist()
def from_str(cls: type["AutoQueuePersist"], content: str) -> "AutoQueuePersist":
persist = cls()
try:
dct = json.loads(content)
pattern_list = dct[AutoQueuePersist.__KEY_PATTERNS]
Expand Down Expand Up @@ -356,6 +356,8 @@ def _is_auto_queue_enabled_for_file(self, file: ModelFile) -> bool:
caller (process()) already gates on the global __enabled flag.
"""
if self.__pair_auto_queue:
if file.pair_id is None:
return False
return self.__pair_auto_queue.get(file.pair_id, False)
return True

Expand Down Expand Up @@ -405,8 +407,8 @@ def __match(pattern: AutoQueuePattern, file: ModelFile) -> bool:
:return:
"""
# make the search case insensitive
pattern = pattern.pattern.lower()
pattern_str = pattern.pattern.lower()
filename = file.name.lower()
# 1. pattern match
# 2. wildcard match
return pattern in filename or fnmatch.fnmatch(filename, pattern)
return pattern_str in filename or fnmatch.fnmatch(filename, pattern_str)
22 changes: 11 additions & 11 deletions src/python/controller/extract/extract_process.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Copyright 2017, Inderpreet Singh, All rights reserved.

import datetime
import logging
import multiprocessing
import queue
import time
from datetime import datetime

from common import AppProcess, overrides

Expand All @@ -19,15 +19,15 @@ def __init__(self, timestamp: datetime, statuses: list[ExtractStatus]):


class ExtractCompletedResult:
def __init__(self, timestamp: datetime, name: str, is_dir: bool, pair_id: str = None):
def __init__(self, timestamp: datetime, name: str, is_dir: bool, pair_id: str | None = None):
self.timestamp = timestamp
self.name = name
self.is_dir = is_dir
self.pair_id = pair_id


class ExtractFailedResult:
def __init__(self, timestamp: datetime, name: str, is_dir: bool, pair_id: str = None):
def __init__(self, timestamp: datetime, name: str, is_dir: bool, pair_id: str | None = None):
self.timestamp = timestamp
self.name = name
self.is_dir = is_dir
Expand All @@ -45,18 +45,16 @@ def __init__(
self.completed_queue = completed_queue
self.failed_queue = failed_queue

def extract_completed(self, name: str, is_dir: bool, pair_id: str = None):
def extract_completed(self, name: str, is_dir: bool, pair_id: str | None = None):
self.logger.info("Extraction completed for {}".format(name))
completed_result = ExtractCompletedResult(
timestamp=datetime.datetime.now(), name=name, is_dir=is_dir, pair_id=pair_id
timestamp=datetime.now(), name=name, is_dir=is_dir, pair_id=pair_id
)
self.completed_queue.put(completed_result)

def extract_failed(self, name: str, is_dir: bool, pair_id: str = None):
def extract_failed(self, name: str, is_dir: bool, pair_id: str | None = None):
self.logger.error("Extraction failed for {}".format(name))
failed_result = ExtractFailedResult(
timestamp=datetime.datetime.now(), name=name, is_dir=is_dir, pair_id=pair_id
)
failed_result = ExtractFailedResult(timestamp=datetime.now(), name=name, is_dir=is_dir, pair_id=pair_id)
self.failed_queue.put(failed_result)

def __init__(self):
Expand All @@ -83,10 +81,12 @@ def run_init(self):

@overrides(AppProcess)
def run_cleanup(self):
assert self.__dispatch is not None
self.__dispatch.stop()

@overrides(AppProcess)
def run_loop(self):
assert self.__dispatch is not None
# Forward all the extract commands
try:
while True:
Expand All @@ -98,7 +98,7 @@ def run_loop(self):
# Report dispatch errors as failures so the controller
# can transition the file to EXTRACT_FAILED state
failed_result = ExtractFailedResult(
timestamp=datetime.datetime.now(),
timestamp=datetime.now(),
name=req.model_file.name,
is_dir=req.model_file.is_dir,
pair_id=req.pair_id,
Expand All @@ -109,7 +109,7 @@ def run_loop(self):

# Queue the latest status
statuses = self.__dispatch.status()
status_result = ExtractStatusResult(timestamp=datetime.datetime.now(), statuses=statuses)
status_result = ExtractStatusResult(timestamp=datetime.now(), statuses=statuses)
self.__status_result_queue.put(status_result)

time.sleep(ExtractProcess.__DEFAULT_SLEEP_INTERVAL_IN_SECS)
Expand Down
54 changes: 33 additions & 21 deletions src/python/lftp/lftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __spawn_process(self):
# Suppress DeprecationWarning from pexpect.spawn's internal forkpty call.
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*fork.*", category=DeprecationWarning)
self.__process = pexpect.spawn("/usr/bin/lftp", args, env=spawn_env)
self.__process = pexpect.spawn("/usr/bin/lftp", args, env=spawn_env) # type: ignore[arg-type]
self.__process.setwinsize(24, 10000)
self.__process.expect(self.__expect_pattern)
self.__setup()
Expand All @@ -122,7 +122,7 @@ def __restart_process(self):
self.__spawn_process()
# Replay cached settings
for setting, value in self.__settings_cache.items():
self.__run_command("set {} {}".format(setting, value))
self.__run_command("set {} {}".format(setting, value)) # type: ignore[arg-type]
self.__consecutive_timeouts = 0

def __setup(self):
Expand All @@ -139,7 +139,7 @@ def __setup(self):
# permission bits (e.g. 664 → remote) which would override our umask setting
self.__set(Lftp.__SET_SFTP_SET_PERMISSIONS, "false")

def with_check_process(method: Callable):
def with_check_process(method: Callable): # type: ignore[override]
"""
Decorator that checks for a valid process before executing
the decorated method. Attempts restart if process is dead.
Expand Down Expand Up @@ -181,7 +181,8 @@ def raise_pending_error(self):
raise LftpError(error)

@with_check_process
def __run_command(self, command: str):
def __run_command(self, command: str): # type: ignore[arg-type]
assert self.__process is not None
if self.__log_command_output:
self.logger.debug("command: {}".format(command.encode("utf8", "surrogateescape")))
self.__process.sendline(command)
Expand All @@ -204,14 +205,17 @@ def __run_command(self, command: str):

# Success — reset consecutive timeout counter
self.__consecutive_timeouts = 0
out = self.__process.before.decode("utf8", "replace")
before = self.__process.before
assert isinstance(before, bytes)
out = before.decode("utf8", "replace")
out = out.strip() # remove any CRs

if self.__log_command_output:
self.logger.debug("out ({} bytes):\n {}".format(len(out), out))
after_val = self.__process.after
after = (
self.__process.after.decode("utf8", "replace").strip()
if self.__process.after != pexpect.TIMEOUT
after_val.decode("utf8", "replace").strip() # type: ignore[union-attr]
if after_val != pexpect.TIMEOUT
else ""
)
self.logger.debug("after: {}".format(after))
Expand All @@ -227,13 +231,16 @@ def __run_command(self, command: str):
self.logger.warning("Lftp timeout while consuming error output")
self.__pending_error = error_out
return ""
out = self.__process.before.decode("utf8", "replace")
before = self.__process.before
assert isinstance(before, bytes)
out = before.decode("utf8", "replace")
out = out.strip() # remove any CRs
if self.__log_command_output:
self.logger.debug("retry out ({} bytes):\n {}".format(len(out), out))
after_val = self.__process.after
after = (
self.__process.after.decode("utf8", "replace").strip()
if self.__process.after != pexpect.TIMEOUT
after_val.decode("utf8", "replace").strip() # type: ignore[union-attr]
if after_val != pexpect.TIMEOUT
else ""
)
self.logger.debug("retry after: {}".format(after))
Expand Down Expand Up @@ -263,15 +270,15 @@ def __set(self, setting: str, value: str):
:return:
"""
self.__settings_cache[setting] = value
self.__run_command("set {} {}".format(setting, value))
self.__run_command("set {} {}".format(setting, value)) # type: ignore[arg-type]

def __get(self, setting: str) -> str:
"""
Get a setting from the lftp runtime
:param setting:
:return:
"""
out = self.__run_command("set -a | grep {}".format(setting))
out = self.__run_command("set -a | grep {}".format(setting)) # type: ignore[arg-type]
m = re.search("set {} (.*)".format(setting), out)
if not m or not m.group or not m.group(1):
raise LftpError("Failed to get setting '{}'. Output: '{}'".format(setting, out))
Expand Down Expand Up @@ -473,7 +480,7 @@ def status(self) -> list[LftpJobStatus] | None:
completion signals.
:return:
"""
out = self.__run_command("jobs -v")
out = self.__run_command("jobs -v") # type: ignore[arg-type]
try:
statuses = self.__job_status_parser.parse(out)
self.__consecutive_status_errors = 0
Expand All @@ -486,7 +493,7 @@ def status(self) -> list[LftpJobStatus] | None:
raise
return statuses

def queue(self, name: str, is_dir: bool, exclude_patterns: list = None):
def queue(self, name: str, is_dir: bool, exclude_patterns: list | None = None):
"""
Queues a job for download
This method may cause an exception to be generated in a later method call:
Expand Down Expand Up @@ -528,7 +535,7 @@ def escape(s: str) -> str:
)
command = " ".join(parts)
self.logger.info("queue command: %s", command)
self.__run_command(command)
self.__run_command(command) # type: ignore[arg-type]

def kill(self, name: str) -> bool:
"""
Expand All @@ -538,7 +545,11 @@ def kill(self, name: str) -> bool:
"""
# look for this name in the status list
job_to_kill = None
for status in self.status():
statuses = self.status()
if statuses is None:
self.logger.debug("Kill failed - status unavailable for job '{}'".format(name))
return False
Comment on lines +548 to +551
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

kill() now silently no-ops when status is unavailable.

At Line 549-551, returning False on status() is None is effectively ignored by the current caller (src/python/controller/controller.py Line 1057-1061), so kill failures can pass silently.

Proposed fix
         statuses = self.status()
         if statuses is None:
-            self.logger.debug("Kill failed - status unavailable for job '{}'".format(name))
-            return False
+            self.logger.warning("Kill failed - status unavailable for job '{}'".format(name))
+            raise LftpError("Kill failed - status unavailable")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
statuses = self.status()
if statuses is None:
self.logger.debug("Kill failed - status unavailable for job '{}'".format(name))
return False
statuses = self.status()
if statuses is None:
self.logger.warning("Kill failed - status unavailable for job '{}'".format(name))
raise LftpError("Kill failed - status unavailable")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/python/lftp/lftp.py` around lines 548 - 551, The kill() method currently
returns False when status() is None, which silently no-ops; update kill() in
src/python/lftp/lftp.py so that when statuses is None it logs an error
(self.logger.error) with the job name and then raises a runtime exception (e.g.,
RuntimeError("Kill failed - status unavailable for job '<name>'")) instead of
returning False; keep the existing debug/error message content and ensure the
exception includes the job name so callers (e.g., controller.py callers of
kill()) can detect and handle the failure.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning False is the correct behavior here. kill() returns bool indicating whether the job was found and killed. When status is unavailable
(transient parse error), returning False lets the caller retry on the next cycle. Raising an exception would crash the controller — the same pattern we fixed in
PR #296 (don't crash on transient failures). The caller (controller.py) already handles kill() returning False gracefully.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nitrobass24, understood — that's a clear design rationale. The bool return is the correct contract for kill(): False means "job not found or status temporarily unavailable, retry next cycle," not a hard failure. Raising an exception here would be inconsistent with the transient-failure tolerance pattern established in #296. My suggestion was mistaken in this context.


✏️ Learnings added
Learnt from: nitrobass24
Repo: nitrobass24/seedsync PR: 301
File: src/python/lftp/lftp.py:548-551
Timestamp: 2026-03-18T23:09:07.947Z
Learning: In seedsync (nitrobass24/seedsync), `Lftp.kill()` in `src/python/lftp/lftp.py` intentionally returns `False` (not raises) when `status()` returns `None` due to a transient parse error. Returning `False` allows the controller to retry on the next polling cycle without crashing. Do not suggest raising an exception here. The caller in `controller.py` handles `kill()` returning `False` gracefully. This pattern of returning False on transient failures (rather than raising) was established deliberately in PR `#296`.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: nitrobass24
Repo: nitrobass24/seedsync PR: 74
File: src/python/controller/controller.py:5-5
Timestamp: 2026-02-21T15:50:25.495Z
Learning: Maintain consistency by using typing.Dict, typing.List, and typing.Set across Python code. Do not suggest converting to built-in generic types (dict, list, set) on a per-file basis. If a change is needed, apply it project-wide or ensure any modernization decisions are deliberate and reviewed, rather than triggering mixed usage in individual files.

Learnt from: nitrobass24
Repo: nitrobass24/seedsync PR: 146
File: src/python/tests/unittests/test_controller/test_exclude_patterns.py:11-11
Timestamp: 2026-03-05T20:23:51.664Z
Learning: In this repository Ruff is not configured (no ruff.toml, .ruff.toml, or [tool.ruff] in pyproject.toml). Do not flag Ruff-specific rules (e.g., FBT003, ANN) for any Python files. Do not enforce Ruff rules unless explicitly configured. Also, the project uses positional boolean arguments for SystemFile constructor calls throughout; preserve this pattern unless there is a specific refactor. Apply this guidance to all Python files in the repo.

for status in statuses:
if status.name == name:
job_to_kill = status
break
Expand All @@ -550,10 +561,10 @@ def kill(self, name: str) -> bool:
# in this case the wrong job may be killed, there's nothing we can do about it
if job_to_kill.state == LftpJobStatus.State.RUNNING:
self.logger.debug("Killing running job '{}'...".format(name))
self.__run_command("kill {}".format(job_to_kill.id))
self.__run_command("kill {}".format(job_to_kill.id)) # type: ignore[arg-type]
elif job_to_kill.state == LftpJobStatus.State.QUEUED:
self.logger.debug("Killing queued job '{}'...".format(name))
self.__run_command("queue --delete {}".format(job_to_kill.id))
self.__run_command("queue --delete {}".format(job_to_kill.id)) # type: ignore[arg-type]
else:
raise NotImplementedError("Unsupported state {}".format(str(job_to_kill.state)))
return True
Expand All @@ -564,18 +575,19 @@ def kill_all(self):
:return:
"""
# empty the queue and kill running jobs
self.__run_command("queue -d *")
self.__run_command("kill all")
self.__run_command("queue -d *") # type: ignore[arg-type]
self.__run_command("kill all") # type: ignore[arg-type]

def exit(self):
"""
Exit the lftp instance. It cannot be used after being killed
:return:
"""
self.kill_all()
assert self.__process is not None
self.__process.sendline("exit")
self.__process.close(force=True)

# Mark decorators as static (must be at end of class)
# Source: https://stackoverflow.com/a/3422823
with_check_process = staticmethod(with_check_process)
with_check_process = staticmethod(with_check_process) # type: ignore[arg-type]
36 changes: 23 additions & 13 deletions src/python/ssh/sshcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,20 +293,24 @@ def __run_command(self, command: str, flags: str, args: str) -> bytes:
]
)
if i > 0:
before = sp.before.decode(errors="replace").strip() if sp.before != pexpect.EOF else ""
after = sp.after.decode(errors="replace").strip() if sp.after != pexpect.EOF else ""
before_val = sp.before
after_val = sp.after
before = before_val.decode(errors="replace").strip() if isinstance(before_val, bytes) else ""
after = after_val.decode(errors="replace").strip() if isinstance(after_val, bytes) else ""
self.logger.warning("Command failed: '{} - {}'".format(before, after))
if i == 1:
error_msg = "Unknown error"
if sp.before.decode(errors="replace").strip():
error_msg += " - " + sp.before.decode(errors="replace").strip()
before_val = sp.before
if isinstance(before_val, bytes) and before_val.decode(errors="replace").strip():
error_msg += " - " + before_val.decode(errors="replace").strip()
raise SshcpError(error_msg)
elif i == 3:
raise SshcpError("Bad hostname: {}".format(self.__host))
elif i in {2, 4}:
error_msg = "Connection refused by server"
if sp.before.decode(errors="replace").strip():
error_msg += " - " + sp.before.decode(errors="replace").strip()
before_val = sp.before
if isinstance(before_val, bytes) and before_val.decode(errors="replace").strip():
error_msg += " - " + before_val.decode(errors="replace").strip()
raise SshcpError(error_msg)
sp.sendline(self.__password)

Expand All @@ -321,23 +325,29 @@ def __run_command(self, command: str, flags: str, args: str) -> bytes:
timeout=self.__TIMEOUT_SECS,
)
if i > 0:
before = sp.before.decode(errors="replace").strip() if sp.before != pexpect.EOF else ""
after = sp.after.decode(errors="replace").strip() if sp.after != pexpect.EOF else ""
before_val = sp.before
after_val = sp.after
before = before_val.decode(errors="replace").strip() if isinstance(before_val, bytes) else ""
after = after_val.decode(errors="replace").strip() if isinstance(after_val, bytes) else ""
self.logger.warning("Command failed: '{} - {}'".format(before, after))
if i == 1:
raise SshcpError("Incorrect password")
elif i == 3:
raise SshcpError("Bad hostname: {}".format(self.__host))
elif i in {2, 4}:
error_msg = "Connection refused by server"
if sp.before.decode(errors="replace").strip():
error_msg += " - " + sp.before.decode(errors="replace").strip()
before_val = sp.before
if isinstance(before_val, bytes) and before_val.decode(errors="replace").strip():
error_msg += " - " + before_val.decode(errors="replace").strip()
raise SshcpError(error_msg)

# Capture output attributes while sp is still open (close can clear them)
out_before = sp.before.decode(errors="replace").strip() if sp.before != pexpect.EOF else ""
out_after = sp.after.decode(errors="replace").strip() if sp.after != pexpect.EOF else ""
out_raw = sp.before.replace(b"\r\n", b"\n").strip()
before_val = sp.before
after_val = sp.after
out_before = before_val.decode(errors="replace").strip() if isinstance(before_val, bytes) else ""
out_after = after_val.decode(errors="replace").strip() if isinstance(after_val, bytes) else ""
assert isinstance(before_val, bytes)
out_raw = before_val.replace(b"\r\n", b"\n").strip()

except pexpect.exceptions.TIMEOUT:
elapsed = time.time() - start_time
Expand Down
Loading
Loading