(feat): add actions for langfuse evaluators#593
(feat): add actions for langfuse evaluators#593narsimhaReddyJuspay wants to merge 1 commit intojuspay:releasefrom
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughAdds a Redis-backed async Changes
Sequence DiagramsequenceDiagram
participant Monitor as Score Monitor
participant Config as EVALUATOR_ACTIONS (Redis)
participant Executor as ActionExecutor
participant Action as OutcomeUpdateAction
participant DB as Lead Database
participant Alert as Slack Alert
Monitor->>Config: fetch EVALUATOR_ACTIONS()
Config-->>Monitor: action mappings
loop for each failing score
Monitor->>Executor: should_trigger(score, threshold)?
Executor-->>Monitor: True/False
alt True
Monitor->>DB: get_lead_by_call_id(call_sid)
DB-->>Monitor: lead (cached)
Monitor->>Executor: execute_action(type,cfg,call_sid,score,lead)
Executor->>Action: instantiate handler
Action->>Action: _extract_outcome(score)
Action->>DB: update_lead_call_completion_details(...)
DB-->>Action: success/failure
Action-->>Executor: result
Executor-->>Monitor: action_status (SUCCESS/FAILED/ERROR)
Monitor->>DB: refresh cached lead if SUCCESS
end
end
Monitor->>Alert: send_score_alert(..., action_status)
Alert-->>Monitor: delivered
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (6)
app/services/langfuse/tasks/score_monitor/actions.py (3)
43-56: Regex only handles one level of JSON nesting.The pattern
r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}"matches JSON objects with at most one level of nested braces. If the evaluator comment contains deeper nesting like{"result": {"details": {"outcome": "X"}}}, the regex will fail to capture the full object, potentially returning a partial match that won't parse or returning the wrong inner object.For the current use case (flat JSON with
actual_outcome), this works, but it's fragile if evaluator comment formats evolve.A more robust alternative would be to scan backwards for
{and attemptjson.loadson progressively larger substrings:♻️ More robust extraction approach
- # Find all JSON objects in the comment - # Use a greedy match to get the largest valid JSON at each position - pattern = r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}" - matches = re.findall(pattern, comment, re.DOTALL) - - if not matches: - return None - - # Try to parse each match from the end, return first valid one - for match in reversed(matches): - try: - return json.loads(match) - except json.JSONDecodeError: - continue + # Search backwards for JSON objects by finding '{' positions + # and attempting to parse from each one + for i in range(len(comment) - 1, -1, -1): + if comment[i] == '{': + try: + obj = json.loads(comment[i:]) + if isinstance(obj, dict): + return obj + except json.JSONDecodeError: + continue🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 43 - 56, The current regex-based extraction (pattern and re.findall on variable comment) only supports one level of brace nesting and can miss deeper JSON; replace it with a robust backward-scan that finds the last '{' positions in comment and for each candidate substring (from that position to end) attempts json.loads until one succeeds, returning the parsed object or None if none parse; update the code that currently uses pattern, matches and the reversed loop to this scan-and-try approach (use the same json.loads and comment variable so callers remain unchanged).
78-80: Useremoveprefixinstead ofreplacefor path parsing.
replace("$.", "")replaces all occurrences of"$."in the string, not just the leading prefix. While unlikely to cause issues with current usage,removeprefix(Python 3.9+) is semantically correct here.♻️ Proposed fix
- field = json_path.replace("$.", "") + field = json_path.removeprefix("$.")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 78 - 80, The parsing of the JSON path uses json_path.replace("$.", "") which will remove all occurrences of "$." instead of only a leading prefix; change this to use json_path.removeprefix("$.") when computing field (the variable named field) so only a leading "$." is dropped, then return data.get(field) as before; update the line that assigns field (and any similar occurrences) to use removeprefix.
216-239:should_triggerduplicatesScoreMonitor._is_below_threshold.This method is functionally identical to
_is_below_thresholdinscore.py(lines 185-208). Incheck_and_alert, scores are already filtered by_is_below_threshold, so callingshould_triggeragain (score.py line 955) is redundant — every score infailing_scoresalready passed the threshold check.Consider either:
- Removing the redundant
should_triggercall inscore.py, or- Having
ScoreMonitor._is_below_thresholddelegate toActionExecutor.should_triggerto centralize the logic.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 216 - 239, The should_trigger method in ActionExecutor duplicates ScoreMonitor._is_below_threshold and is redundantly called for items already filtered into failing_scores in check_and_alert; remove the extra call to ActionExecutor.should_trigger from ScoreMonitor.check_and_alert (so failing_scores are assumed already below threshold) and keep ActionExecutor.should_trigger for external use, or alternatively change ScoreMonitor._is_below_threshold to call ActionExecutor.should_trigger (centralizing logic) — update call sites accordingly to reference should_trigger or _is_below_threshold consistently and remove the duplicate threshold check to avoid double validation.app/services/langfuse/tasks/score_monitor/score.py (3)
954-955: Redundant threshold check —should_triggerre-checks what_is_below_thresholdalready verified.All scores in
failing_scoreshave already passed_is_below_threshold(line 834), which performs the identicalscore_value < thresholdcomparison. Theshould_triggercall here is alwaysTruefor these scores.If the intent is to use the same threshold for actions and alerts, the check is redundant. If actions might use different thresholds in the future, consider making that explicit rather than re-checking with the same value.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 954 - 955, The failing_scores loop is redundantly re-checking thresholds: scores have already been filtered by _is_below_threshold (used to build failing_scores), so calling executor.should_trigger(score, threshold) with the same threshold is unnecessary; either remove that second check and directly trigger actions/alerts for entries in failing_scores, or if you intend a separate action threshold, make that explicit by using a different config key (e.g., action_threshold) and pass that to executor.should_trigger — update references to evaluators_config, _is_below_threshold, failing_scores, and executor.should_trigger accordingly.
296-315: Leads are fetched twice: once in the pre-fetch loop (line 932) and again insidesend_score_alert(line 311).
send_score_alertindependently callsget_lead_by_call_id(call_sid)at line 311 to getrecording_url. The pre-fetchedcall_sid_to_leaddict is never passed tosend_score_alert, so every call_sid with an action config triggers two DB lookups.Consider passing the pre-fetched lead to
send_score_alertor refactoring it to accept an optionalrecording_urlparameter, so the same lead data serves both the action and the alert.Also applies to: 928-936
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 296 - 315, The code performs duplicate DB lookups for leads: pre-fetch builds call_sid_to_lead but send_score_alert(call_sid, ...) still calls get_lead_by_call_id(call_sid) to fetch recording_url; change send_score_alert to accept an optional lead or recording_url (e.g., add parameter recording_url=None or lead=None), update all callers (the pre-fetch loop that has call_sid_to_lead to pass the pre-fetched lead/recording_url into send_score_alert), and inside send_score_alert use the passed lead/recording_url if provided and only call get_lead_by_call_id when it is not supplied; remove the redundant database call path to eliminate the double lookup (refer to send_score_alert, get_lead_by_call_id, call_sid_to_lead, and recording_url).
967-973:call_sidmay beNonewhen passed toexecute_action.At line 943,
call_sidis set toNoneiftrace_idis missing or has no mapping. ThisNoneis then passed toexecutor.execute_action(call_sid=call_sid, ...)at line 970, which flows intoOutcomeUpdateAction.executeand ultimately callsget_lead_by_call_id(None).While the downstream code handles it gracefully (returns
None, leading to "No lead found" log +return False), it would be cleaner to skip action execution entirely whencall_sidis absent:♻️ Proposed guard
if executor.should_trigger(score, threshold): + if not call_sid: + logger.info( + f"No call_sid for trace {trace_id}, skipping action" + ) + action_status = "FAILED" + continue try:Note: This would need a slight restructuring since the code isn't in a loop that supports
continueat this level. A simpleif call_sid:guard around the action block would work.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 967 - 973, call_sid can be None before calling executor.execute_action, causing downstream OutcomeUpdateAction.execute to call get_lead_by_call_id(None); add a guard to skip executing actions when call_sid is falsy (e.g., if not call_sid: log/debug that no call_sid found and skip the action block) so that execute_action is only invoked when call_sid is present; update the code paths around the call to executor.execute_action and any related logging to reflect the early skip (refer to the variables call_sid, executor.execute_action, OutcomeUpdateAction.execute, and get_lead_by_call_id when locating the change).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Line 13: Remove the unused typing import "List" from the import statement in
this module (the line importing Any, Dict, List, Optional); keep the used types
(Any, Dict, Optional) so the file compiles and the linter no longer flags an
unused import in actions.py.
- Around line 262-273: The current code calls ActionType(action_type) which
raises ValueError for unknown values and gets caught by the broad exception,
hiding a config problem; change the logic in the method that looks up handlers
so you validate the enum first (e.g., check ActionType.__members__ or use a
small try/except specifically around ActionType(action_type) to handle
ValueError), log a clear "unknown/unsupported action type" message referencing
the action_type when invalid, then use the validated enum key to get
self._action_handlers and proceed to instantiate handler_class and call
handler.execute(call_sid, score, current_lead); keep the broad exception
handling only for runtime errors from handler.execute.
In `@app/services/langfuse/tasks/score_monitor/score.py`:
- Around line 927-936: The pre-fetch currently iterates trace_to_call_sid for
all traces causing unnecessary DB calls; change it to first build a set of
relevant call_sids (only those belonging to traces that have failing scores AND
have a matching evaluator action in action_configs when executor is set), then
iterate that set to call get_lead_by_call_id and populate call_sid_to_lead; use
only the call_sid in the loop (drop unused trace_id) and keep the existing
try/except logger.error behavior. This limits queries to only the
failing+configured traces and removes the B007 unused-variable warning.
---
Nitpick comments:
In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Around line 43-56: The current regex-based extraction (pattern and re.findall
on variable comment) only supports one level of brace nesting and can miss
deeper JSON; replace it with a robust backward-scan that finds the last '{'
positions in comment and for each candidate substring (from that position to
end) attempts json.loads until one succeeds, returning the parsed object or None
if none parse; update the code that currently uses pattern, matches and the
reversed loop to this scan-and-try approach (use the same json.loads and comment
variable so callers remain unchanged).
- Around line 78-80: The parsing of the JSON path uses json_path.replace("$.",
"") which will remove all occurrences of "$." instead of only a leading prefix;
change this to use json_path.removeprefix("$.") when computing field (the
variable named field) so only a leading "$." is dropped, then return
data.get(field) as before; update the line that assigns field (and any similar
occurrences) to use removeprefix.
- Around line 216-239: The should_trigger method in ActionExecutor duplicates
ScoreMonitor._is_below_threshold and is redundantly called for items already
filtered into failing_scores in check_and_alert; remove the extra call to
ActionExecutor.should_trigger from ScoreMonitor.check_and_alert (so
failing_scores are assumed already below threshold) and keep
ActionExecutor.should_trigger for external use, or alternatively change
ScoreMonitor._is_below_threshold to call ActionExecutor.should_trigger
(centralizing logic) — update call sites accordingly to reference should_trigger
or _is_below_threshold consistently and remove the duplicate threshold check to
avoid double validation.
In `@app/services/langfuse/tasks/score_monitor/score.py`:
- Around line 954-955: The failing_scores loop is redundantly re-checking
thresholds: scores have already been filtered by _is_below_threshold (used to
build failing_scores), so calling executor.should_trigger(score, threshold) with
the same threshold is unnecessary; either remove that second check and directly
trigger actions/alerts for entries in failing_scores, or if you intend a
separate action threshold, make that explicit by using a different config key
(e.g., action_threshold) and pass that to executor.should_trigger — update
references to evaluators_config, _is_below_threshold, failing_scores, and
executor.should_trigger accordingly.
- Around line 296-315: The code performs duplicate DB lookups for leads:
pre-fetch builds call_sid_to_lead but send_score_alert(call_sid, ...) still
calls get_lead_by_call_id(call_sid) to fetch recording_url; change
send_score_alert to accept an optional lead or recording_url (e.g., add
parameter recording_url=None or lead=None), update all callers (the pre-fetch
loop that has call_sid_to_lead to pass the pre-fetched lead/recording_url into
send_score_alert), and inside send_score_alert use the passed lead/recording_url
if provided and only call get_lead_by_call_id when it is not supplied; remove
the redundant database call path to eliminate the double lookup (refer to
send_score_alert, get_lead_by_call_id, call_sid_to_lead, and recording_url).
- Around line 967-973: call_sid can be None before calling
executor.execute_action, causing downstream OutcomeUpdateAction.execute to call
get_lead_by_call_id(None); add a guard to skip executing actions when call_sid
is falsy (e.g., if not call_sid: log/debug that no call_sid found and skip the
action block) so that execute_action is only invoked when call_sid is present;
update the code paths around the call to executor.execute_action and any related
logging to reflect the early skip (refer to the variables call_sid,
executor.execute_action, OutcomeUpdateAction.execute, and get_lead_by_call_id
when locating the change).
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
app/core/config/dynamic.pyapp/services/langfuse/tasks/score_monitor/actions.pyapp/services/langfuse/tasks/score_monitor/score.py
0d6e299 to
59f600c
Compare
|
@coderabbitai please review |
|
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
app/services/langfuse/tasks/score_monitor/score.py (1)
978-984:call_sidmay beNonewhen passed toexecute_action.At Line 981,
call_sidcan beNone(from Line 954's.get()). This propagates toOutcomeUpdateAction.execute()which callsget_lead_by_call_id(None). A guard was suggested at the root cause inactions.py(OutcomeUpdateAction.execute), but you could also short-circuit here:+ if not call_sid: + logger.warning( + f"No call_sid for trace {score.get('trace_id')}, skipping action" + ) + action_status = "FAILED" + else: success = await executor.execute_action( action_type=action_type, action_config=action, call_sid=call_sid, score=score, current_lead=current_lead, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 978 - 984, call_sid can be None (from the earlier .get()) and is currently passed into executor.execute_action which leads OutcomeUpdateAction.execute to call get_lead_by_call_id(None); guard against this by short-circuiting before calling executor.execute_action when call_sid is falsy (or explicitly None) — e.g., check the call_sid local variable and skip or log the action dispatch for that action_type; reference the call_sid variable and the executor.execute_action call, and note OutcomeUpdateAction.execute / get_lead_by_call_id as the downstream symbols to avoid invoking with None.
🧹 Nitpick comments (3)
app/services/langfuse/tasks/score_monitor/score.py (1)
960-1007: Consider extracting action processing into a helper method to reduce nesting.This block is nested 7 levels deep (
for→for→try→if→if→if→try), which makes the flow hard to follow. Extracting the action processing (Lines 960–1007) into a dedicated method like_process_action_for_score(executor, action_configs, evaluators_config, score, call_sid, call_sid_to_lead)would improve readability.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 960 - 1007, The action-processing block within the score monitoring loop is deeply nested; extract it into a helper function named something like _process_action_for_score(executor, action_configs, evaluators_config, score, call_sid, call_sid_to_lead) that encapsulates the logic currently around executor.should_trigger(...), executor.execute_action(...), action_status handling, and the post-success lead refresh via get_lead_by_call_id(call_sid); move exception handling for action execution and lead re-fetch into that helper and return action_status and any updated lead so the caller (the outer loop) only calls the helper and updates call_sid_to_lead if needed, preserving existing variable names (executor, action_configs, evaluators_config, score, call_sid, call_sid_to_lead) and log messages (logger.error) to retain behavior.app/services/langfuse/tasks/score_monitor/actions.py (2)
43-56: Regex only handles single-level nested JSON objects.The pattern
\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}will correctly match{"outcome": "X"}or{"a": {"b": "c"}}, but will fail on deeper nesting like{"a": {"b": {"c": "d"}}}. If evaluator comments may contain multi-level nested JSON, this will silently miss valid objects.If deeper nesting is a future possibility, consider a stack-based brace-matching approach or use a scanning parser instead of regex.
💡 Alternative: brace-matching extraction
def extract_json_from_end(comment: str) -> Optional[Dict[str, Any]]: if not comment: return None # Find all top-level JSON objects by brace matching (supports arbitrary nesting) candidates = [] i = len(comment) - 1 while i >= 0: if comment[i] == '}': depth = 0 end = i while i >= 0: if comment[i] == '}': depth += 1 elif comment[i] == '{': depth -= 1 if depth == 0: candidates.append(comment[i:end + 1]) break i -= 1 i -= 1 for candidate in candidates: try: return json.loads(candidate) except json.JSONDecodeError: continue if candidates: logger.warning( f"Found {len(candidates)} JSON-like patterns but none parsed successfully" ) return None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 43 - 56, The current regex-based extraction using the pattern variable (pattern = r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}") will fail on deeper-than-two-level nested JSON; replace the regex approach in this block with a brace-matching scanner that walks the comment string from the end, collects candidate JSON substrings by tracking brace depth, and then json.loads() each candidate in reverse order until one parses successfully (update the existing matches/reversed(matches) logic to use the new candidates list). Ensure the new logic handles empty comments, logs a warning if candidates were found but none parsed, and keep the function behavior of returning the first successfully parsed dict or None.
216-239:should_triggeris redundant with the upstream filtering inscore.py.Scores reaching the action-execution block (score.py Line 966) are already filtered by
_is_below_threshold(score.py Line 834) using the same threshold. Soexecutor.should_trigger(score, threshold)will always returnTruefor these scores.This isn't a bug — it's a defensive double-check — but it duplicates identical logic (
float(value) < threshold) fromScoreMonitor._is_below_threshold. If this is intentional as a safety guard, a brief comment noting that would help future readers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 216 - 239, The should_trigger method duplicates the same float(value) < threshold check already performed upstream by ScoreMonitor._is_below_threshold before actions are executed; update the should_trigger implementation (or its docstring) to explicitly state this is an intentional defensive guard against untrusted inputs coming into executor.should_trigger and that it may be redundant because ScoreMonitor._is_below_threshold already filters scores, so leave the existing logic but add a concise comment referencing ScoreMonitor._is_below_threshold to make the intent clear to future readers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Around line 64-80: The extract_field function currently returns
data.get(field) which can be non-string; change extract_field to coerce non-None
values to str (e.g., return None if missing else str(value)) so its return
matches the expected string downstream; update callers (notably _extract_outcome
and the call to update_lead_call_completion_details(outcome=...)) to rely on the
coerced string/None behavior from extract_field.
- Around line 101-124: The execute method may receive call_sid=None (propagated
from trace_to_call_sid.get(trace_id)), so avoid calling get_lead_by_call_id with
None; add an early guard at the start of ScoreMonitorOutcomeAction.execute to
check if call_sid is falsy/None and immediately return False (and optionally log
a warning) before attempting to call get_lead_by_call_id(call_sid), preserving
the existing behavior when a valid call_sid is provided.
---
Duplicate comments:
In `@app/services/langfuse/tasks/score_monitor/score.py`:
- Around line 978-984: call_sid can be None (from the earlier .get()) and is
currently passed into executor.execute_action which leads
OutcomeUpdateAction.execute to call get_lead_by_call_id(None); guard against
this by short-circuiting before calling executor.execute_action when call_sid is
falsy (or explicitly None) — e.g., check the call_sid local variable and skip or
log the action dispatch for that action_type; reference the call_sid variable
and the executor.execute_action call, and note OutcomeUpdateAction.execute /
get_lead_by_call_id as the downstream symbols to avoid invoking with None.
---
Nitpick comments:
In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Around line 43-56: The current regex-based extraction using the pattern
variable (pattern = r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}") will fail on
deeper-than-two-level nested JSON; replace the regex approach in this block with
a brace-matching scanner that walks the comment string from the end, collects
candidate JSON substrings by tracking brace depth, and then json.loads() each
candidate in reverse order until one parses successfully (update the existing
matches/reversed(matches) logic to use the new candidates list). Ensure the new
logic handles empty comments, logs a warning if candidates were found but none
parsed, and keep the function behavior of returning the first successfully
parsed dict or None.
- Around line 216-239: The should_trigger method duplicates the same
float(value) < threshold check already performed upstream by
ScoreMonitor._is_below_threshold before actions are executed; update the
should_trigger implementation (or its docstring) to explicitly state this is an
intentional defensive guard against untrusted inputs coming into
executor.should_trigger and that it may be redundant because
ScoreMonitor._is_below_threshold already filters scores, so leave the existing
logic but add a concise comment referencing ScoreMonitor._is_below_threshold to
make the intent clear to future readers.
In `@app/services/langfuse/tasks/score_monitor/score.py`:
- Around line 960-1007: The action-processing block within the score monitoring
loop is deeply nested; extract it into a helper function named something like
_process_action_for_score(executor, action_configs, evaluators_config, score,
call_sid, call_sid_to_lead) that encapsulates the logic currently around
executor.should_trigger(...), executor.execute_action(...), action_status
handling, and the post-success lead refresh via get_lead_by_call_id(call_sid);
move exception handling for action execution and lead re-fetch into that helper
and return action_status and any updated lead so the caller (the outer loop)
only calls the helper and updates call_sid_to_lead if needed, preserving
existing variable names (executor, action_configs, evaluators_config, score,
call_sid, call_sid_to_lead) and log messages (logger.error) to retain behavior.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
app/core/config/dynamic.pyapp/services/langfuse/tasks/score_monitor/actions.pyapp/services/langfuse/tasks/score_monitor/score.py
59f600c to
6cee410
Compare
|
@coderabbitai please review |
|
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/core/config/dynamic.py`:
- Around line 263-299: EVALUATOR_ACTIONS currently returns whatever
json.loads(config_value) yields which may be a list or primitive; validate that
the parsed value is a dict before returning and fail closed if not. After
json.loads in EVALUATOR_ACTIONS, check isinstance(parsed, dict) and if it is,
return it; otherwise log a clear error (including the parsed type and the raw
config_value) via logger.error and return {} so downstream callers (e.g., code
expecting .get()) won’t raise AttributeError.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
app/core/config/dynamic.pyapp/services/langfuse/tasks/score_monitor/actions.pyapp/services/langfuse/tasks/score_monitor/score.py
6cee410 to
6204f42
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (5)
app/services/langfuse/tasks/score_monitor/actions.py (2)
241-249: Minor:return score_value < thresholdinsidetrybody (Ruff TRY300)Same pattern as in
dynamic.py— the return belongs in anelseblock so only the float conversion is guarded.♻️ Proposed refactor
try: value = score.get("value") if value is None: return False score_value = float(value) - return score_value < threshold except (ValueError, TypeError): logger.warning(f"Invalid score value: {score.get('value')}") return False + else: + return score_value < threshold🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 241 - 249, The try block currently wraps both the float conversion and the comparison, which can hide logic errors; change it so only the conversion of score.get("value") to float is inside the try (e.g., try: score_value = float(value) except (ValueError, TypeError): logger.warning(...) return False), and perform the comparison return score_value < threshold in an else (or immediately after the try) using the same symbols (score.get("value"), score_value, threshold) so only the conversion is guarded and the return is outside the try body.
43-61:extract_json_from_endregex handles at most two levels of JSON nestingThe pattern
r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}"matches an object whose inner values contain at most one level of{}. For three or more levels (e.g.,{"a": {"b": {"c": 1}}}),re.findallreturns an inner sub-object instead of the outermost one. The current callers only embed flat objects ({"actual_outcome": "VOICEMAIL"}), so this is fine now, but the limitation is invisible to future contributors.Consider adding a docstring note about the nesting depth limit, or replacing the regex with an iterative bracket-balancing scan:
♻️ Bracket-balanced alternative (handles arbitrary nesting)
- # Find all JSON objects in the comment - # Use a greedy match to get the largest valid JSON at each position - pattern = r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}" - matches = re.findall(pattern, comment, re.DOTALL) - - if not matches: + # Find all top-level '{' positions and extract the balanced span + matches = [] + for i, ch in enumerate(comment): + if ch != "{": + continue + depth = 0 + for j in range(i, len(comment)): + if comment[j] == "{": + depth += 1 + elif comment[j] == "}": + depth -= 1 + if depth == 0: + matches.append(comment[i : j + 1]) + break + + if not matches: return None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/actions.py` around lines 43 - 61, The regex in extract_json_from_end (pattern variable) only supports up to two nested brace levels, so replace it with an iterative bracket-balanced scan that walks the string (preferably from the end) counting '{' and '}' until a balanced top-level object is found, then json.loads that substring; update error handling to catch json.JSONDecodeError and keep the existing logger.warning message; alternatively, if you want the minimal change, add a docstring to extract_json_from_end documenting the nesting limitation and the rationale. Ensure references to extract_json_from_end, pattern, and logger are preserved so reviewers can find the change.app/services/langfuse/tasks/score_monitor/score.py (2)
962-966:executor.should_triggeris alwaysTruehere — redundant check
failing_scoreson line 833-835 is already filtered withself._is_below_threshold(s, threshold), which uses identical logic (float(value) < threshold). Every score in the loop has already passed this check with the samethreshold, soexecutor.should_triggerunconditionally returnsTrue.♻️ Proposed simplification
threshold = evaluators_config.get(evaluator_name, 5) - if executor.should_trigger(score, threshold): - try: + try:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 962 - 966, The loop calls executor.should_trigger(score, threshold) after building failing_scores using self._is_below_threshold(s, threshold) which uses identical float(value) < threshold logic, making the check redundant; remove the executor.should_trigger(...) condition or move the executor.should_trigger check into the failing_scores filter so you either (A) just rely on failing_scores and drop the executor.should_trigger branch where evaluator_name/action_config are handled, or (B) change the failing_scores computation to call executor.should_trigger(s, threshold) (instead of self._is_below_threshold) so the executor's custom logic is used; update references to failing_scores, _is_below_threshold, executor.should_trigger, evaluator_name, action_config, evaluators_config and score accordingly.
309-315:send_score_alertissues a fresh DB query forrecording_urleven when the lead is already incall_sid_to_leadWith the new pre-fetch cache (
call_sid_to_lead) and the re-fetch after a successful action (lines 991-997), a single call_sid can now trigger up to threeget_lead_by_call_idcalls per alert cycle. Thesend_score_alertmethod has no access to the cache, so it always queries the DB again.Consider passing the
recording_url(or the cached lead) tosend_score_alertso the method only falls back to a DB query when no cached value is available.♻️ Sketch of the change
async def send_score_alert( self, evaluator_name: str, score: Dict[str, Any], trace_details: Optional[Dict[str, Any]] = None, action_status: Optional[str] = None, + cached_lead: Optional[Any] = None, ) -> bool: ... if trace_details: ... if call_sid: + lead = cached_lead + if lead is None: try: lead = await get_lead_by_call_id(call_sid) except Exception as e: logger.error(f"Error querying database for recording_url: {e}") if lead and lead.recording_url: recording_url = lead.recording_urlThen at the call site (line 1010), pass
cached_lead=call_sid_to_lead.get(call_sid).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/langfuse/tasks/score_monitor/score.py` around lines 309 - 315, send_score_alert currently always calls get_lead_by_call_id causing redundant DB queries; modify send_score_alert to accept an optional cached_lead (or recording_url) parameter and use cached_lead.recording_url (or the passed recording_url) if present before falling back to calling get_lead_by_call_id. Update the place that invokes send_score_alert to pass cached_lead=call_sid_to_lead.get(call_sid) (or cached recording_url) so the DB query only runs when no cached value exists; keep existing exception handling for the fallback query.app/core/config/dynamic.py (1)
295-306: Minor:return parsedinsidetrybody (Ruff TRY300)Moving the success return to an
elseblock makes the try/except intent clearer — only the statement that can actually raiseJSONDecodeErrorstays insidetry.♻️ Proposed refactor
try: parsed = json.loads(config_value) # Ensure the parsed value is a dict, not a list or primitive if not isinstance(parsed, dict): logger.error( f"EVALUATOR_ACTIONS config must be a JSON object, got {type(parsed).__name__}" ) return {} - return parsed except json.JSONDecodeError as e: logger.error(f"Failed to parse EVALUATOR_ACTIONS config: {e}") return {} + else: + return parsed🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/core/config/dynamic.py` around lines 295 - 306, Refactor the try/except so only json.loads(config_value) is inside the try: block and the successful-return logic is inside an else: block; specifically, keep try: parsed = json.loads(config_value) and the except json.JSONDecodeError handler logging "Failed to parse EVALUATOR_ACTIONS config: {e}" and returning {}, then in the else: check isinstance(parsed, dict) (using the same logger/error text about "EVALUATOR_ACTIONS config must be a JSON object"), return {} on failure, and finally return parsed on success (referencing variables config_value, parsed and logger).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@app/core/config/dynamic.py`:
- Around line 295-306: Refactor the try/except so only json.loads(config_value)
is inside the try: block and the successful-return logic is inside an else:
block; specifically, keep try: parsed = json.loads(config_value) and the except
json.JSONDecodeError handler logging "Failed to parse EVALUATOR_ACTIONS config:
{e}" and returning {}, then in the else: check isinstance(parsed, dict) (using
the same logger/error text about "EVALUATOR_ACTIONS config must be a JSON
object"), return {} on failure, and finally return parsed on success
(referencing variables config_value, parsed and logger).
In `@app/services/langfuse/tasks/score_monitor/actions.py`:
- Around line 241-249: The try block currently wraps both the float conversion
and the comparison, which can hide logic errors; change it so only the
conversion of score.get("value") to float is inside the try (e.g., try:
score_value = float(value) except (ValueError, TypeError): logger.warning(...)
return False), and perform the comparison return score_value < threshold in an
else (or immediately after the try) using the same symbols (score.get("value"),
score_value, threshold) so only the conversion is guarded and the return is
outside the try body.
- Around line 43-61: The regex in extract_json_from_end (pattern variable) only
supports up to two nested brace levels, so replace it with an iterative
bracket-balanced scan that walks the string (preferably from the end) counting
'{' and '}' until a balanced top-level object is found, then json.loads that
substring; update error handling to catch json.JSONDecodeError and keep the
existing logger.warning message; alternatively, if you want the minimal change,
add a docstring to extract_json_from_end documenting the nesting limitation and
the rationale. Ensure references to extract_json_from_end, pattern, and logger
are preserved so reviewers can find the change.
In `@app/services/langfuse/tasks/score_monitor/score.py`:
- Around line 962-966: The loop calls executor.should_trigger(score, threshold)
after building failing_scores using self._is_below_threshold(s, threshold) which
uses identical float(value) < threshold logic, making the check redundant;
remove the executor.should_trigger(...) condition or move the
executor.should_trigger check into the failing_scores filter so you either (A)
just rely on failing_scores and drop the executor.should_trigger branch where
evaluator_name/action_config are handled, or (B) change the failing_scores
computation to call executor.should_trigger(s, threshold) (instead of
self._is_below_threshold) so the executor's custom logic is used; update
references to failing_scores, _is_below_threshold, executor.should_trigger,
evaluator_name, action_config, evaluators_config and score accordingly.
- Around line 309-315: send_score_alert currently always calls
get_lead_by_call_id causing redundant DB queries; modify send_score_alert to
accept an optional cached_lead (or recording_url) parameter and use
cached_lead.recording_url (or the passed recording_url) if present before
falling back to calling get_lead_by_call_id. Update the place that invokes
send_score_alert to pass cached_lead=call_sid_to_lead.get(call_sid) (or cached
recording_url) so the DB query only runs when no cached value exists; keep
existing exception handling for the fallback query.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
app/core/config/dynamic.pyapp/services/langfuse/tasks/score_monitor/actions.pyapp/services/langfuse/tasks/score_monitor/score.py
0d13f79 to
85b7a76
Compare
- added new actions for evaluators - added new config for actions of evaluators - added a basic update outcome action for now - they will work with same threshold as the alerts
85b7a76 to
4bc6d39
Compare
There was a problem hiding this comment.
Pull request overview
Adds an evaluator-action framework to the Langfuse score monitor so low evaluator scores can automatically run follow-up actions (initially: outcome correction) and surface step-by-step action status in Slack alerts.
Changes:
- Introduces evaluator-triggered actions (
OutcomeUpdateAction) with step tracking (ActionResult) and anActionExecutor. - Integrates action execution into the Langfuse score monitor loop (execute actions before sending failing-score alerts) and enhances Slack alerts with a full-width action status section.
- Adds DB support for cancelling pending retry leads by
request_id, plus a new dynamic config getter forEVALUATOR_ACTIONSand accompanying documentation.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
docs/EVALUATOR_ACTIONS.md |
New documentation describing evaluator actions, config schema, flow, and alert/logging behavior. |
app/services/slack/alert.py |
Adds action_status rendering as a separate full-width Slack block and improves mention formatting. |
app/services/langfuse/tasks/score_monitor/score.py |
Executes configured actions prior to Slack alerts; includes action status + lead id in alert payload. |
app/services/langfuse/tasks/score_monitor/actions.py |
New evaluator action framework (outcome update, retry cancellation, Nautilus webhook). |
app/database/queries/breeze_buddy/lead_call_tracker.py |
Adds SQL to cancel pending retries by request_id. |
app/database/accessor/breeze_buddy/lead_call_tracker.py |
Adds accessor to execute the new cancellation query and return count cancelled. |
app/core/config/dynamic.py |
Adds dynamic config getter EVALUATOR_ACTIONS() for Redis/env-driven action configuration. |
.env.example |
Adds an EVALUATOR_ACTIONS example. |
| # Step 2: Cancel pending retries if enabled and request_id exists | ||
| if self.cancel_retries: | ||
| if current_lead.request_id: | ||
| try: | ||
| cancelled = await cancel_pending_retries_by_request_id( | ||
| request_id=current_lead.request_id, | ||
| reason=f"outcome_corrected_to_{new_outcome}", | ||
| ) | ||
| result.canceled_count = cancelled |
There was a problem hiding this comment.
cancel_pending_retries_by_request_id() is executed for any outcome change when cancel_retries is enabled; there’s no check that the corrected outcome is a terminal state. Since this can stop future attempts, consider gating this step on a terminal-outcome allowlist (or a config flag like cancel_only_if_terminal) to avoid cancelling retries for non-terminal corrections.
| from app.core.logger.context import set_log_context, update_log_context, clear_log_context | ||
|
|
||
| # At start of action | ||
| set_log_context(call_sid=call_sid, evaluator_action=True) |
There was a problem hiding this comment.
The logging section shows set_log_context(call_sid=call_sid, evaluator_action=True), but the action code currently only sets call_sid. If evaluator_action is intended to be a consistent log field for filtering, either set it in code or adjust the docs example so it matches what’s actually emitted.
| set_log_context(call_sid=call_sid, evaluator_action=True) | |
| set_log_context(call_sid=call_sid) |
| db_result = await update_lead_call_completion_details( | ||
| id=current_lead.id, | ||
| outcome=new_outcome, | ||
| meta_data={ | ||
| "outcome_corrected_by": "evaluator_action", | ||
| "evaluator_name": score.get("name"), | ||
| "previous_outcome": current_lead.outcome, | ||
| "correction_timestamp": datetime.now( | ||
| timezone.utc | ||
| ).isoformat(), | ||
| }, | ||
| ) | ||
|
|
There was a problem hiding this comment.
update_lead_call_completion_details() overwrites the entire meta_data column (the underlying query sets "meta_data" = $n). Passing a fresh dict here will drop existing metadata like conversation_id that you read earlier from current_lead.meta_data. Merge the existing meta_data with the new correction fields (or change the query to jsonb-merge) so prior keys are preserved.
| db_result = await update_lead_call_completion_details( | |
| id=current_lead.id, | |
| outcome=new_outcome, | |
| meta_data={ | |
| "outcome_corrected_by": "evaluator_action", | |
| "evaluator_name": score.get("name"), | |
| "previous_outcome": current_lead.outcome, | |
| "correction_timestamp": datetime.now( | |
| timezone.utc | |
| ).isoformat(), | |
| }, | |
| ) | |
| # Merge existing meta_data with correction details so we don't drop prior keys | |
| existing_meta_data = current_lead.meta_data or {} | |
| merged_meta_data = dict(existing_meta_data) | |
| merged_meta_data.update( | |
| { | |
| "outcome_corrected_by": "evaluator_action", | |
| "evaluator_name": score.get("name"), | |
| "previous_outcome": current_lead.outcome, | |
| "correction_timestamp": datetime.now( | |
| timezone.utc | |
| ).isoformat(), | |
| } | |
| ) | |
| db_result = await update_lead_call_completion_details( | |
| id=current_lead.id, | |
| outcome=new_outcome, | |
| meta_data=merged_meta_data, | |
| ) |
| def _to_title_case(key: str) -> str: | ||
| """ | ||
| Convert snake_case key to Title Case for display. | ||
|
|
||
| Examples: | ||
| update_in_db -> Update In Db | ||
| send_nautilus_webhook -> Send Nautilus Webhook | ||
| cancel_retries -> Cancel Retries | ||
| """ | ||
| return key.replace("_", " ").title() | ||
|
|
||
|
|
There was a problem hiding this comment.
_to_title_case() is not referenced anywhere in this module, so it’s dead code right now. Removing it (or using it for step-name rendering) will keep the new action framework easier to maintain.
| def _to_title_case(key: str) -> str: | |
| """ | |
| Convert snake_case key to Title Case for display. | |
| Examples: | |
| update_in_db -> Update In Db | |
| send_nautilus_webhook -> Send Nautilus Webhook | |
| cancel_retries -> Cancel Retries | |
| """ | |
| return key.replace("_", " ").title() |
| Format: JSON string stored in Redis | ||
| { | ||
| "<VOICEMAIL_EVALUATOR_NAME>": { | ||
| "action_type": "outcome_update", | ||
| "action": { | ||
| "outcome": "VOICEMAIL", | ||
| "require_current_outcome": ["BUSY"] | ||
| } | ||
| }, | ||
| "<OUTCOME_CORRECTOR_NAME>": { | ||
| "action_type": "outcome_update", | ||
| "action": { | ||
| "outcome_key": "$.actual_outcome" | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Config options: | ||
| - outcome: Direct outcome value (e.g., "VOICEMAIL") - use for simple cases | ||
| - outcome_key: JSON path to extract from JSON at end of comment (e.g., "$.actual_outcome") | ||
| - require_current_outcome: (optional) Only update if current outcome is in this list | ||
|
|
||
| Trigger logic: Action triggers when score < threshold (from LANGFUSE_EVALUATORS config) |
There was a problem hiding this comment.
The EVALUATOR_ACTIONS() docstring describes a config shape using an "action" key, but the action implementation reads action_config/action_steps (and the new docs file also documents that newer format). Please update this docstring (and any examples) to match the supported schema to avoid misconfiguring Redis/env values.
| Format: JSON string stored in Redis | |
| { | |
| "<VOICEMAIL_EVALUATOR_NAME>": { | |
| "action_type": "outcome_update", | |
| "action": { | |
| "outcome": "VOICEMAIL", | |
| "require_current_outcome": ["BUSY"] | |
| } | |
| }, | |
| "<OUTCOME_CORRECTOR_NAME>": { | |
| "action_type": "outcome_update", | |
| "action": { | |
| "outcome_key": "$.actual_outcome" | |
| } | |
| } | |
| } | |
| Config options: | |
| - outcome: Direct outcome value (e.g., "VOICEMAIL") - use for simple cases | |
| - outcome_key: JSON path to extract from JSON at end of comment (e.g., "$.actual_outcome") | |
| - require_current_outcome: (optional) Only update if current outcome is in this list | |
| Trigger logic: Action triggers when score < threshold (from LANGFUSE_EVALUATORS config) | |
| Format: JSON string stored in Redis. | |
| Example: | |
| { | |
| "<VOICEMAIL_EVALUATOR_NAME>": { | |
| "action_type": "outcome_update", | |
| "action_config": { | |
| "outcome": "VOICEMAIL", | |
| "require_current_outcome": ["BUSY"] | |
| } | |
| }, | |
| "<OUTCOME_CORRECTOR_NAME>": { | |
| "action_type": "outcome_update", | |
| "action_config": { | |
| "outcome_key": "$.actual_outcome" | |
| } | |
| } | |
| } | |
| Schema: | |
| - <EVALUATOR_NAME>: { | |
| "action_type": "<type>", | |
| "action_config": { ... }, # single-step configuration | |
| "action_steps": [ # optional multi-step configuration | |
| { | |
| "action_type": "<type>", | |
| "action_config": { ... } | |
| }, | |
| ... | |
| ] | |
| } | |
| action_config options for outcome_update: | |
| - outcome: Direct outcome value (e.g., "VOICEMAIL") - use for simple cases | |
| - outcome_key: JSON path to extract from JSON at end of comment (e.g., "$.actual_outcome") | |
| - require_current_outcome: (optional) Only update if current outcome is in this list | |
| Trigger logic: Action triggers when score < threshold (from LANGFUSE_EVALUATORS config). |
| REDIS_CLUSTER_NODES=redis1:6379,redis2:6380,redis3:6381 | ||
|
|
||
| LANGFUSE_EVALUATORS="breeze buddy outcome correctness, latency evaluator" # name of the evaluator | ||
| EVALUATOR_ACTIONS='{"OUTCOME MISMATCH": {"action_type": "outcome_update", "action": {"outcome_key": "$.actual_outcome"}}}' |
There was a problem hiding this comment.
The .env.example EVALUATOR_ACTIONS example uses an "action" key, but OutcomeUpdateAction currently looks for action_config (and action_steps). As written, copying this example into env/Redis will result in actions not being able to extract outcome/outcome_key. Please update the example to the supported schema (or add backward-compatible parsing in the action code).
| EVALUATOR_ACTIONS='{"OUTCOME MISMATCH": {"action_type": "outcome_update", "action": {"outcome_key": "$.actual_outcome"}}}' | |
| EVALUATOR_ACTIONS='{"OUTCOME MISMATCH": {"action_type": "outcome_update", "action_config": {"outcome_key": "$.actual_outcome"}}}' |
| When an evaluator score is below threshold: | ||
| 1. **Update DB** - Correct the outcome in `lead_call_tracker` | ||
| 2. **Cancel Retries** - Stop pending calls for terminal outcomes | ||
| 3. **Send Webhook** - Notify Nautilus to update Shopify tags | ||
| 4. **Send Slack Alert** - Notify team with detailed action status |
There was a problem hiding this comment.
The doc says retries are cancelled only “for terminal outcomes”, but the current implementation cancels pending retries whenever cancel_retries is enabled and request_id exists, regardless of the corrected outcome. Either update the docs to reflect the actual behavior or add a terminal-outcome check in the action logic so the behavior matches the described flow.
| # Extract action_config values (behavior control) | ||
| action_config = config.get("action_config", {}) | ||
| self.outcome = action_config.get("outcome") | ||
| self.outcome_key = action_config.get("outcome_key") | ||
| self.require_current_outcome = action_config.get("require_current_outcome") | ||
| self.disallowed_outcome_changes = action_config.get( | ||
| "disallowed_outcome_changes", {} | ||
| ) | ||
|
|
||
| # Extract action_steps (execution steps) | ||
| action_steps = config.get("action_steps", {}) | ||
| self.update_in_db = action_steps.get("update_in_db", True) | ||
| self.send_nautilus_webhook = action_steps.get("send_nautilus_webhook", True) | ||
| self.cancel_retries = action_steps.get("cancel_retries", True) | ||
|
|
||
| # Store all action_steps for generic step tracking | ||
| self.action_steps = action_steps |
There was a problem hiding this comment.
OutcomeUpdateAction only reads config["action_config"] and config["action_steps"], but other parts of this PR (e.g., EVALUATOR_ACTIONS() docstring / .env.example) use an "action" key. With the current implementation, configs using "action" will result in self.outcome/self.outcome_key being unset and the action will always fail to extract an outcome. Consider normalizing config in __init__ (support both action and action_config as aliases, and default action_steps to all-true).
| if not reporting_webhook_url: | ||
| logger.warning( | ||
| "[EVALUATOR_ACTION] NAUTILUS_WEBHOOK SKIPPED - No reporting_webhook_url in lead payload" | ||
| ) | ||
| return False |
There was a problem hiding this comment.
When reporting_webhook_url is missing, _send_nautilus_webhook() returns False, which is later interpreted as a webhook failure (nautilus_webhook = "FAILED") rather than a skip. This makes the overall action fail even if the DB update succeeds. Treat a missing URL as SKIPPED (or return an explicit status) so the action result and Slack status reflect the intended behavior.
Summary by CodeRabbit
New Features
Bug Fixes / Reliability
Behavior