Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libcloudforensics/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def format(self, record: logging.LogRecord) -> str:
if loglevel_color:
message = loglevel_color + message + RESET_SEQ
record.msg = message
record.args = ()
return super().format(record)


Expand Down
13 changes: 8 additions & 5 deletions libcloudforensics/providers/gcp/forensics.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def CreateDiskCopy(
zone: str,
instance_name: Optional[str] = None,
disk_name: Optional[str] = None,
disk_type: Optional[str] = None) -> 'compute.GoogleComputeDisk':
disk_type: Optional[str] = None,
src_zone: Optional[str] = None) -> 'compute.GoogleComputeDisk':
"""Creates a copy of a Google Compute Disk.

Args:
Expand All @@ -56,6 +57,8 @@ def CreateDiskCopy(
disk_type (str): Optional. URL of the disk type resource describing
which disk type to use to create the disk. The default behavior is to
use the same disk type as the source disk.
src_zone (str): Optional. Zone where the source disk is located. If None,
the default zone will be used.

Returns:
GoogleComputeDisk: A Google Compute Disk object.
Expand All @@ -73,9 +76,9 @@ def CreateDiskCopy(

try:
if disk_name:
disk_to_copy = src_project.compute.GetDisk(disk_name)
disk_to_copy = src_project.compute.GetDisk(disk_name, zone=src_zone)
elif instance_name:
instance = src_project.compute.GetInstance(instance_name)
instance = src_project.compute.GetInstance(instance_name, zone=src_zone)
disk_to_copy = instance.GetBootDisk()
else:
raise ValueError(
Expand Down Expand Up @@ -472,15 +475,15 @@ def VMRemoveServiceAccount(
# Get the initial powered state of the instance
initial_state = instance.GetPowerState()

if not initial_state in valid_starting_states:
if initial_state not in valid_starting_states:
logger.error(
'Instance "{0:s}" is currently {1:s} which is an invalid '
'state for this operation'.format(instance_name, initial_state))
return False

try:
# Stop the instance if it is not already (or on the way)....
if not initial_state in ('TERMINATED', 'STOPPING'):
if initial_state not in ('TERMINATED', 'STOPPING'):
instance.Stop()

# Remove the service account
Expand Down
166 changes: 137 additions & 29 deletions libcloudforensics/providers/gcp/internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import subprocess
import time
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union
from typing import Any, cast, Dict, List, Optional, Tuple, TypeVar, TYPE_CHECKING, Union

from googleapiclient.errors import HttpError

Expand Down Expand Up @@ -143,6 +143,73 @@ def _FindResourceByName(

return matches.pop()

def _GetResourceFromComputeApi(
self,
resource_type: str,
resource_name: str,
zone: Optional[str] = None,
region: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Helper to get a specific resource from the GCE API.

Args:
resource_type: The GCE resource type (e.g., 'instance' or 'disk').
resource_name: The resource name or ID.
zone: Optional zone to restrict the search.
region: Optional region to restrict the search.

Returns:
Optional[Dict[str, Any]]: The resource metadata if found, None otherwise.
"""
client = getattr(self.GceApi(), resource_type)()
if zone or region:
param_name = (resource_type[:-1] if resource_type.endswith('s')
else resource_type)
if resource_type == 'instanceGroupManagers':
param_name = 'instanceGroupManager'
elif resource_type == 'regionDisks':
param_name = 'disk'

kwargs = {'project': self.project_id, param_name: resource_name}
if zone:
kwargs['zone'] = zone
if region:
kwargs['region'] = region

try:
return cast(Dict[str, Any], client.get(**kwargs).execute())
except HttpError as e:
if e.resp.status == 404:
return None
raise
else:
# Use aggregatedList with filter to avoid listing all resources
filter_str = f'name = "{resource_name}"'
if re.match(RESOURCE_ID_REGEX, resource_name):
filter_str = f'id = "{resource_name}"'

# Regional resources might not support aggregatedList on their own
# client. For example, regionDisks doesn't have aggregatedList.
# We use disks.aggregatedList instead as it returns both zonal
# and regional disks.
if resource_type == 'regionDisks':
client = self.GceApi().disks() # pylint: disable=no-member
res_type_in_resp = 'disks'
else:
res_type_in_resp = resource_type

responses = common.ExecuteRequest(
client,
'aggregatedList', {
'project': self.project_id, 'filter': filter_str
})
for response in responses:
for location in response.get('items', {}):
items = response['items'][location].get(res_type_in_resp, [])
if items:
return cast(Dict[str, Any], items[0])
return None


def Instances(self,
refresh: bool = True) -> Dict[str, 'GoogleComputeInstance']:
"""Get all instances in the project.
Expand Down Expand Up @@ -237,18 +304,28 @@ def ListInstances(self) -> Dict[str, 'GoogleComputeInstance']:

return instances

def ListSnapshots(self, filter_string: str | None = None) -> Dict[str, Any]:
def ListSnapshots(self,
filter_string: str | None = None,
zone: str | None = None) -> Dict[str, Any]:
"""List snapshots in project.

Args:
filter_string: Filter for the snapshot query.
zone: Optional zone to filter snapshots by.

