diff --git a/pyrit/scenario/core/scenario.py b/pyrit/scenario/core/scenario.py index fdead33e9..773fc3454 100644 --- a/pyrit/scenario/core/scenario.py +++ b/pyrit/scenario/core/scenario.py @@ -13,7 +13,7 @@ import textwrap import uuid from abc import ABC, abstractmethod -from typing import Dict, List, Optional, Sequence, Set, Type, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, Tuple, Type, Union from tqdm.auto import tqdm @@ -32,6 +32,10 @@ ) from pyrit.score import Scorer +if TYPE_CHECKING: + from pyrit.executor.attack.core.attack_config import AttackScoringConfig + from pyrit.models import SeedAttackGroup + logger = logging.getLogger(__name__) @@ -228,14 +232,16 @@ async def initialize_async( self._memory_labels = memory_labels or {} # Prepare scenario strategies using the stored configuration + # Allow empty strategies when include_baseline is True (baseline-only execution) self._scenario_composites = self._strategy_class.prepare_scenario_strategies( - scenario_strategies, default_aggregate=self.get_default_strategy() + scenario_strategies, + default_aggregate=self.get_default_strategy(), ) self._atomic_attacks = await self._get_atomic_attacks_async() if self._include_baseline: - baseline_attack = self._get_baseline_from_first_attack() + baseline_attack = self._get_baseline() self._atomic_attacks.insert(0, baseline_attack) # Store original objectives for each atomic attack (before any mutations during execution) @@ -281,34 +287,21 @@ async def initialize_async( self._scenario_result_id = str(result.id) logger.info(f"Created new scenario result with ID: {self._scenario_result_id}") - def _get_baseline_from_first_attack(self) -> AtomicAttack: + def _get_baseline(self) -> AtomicAttack: """ Get a baseline AtomicAttack, which simply sends all the objectives without any modifications. + If other atomic attacks exist, derives baseline data from the first attack. + Otherwise, creates a standalone baseline from the dataset configuration and scenario settings. + Returns: AtomicAttack: The baseline AtomicAttack instance. Raises: - ValueError: If no atomic attacks are available to derive baseline from. + ValueError: If required data (seed_groups, objective_target, attack_scoring_config) + is not available. """ - if not self._atomic_attacks or len(self._atomic_attacks) == 0: - raise ValueError("No atomic attacks available to derive baseline from.") - - first_attack = self._atomic_attacks[0] - - # Copy seed_groups, scoring, target from the first attack - seed_groups = first_attack.seed_groups - attack_scoring_config = first_attack._attack.get_attack_scoring_config() - objective_target = first_attack._attack.get_objective_target() - - if not seed_groups or len(seed_groups) == 0: - raise ValueError("First atomic attack must have seed_groups to create baseline.") - - if not objective_target: - raise ValueError("Objective target is required to create baseline attack.") - - if not attack_scoring_config: - raise ValueError("Attack scoring config is required to create baseline attack.") + seed_groups, attack_scoring_config, objective_target = self._get_baseline_data() # Create baseline attack with no converters attack = PromptSendingAttack( @@ -323,6 +316,55 @@ def _get_baseline_from_first_attack(self) -> AtomicAttack: memory_labels=self._memory_labels, ) + def _get_baseline_data(self) -> Tuple[List["SeedAttackGroup"], "AttackScoringConfig", PromptTarget]: + """ + Get the data needed to create a baseline attack. + + Returns either the first attack's data or the scenario-level data + depending on whether other atomic attacks exist. + + Returns: + Tuple containing (seed_groups, attack_scoring_config, objective_target) + + Raises: + ValueError: If required data is not available. + """ + if self._atomic_attacks and len(self._atomic_attacks) > 0: + # Derive from first attack + first_attack = self._atomic_attacks[0] + seed_groups = first_attack.seed_groups + attack_scoring_config = first_attack._attack.get_attack_scoring_config() + objective_target = first_attack._attack.get_objective_target() + else: + # Create from scenario-level settings + if not self._objective_target: + raise ValueError("Objective target is required to create baseline attack.") + if not self._dataset_config: + raise ValueError("Dataset config is required to create baseline attack.") + if not self._objective_scorer: + raise ValueError("Objective scorer is required to create baseline attack.") + + seed_groups = self._dataset_config.get_all_seed_attack_groups() + objective_target = self._objective_target + + # Import here to avoid circular imports + from typing import cast + + from pyrit.executor.attack.core.attack_config import AttackScoringConfig + from pyrit.score import TrueFalseScorer + + attack_scoring_config = AttackScoringConfig(objective_scorer=cast(TrueFalseScorer, self._objective_scorer)) + + # Validate required data + if not seed_groups or len(seed_groups) == 0: + raise ValueError("Seed groups are required to create baseline attack.") + if not objective_target: + raise ValueError("Objective target is required to create baseline attack.") + if not attack_scoring_config: + raise ValueError("Attack scoring config is required to create baseline attack.") + + return seed_groups, attack_scoring_config, objective_target + def _raise_dataset_exception(self) -> None: error_msg = textwrap.dedent( f""" @@ -650,7 +692,8 @@ async def _execute_scenario_async(self) -> ScenarioResult: try: atomic_results = await atomic_attack.run_async( - max_concurrency=self._max_concurrency, return_partial_on_failure=True + max_concurrency=self._max_concurrency, + return_partial_on_failure=True, ) # Always save completed results, even if some objectives didn't complete @@ -677,7 +720,8 @@ async def _execute_scenario_async(self) -> ScenarioResult: # Mark scenario as failed self._memory.update_scenario_run_state( - scenario_result_id=scenario_result_id, scenario_run_state="FAILED" + scenario_result_id=scenario_result_id, + scenario_run_state="FAILED", ) # Raise exception with detailed information @@ -703,7 +747,8 @@ async def _execute_scenario_async(self) -> ScenarioResult: scenario_results = self._memory.get_scenario_results(scenario_result_ids=[scenario_result_id]) if scenario_results and scenario_results[0].scenario_run_state != "FAILED": self._memory.update_scenario_run_state( - scenario_result_id=scenario_result_id, scenario_run_state="FAILED" + scenario_result_id=scenario_result_id, + scenario_run_state="FAILED", ) raise diff --git a/pyrit/scenario/core/scenario_strategy.py b/pyrit/scenario/core/scenario_strategy.py index 362be2c56..d1f1cdceb 100644 --- a/pyrit/scenario/core/scenario_strategy.py +++ b/pyrit/scenario/core/scenario_strategy.py @@ -213,12 +213,14 @@ def prepare_scenario_strategies( strategies (Sequence[T | ScenarioCompositeStrategy] | None): The strategies to prepare. Can be a mix of bare strategy enums and composite strategies. If None, uses default_aggregate to determine defaults. + If an empty sequence, returns an empty list (useful for baseline-only execution). default_aggregate (T | None): The aggregate strategy to use when strategies is None. Common values: MyStrategy.ALL, MyStrategy.EASY. If None when strategies is None, raises ValueError. Returns: List[ScenarioCompositeStrategy]: Normalized list of composite strategies ready for use. + May be empty if an empty sequence was explicitly provided. Raises: ValueError: If strategies is None and default_aggregate is None, or if compositions @@ -251,7 +253,10 @@ def prepare_scenario_strategies( # For now, skip to allow flexibility pass + # Allow empty list if explicitly provided (for baseline-only execution) if not composite_strategies: + if strategies is not None and len(strategies) == 0: + return [] raise ValueError( f"No valid {cls.__name__} strategies provided. " f"Provide at least one {cls.__name__} enum or ScenarioCompositeStrategy." diff --git a/tests/unit/scenarios/test_scenario.py b/tests/unit/scenarios/test_scenario.py index 266e85530..d81101a6c 100644 --- a/tests/unit/scenarios/test_scenario.py +++ b/tests/unit/scenarios/test_scenario.py @@ -35,7 +35,10 @@ async def mock_run_async(*args, **kwargs): def create_mock_scorer(): """Create a mock scorer for testing ScenarioResult.""" mock_scorer = MagicMock(spec=Scorer) - mock_scorer.get_identifier.return_value = {"__type__": "MockScorer", "__module__": "test"} + mock_scorer.get_identifier.return_value = { + "__type__": "MockScorer", + "__module__": "test", + } mock_scorer.get_scorer_metrics.return_value = None return mock_scorer @@ -70,7 +73,10 @@ def mock_atomic_attacks(): def mock_objective_target(): """Create a mock objective target for testing.""" target = MagicMock() - target.get_identifier.return_value = {"__type__": "MockTarget", "__module__": "test"} + target.get_identifier.return_value = { + "__type__": "MockTarget", + "__module__": "test", + } return target @@ -81,7 +87,11 @@ def sample_attack_results(): AttackResult( conversation_id=f"conv-{i}", objective=f"objective{i}", - attack_identifier={"__type__": "TestAttack", "__module__": "test", "id": str(i)}, + attack_identifier={ + "__type__": "TestAttack", + "__module__": "test", + "id": str(i), + }, outcome=AttackOutcome.SUCCESS, executed_turns=1, ) @@ -111,7 +121,10 @@ def get_aggregate_tags(cls) -> set[str]: # Add a mock scorer if not provided if "objective_scorer" not in kwargs: mock_scorer = MagicMock(spec=Scorer) - mock_scorer.get_identifier.return_value = {"__type__": "MockScorer", "__module__": "test"} + mock_scorer.get_identifier.return_value = { + "__type__": "MockScorer", + "__module__": "test", + } mock_scorer.get_scorer_metrics.return_value = None kwargs["objective_scorer"] = mock_scorer @@ -222,7 +235,10 @@ async def test_initialize_async_sets_objective_target(self, mock_objective_targe await scenario.initialize_async(objective_target=mock_objective_target) assert scenario._objective_target == mock_objective_target - assert scenario._objective_target_identifier == {"__type__": "MockTarget", "__module__": "test"} + assert scenario._objective_target_identifier == { + "__type__": "MockTarget", + "__module__": "test", + } @pytest.mark.asyncio async def test_initialize_async_requires_objective_target(self): @@ -431,7 +447,11 @@ async def test_run_async_returns_scenario_result_with_identifier( assert result.scenario_identifier.name == "ConcreteScenario" assert result.scenario_identifier.version == 5 assert result.scenario_identifier.pyrit_version is not None - assert result.get_strategies_used() == ["attack_run_1", "attack_run_2", "attack_run_3"] + assert result.get_strategies_used() == [ + "attack_run_1", + "attack_run_2", + "attack_run_3", + ] @pytest.mark.usefixtures("patch_central_database") @@ -511,8 +531,14 @@ def test_scenario_result_initialization(self, sample_attack_results): mock_scorer = create_mock_scorer() result = ScenarioResult( scenario_identifier=identifier, - objective_target_identifier={"__type__": "TestTarget", "__module__": "test"}, - attack_results={"base64": sample_attack_results[:3], "rot13": sample_attack_results[3:]}, + objective_target_identifier={ + "__type__": "TestTarget", + "__module__": "test", + }, + attack_results={ + "base64": sample_attack_results[:3], + "rot13": sample_attack_results[3:], + }, objective_scorer=mock_scorer, ) @@ -528,7 +554,10 @@ def test_scenario_result_with_empty_results(self): mock_scorer = create_mock_scorer() result = ScenarioResult( scenario_identifier=identifier, - objective_target_identifier={"__type__": "TestTarget", "__module__": "test"}, + objective_target_identifier={ + "__type__": "TestTarget", + "__module__": "test", + }, attack_results={"base64": []}, objective_scorer=mock_scorer, ) @@ -544,7 +573,10 @@ def test_scenario_result_objective_achieved_rate(self, sample_attack_results): # All successful result = ScenarioResult( scenario_identifier=identifier, - objective_target_identifier={"__type__": "TestTarget", "__module__": "test"}, + objective_target_identifier={ + "__type__": "TestTarget", + "__module__": "test", + }, attack_results={"base64": sample_attack_results}, objective_scorer=mock_scorer, ) @@ -555,21 +587,32 @@ def test_scenario_result_objective_achieved_rate(self, sample_attack_results): AttackResult( conversation_id="conv-fail", objective="objective", - attack_identifier={"__type__": "TestAttack", "__module__": "test", "id": "1"}, + attack_identifier={ + "__type__": "TestAttack", + "__module__": "test", + "id": "1", + }, outcome=AttackOutcome.FAILURE, executed_turns=1, ), AttackResult( conversation_id="conv-fail2", objective="objective", - attack_identifier={"__type__": "TestAttack", "__module__": "test", "id": "2"}, + attack_identifier={ + "__type__": "TestAttack", + "__module__": "test", + "id": "2", + }, outcome=AttackOutcome.FAILURE, executed_turns=1, ), ] result2 = ScenarioResult( scenario_identifier=identifier, - objective_target_identifier={"__type__": "TestTarget", "__module__": "test"}, + objective_target_identifier={ + "__type__": "TestTarget", + "__module__": "test", + }, attack_results={"base64": mixed_results}, objective_scorer=mock_scorer, ) @@ -601,3 +644,197 @@ def test_scenario_identifier_with_init_data(self): identifier = ScenarioIdentifier(name="TestScenario", scenario_version=1, init_data=init_data) assert identifier.init_data == init_data + + +def create_mock_truefalse_scorer(): + """Create a mock TrueFalseScorer for testing baseline-only execution.""" + from pyrit.score import TrueFalseScorer + + mock_scorer = MagicMock(spec=TrueFalseScorer) + mock_scorer.get_identifier.return_value = { + "__type__": "MockTrueFalseScorer", + "__module__": "test", + } + mock_scorer.get_scorer_metrics.return_value = None + # Make isinstance check work + mock_scorer.__class__ = TrueFalseScorer + return mock_scorer + + +class ConcreteScenarioWithTrueFalseScorer(Scenario): + """Concrete implementation of Scenario for testing baseline-only execution.""" + + def __init__(self, atomic_attacks_to_return=None, **kwargs): + # Add required strategy_class if not provided + + class TestStrategy(ScenarioStrategy): + TEST = ("test", {"concrete"}) + ALL = ("all", {"all"}) + + @classmethod + def get_aggregate_tags(cls) -> set[str]: + return {"all"} + + kwargs.setdefault("strategy_class", TestStrategy) + + # Use TrueFalseScorer mock if not provided + if "objective_scorer" not in kwargs: + kwargs["objective_scorer"] = create_mock_truefalse_scorer() + + super().__init__(**kwargs) + self._atomic_attacks_to_return = atomic_attacks_to_return or [] + + @classmethod + def get_strategy_class(cls): + """Return a mock strategy class for testing.""" + + from pyrit.scenario.core.scenario_strategy import ScenarioStrategy + + class TestStrategy(ScenarioStrategy): + TEST = ("test", {"concrete"}) + ALL = ("all", {"all"}) + + @classmethod + def get_aggregate_tags(cls) -> set[str]: + return {"all"} + + return TestStrategy + + @classmethod + def get_default_strategy(cls): + """Return the default strategy for testing.""" + return cls.get_strategy_class().ALL + + @classmethod + def default_dataset_config(cls) -> DatasetConfiguration: + """Return the default dataset configuration for testing.""" + return DatasetConfiguration() + + async def _get_atomic_attacks_async(self): + return self._atomic_attacks_to_return + + +@pytest.mark.usefixtures("patch_central_database") +class TestScenarioBaselineOnlyExecution: + """Tests for baseline-only execution (empty strategies with include_baseline=True).""" + + @pytest.mark.asyncio + async def test_initialize_async_with_empty_strategies_and_baseline(self, mock_objective_target): + """Test that baseline-only execution works when include_baseline=True and strategies is empty.""" + from pyrit.models import SeedAttackGroup, SeedObjective + + # Create a scenario with include_default_baseline=True and TrueFalseScorer + scenario = ConcreteScenarioWithTrueFalseScorer( + name="Baseline Only Test", + version=1, + include_default_baseline=True, + ) + + # Create a mock dataset config with seed groups + mock_dataset_config = MagicMock(spec=DatasetConfiguration) + mock_dataset_config.get_all_seed_attack_groups.return_value = [ + SeedAttackGroup(seeds=[SeedObjective(value="test objective 1")]), + SeedAttackGroup(seeds=[SeedObjective(value="test objective 2")]), + ] + + # Initialize with empty strategies + await scenario.initialize_async( + objective_target=mock_objective_target, + scenario_strategies=[], # Empty list - baseline only + dataset_config=mock_dataset_config, + ) + + # Should have exactly one attack - the baseline + assert scenario.atomic_attack_count == 1 + assert scenario._atomic_attacks[0].atomic_attack_name == "baseline" + + @pytest.mark.asyncio + async def test_baseline_only_execution_runs_successfully(self, mock_objective_target, sample_attack_results): + """Test that baseline-only scenario can run successfully.""" + from pyrit.models import SeedAttackGroup, SeedObjective + + # Create a scenario with include_default_baseline=True and TrueFalseScorer + scenario = ConcreteScenarioWithTrueFalseScorer( + name="Baseline Only Test", + version=1, + include_default_baseline=True, + ) + + # Create a mock dataset config with seed groups + mock_dataset_config = MagicMock(spec=DatasetConfiguration) + mock_dataset_config.get_all_seed_attack_groups.return_value = [ + SeedAttackGroup(seeds=[SeedObjective(value="test objective 1")]), + ] + + # Initialize with empty strategies + await scenario.initialize_async( + objective_target=mock_objective_target, + scenario_strategies=[], # Empty list - baseline only + dataset_config=mock_dataset_config, + ) + + # Mock the baseline attack's run_async + scenario._atomic_attacks[0].run_async = create_mock_run_async([sample_attack_results[0]]) + + # Run the scenario + result = await scenario.run_async() + + # Verify the result + assert isinstance(result, ScenarioResult) + assert "baseline" in result.attack_results + assert len(result.attack_results["baseline"]) == 1 + + @pytest.mark.asyncio + async def test_empty_strategies_without_baseline_allows_initialization(self, mock_objective_target): + """Test that empty strategies without include_baseline allows initialization but fails at run time.""" + scenario = ConcreteScenario( + name="No Baseline Test", + version=1, + include_default_baseline=False, # No baseline + ) + + mock_dataset_config = MagicMock(spec=DatasetConfiguration) + + # Empty strategies are now always allowed during initialization + # (no allow_empty parameter required) + await scenario.initialize_async( + objective_target=mock_objective_target, + scenario_strategies=[], # Empty list without baseline + dataset_config=mock_dataset_config, + ) + + # But running should fail because there are no atomic attacks + with pytest.raises(ValueError, match="Cannot run scenario with no atomic attacks"): + await scenario.run_async() + + @pytest.mark.asyncio + async def test_standalone_baseline_uses_dataset_config_seeds(self, mock_objective_target): + """Test that standalone baseline uses seed groups from dataset_config.""" + from pyrit.models import SeedAttackGroup, SeedObjective + + scenario = ConcreteScenarioWithTrueFalseScorer( + name="Baseline Seeds Test", + version=1, + include_default_baseline=True, + ) + + # Create specific seed groups to verify they're used + expected_seeds = [ + SeedAttackGroup(seeds=[SeedObjective(value="objective_a")]), + SeedAttackGroup(seeds=[SeedObjective(value="objective_b")]), + SeedAttackGroup(seeds=[SeedObjective(value="objective_c")]), + ] + + mock_dataset_config = MagicMock(spec=DatasetConfiguration) + mock_dataset_config.get_all_seed_attack_groups.return_value = expected_seeds + + await scenario.initialize_async( + objective_target=mock_objective_target, + scenario_strategies=[], + dataset_config=mock_dataset_config, + ) + + # Verify the baseline attack has the expected seed groups + baseline_attack = scenario._atomic_attacks[0] + assert baseline_attack.atomic_attack_name == "baseline" + assert baseline_attack.seed_groups == expected_seeds