diff --git a/gvm/protocols/gmp/_gmpnext.py b/gvm/protocols/gmp/_gmpnext.py index 4b560de7b..e8ee23443 100644 --- a/gvm/protocols/gmp/_gmpnext.py +++ b/gvm/protocols/gmp/_gmpnext.py @@ -709,6 +709,49 @@ def create_container_task( Tasks.create_container_task(name=name, comment=comment) ) + def create_web_application_task( + self, + name: str, + web_application_target_id: EntityID, + scanner_id: EntityID, + *, + comment: str | None = None, + alterable: bool | None = None, + schedule_id: EntityID | None = None, + alert_ids: Sequence[EntityID] | None = None, + schedule_periods: int | None = None, + observers: Sequence[str] | None = None, + preferences: Mapping[str, SupportsStr] | None = None, + ) -> T: + """Create a new scan task using a web application target. + + Args: + name: Name of the new task. + web_application_target_id: UUID of the web application target to be scanned. + scanner_id: UUID of scanner to use for scanning the agents. + comment: Optional comment for the task. + alterable: Whether the task should be alterable. + alert_ids: List of UUIDs for alerts to be applied to the task. + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: Limit to number of scheduled runs, 0 for unlimited. + observers: List of usernames or IDs allowed to observe the task. + preferences: Scanner preferences as name/value pairs. + """ + return self._send_request_and_transform_response( + Tasks.create_web_application_task( + name=name, + web_application_target_id=web_application_target_id, + scanner_id=scanner_id, + comment=comment, + alterable=alterable, + schedule_id=schedule_id, + alert_ids=alert_ids, + schedule_periods=schedule_periods, + observers=observers, + preferences=preferences, + ) + ) + def create_task( self, name: str, @@ -826,6 +869,7 @@ def modify_task( scanner_id: EntityID | None = None, agent_group_id: EntityID | None = None, oci_image_target_id: EntityID | None = None, + web_application_target_id: EntityID | None = None, alterable: bool | None = None, hosts_ordering: HostsOrdering | None = None, schedule_id: EntityID | None = None, @@ -845,6 +889,7 @@ def modify_task( scanner_id: UUID of scanner to use for scanning the target agent_group_id: UUID of agent group to use for scanning oci_image_target_id: UUID of the OCI Image target to be scanned. + web_application_target_id: UUID of the web application target to be scanned. comment: The comment on the task. alert_ids: List of UUIDs for alerts to be applied to the task hosts_ordering: The order hosts are scanned in @@ -864,6 +909,7 @@ def modify_task( scanner_id=scanner_id, agent_group_id=agent_group_id, oci_image_target_id=oci_image_target_id, + web_application_target_id=web_application_target_id, alterable=alterable, hosts_ordering=hosts_ordering, schedule_id=schedule_id, diff --git a/gvm/protocols/gmp/requests/next/_tasks.py b/gvm/protocols/gmp/requests/next/_tasks.py index 52d63d635..7b1158e38 100644 --- a/gvm/protocols/gmp/requests/next/_tasks.py +++ b/gvm/protocols/gmp/requests/next/_tasks.py @@ -248,6 +248,97 @@ def create_container_task( """ return cls.create_import_task(name=name, comment=comment) + @classmethod + def create_web_application_task( + cls, + name: str, + web_application_target_id: EntityID, + scanner_id: EntityID, + *, + comment: str | None = None, + alterable: bool | None = None, + schedule_id: EntityID | None = None, + alert_ids: Sequence[EntityID] | None = None, + schedule_periods: int | None = None, + observers: Sequence[str] | None = None, + preferences: Mapping[str, SupportsStr] | None = None, + ) -> Request: + """Create a new scan task using a web application target. + + Args: + name: Name of the new task. + web_application_target_id: UUID of the web application target to be scanned. + scanner_id: UUID of scanner to use for scanning the web application. + comment: Optional comment for the task. + alterable: Whether the task should be alterable. + alert_ids: List of UUIDs for alerts to be applied to the task. + schedule_id: UUID of a schedule when the task should be run. + schedule_periods: Limit to number of scheduled runs, 0 for unlimited. + observers: List of usernames or IDs allowed to observe the task. + preferences: Scanner preferences as name/value pairs. + """ + if not name: + raise RequiredArgument( + function=cls.create_web_application_task.__name__, + argument="name", + ) + + if not web_application_target_id: + raise RequiredArgument( + function=cls.create_web_application_task.__name__, + argument="web_application_target_id", + ) + + if not scanner_id: + raise RequiredArgument( + function=cls.create_web_application_task.__name__, + argument="scanner_id", + ) + + cmd = XmlCommand("create_task") + cmd.add_element("name", name) + cmd.add_element("usage_type", "scan") + cmd.add_element( + "web_application_target", + attrs={"id": str(web_application_target_id)}, + ) + cmd.add_element("scanner", attrs={"id": str(scanner_id)}) + + if comment: + cmd.add_element("comment", comment) + + if alterable is not None: + cmd.add_element("alterable", to_bool(alterable)) + + if alert_ids: + for alert in alert_ids: + cmd.add_element("alert", attrs={"id": str(alert)}) + + if schedule_id: + cmd.add_element("schedule", attrs={"id": str(schedule_id)}) + + if schedule_periods is not None: + if ( + not isinstance(schedule_periods, Integral) + or schedule_periods < 0 + ): + raise InvalidArgument( + "schedule_periods must be an integer greater or equal than 0" + ) + cmd.add_element("schedule_periods", str(schedule_periods)) + + if observers: + cmd.add_element("observers", to_comma_list(observers)) + + if preferences is not None: + xml_prefs = cmd.add_element("preferences") + for pref_name, pref_value in preferences.items(): + xml_pref = xml_prefs.add_element("preference") + xml_pref.add_element("scanner_name", pref_name) + xml_pref.add_element("value", str(pref_value)) + + return cmd + @classmethod def create_task( cls, @@ -453,6 +544,7 @@ def modify_task( scanner_id: EntityID | None = None, agent_group_id: EntityID | None = None, oci_image_target_id: EntityID | None = None, + web_application_target_id: EntityID | None = None, alterable: bool | None = None, hosts_ordering: HostsOrdering | None = None, schedule_id: EntityID | None = None, @@ -472,6 +564,7 @@ def modify_task( scanner_id: UUID of scanner to use for scanning the target agent_group_id: UUID of agent group to use for scanning oci_image_target_id: UUID of the OCI Image target to be scanned. + web_application_target_id: UUID of the web application target to be scanned. comment: The comment on the task. alert_ids: List of UUIDs for alerts to be applied to the task hosts_ordering: The order hosts are scanned in @@ -507,6 +600,30 @@ def modify_task( cmd = XmlCommand("modify_task") cmd.set_attribute("task_id", str(task_id)) + if ( + sum( + entity_id is not None + for entity_id in ( + target_id, + agent_group_id, + oci_image_target_id, + web_application_target_id, + ) + ) + > 1 + ): + raise InvalidArgument( + function=cls.modify_task.__name__, + argument=( + "target_id/agent_group_id/oci_image_target_id/" + "web_application_target_id" + ), + message=( + "Only one of target_id, agent_group_id, oci_image_target_id " + "or web_application_target_id can be modified at a time" + ), + ) + if name: cmd.add_element("name", name) @@ -526,6 +643,11 @@ def modify_task( cmd.add_element( "oci_image_target", attrs={"id": str(oci_image_target_id)} ) + if web_application_target_id: + cmd.add_element( + "web_application_target", + attrs={"id": str(web_application_target_id)}, + ) if alterable is not None: cmd.add_element("alterable", to_bool(alterable)) diff --git a/tests/protocols/gmpnext/entities/tasks/__init__.py b/tests/protocols/gmpnext/entities/tasks/__init__.py index 68f75376c..c47f79db8 100644 --- a/tests/protocols/gmpnext/entities/tasks/__init__.py +++ b/tests/protocols/gmpnext/entities/tasks/__init__.py @@ -11,6 +11,9 @@ from .test_create_container_task import GmpCreateContainerTaskTestMixin from .test_create_import_task import GmpCreateImportTaskTestMixin from .test_create_task import GmpCreateTaskTestMixin +from .test_create_web_application_task import ( + GmpCreateWebApplicationTaskTestMixin, +) from .test_delete_task import GmpDeleteTaskTestMixin from .test_get_task import GmpGetTaskTestMixin from .test_get_tasks import GmpGetTasksTestMixin @@ -27,6 +30,7 @@ "GmpCreateContainerTaskTestMixin", "GmpCreateImportTaskTestMixin", "GmpCreateTaskTestMixin", + "GmpCreateWebApplicationTaskTestMixin", "GmpDeleteTaskTestMixin", "GmpGetTaskTestMixin", "GmpGetTasksTestMixin", diff --git a/tests/protocols/gmpnext/entities/tasks/test_create_web_application_task.py b/tests/protocols/gmpnext/entities/tasks/test_create_web_application_task.py new file mode 100644 index 000000000..6a1bbf90f --- /dev/null +++ b/tests/protocols/gmpnext/entities/tasks/test_create_web_application_task.py @@ -0,0 +1,227 @@ +# SPDX-FileCopyrightText: 2026 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from collections import OrderedDict + +from gvm.errors import InvalidArgument, RequiredArgument + + +class GmpCreateWebApplicationTaskTestMixin: + def test_create_web_application_task(self): + self.gmp.create_web_application_task( + name="foo", web_application_target_id="wt1", scanner_id="s1" + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"" + ) + + def test_create_web_application_task_missing_name(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_web_application_task( + name=None, web_application_target_id="wt1", scanner_id="s1" + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_web_application_task( + name="", web_application_target_id="wt1", scanner_id="s1" + ) + + def test_create_web_application_task_missing_web_application_target_id( + self, + ): + with self.assertRaises(RequiredArgument): + self.gmp.create_web_application_task( + name="foo", web_application_target_id=None, scanner_id="s1" + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_web_application_task( + name="foo", web_application_target_id="", scanner_id="s1" + ) + + def test_create_web_application_task_missing_scanner_id(self): + with self.assertRaises(RequiredArgument): + self.gmp.create_web_application_task( + name="foo", web_application_target_id="wt1", scanner_id=None + ) + + with self.assertRaises(RequiredArgument): + self.gmp.create_web_application_task( + name="foo", web_application_target_id="wt1", scanner_id="" + ) + + def test_create_web_application_task_with_comment(self): + self.gmp.create_web_application_task( + name="foo", + web_application_target_id="wt1", + scanner_id="s1", + comment="my comment", + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"my comment" + b"" + ) + + def test_create_web_application_task_with_alerts(self): + self.gmp.create_web_application_task( + name="foo", + web_application_target_id="wt1", + scanner_id="s1", + alert_ids=["a1", "a2"], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b'' + b"" + ) + + def test_create_web_application_task_with_empty_alerts(self): + self.gmp.create_web_application_task( + name="foo", + web_application_target_id="wt1", + scanner_id="s1", + alert_ids=[], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"" + ) + + def test_create_web_application_task_with_schedule(self): + self.gmp.create_web_application_task( + name="foo", + web_application_target_id="wt1", + scanner_id="s1", + schedule_id="sch1", + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"" + ) + + def test_create_web_application_task_with_schedule_periods(self): + self.gmp.create_web_application_task( + name="foo", + web_application_target_id="wt1", + scanner_id="s1", + schedule_id="sch1", + schedule_periods=5, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b'' + b"5" + b"" + ) + + def test_create_web_application_task_with_invalid_schedule_periods(self): + with self.assertRaises(InvalidArgument): + self.gmp.create_web_application_task( + name="foo", + web_application_target_id="wt1", + scanner_id="s1", + schedule_id="sch1", + schedule_periods="invalid", + ) + + with self.assertRaises(InvalidArgument): + self.gmp.create_web_application_task( + name="foo", + web_application_target_id="wt1", + scanner_id="s1", + schedule_id="sch1", + schedule_periods=-1, + ) + + def test_create_web_application_task_with_alterable(self): + self.gmp.create_web_application_task( + name="foo", + web_application_target_id="wt1", + scanner_id="s1", + alterable=True, + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"1" + b"" + ) + + def test_create_web_application_task_with_observers(self): + self.gmp.create_web_application_task( + name="foo", + web_application_target_id="wt1", + scanner_id="s1", + observers=["u1", "u2"], + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"u1,u2" + b"" + ) + + def test_create_web_application_task_with_preferences(self): + self.gmp.create_web_application_task( + name="foo", + web_application_target_id="wt1", + scanner_id="s1", + preferences=OrderedDict([("pref1", "val1"), ("pref2", "val2")]), + ) + + self.connection.send.has_been_called_with( + b"" + b"foo" + b"scan" + b'' + b'' + b"" + b"pref1val1" + b"pref2val2" + b"" + b"" + ) diff --git a/tests/protocols/gmpnext/entities/tasks/test_modify_task.py b/tests/protocols/gmpnext/entities/tasks/test_modify_task.py index 45becac65..72d87ece1 100644 --- a/tests/protocols/gmpnext/entities/tasks/test_modify_task.py +++ b/tests/protocols/gmpnext/entities/tasks/test_modify_task.py @@ -205,3 +205,36 @@ def test_modify_task_with_preferences(self): b"" b"" ) + + def test_modify_task_with_web_application_target_id(self): + self.gmp.modify_task(task_id="t1", web_application_target_id="wt1") + + self.connection.send.has_been_called_with( + b'' + b'' + b"" + ) + + def test_modify_task_with_target_and_web_application_target(self): + with self.assertRaises(InvalidArgument): + self.gmp.modify_task( + task_id="t1", + target_id="t1", + web_application_target_id="wt1", + ) + + def test_modify_task_with_agent_group_and_web_application_target(self): + with self.assertRaises(InvalidArgument): + self.gmp.modify_task( + task_id="t1", + agent_group_id="ag1", + web_application_target_id="wt1", + ) + + def test_modify_task_with_oci_image_target_and_web_application_target(self): + with self.assertRaises(InvalidArgument): + self.gmp.modify_task( + task_id="t1", + oci_image_target_id="it1", + web_application_target_id="wt1", + ) diff --git a/tests/protocols/gmpnext/entities/test_tasks.py b/tests/protocols/gmpnext/entities/test_tasks.py index 3c0b8ef17..2f497ea01 100644 --- a/tests/protocols/gmpnext/entities/test_tasks.py +++ b/tests/protocols/gmpnext/entities/test_tasks.py @@ -10,6 +10,7 @@ GmpCreateContainerTaskTestMixin, GmpCreateImportTaskTestMixin, GmpCreateTaskTestMixin, + GmpCreateWebApplicationTaskTestMixin, GmpDeleteTaskTestMixin, GmpGetTasksTestMixin, GmpGetTaskTestMixin, @@ -47,6 +48,12 @@ class GMPCreateImportTaskTestCase(GmpCreateImportTaskTestMixin, GMPTestCase): pass +class GmpCreateWebApplicationTaskTestCase( + GmpCreateWebApplicationTaskTestMixin, GMPTestCase +): + pass + + class GMPCreateTaskTestCase(GmpCreateTaskTestMixin, GMPTestCase): pass