Returns:
Dict[str, Any]: Dictionary mapping snapshot IDs (str)
to their respective snapshot description.
See:
https://docs.cloud.google.com/compute/docs/reference/rest/v1/snapshots/list
"""
if zone:
zone_filter = f'sourceDisk ~ ".*/zones/{zone}/disks/.*"'
if filter_string:
filter_string = f'({filter_string}) ({zone_filter})'
else:
filter_string = zone_filter

snapshots = {}
gce_snapshot_client = self.GceApi().snapshots() # pylint: disable=no-member
responses = common.ExecuteRequest(
Expand Down Expand Up @@ -450,16 +527,25 @@ def GetRegionDisk(
Raises:
ResourceNotFoundError: When the specified disk cannot be found in project.
"""
disks = self.RegionDisks()
if re.match(RESOURCE_ID_REGEX, disk_name):
disk = disks.get(disk_name)
else:
disk = self._FindResourceByName(disks, disk_name, region=region)
if not disk:
disk_dict = self._GetResourceFromComputeApi(
'regionDisks', disk_name, region=region)

if not disk_dict:
raise errors.ResourceNotFoundError(
f'Disk {disk_name} was not found in project '
f'{self.project_id}', __name__)
return disk

try:
_, disk_region = disk_dict['region'].rsplit('/', 1)
except ValueError as exception:
raise errors.ResourceNotFoundError(
f'Region not found for disk {disk_name} in project '
f'{self.project_id}', __name__) from exception

return GoogleRegionComputeDisk(
self.project_id, disk_region, disk_dict['name'],
resource_id=disk_dict['id'],
labels=disk_dict.get('labels'))

def GetInstance(
self,
Expand All @@ -478,19 +564,28 @@ def GetInstance(
Raises:
ResourceNotFoundError: If instance does not exist.
"""
instance_dict = self._GetResourceFromComputeApi(
'instances', instance_name, zone=zone)

instances = self.Instances()

if re.match(RESOURCE_ID_REGEX, instance_name):
instance = instances.get(instance_name)
else:
instance = self._FindResourceByName(instances, instance_name, zone)

if not instance:
if not instance_dict:
raise errors.ResourceNotFoundError(
f'Instance {instance_name} was not found in project '
f'{self.project_id}', __name__)
return instance

try:
_, instance_zone = instance_dict['zone'].rsplit('/', 1)
except ValueError as exception:
raise errors.ResourceNotFoundError(
f'Zone not found for instance {instance_name} in project '
f'{self.project_id}', __name__) from exception

return GoogleComputeInstance(
self.project_id,
instance_zone,
instance_dict['name'],
resource_id=instance_dict['id'],
labels=instance_dict.get('labels'),
deletion_protection=instance_dict.get('deletionProtection', False))

def GetDisk(
self,
Expand All @@ -509,16 +604,26 @@ def GetDisk(
ResourceNotFoundError: When the specified disk cannot be found in project.
"""

disks = self.Disks()
if re.match(RESOURCE_ID_REGEX, disk_name):
disk = disks.get(disk_name)
else:
disk = self._FindResourceByName(disks, disk_name, zone)
if not disk:
disk_dict = self._GetResourceFromComputeApi('disks', disk_name, zone=zone)

if not disk_dict:
raise errors.ResourceNotFoundError(
f'Disk {disk_name} was not found in project '
f'{self.project_id}', __name__)
return disk

try:
_, disk_zone = disk_dict['zone'].rsplit('/', 1)
except ValueError as exception:
raise errors.ResourceNotFoundError(
f'Zone not found for disk {disk_name} in project '
f'{self.project_id}', __name__) from exception

return GoogleComputeDisk(
self.project_id,
disk_zone,
disk_dict['name'],
resource_id=disk_dict['id'],
labels=disk_dict.get('labels'))

def CreateDiskFromSnapshot(
self,
Expand Down Expand Up @@ -1431,7 +1536,8 @@ def GetBootDisk(self) -> 'GoogleComputeDisk':
self.name),
__name__)
disk_name = disk['source'].split('/')[-1]
return GoogleCloudCompute(self.project_id).GetDisk(disk_name=disk_name)
return GoogleCloudCompute(self.project_id).GetDisk(
disk_name=disk_name, zone=self.zone)
raise errors.ResourceNotFoundError(
'Boot disk not found for instance {0:s}'.format(self.name), __name__)

Expand All @@ -1451,7 +1557,8 @@ def GetDisk(self, disk_name: str) -> 'GoogleComputeDisk':

for disk in self.GetValue('disks'):
if disk.get('source', '').split('/')[-1] == disk_name:
return GoogleCloudCompute(self.project_id).GetDisk(disk_name=disk_name)
return GoogleCloudCompute(self.project_id).GetDisk(
disk_name=disk_name, zone=self.zone)
raise errors.ResourceNotFoundError(
'Disk {0:s} was not found in instance {1:s}'.format(
disk_name, self.name),
Expand Down Expand Up @@ -1640,7 +1747,8 @@ def Delete(

for disk_name in disks_to_delete:
try:
disk = GoogleCloudCompute(self.project_id).GetDisk(disk_name=disk_name)
disk = GoogleCloudCompute(self.project_id).GetDisk(
disk_name=disk_name, zone=self.zone)
disk.Delete()
except (errors.ResourceDeletionError, errors.ResourceNotFoundError):
logger.info(
Expand Down Expand Up @@ -2333,7 +2441,7 @@ def ExportImage(
build_args.append('-format={0:s}'.format(image_format))
build_body = {
'timeout': '86400s',
'steps': [{
'steps': [{
'args': build_args,
'name': 'gcr.io/compute-image-tools/gce_vm_image_export:release',
'env': []
Expand Down
Loading
Loading