diff --git a/libcloudforensics/logging_utils.py b/libcloudforensics/logging_utils.py index 74747001..e1c28862 100644 --- a/libcloudforensics/logging_utils.py +++ b/libcloudforensics/logging_utils.py @@ -99,6 +99,7 @@ def format(self, record: logging.LogRecord) -> str: if loglevel_color: message = loglevel_color + message + RESET_SEQ record.msg = message + record.args = () return super().format(record) diff --git a/libcloudforensics/providers/gcp/forensics.py b/libcloudforensics/providers/gcp/forensics.py index b321f217..848dcd41 100644 --- a/libcloudforensics/providers/gcp/forensics.py +++ b/libcloudforensics/providers/gcp/forensics.py @@ -43,7 +43,8 @@ def CreateDiskCopy( zone: str, instance_name: Optional[str] = None, disk_name: Optional[str] = None, - disk_type: Optional[str] = None) -> 'compute.GoogleComputeDisk': + disk_type: Optional[str] = None, + src_zone: Optional[str] = None) -> 'compute.GoogleComputeDisk': """Creates a copy of a Google Compute Disk. Args: @@ -56,6 +57,8 @@ def CreateDiskCopy( disk_type (str): Optional. URL of the disk type resource describing which disk type to use to create the disk. The default behavior is to use the same disk type as the source disk. + src_zone (str): Optional. Zone where the source disk is located. If None, + the default zone will be used. Returns: GoogleComputeDisk: A Google Compute Disk object. @@ -73,9 +76,9 @@ def CreateDiskCopy( try: if disk_name: - disk_to_copy = src_project.compute.GetDisk(disk_name) + disk_to_copy = src_project.compute.GetDisk(disk_name, zone=src_zone) elif instance_name: - instance = src_project.compute.GetInstance(instance_name) + instance = src_project.compute.GetInstance(instance_name, zone=src_zone) disk_to_copy = instance.GetBootDisk() else: raise ValueError( @@ -472,7 +475,7 @@ def VMRemoveServiceAccount( # Get the initial powered state of the instance initial_state = instance.GetPowerState() - if not initial_state in valid_starting_states: + if initial_state not in valid_starting_states: logger.error( 'Instance "{0:s}" is currently {1:s} which is an invalid ' 'state for this operation'.format(instance_name, initial_state)) @@ -480,7 +483,7 @@ def VMRemoveServiceAccount( try: # Stop the instance if it is not already (or on the way).... - if not initial_state in ('TERMINATED', 'STOPPING'): + if initial_state not in ('TERMINATED', 'STOPPING'): instance.Stop() # Remove the service account diff --git a/libcloudforensics/providers/gcp/internal/compute.py b/libcloudforensics/providers/gcp/internal/compute.py index 22f722fc..e4158160 100644 --- a/libcloudforensics/providers/gcp/internal/compute.py +++ b/libcloudforensics/providers/gcp/internal/compute.py @@ -20,7 +20,7 @@ import subprocess import time from collections import defaultdict -from typing import Any, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union +from typing import Any, cast, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union from googleapiclient.errors import HttpError @@ -143,6 +143,73 @@ def _FindResourceByName( return matches.pop() + def _GetResourceFromComputeApi( + self, + resource_type: str, + resource_name: str, + zone: Optional[str] = None, + region: Optional[str] = None) -> Optional[Dict[str, Any]]: + """Helper to get a specific resource from the GCE API. + + Args: + resource_type: The GCE resource type (e.g., 'instance' or 'disk'). + resource_name: The resource name or ID. + zone: Optional zone to restrict the search. + region: Optional region to restrict the search. + + Returns: + Optional[Dict[str, Any]]: The resource metadata if found, None otherwise. + """ + client = getattr(self.GceApi(), resource_type)() + if zone or region: + param_name = (resource_type[:-1] if resource_type.endswith('s') + else resource_type) + if resource_type == 'instanceGroupManagers': + param_name = 'instanceGroupManager' + elif resource_type == 'regionDisks': + param_name = 'disk' + + kwargs = {'project': self.project_id, param_name: resource_name} + if zone: + kwargs['zone'] = zone + if region: + kwargs['region'] = region + + try: + return cast(Dict[str, Any], client.get(**kwargs).execute()) + except HttpError as e: + if e.resp.status == 404: + return None + raise + else: + # Use aggregatedList with filter to avoid listing all resources + filter_str = f'name = "{resource_name}"' + if re.match(RESOURCE_ID_REGEX, resource_name): + filter_str = f'id = "{resource_name}"' + + # Regional resources might not support aggregatedList on their own + # client. For example, regionDisks doesn't have aggregatedList. + # We use disks.aggregatedList instead as it returns both zonal + # and regional disks. + if resource_type == 'regionDisks': + client = self.GceApi().disks() # pylint: disable=no-member + res_type_in_resp = 'disks' + else: + res_type_in_resp = resource_type + + responses = common.ExecuteRequest( + client, + 'aggregatedList', { + 'project': self.project_id, 'filter': filter_str + }) + for response in responses: + for location in response.get('items', {}): + items = response['items'][location].get(res_type_in_resp, []) + if items: + return cast(Dict[str, Any], items[0]) + return None + + def Instances(self, refresh: bool = True) -> Dict[str, 'GoogleComputeInstance']: """Get all instances in the project. @@ -237,11 +304,14 @@ def ListInstances(self) -> Dict[str, 'GoogleComputeInstance']: return instances - def ListSnapshots(self, filter_string: str | None = None) -> Dict[str, Any]: + def ListSnapshots(self, + filter_string: str | None = None, + zone: str | None = None) -> Dict[str, Any]: """List snapshots in project. Args: filter_string: Filter for the snapshot query. + zone: Optional zone to filter snapshots by. Returns: Dict[str, Any]: Dictionary mapping snapshot IDs (str) @@ -249,6 +319,13 @@ def ListSnapshots(self, filter_string: str | None = None) -> Dict[str, Any]: See: https://docs.cloud.google.com/compute/docs/reference/rest/v1/snapshots/list """ + if zone: + zone_filter = f'sourceDisk ~ ".*/zones/{zone}/disks/.*"' + if filter_string: + filter_string = f'({filter_string}) ({zone_filter})' + else: + filter_string = zone_filter + snapshots = {} gce_snapshot_client = self.GceApi().snapshots() # pylint: disable=no-member responses = common.ExecuteRequest( @@ -450,16 +527,25 @@ def GetRegionDisk( Raises: ResourceNotFoundError: When the specified disk cannot be found in project. """ - disks = self.RegionDisks() - if re.match(RESOURCE_ID_REGEX, disk_name): - disk = disks.get(disk_name) - else: - disk = self._FindResourceByName(disks, disk_name, region=region) - if not disk: + disk_dict = self._GetResourceFromComputeApi( + 'regionDisks', disk_name, region=region) + + if not disk_dict: raise errors.ResourceNotFoundError( f'Disk {disk_name} was not found in project ' f'{self.project_id}', __name__) - return disk + + try: + _, disk_region = disk_dict['region'].rsplit('/', 1) + except ValueError as exception: + raise errors.ResourceNotFoundError( + f'Region not found for disk {disk_name} in project ' + f'{self.project_id}', __name__) from exception + + return GoogleRegionComputeDisk( + self.project_id, disk_region, disk_dict['name'], + resource_id=disk_dict['id'], + labels=disk_dict.get('labels')) def GetInstance( self, @@ -478,19 +564,28 @@ def GetInstance( Raises: ResourceNotFoundError: If instance does not exist. """ + instance_dict = self._GetResourceFromComputeApi( + 'instances', instance_name, zone=zone) - instances = self.Instances() - - if re.match(RESOURCE_ID_REGEX, instance_name): - instance = instances.get(instance_name) - else: - instance = self._FindResourceByName(instances, instance_name, zone) - - if not instance: + if not instance_dict: raise errors.ResourceNotFoundError( f'Instance {instance_name} was not found in project ' f'{self.project_id}', __name__) - return instance + + try: + _, instance_zone = instance_dict['zone'].rsplit('/', 1) + except ValueError as exception: + raise errors.ResourceNotFoundError( + f'Zone not found for instance {instance_name} in project ' + f'{self.project_id}', __name__) from exception + + return GoogleComputeInstance( + self.project_id, + instance_zone, + instance_dict['name'], + resource_id=instance_dict['id'], + labels=instance_dict.get('labels'), + deletion_protection=instance_dict.get('deletionProtection', False)) def GetDisk( self, @@ -509,16 +604,26 @@ def GetDisk( ResourceNotFoundError: When the specified disk cannot be found in project. """ - disks = self.Disks() - if re.match(RESOURCE_ID_REGEX, disk_name): - disk = disks.get(disk_name) - else: - disk = self._FindResourceByName(disks, disk_name, zone) - if not disk: + disk_dict = self._GetResourceFromComputeApi('disks', disk_name, zone=zone) + + if not disk_dict: raise errors.ResourceNotFoundError( f'Disk {disk_name} was not found in project ' f'{self.project_id}', __name__) - return disk + + try: + _, disk_zone = disk_dict['zone'].rsplit('/', 1) + except ValueError as exception: + raise errors.ResourceNotFoundError( + f'Zone not found for disk {disk_name} in project ' + f'{self.project_id}', __name__) from exception + + return GoogleComputeDisk( + self.project_id, + disk_zone, + disk_dict['name'], + resource_id=disk_dict['id'], + labels=disk_dict.get('labels')) def CreateDiskFromSnapshot( self, @@ -1431,7 +1536,8 @@ def GetBootDisk(self) -> 'GoogleComputeDisk': self.name), __name__) disk_name = disk['source'].split('/')[-1] - return GoogleCloudCompute(self.project_id).GetDisk(disk_name=disk_name) + return GoogleCloudCompute(self.project_id).GetDisk( + disk_name=disk_name, zone=self.zone) raise errors.ResourceNotFoundError( 'Boot disk not found for instance {0:s}'.format(self.name), __name__) @@ -1451,7 +1557,8 @@ def GetDisk(self, disk_name: str) -> 'GoogleComputeDisk': for disk in self.GetValue('disks'): if disk.get('source', '').split('/')[-1] == disk_name: - return GoogleCloudCompute(self.project_id).GetDisk(disk_name=disk_name) + return GoogleCloudCompute(self.project_id).GetDisk( + disk_name=disk_name, zone=self.zone) raise errors.ResourceNotFoundError( 'Disk {0:s} was not found in instance {1:s}'.format( disk_name, self.name), @@ -1640,7 +1747,8 @@ def Delete( for disk_name in disks_to_delete: try: - disk = GoogleCloudCompute(self.project_id).GetDisk(disk_name=disk_name) + disk = GoogleCloudCompute(self.project_id).GetDisk( + disk_name=disk_name, zone=self.zone) disk.Delete() except (errors.ResourceDeletionError, errors.ResourceNotFoundError): logger.info( @@ -2333,7 +2441,7 @@ def ExportImage( build_args.append('-format={0:s}'.format(image_format)) build_body = { 'timeout': '86400s', - 'steps': [{ + 'steps': [{ 'args': build_args, 'name': 'gcr.io/compute-image-tools/gce_vm_image_export:release', 'env': [] diff --git a/tests/providers/gcp/gcp_mocks.py b/tests/providers/gcp/gcp_mocks.py index df227cea..afe30848 100644 --- a/tests/providers/gcp/gcp_mocks.py +++ b/tests/providers/gcp/gcp_mocks.py @@ -74,6 +74,27 @@ 'projects/fake-target-project/logs/OSConfigAgent' ] +# pylint: disable=line-too-long +FAKE_DISK_LIST = [ + { + 'name': + 'fake-boot-disk', + 'zone': + 'https://www.googleapis.com/compute/v1/projects/fake-source-project/zones/fake-zone', + 'id': + '01234567890123456789' + }, + { + 'name': + 'fake-disk', + 'zone': + 'https://www.googleapis.com/compute/v1/projects/fake-source-project/zones/fake-zone', + 'id': + '0123456789012345678' + } +] +# pylint: enable=line-too-long + STARTUP_SCRIPT = 'scripts/startup.sh' FAKE_LOG_ENTRIES = [{ @@ -141,14 +162,14 @@ 'disks': [{ 'name': FAKE_BOOT_DISK.name, 'zone': '/' + FAKE_BOOT_DISK.zone, - 'id': '01234567890123456789' + 'id': '01234567890123456789' }] }, 1: { 'disks': [{ 'name': FAKE_DISK.name, 'zone': '/' + FAKE_DISK.zone, - 'id': '0123456789012345678' + 'id': '0123456789012345678' }] } } @@ -1039,48 +1060,48 @@ } MOCK_BIGQUERY_JOBS = { - "etag": "ABCde1FGHiJklmn23op4rs==", - "kind": "bigquery#jobList", - "jobs": [{ - "id": "fake-target-project:europe-west1.bquxjob_12345678_abcdefghij1k", - "kind": "bigquery#job", - "jobReference": { - "projectId": "fake-target-project", - "jobId": "bquxjob_12345678_abcdefghij1k", - "location": "europe-west1" - }, - "state": "DONE", - "statistics": { - "creationTime": "1640804415278", - "startTime": "1640804415351", - "endTime": "1640804415457", - "totalBytesProcessed": "0", - "query": { - "totalBytesProcessed": "0", - "totalBytesBilled": "0", - "cacheHit": True, - "statementType": "SELECT" - } - }, - "configuration": { - "query": { - "query": "SELECT * FROM `fake-target-project.fake-target-project-dataset.fake-target-project-table`", - "destinationTable": { - "projectId": "fake-target-project", - "datasetId": "_1a2b34c567890d1efghi2j345678kl9012mn34c5", - "tableId": "anona1234c5d67890123efg45678hij90kl23mnoprst" - }, - "writeDisposition": "WRITE_TRUNCATE", - "priority": "INTERACTIVE", - "useLegacySql": False - }, - "jobType": "QUERY" - }, - "status": { - "state": "DONE" - }, - "user_email": "fake-user-email@test.com" - }] + "etag": "ABCde1FGHiJklmn23op4rs==", + "kind": "bigquery#jobList", + "jobs": [{ + "id": "fake-target-project:europe-west1.bquxjob_12345678_abcdefghij1k", + "kind": "bigquery#job", + "jobReference": { + "projectId": "fake-target-project", + "jobId": "bquxjob_12345678_abcdefghij1k", + "location": "europe-west1" + }, + "state": "DONE", + "statistics": { + "creationTime": "1640804415278", + "startTime": "1640804415351", + "endTime": "1640804415457", + "totalBytesProcessed": "0", + "query": { + "totalBytesProcessed": "0", + "totalBytesBilled": "0", + "cacheHit": True, + "statementType": "SELECT" + } + }, + "configuration": { + "query": { + "query": "SELECT * FROM `fake-target-project.fake-target-project-dataset.fake-target-project-table`", + "destinationTable": { + "projectId": "fake-target-project", + "datasetId": "_1a2b34c567890d1efghi2j345678kl9012mn34c5", + "tableId": "anona1234c5d67890123efg45678hij90kl23mnoprst" + }, + "writeDisposition": "WRITE_TRUNCATE", + "priority": "INTERACTIVE", + "useLegacySql": False + }, + "jobType": "QUERY" + }, + "status": { + "state": "DONE" + }, + "user_email": "fake-user-email@test.com" + }] } MOCK_IAM_POLICY = { diff --git a/tests/providers/gcp/internal/test_compute.py b/tests/providers/gcp/internal/test_compute.py index f4546730..53c91383 100644 --- a/tests/providers/gcp/internal/test_compute.py +++ b/tests/providers/gcp/internal/test_compute.py @@ -101,36 +101,45 @@ def testListDisks(self, mock_gce_api): self.assertEqual('fake-zone', list_disks['01234567890123456789'].zone) @typing.no_type_check - @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.ListInstances') - def testGetInstance(self, mock_list_instances): + @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute._GetResourceFromComputeApi') + def testGetInstance(self, mock_get_resource): """Test that an instance of a project can be found.""" - mock_list_instances.return_value = gcp_mocks.MOCK_LIST_INSTANCES + mock_get_resource.return_value = gcp_mocks.MOCK_GCE_OPERATION_INSTANCES_GET + # In MOCK_GCE_OPERATION_INSTANCES_GET, 'zone' is a URL. + # We also need to mock ID and name if needed, but they are there. + # Note: FAKE_INSTANCE.resource_id is used. + instance_data = gcp_mocks.MOCK_GCE_OPERATION_INSTANCES_GET.copy() + instance_data['id'] = gcp_mocks.FAKE_INSTANCE.resource_id + mock_get_resource.return_value = instance_data + found_instance = gcp_mocks.FAKE_SOURCE_PROJECT.compute.GetInstance(gcp_mocks.FAKE_INSTANCE.name) self.assertIsInstance(found_instance, compute.GoogleComputeInstance) self.assertEqual(gcp_mocks.FAKE_SOURCE_PROJECT.project_id, found_instance.project_id) self.assertEqual('fake-instance', found_instance.name) - self.assertEqual('fake-zone', found_instance.zone) - # pylint: disable=protected-access - self.assertEqual(gcp_mocks.FAKE_INSTANCE._data, found_instance._data) - # pylint: enable=protected-access + self.assertEqual('us-central1-a', found_instance.zone) + + mock_get_resource.return_value = None with self.assertRaises(errors.ResourceNotFoundError): gcp_mocks.FAKE_SOURCE_PROJECT.compute.GetInstance('non-existent-instance') - id_found_instance = gcp_mocks.FAKE_SOURCE_PROJECT.compute.GetInstance(gcp_mocks.FAKE_INSTANCE.resource_id) - self.assertEqual(id_found_instance, gcp_mocks.FAKE_INSTANCE) @typing.no_type_check - @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.ListDisks') - def testGetDisk(self, mock_list_disks): + @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute._GetResourceFromComputeApi') + def testGetDisk(self, mock_get_resource): """Test that a disk of an instance can be found.""" - mock_list_disks.return_value = gcp_mocks.MOCK_LIST_DISKS + disk_data = gcp_mocks.MOCK_DISKS_AGGREGATED['items'][0]['disks'][0] + mock_get_resource.return_value = disk_data + found_disk = gcp_mocks.FAKE_SOURCE_PROJECT.compute.GetDisk(gcp_mocks.FAKE_DISK.name) self.assertIsInstance(found_disk, compute.GoogleComputeDisk) self.assertEqual(gcp_mocks.FAKE_SOURCE_PROJECT.project_id, found_disk.project_id) - self.assertEqual('fake-disk', found_disk.name) + self.assertEqual('fake-boot-disk', found_disk.name) # FAKE_DISK name is used but mock returns FAKE_BOOT_DISK self.assertEqual('fake-zone', found_disk.zone) + + mock_get_resource.return_value = None with self.assertRaises(errors.ResourceNotFoundError): gcp_mocks.FAKE_SOURCE_PROJECT.compute.GetDisk('non-existent-disk') + @typing.no_type_check @mock.patch('libcloudforensics.providers.gcp.internal.common.GoogleCloudComputeClient.BlockOperation') @mock.patch('libcloudforensics.providers.gcp.internal.common.GoogleCloudComputeClient.GceApi') @@ -202,6 +211,37 @@ def testCreateDiskFromSnapshot(self, mock_gce_api, mock_block_operation): 'Unknown error occurred when creating disk from Snapshot', str(context.exception)) + @typing.no_type_check + @mock.patch('libcloudforensics.providers.gcp.internal.common.GoogleCloudComputeClient.GceApi') + @mock.patch('libcloudforensics.providers.gcp.internal.common.ExecuteRequest') + def testListSnapshots(self, mock_execute, mock_gce_api): + """Test that snapshots of project are correctly listed.""" + mock_gce_api.return_value.snapshots.return_value = mock.Mock() + mock_execute.return_value = [{'items': [{'name': 'fake-snapshot'}]}] + + # Test without zone + snapshots = gcp_mocks.FAKE_SOURCE_PROJECT.compute.ListSnapshots(filter_string='name ~ "s.*"') + self.assertEqual(1, len(snapshots)) + self.assertIn('fake-snapshot', snapshots) + mock_execute.assert_called_with( + mock.ANY, 'list', {'project': 'fake-source-project', 'filter': 'name ~ "s.*"'}) + + # Test with zone + # We mock ExecuteRequest to return a snapshot from us-central1-a + mock_execute.return_value = [{'items': [ + {'name': 'snapshot-target', 'sourceDisk': '.../zones/us-central1-a/disks/disk1'} + ]}] + snapshots = gcp_mocks.FAKE_SOURCE_PROJECT.compute.ListSnapshots( + filter_string='name ~ "snapshot.*"', zone='us-central1-a') + + self.assertEqual(1, len(snapshots)) + self.assertIn('snapshot-target', snapshots) + + # The filter passed to the API should contain both combined with space + expected_filter = '(name ~ "snapshot.*") (sourceDisk ~ ".*/zones/us-central1-a/disks/.*")' + mock_execute.assert_called_with( + mock.ANY, 'list', {'project': 'fake-source-project', 'filter': expected_filter}) + @typing.no_type_check @mock.patch('libcloudforensics.providers.gcp.internal.common.GoogleCloudComputeClient.GceApi') def testGetNetwork(self, mock_gce_api): @@ -504,31 +544,35 @@ class GoogleComputeInstanceTest(unittest.TestCase): # pylint: disable=line-too-long @typing.no_type_check - @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.ListDisks') + @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.GetDisk') @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleComputeInstance.GetOperation') - def testGetBootDisk(self, mock_get_operation, mock_list_disks): + def testGetBootDisk(self, mock_get_operation, mock_get_disk): """Test that a boot disk is retrieved if existing.""" mock_get_operation.return_value = gcp_mocks.MOCK_GCE_OPERATION_INSTANCES_GET - mock_list_disks.return_value = gcp_mocks.MOCK_LIST_DISKS + mock_get_disk.return_value = gcp_mocks.FAKE_BOOT_DISK boot_disk = gcp_mocks.FAKE_INSTANCE.GetBootDisk() self.assertIsInstance(boot_disk, compute.GoogleComputeDisk) self.assertEqual('fake-boot-disk', boot_disk.name) + mock_get_disk.assert_called_with(disk_name='fake-boot-disk', zone='fake-zone') + @typing.no_type_check - @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.ListDisks') + @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.GetDisk') @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleComputeInstance.GetOperation') - def testGetDisk(self, mock_get_operation, mock_list_disks): + def testGetDisk(self, mock_get_operation, mock_get_disk): """Test that a disk is retrieved by its name, if existing.""" mock_get_operation.return_value = gcp_mocks.MOCK_GCE_OPERATION_INSTANCES_GET - mock_list_disks.return_value = gcp_mocks.MOCK_LIST_DISKS + mock_get_disk.return_value = gcp_mocks.FAKE_DISK # Normal disk disk = gcp_mocks.FAKE_INSTANCE.GetDisk(gcp_mocks.FAKE_DISK.name) self.assertIsInstance(disk, compute.GoogleComputeDisk) self.assertEqual('fake-disk', disk.name) + mock_get_disk.assert_called_with(disk_name='fake-disk', zone='fake-zone') # Boot disk + mock_get_disk.return_value = gcp_mocks.FAKE_BOOT_DISK disk = gcp_mocks.FAKE_INSTANCE.GetDisk(gcp_mocks.FAKE_BOOT_DISK.name) self.assertIsInstance(disk, compute.GoogleComputeDisk) self.assertEqual('fake-boot-disk', disk.name) @@ -538,16 +582,17 @@ def testGetDisk(self, mock_get_operation, mock_list_disks): gcp_mocks.FAKE_INSTANCE.GetDisk('non-existent-disk') @typing.no_type_check - @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.ListDisks') @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleComputeInstance.GetOperation') - def testListDisks(self, mock_get_operation, mock_list_disks): + @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleComputeInstance.GetDisk') + def testListDisks(self, mock_get_disk, mock_get_operation): """Test that all disks of an instance are correctly retrieved.""" mock_get_operation.return_value = gcp_mocks.MOCK_GCE_OPERATION_INSTANCES_GET - mock_list_disks.return_value = gcp_mocks.MOCK_LIST_DISKS + mock_get_disk.side_effect = [gcp_mocks.FAKE_BOOT_DISK, gcp_mocks.FAKE_DISK] disks = gcp_mocks.FAKE_INSTANCE.ListDisks() self.assertEqual(2, len(disks)) self.assertEqual(['fake-boot-disk', 'fake-disk'], list(disks.keys())) + mock_get_disk.assert_has_calls([mock.call('fake-boot-disk'), mock.call('fake-disk')]) @typing.no_type_check @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.ListDisks') @@ -564,6 +609,7 @@ def testDelete( mock_disk_delete = mock_gce_api.return_value.disks.return_value.delete mock_disk_delete.return_value.execute.return_value = None + mock_gce_api.return_value.disks.return_value.get.return_value.execute.side_effect = gcp_mocks.FAKE_DISK_LIST gcp_mocks.FAKE_INSTANCE.Delete(delete_disks=True) calls = [ diff --git a/tests/providers/gcp/test_forensics.py b/tests/providers/gcp/test_forensics.py index f9116652..f21d8252 100644 --- a/tests/providers/gcp/test_forensics.py +++ b/tests/providers/gcp/test_forensics.py @@ -62,7 +62,7 @@ def testCreateDiskCopy1(self, gcp_mocks.FAKE_ANALYSIS_PROJECT.project_id, zone=gcp_mocks.FAKE_INSTANCE.zone, instance_name=gcp_mocks.FAKE_INSTANCE.name) - mock_get_instance.assert_called_with(gcp_mocks.FAKE_INSTANCE.name) + mock_get_instance.assert_called_with(gcp_mocks.FAKE_INSTANCE.name, zone=None) mock_get_disk.assert_not_called() self.assertIsInstance(new_disk, compute.GoogleComputeDisk) self.assertTrue(new_disk.name.startswith('evidence-')) @@ -99,7 +99,7 @@ def testCreateDiskCopy2(self, gcp_mocks.FAKE_ANALYSIS_PROJECT.project_id, zone=gcp_mocks.FAKE_INSTANCE.zone, disk_name=gcp_mocks.FAKE_DISK.name) - mock_get_disk.assert_called_with(gcp_mocks.FAKE_DISK.name) + mock_get_disk.assert_called_with(gcp_mocks.FAKE_DISK.name, zone=None) mock_get_boot_disk.assert_not_called() self.assertIsInstance(new_disk, compute.GoogleComputeDisk) self.assertTrue(new_disk.name.startswith('evidence-')) @@ -107,13 +107,20 @@ def testCreateDiskCopy2(self, self.assertTrue(new_disk.name.endswith('-copy')) @typing.no_type_check + @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.GetInstance') + @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.GetDisk') @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.ListInstances') @mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleCloudCompute.ListDisks') - def testCreateDiskCopy3(self, mock_list_disks, mock_list_instances): + def testCreateDiskCopy3(self, mock_list_disks, mock_list_instances, + mock_get_disk, mock_get_instance): """Test that a disk from a remote project is duplicated and attached to an analysis project. """ mock_list_disks.return_value = gcp_mocks.MOCK_LIST_DISKS mock_list_instances.return_value = gcp_mocks.MOCK_LIST_INSTANCES + mock_get_disk.side_effect = errors.ResourceNotFoundError( + 'Disk not found', __name__) + mock_get_instance.side_effect = errors.ResourceNotFoundError( + 'Instance not found', __name__) # create_disk_copy( # src_proj,