diff --git a/src/constraint_handler/PropagatorConstants.py b/src/constraint_handler/PropagatorConstants.py index d53aa194..668f0cea 100644 --- a/src/constraint_handler/PropagatorConstants.py +++ b/src/constraint_handler/PropagatorConstants.py @@ -1,5 +1,7 @@ import enum -from typing import Literal +from typing import Any, Literal + +from clingo import Symbol from constraint_handler.schemas.warning import Warning @@ -7,8 +9,6 @@ DEFAULT_DECISION_LEVEL: Literal[-1] = -1 -FALSE_ASSIGNMENTS: Literal["__FALSE_ASSIGNMENTS__"] = "__FALSE_ASSIGNMENTS__" - ENSURE_VAR_NAME: Literal["__ensure__"] = "__ensure__" EXECUTION_INPUT: Literal["execution_input"] = "execution_input" EXECUTION_OUTPUT: Literal["execution_output"] = "execution_output" @@ -57,3 +57,4 @@ class NoValueSet(Exception): type propagator_warning_t = list[Warning] +type evaluations_type = dict[Symbol, Any | set[Any] | dict[Any, Any]] diff --git a/src/constraint_handler/PropagatorVariables.py b/src/constraint_handler/PropagatorVariables.py index 7d6ff265..311aaa51 100644 --- a/src/constraint_handler/PropagatorVariables.py +++ b/src/constraint_handler/PropagatorVariables.py @@ -17,9 +17,9 @@ DEFAULT_DECISION_LEVEL, EXECUTION_INPUT, EXECUTION_OUTPUT, - FALSE_ASSIGNMENTS, EvaluationResult, ValueStatus, + evaluations_type, propagator_warning_t, ) @@ -79,11 +79,11 @@ def vars(self) -> set[clingo.Symbol]: ... @abstractmethod def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] + self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any] ) -> EvaluationResult: ... @abstractmethod - def add_self_to_dict(self, d: dict[clingo.Symbol, Any | set[Any] | dict[Any, Any]]) -> None: ... + def add_self_to_evaluations(self, e: Evaluations) -> None: ... class VariableValue: @@ -116,9 +116,7 @@ def __init__(self, expr: expression.Expr, lit: int): self.errors: propagator_warning_t = [] - def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] - ) -> bool: + def evaluate(self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any]) -> bool: """ Evaluate the expression @@ -147,23 +145,28 @@ def evaluate( return True for var in self.vars(): - if var not in evaluations and var not in evaluations[FALSE_ASSIGNMENTS]: + # If a variable that exists is not yet assigned(A value or False assignment), then we cannot evaluate yet + if not evaluations.var_usable(var): # can't evaluate yet # value should not be set yet assert self.value == ValueStatus.NOT_SET return False - self.value, errors = evaluator.evaluate_expr(self.expr, env, evaluations) + self.value, errors = evaluator.evaluate_expr(self.expr, env, evaluations.evaluations) for error, msg in errors: self.errors.append(warning.Warning(error, (), repr(msg))) if ( - self.value == expression.Bad.bad - or (isinstance(self.value, frozenset) and expression.Bad.bad in self.value) + self.value == expression.Bad.bad # ty:ignore[unresolved-attribute] + or ( + isinstance(self.value, frozenset) and expression.Bad.bad in self.value + ) # ty:ignore[unresolved-attribute] or ( isinstance(self.value, dict) - and (expression.Bad.bad in self.value.values() or expression.Bad.bad in self.value.keys()) + and ( + expression.Bad.bad in self.value.values() or expression.Bad.bad in self.value.keys() + ) # ty:ignore[unresolved-attribute] ) ): self.errors.append( @@ -232,7 +235,7 @@ def __eq__(self, other) -> bool: return self.expr == other.expr def __hash__(self) -> int: - return hash(str(self.expr)) + return hash((str(self.expr), self.literal)) def __str__(self) -> str: return f"VariableValue({self.expr}, {self.value})" @@ -269,9 +272,7 @@ def __init__(self, op: expression.Operator, args: list[expression.Expr], literal self.errors: propagator_warning_t = [] - def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] - ) -> bool: + def evaluate(self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any]) -> bool: """ Evaluate the expression @@ -286,17 +287,21 @@ def evaluate( if not ctl.assignment.is_true(self.literal): return False myprint(f"Evaluating {self.op}({self.args})") - value, errors = evaluator.evaluate_expr(expression.Operation(self.op, self.args), env, evaluations) + value, errors = evaluator.evaluate_expr(expression.Operation(self.op, self.args), env, evaluations.evaluations) self.value = value for error, msg in errors: self.errors.append(warning.Warning(error, (), repr(msg))) if ( - self.value == expression.Bad.bad - or (isinstance(self.value, frozenset) and expression.Bad.bad in self.value) + self.value == expression.Bad.bad # ty:ignore[unresolved-attribute] + or ( + isinstance(self.value, frozenset) and expression.Bad.bad in self.value + ) # ty:ignore[unresolved-attribute] or ( isinstance(self.value, dict) - and (expression.Bad.bad in self.value.values() or expression.Bad.bad in self.value.keys()) + and ( + expression.Bad.bad in self.value.values() or expression.Bad.bad in self.value.keys() + ) # ty:ignore[unresolved-attribute] ) ): self.errors.append( @@ -396,9 +401,7 @@ def has_domain(self) -> bool: """ return True - def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] - ) -> EvaluationResult: + def evaluate(self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any]) -> EvaluationResult: """ Evaluate the expression and return a tuple (changed, conflict). changed is True if the value has changed. @@ -485,13 +488,13 @@ def reset(self, dl: int) -> None: self.value = ValueStatus.NOT_SET self.decision_level = DEFAULT_DECISION_LEVEL - def add_self_to_dict(self, d: dict[clingo.Symbol, Any | set[Any] | dict[Any, Any]]) -> None: + def add_self_to_evaluations(self, e: Evaluations) -> None: """ Does nothing, as ensure variables are not added to the evaluation dictionary. This is here for compatibility with the VariableType protocol. Args: - d: Dictionary to update. + e: Evaluations object to update. """ return @@ -607,9 +610,7 @@ def has_domain(self) -> bool: """ return len(self.expressions) > 0 - def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] - ) -> EvaluationResult: + def evaluate(self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any]) -> EvaluationResult: """ Evaluate the expression and return an EvaluationResult. """ @@ -634,6 +635,8 @@ def evaluate( # some values are unassigned # so we cannot determine the value yet val = [ValueStatus.NOT_SET] + self.decision_level = ctl.assignment.decision_level + return EvaluationResult.NOT_CHANGED # if at least one domain lit is true, then a value MUST be chosen elif any(ctl.assignment.is_true(domain_lit) for domain_lit in self.domain_literals): @@ -706,21 +709,21 @@ def reset(self, dl: int) -> None: self.value = ValueStatus.NOT_SET self.errors = [] - def add_self_to_dict(self, d: dict[clingo.Symbol, Any | set[Any] | dict[Any, Any]]) -> None: + def add_self_to_evaluations(self, e: Evaluations) -> None: """ Add this variable's value to the evaluation dictionary. Args: - d: Dictionary to update. + e: Evaluations object to update. """ value = self.get_value() if value == ValueStatus.NOT_SET: return elif value == ValueStatus.ASSIGNMENT_IS_FALSE: - d[FALSE_ASSIGNMENTS].append(self.var) # type: ignore + e.false_assignments.append(self.var) # type: ignore return - d[self.var] = value + e.evaluations[self.var] = value def __eq__(self, other) -> bool: if not isinstance(other, Variable): @@ -824,9 +827,7 @@ def vars(self) -> set[clingo.Symbol]: vars.update(arg.vars()) return vars - def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] - ) -> bool: + def evaluate(self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any]) -> bool: """Evaluate the expression and return True if the value has changed.""" changed = False for arg in self.values: @@ -975,9 +976,7 @@ def vars(self) -> set[clingo.Symbol]: """ return self.expressions.vars() - def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] - ) -> EvaluationResult: + def evaluate(self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any]) -> EvaluationResult: """ Evaluate the expression and return an EvaluationResult. """ @@ -994,11 +993,6 @@ def evaluate( # Assignment is false, so value is set to false assignment self.value = ValueStatus.ASSIGNMENT_IS_FALSE self.decision_level = ctl.assignment.decision_level - self.errors.append( - warning.Warning( - warning.ExpressionWarning.syntaxError, (self.var,), "Set declaration is False" - ) # ty:ignore[unresolved-attribute] - ) return EvaluationResult.CHANGED changed = self.expressions.evaluate(evaluations, ctl, env) @@ -1026,21 +1020,21 @@ def reset(self, dl: int) -> None: self.value = ValueStatus.NOT_SET self.errors = [] - def add_self_to_dict(self, d: dict[clingo.Symbol, Any | set[Any] | dict[Any, Any]]) -> None: + def add_self_to_evaluations(self, e: Evaluations) -> None: """ Add this variable's value into an output dictionary. Args: - d: Dictionary to update. + e: Evaluations object to update. """ value = self.get_value() if value == ValueStatus.NOT_SET: return elif value == ValueStatus.ASSIGNMENT_IS_FALSE: - d[FALSE_ASSIGNMENTS].append(self.var) # type: ignore + e.false_assignments.append(self.var) # type: ignore return - d[self.var] = value + e.evaluations[self.var] = value def __eq__(self, value) -> bool: if not isinstance(value, SetVariable): @@ -1216,9 +1210,7 @@ def vars(self) -> set[clingo.Symbol]: vars.update(key.vars()) return vars - def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] - ) -> EvaluationResult: + def evaluate(self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any]) -> EvaluationResult: """ Evaluate all values in the dictionary and return (changed, conflict). For DictVariable, conflict should never occur. @@ -1236,9 +1228,6 @@ def evaluate( elif ctl.assignment.is_false(self.literal): self.value = ValueStatus.ASSIGNMENT_IS_FALSE self.decision_level = ctl.assignment.decision_level - self.errors.append( - warning.Warning(warning.ExpressionWarning.syntaxError, (self.var,), "Dict declaration is False") # type: ignore[unresolved-attribute] - ) return EvaluationResult.CHANGED changed = False @@ -1272,21 +1261,21 @@ def reset(self, dl: int) -> None: self.value = ValueStatus.NOT_SET self.errors = [] - def add_self_to_dict(self, d: dict[clingo.Symbol, Any | set[Any] | dict[Any, Any]]) -> None: + def add_self_to_evaluations(self, e: Evaluations) -> None: """ - Add this variable's value into the given dictionary. + Add this variable's value into the given Evaluations object. Args: - d: Dictionary to update. + e: Evaluations object to update. """ value = self.get_value() if value == ValueStatus.NOT_SET: return elif value == ValueStatus.ASSIGNMENT_IS_FALSE: - d[FALSE_ASSIGNMENTS].append(self.var) # type: ignore + e.false_assignments.append(self.var) # type: ignore return - d[self.var] = value + e.evaluations[self.var] = value def __eq__(self, other) -> bool: if not isinstance(other, DictVariable): @@ -1363,11 +1352,7 @@ def discern_value(self) -> int | float: # TODO: check if we need to also need to excluce Bad.bad from the sums for var, expr in self.expressions: myprint(f"Summing {expr} with value {expr.value}") - if ( - expr.value != ValueStatus.NOT_SET - and expr.value != ValueStatus.ASSIGNMENT_IS_FALSE - and expr.value is not None - ): + if expr.value not in [ValueStatus.NOT_SET, ValueStatus.ASSIGNMENT_IS_FALSE, None, expression.Bad.bad]: vals.add((var, expr.value)) return sum(value for var, value in vals) @@ -1393,9 +1378,7 @@ def get_errors(self) -> propagator_warning_t: errors.extend(expr.get_errors()) return errors - def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] - ) -> bool: + def evaluate(self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any]) -> bool: """ Evaluate the optimization sum and return True if the value has changed. @@ -1505,9 +1488,7 @@ def get_sum_count(self) -> int: """ return len(self.sums) - def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] - ) -> bool: + def evaluate(self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any]) -> bool: """ Evaluate all optimization sums and return True if any value has changed. @@ -1736,9 +1717,7 @@ def convert_var(self, var: clingo.Symbol | str, input: bool = True) -> clingo.Sy v = clingo.Function(exec_name, [self.func_name, var_func]) return v - def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] - ) -> EvaluationResult: + def evaluate(self, evaluations: Evaluations, ctl: clingo.PropagateControl, env: dict[Any, Any]) -> EvaluationResult: """ Evaluate the execution and return an EvaluationResult. @@ -1767,7 +1746,7 @@ def evaluate( return EvaluationResult.CHANGED for var in self.converted_in_vars: - if var not in evaluations and var not in evaluations[FALSE_ASSIGNMENTS]: + if not evaluations.var_usable(var): # can't evaluate yet # value should not be set yet assert self.values == ValueStatus.NOT_SET @@ -1777,7 +1756,7 @@ def evaluate( # There is a lot of copying of the evaluations dictionary! evals = {} for c_var, var in zip(self.converted_in_vars, self.in_vars): - evals[var] = evaluations[c_var] + evals[var] = evaluations.evaluations[c_var] final_evals = dict() for stmt in self.statements: @@ -1809,6 +1788,14 @@ def evaluate( self.values.append((c_out_var, None)) else: self.values.append((c_out_var, final_evals[out_var])) + if final_evals[out_var] == expression.Bad.bad: # ty:ignore[unresolved-attribute] + self.errors.append( + warning.Warning( + warning.Variable(warning.VariableWarning.badValue), # ty:ignore[unresolved-attribute] + (self.func_name, out_var), + f"Output variable {out_var} in execution {self.func_name} has a bad value!", + ) + ) return EvaluationResult.CHANGED @@ -1866,12 +1853,12 @@ def reset(self, dl: int): self.errors = [] self.values = ValueStatus.NOT_SET - def add_self_to_dict(self, d: dict[clingo.Symbol, Any | set[Any] | dict[Any, Any]]) -> None: + def add_self_to_evaluations(self, e: Evaluations) -> None: """ - Add the execution's output values to the provided dictionary. + Add the execution's output values to the provided Evaluations object. Args: - d: Dictionary to update. + e: Evaluations object to update. """ value = self.get_value() @@ -1880,10 +1867,10 @@ def add_self_to_dict(self, d: dict[clingo.Symbol, Any | set[Any] | dict[Any, Any elif value == ValueStatus.ASSIGNMENT_IS_FALSE: for out_var in self.converted_out_vars: - d[FALSE_ASSIGNMENTS].append(out_var) # type: ignore + e.false_assignments.append(out_var) else: for out_var, val in value: - d[out_var] = val + e.evaluations[out_var] = val def __hash__(self) -> int: return hash((self.func_name, tuple(self.statements), tuple(self.in_vars), tuple(self.out_vars))) @@ -1977,7 +1964,7 @@ def reset(self, dl: int): self.assigned = None def evaluate( - self, evaluations: dict[clingo.Symbol, Any], ctl: clingo.PropagateControl, env: dict[Any, Any] + self, evaluations: evaluations_type, ctl: clingo.PropagateControl, env: dict[Any, Any] ) -> EvaluationResult: """ Evaluate the execution statements and return an EvaluationResult. @@ -2018,22 +2005,68 @@ def evaluate( return EvaluationResult.CHANGED -def make_dict_from_variables( - variables: Iterable[VariableType], -) -> dict[clingo.Symbol, Any | set[Any] | dict[Any, Any]]: - """Build a plain Python dictionary from a collection of variables. +class Evaluations: + """ + Manages the current evaluations of variables. - The resulting dictionary maps each variable symbol to its evaluated value. - Variables that are assigned false are listed under the `FALSE_ASSIGNMENTS` key. + Attributes: + evaluations: Dictionary mapping variable symbols to their evaluated values. + false_assignments: List of variables that are assigned false. + existing_vars: List of variables that exist in the current evaluation. + """ - Args: - variables: Iterable of variables to export. Variables must implement the `add_self_to_dict` method! + def __init__(self): + """ + Initialize Evaluations with an empty dictionary. + """ + self.evaluations: evaluations_type = {} + self.false_assignments: list[clingo.Symbol] = [] + self.existing_vars: list[clingo.Symbol] = [] - Returns: - dict[clingo.Symbol, Any | set[Any] | dict[Any, Any]]: Dictionary of variable values. - """ - result: dict[clingo.Symbol, Any | set[Any] | dict[Any, Any]] = {FALSE_ASSIGNMENTS: []} # type: ignore - for var in variables: - var.add_self_to_dict(result) + def init(self, variables: Iterable[VariableType]): + """ + Initialize the existing variables list based on the provided variables. + This method should be called before any evaluations are updated + to ensure that the existing variables are correctly tracked. + + Args: + variables: Iterable of variables to initialize from. + """ + for var in variables: + if isinstance(var, Execution): + self.existing_vars.extend(var.converted_out_vars) + else: + self.existing_vars.append(var.var) + + def update_evaluations( + self, + variables: Iterable[VariableType], + ) -> None: + """Build a plain Python dictionary from a collection of variables. - return result + The resulting dictionary maps each variable symbol to its evaluated value. + Variables that are assigned false are listed under the `FALSE_ASSIGNMENTS` key. + + Args: + variables: Iterable of variables to export. Variables must implement the `add_self_to_evaluations` method! + + """ + self.evaluations.clear() + self.false_assignments.clear() + for var in variables: + var.add_self_to_evaluations(self) + + def var_usable(self, var: clingo.Symbol) -> bool: + """ + Check if a variable is usable in the current evaluation. + + A variable is considered usable if it has an assigned value, is listed as a false assignment or does not exist. + + Args: + var: Variable symbol to check. + + Returns: + bool: True if the variable is usable, False otherwise. + + """ + return var in self.evaluations or var in self.false_assignments or var not in self.existing_vars diff --git a/src/constraint_handler/propagator.py b/src/constraint_handler/propagator.py index d767e09a..892f41d5 100644 --- a/src/constraint_handler/propagator.py +++ b/src/constraint_handler/propagator.py @@ -30,12 +30,12 @@ DictVariable, EnsureVariable, EvaluateVariable, + Evaluations, Execution, OptimizationHandler, SetVariable, Variable, VariableType, - make_dict_from_variables, ) from constraint_handler.utils.common import Bad @@ -78,6 +78,7 @@ def __init__(self, check_only: bool = False): """ self.symbol2var: dict[clingo.Symbol, VariableType] = {} self.literal2var: dict[int, list[VariableType]] = {} + self.evaluations: Evaluations = Evaluations() self.evaluatevars: list[EvaluateVariable] = [] @@ -162,6 +163,8 @@ def init(self, init: clingo.PropagateInit) -> None: self.get_forbidden_warnings(init) + self.evaluations.init(list(self.symbol2var.values())) + myprint("INIT DONE") myprint("#" * 50) @@ -250,7 +253,7 @@ def check(self, control: clingo.PropagateControl) -> None: myprint(f"backtracking {backtrack} due to conflicts in evaluation of variables") return - myprint(f"Evaluated assignments: {make_dict_from_variables(self.symbol2var.values())}") + myprint(f"Evaluated assignments: {self.evaluations.update_evaluations(self.symbol2var.values())}") # If not backtracking, check optimization sums backtrack = self.evaluate_optimization_sum(control) @@ -427,9 +430,12 @@ def check_evaluate(self, ctl: clingo.PropagateControl): ctl: Clingo PropagateControl object. """ myprint("Checking evaluate atoms") - myprint(f"Evaluated assignments before evaluate: {make_dict_from_variables(self.symbol2var.values())}") + myprint( + f"Evaluated assignments before evaluate: {self.evaluations.update_evaluations(self.symbol2var.values())}" + ) + self.evaluations.update_evaluations(self.symbol2var.values()) for var in self.evaluatevars: - var.evaluate(make_dict_from_variables(self.symbol2var.values()), ctl, self.environment) + var.evaluate(self.evaluations, ctl, self.environment) def propagate(self, control: clingo.PropagateControl, changes: Sequence[int]) -> None: """ @@ -459,8 +465,6 @@ def propagate(self, control: clingo.PropagateControl, changes: Sequence[int]) -> myprint(f"PROPAGATION DONE, backtracking {backtrack} due to conflicts in evaluation of variables") return - myprint(f"Evaluated assignments: {make_dict_from_variables(self.symbol2var.values())}") - # If not backtracking, check optimization sums self.evaluate_optimization_sum(control) myprint(f"Optimization sum evaluated to {self.optimization_sum.get_value()}") @@ -481,10 +485,8 @@ def evaluate_optimization_sum(self, ctl: clingo.PropagateControl) -> bool: """ if not self.using_optimization: return False - - changed = self.optimization_sum.evaluate( - make_dict_from_variables(self.symbol2var.values()), ctl, self.environment - ) + self.evaluations.update_evaluations(self.symbol2var.values()) + changed = self.optimization_sum.evaluate(self.evaluations, ctl, self.environment) if not changed: return False @@ -526,6 +528,8 @@ def add_nogood_for_variable(self, ctl: clingo.PropagateControl, var: VariableTyp """ ng: set[int] = set() for symbol_var in var.vars(): + if symbol_var not in self.symbol2var: + continue v = self.symbol2var[symbol_var] ng = ng.union(self.get_reasons(v)) myprint(f"Adding nogood {list(ng)} for variable {var}") @@ -575,9 +579,8 @@ def evaluate_variable(self, ctl: clingo.PropagateControl, var: VariableType) -> - False if it did not change. - None if evaluation detected a conflict (nogood added). """ - eval_result: EvaluationResult = var.evaluate( - make_dict_from_variables(self.symbol2var.values()), ctl, self.environment - ) + self.evaluations.update_evaluations(self.symbol2var.values()) + eval_result: EvaluationResult = var.evaluate(self.evaluations, ctl, self.environment) myprint(f"Variable {var} evaluation result: {eval_result}") if eval_result == EvaluationResult.CONFLICT: @@ -622,6 +625,8 @@ def get_reasons(self, var: VariableType, seen: set[VariableType] | None = None) seen = set() reasons = var.literals for dvar in var.vars(): + if dvar not in self.symbol2var: + continue if dvar.name == EXECUTION_OUTPUT: dvar = dvar.arguments[0] if self.symbol2var[dvar] not in seen: @@ -757,17 +762,17 @@ def get_variables(self, ctl: clingo.PropagateInit): var_domains = myClorm.findInPropagateInit(ctl, atom.Propagator_variable_domain) var_optionals = myClorm.findInPropagateInit(ctl, atom.Propagator_variable_declareOptional) - from_facts_literals: dict[symbol_var, int] = {} - for (name, symbol_var, domain), __literal in var_declares.items(): + from_facts_literals: dict[clingo.Symbol, int] = {} + for (name, symbol_var, domain), _literal in var_declares.items(): if symbol_var not in self.symbol2var: self.symbol2var[symbol_var] = Variable(name, symbol_var) variable: Variable = cast(Variable, self.symbol2var[symbol_var]) - ctl.add_watch(__literal) - ctl.add_watch(-__literal) + ctl.add_watch(_literal) + ctl.add_watch(-_literal) - self.literal2var.setdefault(__literal, []).append(variable) + self.literal2var.setdefault(_literal, []).append(variable) if isinstance(domain, atom.BoolDomain): literal_true = ctl.add_literal(freeze=True) @@ -775,12 +780,12 @@ def get_variables(self, ctl: clingo.PropagateInit): variable.add_value( expression.Val(expression.BaseType.bool, True), # ty:ignore[unresolved-attribute] literal_true, - __literal, + _literal, ) variable.add_value( expression.Val(expression.BaseType.bool, False), # ty:ignore[unresolved-attribute] literal_false, - __literal, + _literal, ) ctl.add_watch(literal_true) ctl.add_watch(-literal_true) @@ -788,8 +793,8 @@ def get_variables(self, ctl: clingo.PropagateInit): ctl.add_watch(-literal_false) # if the declaration is False, then the value it can give can not be true - ctl.add_clause([-literal_true, __literal]) - ctl.add_clause([-literal_false, __literal]) + ctl.add_clause([-literal_true, _literal]) + ctl.add_clause([-literal_false, _literal]) self.literal2var[literal_true] = [variable] self.literal2var[literal_false] = [variable] @@ -797,17 +802,17 @@ def get_variables(self, ctl: clingo.PropagateInit): elif isinstance(domain, atom.FromList): for expr in domain.elements: literal = ctl.add_literal(freeze=True) - variable.add_value(expr, literal, __literal) + variable.add_value(expr, literal, _literal) ctl.add_watch(literal) ctl.add_watch(-literal) - ctl.add_clause([-literal, __literal]) + ctl.add_clause([-literal, _literal]) self.literal2var[literal] = [variable] elif isinstance(domain, atom.FromFacts): # values will be added from facts, nothing to do here - from_facts_literals[symbol_var] = __literal + from_facts_literals[symbol_var] = _literal else: self.errors.append( warning.Warning( @@ -817,30 +822,23 @@ def get_variables(self, ctl: clingo.PropagateInit): ) ) - for (name, symbol_var, expr), __literal in var_defines.items(): + for (name, symbol_var, expr), _literal in var_defines.items(): if symbol_var in self.symbol2var: define_variable: Variable = cast(Variable, self.symbol2var[symbol_var]) else: define_variable = Variable(name, symbol_var) self.symbol2var[symbol_var] = define_variable - define_variable.add_value(expr, __literal, __literal) - ctl.add_watch(__literal) - ctl.add_watch(-__literal) + define_variable.add_value(expr, _literal, _literal) + ctl.add_watch(_literal) + ctl.add_watch(-_literal) - self.literal2var.setdefault(__literal, []).append(define_variable) + self.literal2var.setdefault(_literal, []).append(define_variable) # here we dont add a nogood since its the same literal - for (name, symbol_var, domain_expr), __literal in var_domains.items(): - # These values are assgiend the "from_facts" domain literal for the given variable + for (name, symbol_var, domain_expr), _literal in var_domains.items(): + # These values are assigned the "from_facts" domain literal for the given variable if symbol_var not in self.symbol2var: - self.errors.append( - warning.Warning( - warning.Variable(warning.VariableWarning.undeclared), # ty:ignore[unresolved-attribute] - (symbol_var,), - f"Variable '{symbol_var}' domain set but variable not declared!", - ) - ) continue domain_variable: Variable = cast(Variable, self.symbol2var[symbol_var]) literal = ctl.add_literal(freeze=True) @@ -851,46 +849,27 @@ def get_variables(self, ctl: clingo.PropagateInit): ctl.add_clause([-literal, domain_literal]) # literal defining the domain should also be included, not just the variable declaration literal! - ctl.add_clause([-literal, __literal]) + ctl.add_clause([-literal, _literal]) self.literal2var[literal] = [domain_variable] - for (name, optional), __literal in var_optionals.items(): + for (name, optional), _literal in var_optionals.items(): if optional not in self.symbol2var: - # If the optinal variable is not declared we add a warning, since this is probably not intended - self.errors.append( - warning.Warning( - warning.Variable(warning.VariableWarning.undeclared), # ty:ignore[unresolved-attribute] - (optional,), - f"Variable '{optional}' declared optional but variable not declared!", - ) - ) continue optional_variable: Variable = cast(Variable, self.symbol2var[optional]) literal = ctl.add_literal(freeze=True) optional_variable.add_value( - expression.Val(expression.BaseType.none, None), literal, __literal + expression.Val(expression.BaseType.none, None), literal, _literal ) # ty:ignore[unresolved-attribute] ctl.add_watch(literal) ctl.add_watch(-literal) - ctl.add_clause([-literal, __literal]) + ctl.add_clause([-literal, _literal]) - self.literal2var.setdefault(__literal, []).append(optional_variable) + self.literal2var.setdefault(_literal, []).append(optional_variable) self.literal2var[literal] = [optional_variable] - # check that all variables have a domain - for var in self.symbol2var.values(): - if not var.has_domain(): - self.errors.append( - warning.Warning( - warning.Variable(warning.VariableWarning.emptyDomain), # ty:ignore[unresolved-attribute] - (var,), - f"Variable '{var}' has no domain defined!", - ) - ) - def get_assign(self, ctl: clingo.PropagateInit): """ Load `propagator_assign/..` atoms and attach values to variables. @@ -991,15 +970,6 @@ def get_set_declarations(self, ctl: clingo.PropagateInit): declares = myClorm.findInPropagateInit(ctl, atom.Propagator_set_declare) for (name, symbol_var), literal in declares.items(): variable = SetVariable(name, symbol_var, literal) - if literal != 1: - self.errors.append( - warning.Warning( - warning.Propagator(), - (variable,), - f"Set variable {name} declaration is not a fact! It has literal {literal}.", - ) - ) - self.symbol2var[symbol_var] = variable self.literal2var.setdefault(literal, []).append(variable) @@ -1035,16 +1005,6 @@ def get_multimap_declarations(self, ctl: clingo.PropagateInit): declares = myClorm.findInPropagateInit(ctl, atom.Propagator_multimap_declare) for (name, symbol_var), literal in declares.items(): variable = DictVariable(name, symbol_var, literal) - - if literal != 1: - self.errors.append( - warning.Warning( - warning.Propagator(), - (variable,), - f"Dict variable {symbol_var} declaration is not a fact! It has literal {literal}.", - ) - ) - self.symbol2var[symbol_var] = variable self.literal2var.setdefault(literal, []).append(variable) @@ -1141,10 +1101,8 @@ def set_parents(self): # for self referencing variables continue if symbol_var not in self.symbol2var: - myprint(self.symbol2var.keys()) - myprint(type(symbol_var), symbol_var) continue - assert False, f"Variable {symbol_var} not found in symbol2var" + self.symbol2var[symbol_var].parents.append(var) for var in self.symbol2var.values(): @@ -1164,7 +1122,7 @@ def update_python_model(self): the clingo model and by brave/cautious reasoning to accumulate results. """ self.python_model = set() - + model_errors: propagator_warning_t = [] for var in self.symbol2var.values(): self.handle_on_model_warning(var.get_errors()) if isinstance(var, EnsureVariable): @@ -1176,7 +1134,7 @@ def update_python_model(self): # myprint(var.var, final_value, type(final_value)) if final_value is ValueStatus.NOT_SET: # print(f"Variable {var} has no value set in on_model!") - self.errors.append( + model_errors.append( warning.Warning(warning.Propagator(), (str(var),), "Variable has no value set in on_model!") ) continue @@ -1197,7 +1155,7 @@ def update_python_model(self): myprint(eval_var, eval_var.get_value()) self.handle_on_model_warning(eval_var.get_errors()) final_value = eval_var.get_value() - if final_value == Bad.bad: + if final_value == expression.Bad.bad: # ty:ignore[unresolved-attribute] pyVal = final_value else: pyVal = expression.Val(evaluator.get_baseType(final_value), final_value) @@ -1212,7 +1170,7 @@ def update_python_model(self): self.handle_on_model_warning(self.optimization_sum.get_errors()) myprint(f"Optimization value: {self.optimization_sum.get_value()}") - self.handle_on_model_warning(self.errors) + self.handle_on_model_warning(model_errors + self.errors) def on_model(self, model: clingo.Model): """ @@ -1306,7 +1264,9 @@ def handle_on_model_dict(self, var: clingo.Symbol, final_value: dict): self.python_model.add(mm_pyAtom) def handle_on_model_normal_type( - self, var: clingo.Symbol, final_value: bool | int | float | str | clingo.Symbol | Bad + self, + var: clingo.Symbol, + final_value: bool | int | float | str | clingo.Symbol | expression.Bad.bad, # ty:ignore[unresolved-attribute] ): """ Add atoms for a variable (bool/int/float/string/symbol) to the python model. @@ -1316,7 +1276,7 @@ def handle_on_model_normal_type( final_value: Scalar value. """ assert self.python_model is not None - if final_value == Bad.bad: + if final_value == expression.Bad.bad: # ty:ignore[unresolved-attribute] pyVal = final_value else: pyVal = expression.Val(evaluator.get_baseType(final_value), final_value) diff --git a/tests/test_encoding.py b/tests/test_encoding.py index 6496b709..02f78a5b 100644 --- a/tests/test_encoding.py +++ b/tests/test_encoding.py @@ -158,7 +158,6 @@ def test_engine_propagator(name, check_mode): "set_fold_bools", "set_iterations", "set_selfref", - "warning_bad", "warning_variables", "warning_variable_undeclared", "type_checking",