diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 8e75f096..0eb87705 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -1,7 +1,7 @@ # This workflow installs required Python dependencies and then runs the available tests. # For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python -name: Run Acceptance and Unit tests +name: Run Acceptance and Unit tests (with and without visualisation dependencies) on: pull_request: @@ -21,10 +21,18 @@ jobs: uses: actions/setup-python@v3 with: python-version: "3.10" # Only the oldest supported Python version is included here - - name: Install dependencies - run: | - python -m pip install --upgrade pip # upgrade pip to latest version - pip install . # install pyproject.toml dependencies - excludes optional dependencies (such as visualisation) - - name: Run tests + + - name: Install dependencies (without extra visualisation dependencies) run: | - python run_tests.py + python -m pip install --upgrade pip # upgrade pip to latest version + pip install . # no additional [visualisation] dependencies + + - name: Run tests (without visualisation dependencies) + run: python run_tests.py + + - name: Install extra visualisation dependencies + run: pip install ".[visualisation]" # extra [visualisation] dependencies in pyproject.toml + + - name: Run Tests (with visualisation dependencies) + run: python run_tests.py + diff --git a/README.md b/README.md index 6c3e9309..89594176 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ The recommended installation method is using [pip](http://pip-installer.org) After installation include `robotmbt` as library in your robot file to get access to the new functionality. To run your test suite model-based, use the __Treat this test suite model-based__ keyword as suite setup. Check the _How to model_ section to learn how to make your scenarios suitable for running model-based. -``` +```robotframework *** Settings *** Library robotmbt Suite Setup Treat this test suite model-based @@ -50,7 +50,8 @@ Modelling can be done directly from [Robot framework](https://robotframework.org Consider these two scenarios: -``` +```robotframework +*** Test Cases *** Buying a postcard When you buy a new postcard then you have a blank postcard @@ -63,7 +64,8 @@ Preparing for a birthday party Mapping the dependencies between scenarios is done by annotating the steps with modelling info. Modelling info is added to the documentation of the step as shown below. Regular documentation can still be added, as long as `*model info*` starts on a new line and a white line is included after the last `:OUT:` expression. -``` +```robotframework +*** Keywords *** you buy a new postcard [Documentation] *model info* ... :IN: None @@ -117,7 +119,8 @@ All example scenarios naturally contain data. This information is embedded in th #### Step argument modifiers -``` +```robotframework +*** Test Cases *** Personalising a birthday card Given there is a birthday card when Johan writes their name on the birthday card @@ -126,7 +129,8 @@ Personalising a birthday card The above scenario uses the name `Johan` to create a concrete example. But now suppose that from a testing perspective `Johan` and `Frederique` are part of the same equivalence class. Then the step `Frederique writes their name on the birthday card` would yield an equally valid scenario. This can be achieved by adding a modifier (`:MOD:`) to the model info of the step. The format of a modifier is a Robot argument to which you assign a list of options. The modifier updates the argument value to a randomly chosen value from the specified options. -``` +```robotframework +*** Keywords *** ${person} writes their name on the birthday card [Documentation] *model info* ... :MOD: ${person}= [Johan, Frederique] @@ -138,7 +142,8 @@ ${person} writes their name on the birthday card When constructing examples, they often express relations between multiple actors, where each actor can appear in multiple steps. This makes it important to know how modifiers behave when there are multiple modifiers in a scenario. -``` +```robotframework +*** Test Cases *** Addressing a birthday card Given Tannaz is having their birthday and Johan has a birthday card @@ -148,7 +153,8 @@ Addressing a birthday card Have a look at the when-step above. We will assume the model already contains a domain term with two properties: `birthday.celebrant = Tannaz` and `birthday.guests = [Johan, Frederique]`. -``` +```robotframework +*** Keywords *** ${sender} writes the address of ${receiver} on the birthday card [Documentation] *model info* ... :MOD: ${sender}= birthday.guests @@ -175,10 +181,12 @@ It is not possible to add new options to an existing example value. Any constrai It is possible for a step to keep the same options. The special `.*` notation lets you keep the available options as-is. Preceding steps must then supply the possible options. Some steps can, or must, deal with multiple independent sets of options that must not be mixed, because the expected results should differ. Suppose you have a set of valid and invalid passwords. You might be reluctant to include the superset of these as options to an authentication step. Instead, you can use `:MOD: ${password}= .*` as the modifier for that step. Like in the when-step for this scenario: -``` -Given 'secret' is too weak a password -When user tries to update their password to 'secret' -then the password is rejected +```robotframework +*** Test Cases *** +Reject password + Given 'secret' is too weak a password + When user tries to update their password to 'secret' + then the password is rejected ``` In a then-step, modifiers behave slightly different. In then-steps no new option constraints are accepted for an argument. Its value must already have been determined during the given- and when-steps. In other words, regardless of the actual modifier, the expression behaves as if it were `.*`. The exception to this is when a then-step signals the first use of a new example value. In that case the argument value from the original scenario text is used. @@ -193,12 +201,47 @@ For now, variable data considers strict equivalence classes only. This means tha By default, trace generation is random. The random seed used for the trace is logged by _Treat this test suite model-based_. This seed can be used to rerun the same trace, if no external random factors influence the test run. To activate the seed, pass it as argument: -``` +```robotframework Treat this test suite model-based seed=eag-etou-cxi-leamv-jsi ``` Using `seed=new` will force generation of a new reusable seed and is identical to omitting the seed argument. To completely bypass seed generation and use the system's random source, use `seed=None`. This has even more variation but does not produce a reusable seed. +### Graphs + +A graph can be included in the log file to visualise how scenarios are linked. This helps in understanding a test suite's structure and reveals alternative paths that did not make it into the final trace. + +To enable graph generation, some extra dependencies must be installed: `pip install robotframework-mbt[visualisation]` + +Generate the graph by setting the graph style for the model-based suite. The graph will be included in the Robot log file as part of the keyword's logging. + +```robotframework +Treat this test suite Model-based graph=scenario +``` + +Available graph styles: + +* scenario + * Compact view: Each scenario is shown as one node. +* scenario-delta-value + * Expanded view: Scenarios can become multiple nodes if they affect system state in different ways. + +#### Exporting and importing graph data + +Graph data can be stored by setting an output directory using argument `export_graph_data`. This createss a json file, named after the test suite, in the selected folder. Any accessable path can be used. For your convenience, Robot Framework offers an [automatic variable](https://robotframework.org/robotframework/latest/RobotFrameworkUserGuide.html#automatic-variables) `${OUTPUT_DIR}` that points to this run's output directory. The `export_graph_data` argument can be used independent of the `graph` argument. + +```robotframework +Treat this test suite Model-based export_graph_data=${OUTPUT_DIR} +``` + +To recreate a graph from previously exported graph data, use: + +```robotframework +Show model graph from exported file json_file_path= graph_style=scenario +``` + +This will draw a graph from the exported file, without the need to rerun the test suite. It is possible to select a different graph style than was used during the test run. If no graph style is selected, then the scenario graph style is used. + ### Option management If you want to set configuration options for use in multiple test suites without having to repeat them, the keywords __Set model-based options__ and __Update model-based options__ can be used to configure RobotMBT library options. _Set_ takes the provided options and discards any previously set options. _Update_ allows you to modify existing options or add new ones. Reset all options by calling _Set_ without arguments. Direct options provided to __Treat this test suite model-based__ take precedence over library options and affect only the current test suite. diff --git a/atest/robotMBT tests/10__visualisation/01__scenario_graph/__init__.robot b/atest/robotMBT tests/10__visualisation/01__scenario_graph/__init__.robot new file mode 100644 index 00000000..3b161c11 --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/01__scenario_graph/__init__.robot @@ -0,0 +1,3 @@ +*** Settings *** +Suite Setup Clear prior exports ${OUTPUT_DIR}${/}run_model_with_graph.json +Library ../graph_checker.py diff --git a/atest/robotMBT tests/10__visualisation/01__scenario_graph/run_model_with_graph.robot b/atest/robotMBT tests/10__visualisation/01__scenario_graph/run_model_with_graph.robot new file mode 100644 index 00000000..10c438c4 --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/01__scenario_graph/run_model_with_graph.robot @@ -0,0 +1,29 @@ +*** Settings *** +Documentation This suite runs model-based with the graphing option enabled. The checks for the contents +... of the graph are delegated to the second robot file in the same folder. Export/import +... functionality is used to gain access to the graph contents. +Suite Setup Treat this test suite Model-based graph=scenario export_graph_data=${OUTPUT_DIR} +Resource ../../../resources/birthday_cards_data_variation.resource +Library robotmbt + +*** Test Cases *** +Buying a card + Given Jonathan is having their birthday + and Douwe is a friend of Jonathan + and Diogo is a friend of Jonathan + and Tycho is a friend of Jonathan + and Thomas is a friend of Jonathan + When Douwe buys a birthday card + then there is a blank birthday card available + +Someone writes their name on the card + Given there is a birthday card + and Diogo's name is not yet on the birthday card + when Diogo writes their name on the birthday card + then the birthday card has 'Diogo' written on it + +At least 3 people can write their name on the card + Given the birthday card has 2 different names written on it + and Tycho's name is not yet on the birthday card + when Tycho writes their name on the birthday card + then the birthday card has 3 different names written on it diff --git a/atest/robotMBT tests/10__visualisation/01__scenario_graph/verify_model_graph.robot b/atest/robotMBT tests/10__visualisation/01__scenario_graph/verify_model_graph.robot new file mode 100644 index 00000000..dca6d27b --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/01__scenario_graph/verify_model_graph.robot @@ -0,0 +1,33 @@ +*** Settings *** +Documentation This suite takes the graph generated in the first suite in this folder +... and checks the content's properties. +Library ../graph_checker.py +Library Collections +Suite Setup Import graph data from ${OUTPUT_DIR}${/}run_model_with_graph.json + +*** Test Cases *** +Scenarios are nodes in the graph + VAR @{expected_nodes} start + ... Buying a card + ... Someone writes their name on the card + ... At least 3 people can write their name on the card + ${node_count}= Number of graph nodes + Should be equal ${node_count} ${4} + @{all_nodes}= List of node titles + Lists should be equal ${all_nodes} ${expected_nodes} ignore_order=True + +Dependent nodes are connected by directed edges + @{successors}= All successors to node Buying a card + VAR @{next_node} Someone writes their name on the card + Should be equal ${successors} ${next_node} + @{successors}= All successors to node Someone writes their name on the card + Should not contain ${successors} Buying a card + +The dependency order for nodes is top-down + ${first_pos}= Vertical position of node Buying a card + ${second_pos}= Vertical position of node Someone writes their name on the card + Should be true ${second_pos} < ${first_pos} + +Repeating scenarios have a self-looping edge + @{successors}= All successors to node Someone writes their name on the card + Should contain ${successors} Someone writes their name on the card diff --git a/atest/robotMBT tests/10__visualisation/02__scenario-delta-value_graph/__init__.robot b/atest/robotMBT tests/10__visualisation/02__scenario-delta-value_graph/__init__.robot new file mode 100644 index 00000000..3b161c11 --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/02__scenario-delta-value_graph/__init__.robot @@ -0,0 +1,3 @@ +*** Settings *** +Suite Setup Clear prior exports ${OUTPUT_DIR}${/}run_model_with_graph.json +Library ../graph_checker.py diff --git a/atest/robotMBT tests/10__visualisation/02__scenario-delta-value_graph/run_model_with_graph.robot b/atest/robotMBT tests/10__visualisation/02__scenario-delta-value_graph/run_model_with_graph.robot new file mode 100644 index 00000000..9cb0b3eb --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/02__scenario-delta-value_graph/run_model_with_graph.robot @@ -0,0 +1,29 @@ +*** Settings *** +Documentation This suite runs model-based with the graphing option enabled. The checks for the contents +... of the graph are delegated to the second robot file in the same folder. Export/import +... functionality is used to gain access to the graph contents. +Suite Setup Treat this test suite Model-based graph=scenario-delta-value export_graph_data=${OUTPUT_DIR} +Resource ../../../resources/birthday_cards_data_variation.resource +Library robotmbt + +*** Test Cases *** +Buying a card + Given Jonathan is having their birthday + and Douwe is a friend of Jonathan + and Diogo is a friend of Jonathan + and Tycho is a friend of Jonathan + and Thomas is a friend of Jonathan + When Douwe buys a birthday card + then there is a blank birthday card available + +Someone writes their name on the card + Given there is a birthday card + and Diogo's name is not yet on the birthday card + when Diogo writes their name on the birthday card + then the birthday card has 'Diogo' written on it + +At least 3 people can write their name on the card + Given the birthday card has 2 different names written on it + and Tycho's name is not yet on the birthday card + when Tycho writes their name on the birthday card + then the birthday card has 3 different names written on it diff --git a/atest/robotMBT tests/10__visualisation/02__scenario-delta-value_graph/verify_model_graph.robot b/atest/robotMBT tests/10__visualisation/02__scenario-delta-value_graph/verify_model_graph.robot new file mode 100644 index 00000000..fcb351b8 --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/02__scenario-delta-value_graph/verify_model_graph.robot @@ -0,0 +1,36 @@ +*** Settings *** +Documentation This suite takes the graph generated in the first suite in this folder +... and checks how the content's properties for the scenario-delta-value +... graph are different compared to the scenario graph. +Library ../graph_checker.py +Library Collections +Suite Setup Import graph data from ${OUTPUT_DIR}${/}run_model_with_graph.json graph_type=scenario-delta-value + +*** Test Cases *** +Repeated scenario with different states become separate nodes + [Documentation] In the standard scenario graph, the repeated scenario would show up + ... as a self-looping edge. In the scenario-delta-value variant, also + ... state changes are taken into account, meaning that the two variants + ... will each get their own node. + ${node_count}= Number of graph nodes + Should be true ${node_count} > ${4} + @{all_nodes}= List of node titles + Should contain ${all_nodes} Someone writes their name on the card + Should contain ${all_nodes} Someone writes their name on the card (rep 2) + +Full node text contains state info + [Documentation] Next to the scenario name, the node text for scenario-delta-value graphs + ... also contians information about the model state. 'Buying a card' + ... initialises the modeland contains all properties in their initial state. + ... 'host' and 'names' are properties being tracked by the model. + ${full_text}= Full node text of node Buying a card + Should contain ${full_text} host + Should contain ${full_text} names + +Full node text only contains changed information + [Documentation] When someone writes their name on the card, then the host or celebrant + ... does not change. The node for this scenario should only show the state + ... that changed during this scenario. + ${full_text}= Full node text of node Someone writes their name on the card (rep 2) + Should contain ${full_text} names + Should not contain ${full_text} host diff --git a/atest/robotMBT tests/10__visualisation/03__verify_importing_graphs.robot b/atest/robotMBT tests/10__visualisation/03__verify_importing_graphs.robot new file mode 100644 index 00000000..3750bfdd --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/03__verify_importing_graphs.robot @@ -0,0 +1,39 @@ +*** Settings *** +Documentation This suite takes the graph generated earlier in the visualisation suite +... and uses it to check import functionality. +Library robotmbt +Library graph_checker.py +Library Collections + +*** Variables *** +${prior_export} ${OUTPUT_DIR}${/}run_model_with_graph.json + +*** Test Cases *** +Import as any graph type + [Documentation] The data that is stored during export is always the compelte data set. This + ... has the advantage that different graph types can be reconstructed from the + ... data without rerunning the model. + Import graph data from ${prior_export} graph_type=scenario + Show model graph from exported file ${prior_export} graph_style=scenario + ${node_count}= Number of graph nodes + Should be equal ${node_count} ${4} + Import graph data from ${prior_export} graph_type=scenario-delta-value + Show model graph from exported file ${prior_export} graph_style=scenario-delta-value + ${node_count}= Number of graph nodes + Should be true ${node_count} > ${4} + +Future major version bump imports are rejected + ${future_file}= Modify export file with future major version number ${prior_export} + Run keyword and expect error *incompatible RobotMBT version + ... Import graph data from ${future_file} + +Future minor verion bump imports are accepted + ${future_file}= Modify export file with future minor version number ${prior_export} + Import graph data from ${future_file} + ${node_count}= Number of graph nodes + Should be equal ${node_count} ${4} + +Ill-formed files are rejected for import + ${corrupted_file}= Corrupt export file ${prior_export} + Run keyword and expect error *could not be loaded as RobotMBT graph data + ... Import graph data from ${corrupted_file} diff --git a/atest/robotMBT tests/10__visualisation/__init__.robot b/atest/robotMBT tests/10__visualisation/__init__.robot new file mode 100644 index 00000000..ac5d9fd4 --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/__init__.robot @@ -0,0 +1,8 @@ +*** Settings *** +Suite Setup Skip if optional visualisation is not installed +Library graph_checker.py + +*** Keywords *** +Skip if optional visualisation is not installed + ${partial_installation}= Graphing dependencies missing + Skip If ${partial_installation} Visualisation dependencies not installed. Please read the README for information on how to do this. diff --git a/atest/robotMBT tests/10__visualisation/graph_checker.py b/atest/robotMBT tests/10__visualisation/graph_checker.py new file mode 100644 index 00000000..cb5c3160 --- /dev/null +++ b/atest/robotMBT tests/10__visualisation/graph_checker.py @@ -0,0 +1,110 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import os +from typing import Literal +from robot.api.deco import library, keyword +try: + from robotmbt.visualise.visualiser import Visualiser + from robotmbt.visualise.networkvisualiser import NetworkVisualiser +except ImportError: + Visualiser = None + + +@library(scope='SUITE', auto_keywords=True) +class GraphChecker: + def graphing_dependencies_missing(self) -> bool: + return Visualiser is None + + def clear_prior_exports(self, file_path): + if os.path.exists(file_path): + os.remove(file_path) + if os.path.exists(file_path): + raise OSError(f"Removing previous export file '{file_path}' failed") + + def import_graph_data_from(self, file_path: str, graph_type: str = 'scenario'): + self.visualiser = Visualiser() + self.visualiser.load_from_file(file_path) + self.graph = self.visualiser._get_graph(graph_type) + self.nxgraph = self.graph.networkx + + @property + def to_node(self) -> dict[str, str]: + """scenario name to node id converter""" + node_dict = {self._label_to_scenario(node['label']): id for id, node in self.nxgraph.nodes().items()} + if len(node_dict) < self.number_of_graph_nodes(): + print("Warning: Repeated scenario names present in graph") + return node_dict + + @property + def from_node(self) -> dict[str, str]: + """node id to scenario name converter""" + return {id: self._label_to_scenario(node['label']) for id, node in self.nxgraph.nodes().items()} + + def number_of_graph_nodes(self) -> int: + return len(self.nxgraph.nodes()) + + def list_of_node_titles(self) -> list[str]: + return list(self.to_node) + + def full_node_text_of_node(self, node_title: str) -> str: + node = self.to_node[node_title] + return self.nxgraph.nodes()[node]['label'] + + def all_successors_to_node(self, node_title: str) -> list[str]: + return [self.from_node[node] for node in self.nxgraph.successors(self.to_node[node_title])] + + def vertical_position_of_node(self, node_title: str) -> int: + nxvisual = NetworkVisualiser(self.graph, 'dummy') + return nxvisual.node_dict[self.to_node[node_title]].y + + @keyword("Modify export file with future ${bump} version number") + def modify_export_file_with_future_version_number(self, bump: Literal['major', 'minor'], file_path: str) -> str: + with open(file_path, "r") as f: + contents = f.read() + new_file = file_path.replace('.json', '_future.json') + new_version = '2.0.0' if bump == 'major' else '1.1.1' + with open(new_file, "w") as f: + f.write(contents.replace('"ROBOTMBT_MODEL_VERSION": "1.0.0"', f'"ROBOTMBT_MODEL_VERSION": "{new_version}"')) + return new_file + + def corrupt_export_file(self, file_path: str) -> str: + with open(file_path, "r") as f: + contents = f.read() + new_file = file_path.replace('.json', '_corrupt.json') + with open(new_file, "w") as f: + f.write(contents) + f.seek(0) + f.write('corrupted-'*20) + return new_file + + @staticmethod + def _label_to_scenario(labeltext: str) -> str: + return labeltext.split("\n\n")[0].replace('\n', ' ') diff --git a/pyproject.toml b/pyproject.toml index dbd356de..264b66e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,8 +23,11 @@ dependencies = [ ] requires-python = ">=3.10" -[tool.setuptools] -packages = ["robotmbt"] +[tool.setuptools.packages.find] +include = ["robotmbt*"] [project.urls] Homepage = "https://github.com/JFoederer/robotframeworkMBT" + +[project.optional-dependencies] +visualisation = ["networkx", "bokeh", "grandalf", "jsonpickle"] diff --git a/robotmbt/modelspace.py b/robotmbt/modelspace.py index b3238670..e5cac2a9 100644 --- a/robotmbt/modelspace.py +++ b/robotmbt/modelspace.py @@ -215,3 +215,17 @@ def __iter__(self): def __bool__(self): return any(True for _ in self) + + def __eq__(self, other): + self_set = set([(attr, getattr(self, attr)) for attr in dir(self._outer_scope) + dir(self) + if not attr.startswith('__') and attr != '_outer_scope']) + other_set = set([(attr, getattr(other, attr)) for attr in dir(other._outer_scope) + dir(other) + if not attr.startswith('__') and attr != '_outer_scope']) + return self_set == other_set + + def __str__(self): + res = "{" + for k, v in self: + res += f"{k}={v}, " + res += "}" + return res diff --git a/robotmbt/steparguments.py b/robotmbt/steparguments.py index d7be91c1..e9b78daa 100644 --- a/robotmbt/steparguments.py +++ b/robotmbt/steparguments.py @@ -40,7 +40,7 @@ class StepArguments(list): def __init__(self, iterable=[]): super().__init__(item.copy() for item in iterable) - def fill_in_args(self, text: str, as_code: bool = False): + def fill_in_args(self, text: str, as_code: bool = False) -> str: result = text for arg in self: sub = arg.codestring if as_code else str(arg.value) diff --git a/robotmbt/substitutionmap.py b/robotmbt/substitutionmap.py index 46c47c12..961bbbc3 100644 --- a/robotmbt/substitutionmap.py +++ b/robotmbt/substitutionmap.py @@ -66,7 +66,7 @@ def substitute(self, example_value: str, constraint: list[Any]): def solve(self) -> dict[str, str]: self.solution = {} - solution = dict() + solution: dict[str, str] = dict() substitutions = self.copy().substitutions unsolved_subs = list(substitutions) subs_stack = [] @@ -117,12 +117,13 @@ def __init__(self, constraint: list[Any]): try: # Keep the items in optionset unique. Refrain from using Python sets # due to non-deterministic behaviour when using random seeding. - self.optionset: list[Any] = list(dict.fromkeys(constraint)) + self.optionset: list[Any] | None = list(dict.fromkeys(constraint)) except TypeError: - self.optionset = None + self.optionset: list[Any] | None = None if not self.optionset or isinstance(constraint, str): raise ValueError(f"Invalid option set for initial constraint: {constraint}") - self.removed_stack = [] + + self.removed_stack: list[str | Placeholder] = [] def __repr__(self): return f'Constraint([{", ".join([str(e) for e in self.optionset])}])' diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index ebed36dd..f5b7e67d 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -32,18 +32,24 @@ import copy import random -from typing import Any from robot.api import logger +from robot.errors import TimeoutExceeded from . import modeller from .modelspace import ModelSpace from .suitedata import Suite, Scenario from .tracestate import TraceState +try: + from .visualise.visualiser import Visualiser +except ImportError: + Visualiser = None + class SuiteProcessors: - def echo(self, in_suite: Suite) -> Suite: + @staticmethod + def echo(in_suite: Suite) -> Suite: return in_suite def flatten(self, in_suite: Suite) -> Suite: @@ -74,13 +80,14 @@ def flatten(self, in_suite: Suite) -> Suite: out_suite.suites = [] return out_suite - def process_test_suite(self, in_suite: Suite, *, seed: str | int | bytes | bytearray = 'new') -> Suite: + def process_test_suite(self, in_suite: Suite, *, seed: str | int | bytes | bytearray = 'new', + graph: str = '', export_graph_data: str = '') -> Suite: + visualiser = self._init_visualiser(in_suite.name) if graph or export_graph_data else None self.out_suite = Suite(in_suite.name) self.out_suite.filename = in_suite.filename self.out_suite.parent = in_suite.parent self._fail_on_step_errors(in_suite) self.flat_suite = self.flatten(in_suite) - for id, scenario in enumerate(self.flat_suite.scenarios, start=1): scenario.src_id = id self.scenarios: list[Scenario] = self.flat_suite.scenarios[:] @@ -92,19 +99,30 @@ def process_test_suite(self, in_suite: Suite, *, seed: str | int | bytes | bytea random.shuffle(self.shuffled) # Keep a single shuffle for all TraceStates (non-essential) # a short trace without the need for repeating scenarios is preferred - tracestate = self._try_to_reach_full_coverage(allow_duplicate_scenarios=False) - + tracestate = self._try_to_reach_full_coverage(allow_duplicate_scenarios=False, visualiser=visualiser) if not tracestate.coverage_reached(): logger.debug("Direct trace not available. Allowing repetition of scenarios") - tracestate = self._try_to_reach_full_coverage(allow_duplicate_scenarios=True) - if not tracestate.coverage_reached(): - raise Exception("Unable to compose a consistent suite") + tracestate = self._try_to_reach_full_coverage(allow_duplicate_scenarios=True, visualiser=visualiser) + + if graph: + self._write_visualisation(visualiser, graph) + if export_graph_data: + self._export_graph_data(visualiser, export_graph_data) + if not tracestate.coverage_reached(): + raise Exception("Unable to compose a consistent suite") - self.out_suite.scenarios = tracestate.get_trace() self._report_tracestate_wrapup(tracestate) + self.out_suite.scenarios = tracestate.get_trace() return self.out_suite - def _try_to_reach_full_coverage(self, allow_duplicate_scenarios: bool) -> TraceState: + def draw_graph_from_export_file(self, file_path: str, graph_style: str): + if visualiser := self._init_visualiser(): + visualiser.load_from_file(file_path) + logger.info(visualiser.generate_visualisation(graph_style), html=True) + else: + logger.info(f'Visualisation disabled due to initialisation failure.') + + def _try_to_reach_full_coverage(self, allow_duplicate_scenarios: bool, visualiser: Visualiser = None) -> TraceState: tracestate = TraceState(self.shuffled) while not tracestate.coverage_reached(): candidate_id = tracestate.next_candidate(retry=allow_duplicate_scenarios) @@ -118,9 +136,11 @@ def _try_to_reach_full_coverage(self, allow_duplicate_scenarios: bool) -> TraceS candidate = self._select_scenario_variant(candidate_id, tracestate) if not candidate: # No valid variant available in the current state tracestate.reject_scenario(candidate_id) + self._update_visualisation(visualiser, tracestate) continue previous_len = len(tracestate) modeller.try_to_fit_in_scenario(candidate, tracestate) + self._update_visualisation(visualiser, tracestate) self._report_tracestate_to_user(tracestate) if len(tracestate) > previous_len: logger.debug(f"last state:\n{tracestate.model.get_status_text()}") @@ -134,6 +154,8 @@ def _try_to_reach_full_coverage(self, allow_duplicate_scenarios: bool) -> TraceS modeller.rewind(tracestate, drought_recovery=True) self._report_tracestate_to_user(tracestate) logger.debug(f"last state:\n{tracestate.model.get_status_text()}") + self._update_visualisation(visualiser, tracestate) + self._update_visualisation(visualiser, tracestate) return tracestate @staticmethod @@ -164,6 +186,7 @@ def _scenario_with_repeat_counter(self, index: int, tracestate: TraceState) -> S @staticmethod def _fail_on_step_errors(suite: Suite): error_list = suite.steps_with_errors() + if error_list: err_msg = "Steps with errors in their model info found:\n" err_msg += '\n'.join([f"{s.keyword} [{s.model_info['error']}] used in {s.parent.name}" @@ -183,12 +206,12 @@ def _report_tracestate_wrapup(tracestate: TraceState): logger.debug(f"model\n{progression.model.get_status_text()}\n") @staticmethod - def _init_randomiser(seed: str | int | bytes | bytearray): + def _init_randomiser(seed: str | int | bytes | bytearray | None): if isinstance(seed, str): seed = seed.strip() if str(seed).lower() == 'none': logger.info( - f"Using system's random seed for trace generation. This trace cannot be rerun. Use `seed=new` to generate a reusable seed.") + "Using system's random seed for trace generation. This trace cannot be rerun. Use `seed=new` to generate a reusable seed.") elif str(seed).lower() == 'new': new_seed = SuiteProcessors._generate_seed() logger.info(f"seed={new_seed} (use seed to rerun this trace)") @@ -220,3 +243,47 @@ def _generate_seed() -> str: words.append(string) seed = '-'.join(words) return seed + + @staticmethod + def _init_visualiser(name: str = '') -> Visualiser: + global Visualiser + if Visualiser is None: + Visualiser = False + logger.warn(f'Visualisation requested, but required dependencies are not installed. ' + 'Refer to the README on how to install these dependencies. ') + return Visualiser(name) if Visualiser else None + + @staticmethod + def _update_visualisation(visualiser, tracestate: TraceState): + if visualiser: + try: + visualiser.update_trace(tracestate) + except TimeoutExceeded: + raise + except Exception as e: + logger.debug(f'Could not update visualisation due to error!\n{e}') + + @staticmethod + def _write_visualisation(visualiser, graph_style: str): + if visualiser: + try: + text = visualiser.generate_visualisation(graph_style) + logger.info(text, html=True) + except TimeoutExceeded: + raise + except Exception as e: + logger.debug(f'Could not generate visualisation due to error!\n{e}') + else: + logger.info("Graph skipped due to prior failure") + + @staticmethod + def _export_graph_data(visualiser, export_dir): + if visualiser: + try: + file_name = visualiser.export_to_file(export_dir) + logger.info(f"Graph data stored in file: {file_name}") + except TimeoutExceeded: + raise + except Exception as e: + logger.info("Could not export visualisation due to failure.") + logger.debug(f"Export error:\n{e}") diff --git a/robotmbt/suitereplacer.py b/robotmbt/suitereplacer.py index 7163a5cd..0c9daacb 100644 --- a/robotmbt/suitereplacer.py +++ b/robotmbt/suitereplacer.py @@ -30,14 +30,18 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from collections.abc import Callable +from typing import Any + import robot.model import robot.running.model as rmodel -from .suitedata import Suite, Scenario, Step -from .suiteprocessors import SuiteProcessors from robot.api import logger from robot.api.deco import library, keyword -from typing import Any, Literal from robot.libraries.BuiltIn import BuiltIn + +from .suitedata import Suite, Scenario, Step +from .suiteprocessors import SuiteProcessors + Robot = BuiltIn() @@ -49,7 +53,7 @@ def __init__(self, processor: str = 'process_test_suite', processor_lib: str | N self.processor_lib_name: str | None = processor_lib self.processor_name: str = processor self._processor_lib: SuiteProcessors | None | object = None - self._processor_method: Any = None + self._processor_method: Callable[..., Suite] | None = None self.processor_options: dict[str, Any] = {} @property @@ -109,6 +113,17 @@ def update_model_based_options(self, **kwargs): """ self.processor_options.update(kwargs) + @keyword("Show model graph from exported file") + def show_graph(self, json_file_path: str, graph_style: str = 'scenario'): + """ + If your previously ran `Treat this test suite Model-based` with the option to export + graph data to file, then this keyword can be used to directly draw a graph from the + exported file, without the need to rerun the test suite. It is possible to select a + different graph style than was used during the test run. If no graph style is selected, + then the scenario graph style is used. + """ + SuiteProcessors().draw_graph_from_export_file(json_file_path, graph_style) + def __process_robot_suite(self, in_suite: robot.model.TestSuite, parent: Suite | None) -> Suite: out_suite = Suite(in_suite.name, parent) out_suite.filename = in_suite.source diff --git a/robotmbt/visualise/__init__.py b/robotmbt/visualise/__init__.py new file mode 100644 index 00000000..c7ec90cd --- /dev/null +++ b/robotmbt/visualise/__init__.py @@ -0,0 +1,29 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/robotmbt/visualise/graphs/README.md b/robotmbt/visualise/graphs/README.md new file mode 100644 index 00000000..dd0b1e94 --- /dev/null +++ b/robotmbt/visualise/graphs/README.md @@ -0,0 +1,74 @@ +# How to: Creating new graphs + +Extending the functionality of the visualiser with new graph types can result in better insights into created tests. The visualiser makes use of an abstract graph class that makes it easy to create new graph types. + +To create a new graph type, create an instance of `robotmbt/visualise/graphs/AbstractGraph`, instantiating the abstract methods. Please place the graph under `robotmbt/visualise/graphs/`. + +**NOTE**: when manually altering the `networkx` field, ensure its IDs remain as a serializable and hashable type when the constructor finishes. + +As an example, we show the implementation of the scenario graph below. In this graph type, nodes represent scenarios encountered in exploration, and edges show the flow between these scenarios. +It does not enable tooltips. + +```python +class ScenarioGraph(AbstractGraph[ScenarioInfo, None]): + @staticmethod + def select_node_info(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> ScenarioInfo: + return trace[index][0] + + @staticmethod + def select_edge_info(pair: tuple[ScenarioInfo, StateInfo]) -> None: + return None + + @staticmethod + def create_node_description(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> str: + return '' + + @staticmethod + def create_node_label(info: ScenarioInfo) -> str: + return info.name + + @staticmethod + def create_edge_label(info: None) -> str: + return '' + + @staticmethod + def get_legend_info_final_trace_node() -> str: + return "Scenario (part of trace)" + + @staticmethod + def get_legend_info_other_node() -> str: + return "Scenario (not in trace)" + + @staticmethod + def get_legend_info_final_trace_edge() -> str: + return "path included in trace" + + @staticmethod + def get_legend_info_other_edge() -> str: + return "not selected for trace" + + @staticmethod + def get_tooltip_name() -> str: + return "" +``` + +Once you have created a new Graph class, you can direct the visualiser to use it when your type is selected. +Simply add your class to the `GRAPHS` dictionary in `robotmbt/visualise/visualiser.py` like the others. For our example: + +```python +GRAPHS = { + ... + 'scenario': ScenarioGraph, + ... +} +``` + +Now, when selecting your graph type (in our example `Treat this test suite Model-based graph=scenario`), your graph will get constructed! + +**NOTE**: You need the optional dependencies for visualisation to create graphs, e.g.: + +```bash +pip install .[visualisation] +``` + +**TIP**: [Python virtual environments](https://docs.python.org/3/library/venv.html) are an easy way to test with different Python version and sets of dependencies. diff --git a/robotmbt/visualise/graphs/__init__.py b/robotmbt/visualise/graphs/__init__.py new file mode 100644 index 00000000..c7ec90cd --- /dev/null +++ b/robotmbt/visualise/graphs/__init__.py @@ -0,0 +1,29 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/robotmbt/visualise/graphs/abstractgraph.py b/robotmbt/visualise/graphs/abstractgraph.py new file mode 100644 index 00000000..4214d0db --- /dev/null +++ b/robotmbt/visualise/graphs/abstractgraph.py @@ -0,0 +1,186 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from robotmbt.visualise.models import TraceInfo, ScenarioInfo, StateInfo +import networkx as nx + + +NodeInfo = TypeVar('NodeInfo') +EdgeInfo = TypeVar('EdgeInfo') + + +class AbstractGraph(ABC, Generic[NodeInfo, EdgeInfo]): + def __init__(self, info: TraceInfo): + """ + Note that networkx's ids have to be of a serializable and hashable type after construction. + """ + # The underlying storage - a NetworkX DiGraph + self.networkx: nx.DiGraph = nx.DiGraph() + + # Keep track of node IDs + self.ids: dict[str, tuple[NodeInfo, str]] = {} + + # Add the start node + self.networkx.add_node('start', label='start', description='') + self.start_node = 'start' + + # Add nodes and edges for all traces + for trace in info.all_traces: + for i in range(len(trace)): + if i > 0: + from_node = self._get_or_create_id( + self.select_node_info(trace, i - 1), + self.create_node_description(trace, i - 1)) + else: + from_node = 'start' + to_node = self._get_or_create_id( + self.select_node_info(trace, i), + self.create_node_description(trace, i)) + self._add_node(from_node) + self._add_node(to_node) + self._add_edge(from_node, to_node, + self.create_edge_label(self.select_edge_info(trace[i]))) + + # Set the final trace and add any missing nodes/edges + self.final_trace = ['start'] + for i in range(len(info.current_trace)): + if i > 0: + from_node = self._get_or_create_id( + self.select_node_info(info.current_trace, i - 1), + self.create_node_description(info.current_trace, i - 1)) + else: + from_node = 'start' + to_node = self._get_or_create_id( + self.select_node_info(info.current_trace, i), + self.create_node_description(info.current_trace, i)) + self.final_trace.append(to_node) + self._add_node(from_node) + self._add_node(to_node) + self._add_edge(from_node, to_node, + self.create_edge_label(self.select_edge_info(info.current_trace[i]))) + + def get_final_trace(self) -> list[str]: + """ + Get the final trace as ordered node ids. + Edges are subsequent entries in the list. + """ + return self.final_trace + + def _get_or_create_id(self, info: NodeInfo, description: str) -> str: + """ + Get the ID for a state that has been added before, or create and store a new one. + """ + for i in self.ids.keys(): + if self.ids[i][0] == info: + return i + + new_id = f"node{len(self.ids)}" + self.ids[new_id] = info, description + return new_id + + def _add_node(self, node: str): + """ + Add node if it doesn't already exist. + """ + if node not in self.networkx.nodes: + self.networkx.add_node( + node, label=self.create_node_label(self.ids[node][0]), description=self.ids[node][1]) + + def _add_edge(self, from_node: str, to_node: str, label: str): + """ + Add edge if it doesn't already exist. + If edge exists, update the label information + """ + if (from_node, to_node) in self.networkx.edges: + if label == '': + return + old_label = nx.get_edge_attributes(self.networkx, 'label')[ + (from_node, to_node)] + if label in old_label.split('\n'): + return + new_label = old_label + '\n' + label + attr = {(from_node, to_node): {'label': new_label}} + nx.set_edge_attributes(self.networkx, attr) + else: + self.networkx.add_edge( + from_node, to_node, label=label) + + @staticmethod + @abstractmethod + def select_node_info(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> NodeInfo: + """Select the info to use to compare nodes and generate their labels for a specific graph type.""" + + @staticmethod + @abstractmethod + def select_edge_info(pair: tuple[ScenarioInfo, StateInfo]) -> EdgeInfo: + """Select the info to use to generate the label for each edge for a specific graph type.""" + + @staticmethod + @abstractmethod + def create_node_description(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> str: + """Create the description to be shown in a tooltip for a node given the full trace and its index.""" + + @staticmethod + @abstractmethod + def create_node_label(info: NodeInfo) -> str: + """Create the label for a node given its chosen information.""" + + @staticmethod + @abstractmethod + def create_edge_label(info: EdgeInfo) -> str: + """Create the label for an edge given its chosen information.""" + + @staticmethod + @abstractmethod + def get_legend_info_final_trace_node() -> str: + """Get the information to include in the legend for nodes that appear in the final trace.""" + + @staticmethod + @abstractmethod + def get_legend_info_other_node() -> str: + """Get the information to include in the legend for nodes that do not appear in the final trace.""" + + @staticmethod + @abstractmethod + def get_legend_info_final_trace_edge() -> str: + """Get the information to include in the legend for edges that appear in the final trace.""" + + @staticmethod + @abstractmethod + def get_legend_info_other_edge() -> str: + """Get the information to include in the legend for edges that do not appear in the final trace.""" + + @staticmethod + @abstractmethod + def get_tooltip_name() -> str: + """Get the name/title of the tooltip.""" diff --git a/robotmbt/visualise/graphs/scenariodeltavaluegraph.py b/robotmbt/visualise/graphs/scenariodeltavaluegraph.py new file mode 100644 index 00000000..1fd3fd5f --- /dev/null +++ b/robotmbt/visualise/graphs/scenariodeltavaluegraph.py @@ -0,0 +1,89 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from robotmbt.modelspace import ModelSpace + +from robotmbt.visualise.graphs.abstractgraph import AbstractGraph +from robotmbt.visualise.models import ScenarioInfo, StateInfo + + +class ScenarioDeltaValueGraph(AbstractGraph[tuple[ScenarioInfo, set[tuple[str, str]]], None]): + """ + The Scenario-delta-Value graph keeps track of both the scenarios and state updates encountered. + Its nodes are scenarios together with the property assignments after the scenario has run. + Its edges represent steps in the trace. + """ + + @staticmethod + def select_node_info(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) \ + -> tuple[ScenarioInfo, set[tuple[str, str]]]: + if index == 0: + return trace[0][0], StateInfo(ModelSpace()).difference(trace[0][1]) + else: + return trace[index][0], trace[index-1][1].difference(trace[index][1]) + + @staticmethod + def select_edge_info(pair: tuple[ScenarioInfo, StateInfo]) -> None: + return None + + @staticmethod + def create_node_description(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> str: + return str(trace[index][1]).replace('\n', '
') + + @staticmethod + def create_node_label(info: tuple[ScenarioInfo, set[tuple[str, str]]]) -> str: + res = "" + for assignment in info[1]: + res += "\n\n"+assignment[0]+":"+assignment[1] + return f"{info[0].name}{res}" + + @staticmethod + def create_edge_label(info: None) -> str: + return '' + + @staticmethod + def get_legend_info_final_trace_node() -> str: + return "Scenario + effect on model (part of trace)" + + @staticmethod + def get_legend_info_other_node() -> str: + return "Scenario + effect on model (not in trace)" + + @staticmethod + def get_legend_info_final_trace_edge() -> str: + return "path included in trace" + + @staticmethod + def get_legend_info_other_edge() -> str: + return "not selected for trace" + + @staticmethod + def get_tooltip_name() -> str: + return "Full state" diff --git a/robotmbt/visualise/graphs/scenariograph.py b/robotmbt/visualise/graphs/scenariograph.py new file mode 100644 index 00000000..1b186f6c --- /dev/null +++ b/robotmbt/visualise/graphs/scenariograph.py @@ -0,0 +1,79 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from robotmbt.visualise.graphs.abstractgraph import AbstractGraph +from robotmbt.visualise.models import ScenarioInfo, StateInfo + + +class ScenarioGraph(AbstractGraph[ScenarioInfo, None]): + """ + The scenario graph is the most basic representation of trace exploration. + It represents scenarios as nodes, and the trace as edges. + """ + + @staticmethod + def select_node_info(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> ScenarioInfo: + return trace[index][0] + + @staticmethod + def select_edge_info(pair: tuple[ScenarioInfo, StateInfo]) -> None: + return None + + @staticmethod + def create_node_description(trace: list[tuple[ScenarioInfo, StateInfo]], index: int) -> str: + return '' + + @staticmethod + def create_node_label(info: ScenarioInfo) -> str: + return info.name + + @staticmethod + def create_edge_label(info: None) -> str: + return '' + + @staticmethod + def get_legend_info_final_trace_node() -> str: + return "Scenario (part of trace)" + + @staticmethod + def get_legend_info_other_node() -> str: + return "Scenario (not in trace)" + + @staticmethod + def get_legend_info_final_trace_edge() -> str: + return "path included in trace" + + @staticmethod + def get_legend_info_other_edge() -> str: + return "not selected for trace" + + @staticmethod + def get_tooltip_name() -> str: + return "" diff --git a/robotmbt/visualise/models.py b/robotmbt/visualise/models.py new file mode 100644 index 00000000..658d49bf --- /dev/null +++ b/robotmbt/visualise/models.py @@ -0,0 +1,311 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from typing import Any + +from robot.api import logger + +from robotmbt.modelspace import ModelSpace +from robotmbt.suitedata import Scenario + +import jsonpickle +import tempfile +import os + +DESIRED_NAME_LINE_LENGTH = 20 + + +class ScenarioInfo: + """ + This contains all information we need from scenarios, abstracting away from the actual Scenario class: + - name + - src_id + """ + + def __init__(self, scenario: Scenario): + self.name = self._split_name(scenario.name) + self.src_id = scenario.src_id + + def __str__(self): + return f"Scenario {self.src_id}: {self.name}" + + def __eq__(self, other): + return self.src_id == other.src_id + + @staticmethod + def _split_name(name: str) -> str: + """ + Split a name into separate lines where each line is as close to the desired line length as possible. + """ + # Split into words + words = name.split(" ") + + # If any word is longer than the desired length, use that as the desired length instead + # (otherwise, we will always get a line (much) longer than the desired length, while the other lines will + # be constrained by the desired length) + desired_length = DESIRED_NAME_LINE_LENGTH + for i in words: + if len(i) > desired_length: + desired_length = len(i) + + res = "" + line = words[0] + for i in words[1:]: + # If the previous line was fully appended, simply take the current word as the new line + if line == '\n': + line += i + continue + + app_len = len(line + ' ' + i) + + # If the word fully fits into the line, simply append it + if app_len < desired_length: + line = line + ' ' + i + continue + + app_diff = abs(desired_length - app_len) + curr_diff = abs(desired_length - len(line)) + + # If the current line is closer to the desired length, use that + if curr_diff < app_diff: + res += line + line = '\n' + i + # If the current line with the new word is closer to the desired length, use that + else: + res += line + ' ' + i + line = '\n' + + # Append the final line if it wasn't empty + if line != '\n': + res += line + + return res + + +class StateInfo: + """ + This contains all information we need from states, abstracting away from the actual ModelSpace class: + - domain + - properties + """ + + @classmethod + def _create_state_with_prop(cls, name: str, attrs: list[tuple[str, Any]]): + space = ModelSpace() + prop = ModelSpace() + for (key, val) in attrs: + prop.__setattr__(key, val) + space.props[name] = prop + return cls(space) + + def difference(self, new_state) -> set[tuple[str, str]]: + """ + new_state: the new StateInfo to be compared to the self. + returns: a set of tuples with properties and their assignment. + """ + old: dict[str, dict | str] = self.properties.copy() + new: dict[str, dict | str] = new_state.properties.copy() + temp = StateInfo._dict_deep_diff(old, new) + for key in temp.keys(): + res = "" + for k, v in sorted(temp[key].items()): + res += f"\n\t{k}={v}" + temp[key] = res + return set(temp.items()) # type inference goes wacky here + + @staticmethod + def _dict_deep_diff(old_state: dict[str, Any], new_state: dict[str, Any]) -> dict[str, Any]: + res = {} + for key in new_state.keys(): + if key not in old_state: + res[key] = new_state[key] + elif isinstance(old_state[key], dict): + diff = StateInfo._dict_deep_diff(old_state[key], new_state[key]) + if len(diff) != 0: + res[key] = diff + elif old_state[key] != new_state[key]: + res[key] = new_state[key] + return res + + def __init__(self, state: ModelSpace): + self.domain = state.ref_id + + # Extract all attributes/properties stored in the model space and store them in the temp dict + # Similar in workings to ModelSpace's get_status_text + temp = {} + for p in state.props: + temp[p] = {} + if p == 'scenario': + temp['scenario'] = dict(state.props['scenario']) + else: + for attr in dir(state.props[p]): + temp[p][attr] = getattr(state.props[p], attr) + + # Filter empty entries + self.properties = {} + for p in temp.keys(): + if len(temp[p]) > 0: + self.properties[p] = temp[p].copy() + + def __eq__(self, other): + return self.domain == other.domain and self.properties == other.properties + + def __str__(self): + res = "" + for p in self.properties: + if res != "": + res += "\n\n" + res += f"{p}:" + for k, v in self.properties[p].items(): + res += f"\n\t{k}={v}" + return res + + +class TraceInfo: + """ + This keeps track of all information we need from all steps in trace exploration: + - current_trace: the trace currently being built up, a list of scenario/state pairs in order of execution + - all_traces: all valid traces encountered in trace exploration, up until the point they could not go any further + - previous_length: used to identify backtracking + """ + ROBOTMBT_MODEL_VERSION = '1.0.0' + + def __init__(self, name: str = ''): + self.ROBOTMBT_MODEL_VERSION: str = TraceInfo.ROBOTMBT_MODEL_VERSION + self.model_name: str = name + self.current_trace: list[tuple[ScenarioInfo, StateInfo]] = [] + self.all_traces: list[list[tuple[ScenarioInfo, StateInfo]]] = [] + self.previous_length: int = 0 + self.pushed: bool = False + + def update_trace(self, scenario: ScenarioInfo | None, state: StateInfo, length: int): + """ + Updates TraceInfo instance with the information that a scenario has run resulting in the given state as the nth + scenario of the trace, where n is the value of the length parameter. If length is greater than the previous + length of the trace to be updated, adds the given scenario/state to the trace. If length is smaller than the + previous length of the trace, roll back the trace until the step indicated by length. + scenario: the scenario that has run. + state: the state after scenario has run. + length: the step in the trace the scenario occurred in. + """ + if length > self.previous_length: + # New state - push + self._push(scenario, state, length - self.previous_length) + self.previous_length = length + elif length < self.previous_length: + # Backtrack - pop + self._pop(self.previous_length - length) + self.previous_length = length + + # Sanity check + if len(self.current_trace) > 0: + self._sanity_check(scenario, state, 'popping') + else: + # No change - sanity check + if len(self.current_trace) > 0: + self._sanity_check(scenario, state, 'nothing') + + def _push(self, scenario: ScenarioInfo, state: StateInfo, n: int): + if n > 1: + logger.warn( + f"Pushing multiple scenario/state pairs at once to trace info ({n})! Some info might be lost!") + for _ in range(n): + self.current_trace.append((scenario, state)) + self.pushed = True + + def _pop(self, n: int): + if self.pushed: + self.all_traces.append(self.current_trace.copy()) + for _ in range(n): + self.current_trace.pop() + self.pushed = False + + def __repr__(self) -> str: + return f"TraceInfo(traces=[{[f'[{[self.stringify_pair(pair) for pair in trace]}]' for trace in self.all_traces]}], current=[{[self.stringify_pair(pair) for pair in self.current_trace]}])" + + def _sanity_check(self, scen: ScenarioInfo, state: StateInfo, after: str): + (prev_scen, prev_state) = self.current_trace[-1] + if prev_scen != scen: + logger.warn( + f'TraceInfo got out of sync after {after}\nExpected scenario: {prev_scen}\nActual scenario: {scen}') + if prev_state != state: + logger.warn( + f'TraceInfo got out of sync after {after}\nExpected state: {prev_state}\nActual state: {state}') + + def export_graph(self, dir: str = '', atest: bool = False) -> str | None: + encoded_instance = jsonpickle.encode(self) + name = self.model_name.lower().replace(' ', '_') + if atest: + ''' + temporary file to not accidentally overwrite an existing file + mkstemp() is not ideal but given Python's limitations this is the easiest solution + as temporary file, a different method, is problematic on Windows + https://stackoverflow.com/a/57015383 + ''' + fd, dir = tempfile.mkstemp() + with os.fdopen(fd, "w") as f: + f.write(encoded_instance) + return dir + + if dir[-1] != '/': + dir += '/' + + # create folders if they do not exist + if not os.path.exists(dir): + os.makedirs(dir) + + with open(f"{dir}{name}.json", "w") as f: + f.write(encoded_instance) + return None + + @staticmethod + def import_graph_from_file(file_path: str): + try: + with open(file_path, "r") as f: + traceinfo = jsonpickle.decode(f.read()) + traceinfo.ROBOTMBT_MODEL_VERSION # trigger AttributeError if non-robotmbt data + except OSError: + raise + except Exception: + raise TypeError(f"Contents from '{file_path}' could not be loaded as RobotMBT graph data") + + if str(traceinfo.ROBOTMBT_MODEL_VERSION).split('.')[0] != TraceInfo.ROBOTMBT_MODEL_VERSION.split('.')[0]: + # raise if major version differs + raise ValueError("Graph data is from an incompatible RobotMBT version") + return traceinfo + + @staticmethod + def stringify_pair(pair: tuple[ScenarioInfo, StateInfo]) -> str: + """ + Takes a pair of a scenario and a state and returns a string describing both. + pair: a tuple consisting of a scenario and a state. + returns: formatted string based on the given scenario and state. + """ + return f"Scenario={pair[0].name}, State={pair[1]}" diff --git a/robotmbt/visualise/networkvisualiser.py b/robotmbt/visualise/networkvisualiser.py new file mode 100644 index 00000000..beacfd4b --- /dev/null +++ b/robotmbt/visualise/networkvisualiser.py @@ -0,0 +1,679 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from bokeh.core.enums import PlaceType, LegendLocationType +from bokeh.core.property.vectorization import value +from bokeh.embed import file_html +from bokeh.models import ColumnDataSource, Rect, Text, ResetTool, SaveTool, WheelZoomTool, PanTool, Plot, Range1d, \ + Title, FullscreenTool, CustomJS, Segment, Arrow, NormalHead, Bezier, Legend, ZoomInTool, ZoomOutTool, HoverTool + +# grandalf used under EPL license +from grandalf.graphs import Vertex as GVertex, Edge as GEdge, Graph as GGraph +from grandalf.layouts import SugiyamaLayout + +from networkx import DiGraph + +from robotmbt.visualise.graphs.abstractgraph import AbstractGraph + +# Padding within the nodes between the borders and inner text +HORIZONTAL_PADDING_WITHIN_NODES: int = 5 +VERTICAL_PADDING_WITHIN_NODES: int = 5 + +# Colors for different parts of the graph +FINAL_TRACE_NODE_COLOR: str = '#CCCC00' +OTHER_NODE_COLOR: str = '#999989' +FINAL_TRACE_EDGE_COLOR: str = '#444422' +OTHER_EDGE_COLOR: str = '#BBBBAA' + +# Legend placement +# Alignment within graph ('center' is in the middle, 'top_right' is the top right, etc.) +LEGEND_LOCATION: LegendLocationType | tuple[float, float] = 'top_right' +# Where it appears relative to graph ('center' is within, 'below' is below, etc.) +LEGEND_PLACE: PlaceType = 'center' + +# Dimensions of the plot in the window +INNER_WINDOW_WIDTH: int = 720 +INNER_WINDOW_HEIGHT: int = 480 +OUTER_WINDOW_WIDTH: int = INNER_WINDOW_WIDTH + (280 if LEGEND_PLACE == 'left' or LEGEND_PLACE == 'right' else 30) +OUTER_WINDOW_HEIGHT: int = INNER_WINDOW_HEIGHT + (150 if LEGEND_PLACE == 'below' or LEGEND_PLACE == 'center' else 30) + +# Font sizes +MAJOR_FONT_SIZE: int = 16 +MINOR_FONT_SIZE: int = 8 + + +class Node: + """ + Contains the information we need to add a node to the graph. + """ + + def __init__(self, node_id: str, label: str, x: int, y: int, width: float, height: float, description: str): + self.node_id = node_id + self.label = label + self.x = x + self.y = y + self.width = width + self.height = height + self.description = description + + +class Edge: + """ + Contains the information we need to add an edge to the graph. + """ + + def __init__(self, from_node: str, to_node: str, label: str, points: list[tuple[float, float]]): + self.from_node = from_node + self.to_node = to_node + self.label = label + self.points = points + + +class NetworkVisualiser: + """ + A container for a Bokeh graph, which can be created from any abstract graph. + """ + + def __init__(self, graph: AbstractGraph, suite_name: str): + # Extract what we need from the graph + self.networkx: DiGraph = graph.networkx + self.final_trace = graph.get_final_trace() + self.start = graph.start_node + + # Set up a Bokeh figure + self.plot = Plot(width=OUTER_WINDOW_WIDTH, height=OUTER_WINDOW_HEIGHT) + self.plot.output_backend = "svg" + + # Create Sugiyama layout + nodes, edges = self._create_layout() + self.node_dict: dict[str, Node] = {} + for node in nodes: + self.node_dict[node.node_id] = node + + # Keep track of arrows in the graph for scaling + self.arrows = [] + + # Create the hover tool to show tooltips + tooltip_name = graph.get_tooltip_name() + if tooltip_name: + self.hover = HoverTool() + tooltips = f""" +
+ {tooltip_name} +
+ @description{{safe}} +
+
+ """ + self.hover.tooltips = tooltips + else: + self.hover = None + + # Add the nodes to the graph + self._add_nodes(nodes) + + # Add the edges to the graph + self._add_edges(nodes, edges) + + # Add a legend to the graph + self._add_legend(graph) + + # Add our features to the graph (e.g. tools) + self._add_features(suite_name) + + def generate_html(self): + """ + Generate HTML for the Bokeh graph. + """ + return file_html(self.plot, 'inline', "graph") + + def _add_nodes(self, nodes: list[Node]): + """ + Add the nodes to the graph in the form of Rect and Text glyphs. + """ + # The ColumnDataSources to store our nodes and edges in Bokeh's format + node_source: ColumnDataSource = ColumnDataSource( + {'id': [], 'x': [], 'y': [], 'w': [], 'h': [], 'color': [], 'description': []}) + node_label_source: ColumnDataSource = ColumnDataSource( + {'id': [], 'x': [], 'y': [], 'label': []}) + + # Add all nodes to the column data sources + for node in nodes: + _add_node_to_sources(node, self.final_trace, node_source, node_label_source) + + # Add the glyphs for nodes and their labels + node_glyph = Rect(x='x', y='y', width='w', height='h', fill_color='color') + node_glyph_renderer = self.plot.add_glyph(node_source, node_glyph) + + if self.hover is not None: + self.hover.renderers = [node_glyph_renderer] + + node_label_glyph = Text(x='x', y='y', text='label', text_align='left', text_baseline='middle', + text_font_size=f'{MAJOR_FONT_SIZE}pt', text_font=value("Courier New")) + node_label_glyph.tags = [f"scalable_text{MAJOR_FONT_SIZE}"] + self.plot.add_glyph(node_label_source, node_label_glyph) + + def _add_edges(self, nodes: list[Node], edges: list[Edge]): + """ + Add the edges to the graph in the form of Arrow layouts and Segment, Bezier, and Text glyphs. + """ + # The ColumnDataSources to store our edges in Bokeh's format + edge_part_source: ColumnDataSource = ColumnDataSource( + {'from': [], 'to': [], 'start_x': [], 'start_y': [], 'end_x': [], 'end_y': [], 'color': []}) + edge_arrow_source: ColumnDataSource = ColumnDataSource( + {'from': [], 'to': [], 'start_x': [], 'start_y': [], 'end_x': [], 'end_y': [], 'color': []}) + edge_bezier_source: ColumnDataSource = ColumnDataSource( + {'from': [], 'to': [], 'start_x': [], 'start_y': [], 'end_x': [], 'end_y': [], 'control1_x': [], + 'control1_y': [], 'control2_x': [], 'control2_y': [], 'color': []}) + edge_label_source: ColumnDataSource = ColumnDataSource({'from': [], 'to': [], 'x': [], 'y': [], 'label': []}) + + for edge in edges: + _add_edge_to_sources(nodes, edge, self.final_trace, edge_part_source, edge_arrow_source, edge_bezier_source, + edge_label_source) + + # Add the glyphs for edges and their labels + edge_part_glyph = Segment(x0='start_x', y0='start_y', x1='end_x', y1='end_y', line_color='color') + self.plot.add_glyph(edge_part_source, edge_part_glyph) + + arrow_layout = Arrow( + end=NormalHead(size=10, fill_color='color', line_color='color'), + x_start='start_x', y_start='start_y', + x_end='end_x', y_end='end_y', line_color='color', + source=edge_arrow_source + ) + self.plot.add_layout(arrow_layout) + self.arrows.append(arrow_layout) + + edge_bezier_glyph = Bezier(x0='start_x', y0='start_y', x1='end_x', y1='end_y', cx0='control1_x', + cy0='control1_y', cx1='control2_x', cy1='control2_y', line_color='color') + self.plot.add_glyph(edge_bezier_source, edge_bezier_glyph) + + edge_label_glyph = Text(x='x', y='y', text='label', text_align='center', text_baseline='middle', + text_font_size=f'{MINOR_FONT_SIZE}pt', text_font=value("Courier New")) + edge_label_glyph.tags = [f"scalable_text{MINOR_FONT_SIZE}"] + self.plot.add_glyph(edge_label_source, edge_label_glyph) + + def _create_layout(self) -> tuple[list[Node], list[Edge]]: + """ + Create the Sugiyama layout using grandalf. + """ + # Containers to convert networkx nodes/edges to the proper format. + vertices = [] + edges = [] + flips = [] + + # Extract nodes from networkx and put them in the proper format to be used by grandalf. + start = None + for node_id in self.networkx.nodes: + v = GVertex(node_id) + w, h = _calculate_dimensions(self.networkx.nodes[node_id]['label']) + v.view = NodeView(w, h) + vertices.append(v) + if node_id == self.start: + start = v + + # Calculate which edges need to be flipped to make the graph acyclic. + flip = _flip_edges([e for e in self.networkx.edges]) + + # Extract edges from networkx and put them in the proper format to be used by grandalf. + for (from_id, to_id) in self.networkx.edges: + from_node = _find_node(vertices, from_id) + to_node = _find_node(vertices, to_id) + e = GEdge(from_node, to_node) + e.view = EdgeView() + edges.append(e) + if (from_id, to_id) in flip: + flips.append(e) + + # Feed the info to grandalf and get the layout. + g = GGraph(vertices, edges) + sugiyama = SugiyamaLayout(g.C[0]) + + # Set specific margins as these values worked best in user-testing + # Space between nodes + sugiyama.xspace = 10 + sugiyama.yspace = 15 + # Default width for nodes with no set width (in this case only for edge routing) + sugiyama.dw = 2 + # Default height for nodes with no set height (in this case only for edge routing) + sugiyama.dh = 2 + + sugiyama.init_all(roots=[start], inverted_edges=flips) + sugiyama.draw() + + # Extract the information we need from the nodes and edges and return them in our format. + ns = [] + for v in g.C[0].sV: + node_id = v.data + label = self.networkx.nodes[node_id]['label'] + (x, y) = v.view.xy + (w, h) = _calculate_dimensions(label) + description = self.networkx.nodes[node_id]['description'] + ns.append(Node(node_id, label, x, -y, w, h, description)) + + es = [] + for e in g.C[0].sE: + from_id = e.v[0].data + to_id = e.v[1].data + label = self.networkx.edges[(from_id, to_id)]['label'] + points = [] + # invert y axis + for p in e.view.points: + points.append((p[0], -p[1])) + + es.append(Edge(from_id, to_id, label, points)) + + return ns, es + + def _add_features(self, suite_name: str): + """ + Add our features to the graph such as tools, titles, and JavaScript callbacks. + """ + self.plot.add_layout(Title(text=suite_name, align="center"), "above") + + # Add the different tools + wheel_zoom = WheelZoomTool() + self.plot.add_tools(ResetTool(), SaveTool(), + wheel_zoom, PanTool(), + FullscreenTool(), ZoomInTool(factor=0.4), ZoomOutTool(factor=0.4)) + self.plot.toolbar.active_scroll = wheel_zoom + + if self.hover: + self.plot.add_tools(self.hover) + + # Specify the default range - these values represent the aspect ratio of the actual view in the window + self.plot.x_range = Range1d(-INNER_WINDOW_WIDTH / 2, INNER_WINDOW_WIDTH / 2) + self.plot.y_range = Range1d(-INNER_WINDOW_HEIGHT, 0) + self.plot.x_range.tags = [{"initial_span": INNER_WINDOW_WIDTH}] + self.plot.y_range.tags = [{"initial_span": INNER_WINDOW_HEIGHT}] + + # A JS callback to scale text and arrows, and change aspect ratio. + resize_cb = CustomJS(args=dict(xr=self.plot.x_range, yr=self.plot.y_range, plot=self.plot, arrows=self.arrows), + code=f""" + // Initialize initial scale tag + if (!plot.tags || plot.tags.length === 0) {{ + plot.tags = [{{ + initial_scale: plot.inner_height / (yr.end - yr.start) + }}] + }} + + // Calculate current x and y span + const xspan = xr.end - xr.start; + const yspan = yr.end - yr.start; + + // Calculate inner aspect ratio and span aspect ratio + const inner_aspect = plot.inner_width / plot.inner_height; + const span_aspect = xspan / yspan; + + // Let span aspect ratio match inner aspect ratio if needed + if (Math.abs(inner_aspect - span_aspect) > 0.05) {{ + const xmid = xr.start + xspan / 2; + const new_xspan = yspan * inner_aspect; + xr.start = xmid - new_xspan / 2; + xr.end = xmid + new_xspan / 2; + }} + + // Calculate scale factor + const scale = (plot.inner_height / yspan) / plot.tags[0].initial_scale + + // Scale text + for (const r of plot.renderers) {{ + if (!r.glyph || !r.glyph.tags) continue + + if (r.glyph.tags.includes("scalable_text{MAJOR_FONT_SIZE}")) {{ + r.glyph.text_font_size = Math.floor({MAJOR_FONT_SIZE} * scale) + "pt" + }} + + if (r.glyph.tags.includes("scalable_text{MINOR_FONT_SIZE}")) {{ + r.glyph.text_font_size = Math.floor({MINOR_FONT_SIZE} * scale) + "pt" + }} + }} + + // Scale arrows + for (const a of arrows) {{ + if (!a.properties) continue; + if (!a.properties.end) continue; + if (!a.properties.end._value) continue; + if (!a.properties.end._value.properties) continue; + if (!a.properties.end._value.properties.size) continue; + if (!a.properties.end._value.properties.size._value) continue; + if (!a.properties.end._value.properties.size._value.value) continue; + if (a._base_end_size == null) + a._base_end_size = a.properties.end._value.properties.size._value.value; + a.properties.end._value.properties.size._value.value = a._base_end_size * Math.sqrt(scale); + a.change.emit(); + }}""") + + # Add the callback to the values that change when zooming/resizing. + self.plot.x_range.js_on_change("start", resize_cb) + self.plot.x_range.js_on_change("end", resize_cb) + self.plot.y_range.js_on_change("start", resize_cb) + self.plot.y_range.js_on_change("end", resize_cb) + self.plot.js_on_change("inner_width", resize_cb) + self.plot.js_on_change("inner_height", resize_cb) + + def _add_legend(self, graph: AbstractGraph): + """ + Adds a legend to the graph with the node/edge information from the given graph. + """ + empty_source = ColumnDataSource({'_': [0]}) + + final_trace_node_glyph = Rect(x='_', y='_', width='_', height='_', fill_color=FINAL_TRACE_NODE_COLOR) + final_trace_node = self.plot.add_glyph(empty_source, final_trace_node_glyph) + + other_node_glyph = Rect(x='_', y='_', width='_', height='_', fill_color=OTHER_NODE_COLOR) + other_node = self.plot.add_glyph(empty_source, other_node_glyph) + + final_trace_edge_glyph = Segment( + x0='_', x1='_', + y0='_', y1='_', line_color=FINAL_TRACE_EDGE_COLOR + ) + final_trace_edge = self.plot.add_glyph(empty_source, final_trace_edge_glyph) + + other_edge_glyph = Segment( + x0='_', x1='_', + y0='_', y1='_', line_color=OTHER_EDGE_COLOR + ) + other_edge = self.plot.add_glyph(empty_source, other_edge_glyph) + + legend = Legend(items=[(graph.get_legend_info_final_trace_node(), [final_trace_node]), + (graph.get_legend_info_other_node(), [other_node]), + (graph.get_legend_info_final_trace_edge(), [final_trace_edge]), + (graph.get_legend_info_other_edge(), [other_edge])], + location=LEGEND_LOCATION, orientation="vertical") + self.plot.add_layout(legend, LEGEND_PLACE) + + +class NodeView: + """ + A view of a node in the format that grandalf expects. + """ + + def __init__(self, width: float, height: float): + self.w, self.h = width, height + self.xy = (0, 0) + + +class EdgeView: + """ + A view of an edge in the format that grandalf expects. + """ + + def __init__(self): + self.points = [] + + def setpath(self, points: list[tuple[float, float]]): + self.points = points + + +def _find_node(nodes: list[GVertex], node_id: str): + """ + Find a node given its id in a list of grandalf nodes. + """ + for node in nodes: + if node.data == node_id: + return node + return None + + +def _get_connection_coordinates(nodes: list[Node], node_id: str) -> list[tuple[float, float]]: + """ + Get the coordinates where edges can connect to a node given its id. + These places are the middle of the left, right, top, and bottom edge, as well as the corners of the node. + """ + start_possibilities = [] + + # Get node from list + node = None + for n in nodes: + if n.node_id == node_id: + node = n + break + + # Left + start_possibilities.append((node.x - node.width / 2, node.y)) + # Right + start_possibilities.append((node.x + node.width / 2, node.y)) + # Bottom + start_possibilities.append((node.x, node.y - node.height / 2)) + # Top + start_possibilities.append((node.x, node.y + node.height / 2)) + # Left bottom + start_possibilities.append((node.x - node.width / 2, node.y - node.height / 2)) + # Left top + start_possibilities.append((node.x - node.width / 2, node.y + node.height / 2)) + # Right bottom + start_possibilities.append((node.x + node.width / 2, node.y - node.height / 2)) + # Right top + start_possibilities.append((node.x + node.width / 2, node.y + node.height / 2)) + + return start_possibilities + + +def _minimize_distance(from_pos: list[tuple[float, float]], to_pos: list[tuple[float, float]]) -> tuple[ + float, float, float, float]: + """ + Find a pair of positions that minimizes their distance. + """ + min_dist = -1 + fx, fy, tx, ty = 0, 0, 0, 0 + + # Calculate the distance between all permutations + for fp in from_pos: + for tp in to_pos: + distance = (fp[0] - tp[0]) ** 2 + (fp[1] - tp[1]) ** 2 + if min_dist == -1 or distance < min_dist: + min_dist = distance + fx, fy, tx, ty = fp[0], fp[1], tp[0], tp[1] + + # Return the permutation with the shortest distance + return fx, fy, tx, ty + + +def _add_edge_to_sources(nodes: list[Node], edge: Edge, final_trace: list[str], edge_part_source: ColumnDataSource, + edge_arrow_source: ColumnDataSource, edge_bezier_source: ColumnDataSource, + edge_label_source: ColumnDataSource): + """ + Add an edge between two nodes to the ColumnDataSources. + Contains all logic to set their color, find their attachment points, and do self-loops properly. + """ + in_final_trace = False + for i in range(len(final_trace) - 1): + if edge.from_node == final_trace[i] and edge.to_node == final_trace[i + 1]: + in_final_trace = True + break + + if edge.from_node == edge.to_node: + _add_self_loop_to_sources(nodes, edge, in_final_trace, edge_arrow_source, edge_bezier_source, edge_label_source) + return + + start_x, start_y = 0, 0 + end_x, end_y = 0, 0 + + # Add edges going through the calculated points + for i in range(len(edge.points) - 1): + start_x, start_y = edge.points[i] + end_x, end_y = edge.points[i + 1] + + # Collect possibilities where the edge can start and end + if i == 0: + from_possibilities = _get_connection_coordinates(nodes, edge.from_node) + else: + from_possibilities = [(start_x, start_y)] + + if i == len(edge.points) - 2: + to_possibilities = _get_connection_coordinates(nodes, edge.to_node) + else: + to_possibilities = [(end_x, end_y)] + + # Choose connection points that minimize edge length + start_x, start_y, end_x, end_y = _minimize_distance(from_possibilities, to_possibilities) + + if i < len(edge.points) - 2: + # Middle part of edge without arrow + edge_part_source.data['from'].append(edge.from_node) + edge_part_source.data['to'].append(edge.to_node) + edge_part_source.data['start_x'].append(start_x) + edge_part_source.data['start_y'].append(start_y) + edge_part_source.data['end_x'].append(end_x) + edge_part_source.data['end_y'].append(end_y) + edge_part_source.data['color'].append(FINAL_TRACE_EDGE_COLOR if in_final_trace else OTHER_EDGE_COLOR) + else: + # End of edge with arrow + edge_arrow_source.data['from'].append(edge.from_node) + edge_arrow_source.data['to'].append(edge.to_node) + edge_arrow_source.data['start_x'].append(start_x) + edge_arrow_source.data['start_y'].append(start_y) + edge_arrow_source.data['end_x'].append(end_x) + edge_arrow_source.data['end_y'].append(end_y) + edge_arrow_source.data['color'].append(FINAL_TRACE_EDGE_COLOR if in_final_trace else OTHER_EDGE_COLOR) + + # Add the label + edge_label_source.data['from'].append(edge.from_node) + edge_label_source.data['to'].append(edge.to_node) + edge_label_source.data['x'].append((start_x + end_x) / 2) + edge_label_source.data['y'].append((start_y + end_y) / 2) + edge_label_source.data['label'].append(edge.label) + + +def _add_self_loop_to_sources(nodes: list[Node], edge: Edge, in_final_trace: bool, edge_arrow_source: ColumnDataSource, + edge_bezier_source: ColumnDataSource, edge_label_source: ColumnDataSource): + """ + Add a self-loop edge for a node to the ColumnDataSources, consisting of a Beziér curve and an arrow. + """ + connection = _get_connection_coordinates(nodes, edge.from_node) + + right_x, right_y = connection[1] + + # Add the Bézier curve + edge_bezier_source.data['from'].append(edge.from_node) + edge_bezier_source.data['to'].append(edge.to_node) + edge_bezier_source.data['start_x'].append(right_x) + edge_bezier_source.data['start_y'].append(right_y + 5) + edge_bezier_source.data['end_x'].append(right_x) + edge_bezier_source.data['end_y'].append(right_y - 5) + edge_bezier_source.data['control1_x'].append(right_x + 25) + edge_bezier_source.data['control1_y'].append(right_y + 25) + edge_bezier_source.data['control2_x'].append(right_x + 25) + edge_bezier_source.data['control2_y'].append(right_y - 25) + edge_bezier_source.data['color'].append(FINAL_TRACE_EDGE_COLOR if in_final_trace else OTHER_EDGE_COLOR) + + # Add the arrow + edge_arrow_source.data['from'].append(edge.from_node) + edge_arrow_source.data['to'].append(edge.to_node) + edge_arrow_source.data['start_x'].append(right_x + 0.001) + edge_arrow_source.data['start_y'].append(right_y - 5.001) + edge_arrow_source.data['end_x'].append(right_x) + edge_arrow_source.data['end_y'].append(right_y - 5) + edge_arrow_source.data['color'].append(FINAL_TRACE_EDGE_COLOR if in_final_trace else OTHER_EDGE_COLOR) + + # Add the label + edge_label_source.data['from'].append(edge.from_node) + edge_label_source.data['to'].append(edge.to_node) + edge_label_source.data['x'].append(right_x + 25) + edge_label_source.data['y'].append(right_y) + edge_label_source.data['label'].append(edge.label) + + +def _add_node_to_sources(node: Node, final_trace: list[str], node_source: ColumnDataSource, + node_label_source: ColumnDataSource): + """ + Add a node to the ColumnDataSources. + """ + node_source.data['id'].append(node.node_id) + node_source.data['x'].append(node.x) + node_source.data['y'].append(node.y) + node_source.data['w'].append(node.width) + node_source.data['h'].append(node.height) + node_source.data['color'].append( + FINAL_TRACE_NODE_COLOR if node.node_id in final_trace else OTHER_NODE_COLOR) + node_source.data['description'].append(node.description) + + node_label_source.data['id'].append(node.node_id) + node_label_source.data['x'].append(node.x - node.width / 2 + HORIZONTAL_PADDING_WITHIN_NODES) + node_label_source.data['y'].append(node.y) + node_label_source.data['label'].append(node.label) + + +def _calculate_dimensions(label: str) -> tuple[float, float]: + """ + Calculate a node's dimensions based on its label and the given font size constant. + Assumes the font is Courier New. + """ + lines = label.splitlines() + width = 0 + for line in lines: + width = max(width, len(line) * (MAJOR_FONT_SIZE / 3 + 5)) + height = len(lines) * (MAJOR_FONT_SIZE / 2 + 9) * 1.37 - 9 + return width + 2 * HORIZONTAL_PADDING_WITHIN_NODES, height + 2 * VERTICAL_PADDING_WITHIN_NODES + + +def _flip_edges(edges: list[tuple[str, str]]) -> list[tuple[str, str]]: + """ + Calculate which edges need to be flipped to make a graph acyclic. + """ + # Step 1: Build adjacency list from edges + adj = {} + for u, v in edges: + if u not in adj: + adj[u] = [] + adj[u].append(v) + + # Step 2: Helper function to detect cycles + def dfs(node, visited, rec_stack, cycle_edges): + visited[node] = True + rec_stack[node] = True + + if node in adj: + for neighbor in adj[node]: + edge = (node, neighbor) + + if not visited.get(neighbor, False): + if dfs(neighbor, visited, rec_stack, cycle_edges): + cycle_edges.append(edge) + elif rec_stack.get(neighbor, False): + # Found a cycle, add the edge to the cycle_edges list + cycle_edges.append(edge) + + rec_stack[node] = False + return False + + # Step 3: Detect cycles + visited = {} + rec_stack = {} + cycle_edges = [] + + for node in adj: + if not visited.get(node, False): + dfs(node, visited, rec_stack, cycle_edges) + + # Step 4: Return the list of edges that need to be flipped + # In this case, the cycle_edges are the ones that we need to "break" by flipping + return cycle_edges diff --git a/robotmbt/visualise/visualiser.py b/robotmbt/visualise/visualiser.py new file mode 100644 index 00000000..da0f33a5 --- /dev/null +++ b/robotmbt/visualise/visualiser.py @@ -0,0 +1,112 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from robotmbt.modelspace import ModelSpace +from robotmbt.tracestate import TraceState +from robotmbt.visualise import networkvisualiser +from robotmbt.visualise.graphs.scenariodeltavaluegraph import ScenarioDeltaValueGraph +from robotmbt.visualise.graphs.abstractgraph import AbstractGraph +from robotmbt.visualise.graphs.scenariograph import ScenarioGraph +from robotmbt.visualise.models import TraceInfo, StateInfo, ScenarioInfo +import html + +GRAPHS = { + 'scenario': ScenarioGraph, + 'scenario-delta-value': ScenarioDeltaValueGraph, +} + + +class Visualiser: + """ + The Visualiser class bridges the different concerns to provide + a simple interface through which the graph can be updated, + and retrieved in HTML format. + """ + + def __init__(self, suite_name: str = "", trace_info: TraceInfo = None): + self.trace_info: TraceInfo = trace_info if trace_info is not None else TraceInfo(suite_name) + self.suite_name = suite_name + + def load_from_file(self, file_path: str): + """ + Imports a JSON encoding of a graph and reconstructs the graph from it. The reconstructed + graph overrides the current graph instance this method is called on. + file_path: the path to a previously exported graph. + """ + self.trace_info = TraceInfo.import_graph_from_file(file_path) + + def export_to_file(self, file_path: str): + self.trace_info.export_graph(file_path) + + def update_trace(self, trace: TraceState): + """ + Uses the new snapshots from trace to update the trace info. + Multiple new snapshots can be pushed or popped at once. + """ + trace_len = len(trace._snapshots) + # We don't have any information + if trace_len == 0: + self.trace_info.update_trace(None, StateInfo(ModelSpace()), 0) + + # New snapshots have been pushed + elif trace_len > self.trace_info.previous_length: + prev = self.trace_info.previous_length + r = trace_len - prev + # Extract all snapshots that have been pushed and update our trace info with their scenario/model info + for i in range(r): + snap = trace._snapshots[prev + i] + scenario = snap.scenario + model = snap.model + self.trace_info.update_trace(ScenarioInfo(scenario), StateInfo(model), prev + i + 1) + + # Snapshots have been removed + else: + snap = trace._snapshots[-1] + scenario = snap.scenario + model = snap.model + self.trace_info.update_trace(ScenarioInfo(scenario), StateInfo(model), trace_len) + + def _get_graph(self, graph_type) -> AbstractGraph | None: + if graph_type not in GRAPHS.keys(): + return None + + return GRAPHS[graph_type](self.trace_info) + + def generate_visualisation(self, graph_type: str) -> str: + """ + Finalize the visualisation. Exports the graph to JSON if requested, and generates HTML if requested. + The boolean signals whether the output is in HTML format or not. + """ + graph: AbstractGraph = self._get_graph(graph_type) + if graph is None: + raise ValueError(f"Unknown graph type: {graph_type}") + + html_bokeh = networkvisualiser.NetworkVisualiser(graph, self.trace_info.model_name).generate_html() + return f'' diff --git a/utest/test_visualise_models.py b/utest/test_visualise_models.py new file mode 100644 index 00000000..1a09f0b1 --- /dev/null +++ b/utest/test_visualise_models.py @@ -0,0 +1,197 @@ +# BSD 3-Clause License +# +# Copyright (c) 2026, T.B. Dubbeling, J. Foederer, T.S. Kas, D.R. Osinga, D.F. Serra e Silva, J.C. Willegers +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from types import SimpleNamespace +import unittest + +try: + from robotmbt.visualise.models import ScenarioInfo, StateInfo, TraceInfo + + VISUALISE = True +except ImportError: + VISUALISE = False + +if VISUALISE: + class TestScenarioInfo(unittest.TestCase): + def test_scenarioInfo_constructor(self): + scenariostub = SimpleNamespace(name='test', src_id=0) + si = ScenarioInfo(scenariostub) + self.assertEqual(si.name, 'test') + self.assertEqual(si.src_id, 0) + + def test_split_name_empty_string(self): + result = ScenarioInfo._split_name("") + self.assertEqual(result, "") + self.assertNotIn('\n', result) + + def test_split_name_single_short_word(self): + result = ScenarioInfo._split_name("Hello") + self.assertEqual(result, "Hello") + self.assertNotIn('\n', result) + + def test_split_name_single_exact_length_word(self): + exact_20 = "abcdefghijklmnopqrst" + result = ScenarioInfo._split_name(exact_20) + self.assertEqual(result, exact_20) + self.assertNotIn('\n', result) + + def test_split_name_single_long_word(self): + name = "ThisIsAReallyLongNameWithoutAnySpacesAtAll" + result = ScenarioInfo._split_name(name) + self.assertEqual(result, name) + self.assertNotIn('\n', result) + + def test_split_name_two_words_short(self): + result = ScenarioInfo._split_name("Hello World") + self.assertEqual(result, "Hello World") + self.assertNotIn('\n', result) + + def test_split_name_two_words_exceeds_limit(self): + name = "Supercalifragilistic Hello" + result = ScenarioInfo._split_name(name) + + self.assertEqual(result.replace('\n', ' '), name) + self.assertIn('\n', result) + + def test_split_name_multiple_words_need_split(self): + name = "This is a very long scenario name that should be split" + result = ScenarioInfo._split_name(name) + + self.assertEqual(result.replace('\n', ' '), name) + self.assertIn('\n', result) + self.assertLessEqual(max([len(line) for line in result.split('\n')]), 20) + + class TestStateInfo(unittest.TestCase): + def test_stateInfo_empty(self): + modelspacestub = SimpleNamespace(ref_id=None, props={}) + s = StateInfo(modelspacestub) + self.assertEqual(str(s), '') + + def test_stateInfo_prop_empty(self): + modelspacestub = SimpleNamespace(ref_id=None, props={}) + s = StateInfo(modelspacestub) + self.assertEqual(str(s), '') + + def test_stateInfo_prop_val(self): + modelspacestub = SimpleNamespace(ref_id=None, props=dict(prop1=SimpleNamespace(value=1))) + s = StateInfo(modelspacestub) + self.assertTrue('prop1:' in str(s)) + self.assertTrue('value=1' in str(s)) + + def test_stateInfo_prop_val_empty(self): + class EmptyProp: + def __dir__(self): + return {} + + modelspacestub = SimpleNamespace(ref_id=None, props=dict(prop1=SimpleNamespace(value=1), + prop2=EmptyProp())) + s = StateInfo(modelspacestub) + self.assertTrue('prop1:' in str(s)) + self.assertTrue('value=1' in str(s)) + self.assertFalse('prop2:' in str(s)) + + class TestTraceInfo(unittest.TestCase): + def test_trace_info_update_normal(self): + info = TraceInfo() + scenariostub = SimpleNamespace(name='test', src_id=0) + + self.assertEqual(len(info.current_trace), 0) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 1)]), 1) + + self.assertEqual(len(info.current_trace), 1) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 2)]), 2) + + self.assertEqual(len(info.current_trace), 2) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 3)]), 3) + + self.assertEqual(len(info.current_trace), 3) + self.assertEqual(len(info.all_traces), 0) + + def test_trace_info_update_backtrack(self): + info = TraceInfo() + scenariostub = SimpleNamespace(name='test', src_id=0) + + self.assertEqual(len(info.current_trace), 0) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 1)]), 1) + + self.assertEqual(len(info.current_trace), 1) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 2)]), 2) + + self.assertEqual(len(info.current_trace), 2) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 3)]), 3) + + self.assertEqual(len(info.current_trace), 3) + self.assertEqual(len(info.all_traces), 0) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 2)]), 2) + + self.assertEqual(len(info.current_trace), 2) + self.assertEqual(len(info.all_traces), 1) + self.assertEqual(len(info.all_traces[0]), 3) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 1)]), 1) + + self.assertEqual(len(info.current_trace), 1) + self.assertEqual(len(info.all_traces), 1) + self.assertEqual(len(info.all_traces[0]), 3) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 4)]), 2) + + self.assertEqual(len(info.current_trace), 2) + self.assertEqual(len(info.all_traces), 1) + self.assertEqual(len(info.all_traces[0]), 3) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 5)]), 3) + + self.assertEqual(len(info.current_trace), 3) + self.assertEqual(len(info.all_traces), 1) + self.assertEqual(len(info.all_traces[0]), 3) + + info.update_trace(ScenarioInfo(scenariostub), StateInfo._create_state_with_prop('prop', [('value', 6)]), 4) + + self.assertEqual(len(info.current_trace), 4) + self.assertEqual(len(info.all_traces), 1) + self.assertEqual(len(info.all_traces[0]), 3) + + +if __name__ == '__main__': + unittest.main()