From 17f63caaeafec7b8e72afe54e80ec7d73684f113 Mon Sep 17 00:00:00 2001 From: Mauricio Mendoza <115591158+mmendozac@users.noreply.github.com> Date: Sat, 25 Mar 2023 18:54:31 -0300 Subject: [PATCH 1/3] Update main.py # Changelog - Added support for analyzing security groups attached to AWS Application Load Balancers and VPC Endpoints, in addition to EC2 instances. - Refactored the code to make it more modular and easier to maintain. - Added the resource type (instance, load balancer, or endpoint) as a new column in the output DataFrame and Excel file. - Improved the function `get_name` to handle the names for load balancers and VPC endpoints. - Created the `get_sg_name` function to fetch the security group name using the group ID and region. - Processed inbound and outbound rules separately and appended them to the DataFrame in a more efficient way. --- security_group_report/main.py | 216 ++++++++++++++++------------------ 1 file changed, 104 insertions(+), 112 deletions(-) diff --git a/security_group_report/main.py b/security_group_report/main.py index d2c6ab1..8170113 100644 --- a/security_group_report/main.py +++ b/security_group_report/main.py @@ -17,15 +17,16 @@ import pandas as pd import datetime +# Initialize EC2 client ec2 = boto3.client("ec2") -# run for every region -regions = [region["RegionName"] for region in ec2.describe_regions()["Regions"]] -# specify regions -# regions = ['eu-west-1','us-west-1] -# Get rules from SG +# Get all available regions +regions = [region["RegionName"] for region in ec2.describe_regions()["Regions"]] +# Uncomment the following two lines to specify the regions you want to analyze +# regions = ['eu-west-1', 'us-west-1'] +# Function to get inbound and outbound rules for a security group def get_rules(sg, region): ec2r = boto3.resource("ec2", region) sgs = ec2r.security_groups.filter( @@ -41,29 +42,42 @@ def get_rules(sg, region): sg_outbound = sg.ip_permissions_egress return sg_inbound, sg_outbound - -# Get available SGs -def get_sgs(instance): - sgs = instance.security_groups +# Function to get security groups for a resource +def get_sgs(resource, resource_type): + if resource_type == "instance": + sgs = resource.security_groups + elif resource_type == "load_balancer": + sgs = [{"GroupId": sg} for sg in resource["SecurityGroups"]] + elif resource_type == "endpoint": + sgs = [{"GroupId": sg["GroupId"]} for sg in resource["Groups"]] return sgs - -# Get instance name -def get_name(instance): - if instance.tags: - for tag in instance.tags: +# Function to get a resource name +def get_name(resource, resource_type): + if resource_type == "instance" and resource.tags: + for tag in resource.tags: if tag["Key"] == "Name": return tag["Value"] - else: - return "None" + elif resource_type == "load_balancer": + return resource["LoadBalancerName"] + elif resource_type == "endpoint": + return resource["VpcEndpointId"] + return "None" +# Function to get security group name +def get_sg_name(sg_id, region): + ec2r = boto3.resource("ec2", region) + sg = ec2r.SecurityGroup(sg_id) + return sg.group_name +# Main function def main(): table = [] columns = [ + "Resource Type", "Region", - "Instance Name", - "Instance-ID", + "Resource Name", + "Resource-ID", "SG-Name", "SG-ID", "Direction", @@ -74,104 +88,82 @@ def main(): ] df = pd.DataFrame(table, columns=columns) print("Collecting Security Groups information from every region....") + for region in regions: ec2r = boto3.resource("ec2", region) - for instance in ec2r.instances.all(): - inst_id = instance.id # get instance id - sgs = get_sgs(instance) # gets sg from instance - inst_name = get_name(instance) # gets the instance name - for sg in sgs: - sg_id = sg["GroupId"] - sg_name = sg["GroupName"] - rules_inbound = get_rules(sg_id, region)[0] - rules_outbound = get_rules(sg_id, region)[1] - for rule in rules_inbound: - rule_destination = inst_id - from_cidr = [] - direction = "Inbound" - from_port_range = rule.get("FromPort", "any") - to_port_range = rule.get("ToPort", "any") - if from_port_range == to_port_range: - ports = from_port_range - else: - ports = str(from_port_range) + " - " + str(to_port_range) - if from_port_range == -1: - ports = "any" - protocol = rule["IpProtocol"] - if protocol == "-1": - protocol = "any" - for cidr in rule.get("IpRanges", []): - from_cidr.append(cidr["CidrIp"]) - for cidrv6 in rule.get("Ipv6Ranges", []): - from_cidr.append(cidrv6["CidrIpv6"]) - for source_sg in rule.get("UserIdGroupPairs", []): - from_cidr.append(source_sg["GroupId"]) - for source_sg in rule.get("PrefixListIds", []): - from_cidr.append(source_sg["PrefixListId"]) - if not from_cidr: - from_cidr.append("0.0.0.0/0") - - df = df.append( - { - "Region": region, - "Instance Name": inst_name, - "Instance-ID": inst_id, - "SG-Name": sg_name, - "SG-ID": sg_id, - "Direction": direction, - "Source": from_cidr, - "Destination": rule_destination, - "Protocol": protocol, - "Ports": ports, - }, - ignore_index=True, - ) - for rule in rules_outbound: - rule_source = inst_id - to_cidr = [] - direction = "Outbound" - protocol = rule["IpProtocol"] - from_port_range = rule.get("FromPort", "any") - to_port_range = rule.get("ToPort", "any") - if from_port_range == to_port_range: - ports = from_port_range - else: - ports = str(from_port_range) + " - " + str(to_port_range) - if from_port_range == -1: - ports = "any" - protocol = rule["IpProtocol"] - if protocol == "-1": - protocol = "any" - for cidr in rule.get("IpRanges", []): - to_cidr.append(cidr["CidrIp"]) - for cidrv6 in rule.get("Ipv6Ranges", []): - to_cidr.append(cidrv6["CidrIpv6"]) - for source_sg in rule.get("UserIdGroupPairs", []): - to_cidr.append(source_sg["GroupId"]) - for source_sg in rule.get("PrefixListIds", []): - to_cidr.append(source_sg["PrefixListId"]) - if not to_cidr: - to_cidr.append("0.0.0.0/0") - df = df.append( - { - "Region": region, - "Instance Name": inst_name, - "Instance-ID": inst_id, - "SG-Name": sg_name, - "SG-ID": sg_id, - "Direction": direction, - "Source": rule_source, - "Destination": to_cidr, - "Protocol": protocol, - "Ports": ports, - }, - ignore_index=True, - ) + elbv2 = boto3.client("elbv2", region) + instances = list(ec2r.instances.all()) + load_balancers = elbv2.describe_load_balancers()["LoadBalancers"] + endpoints = ec2r.meta.client.describe_vpc_endpoints()["VpcEndpoints"] + + resources = [ + {"type": "instance", "data": instances}, + {"type": "load_balancer", "data": load_balancers}, + {"type": "endpoint", "data": endpoints}, + ] + + for resource_type in resources: + for resource_data in resource_type["data"]: + if resource_type["type"] == "instance": + resource_id = resource_data.id + resource_name = get_name(resource_data, "instance") + elif resource_type["type"] == "load_balancer": + resource_id = resource_data["LoadBalancerArn"] + resource_name = get_name(resource_data, "load_balancer") + elif resource_type["type"] == "endpoint": + resource_id = resource_data["VpcEndpointId"] + resource_name = get_name(resource_data, "endpoint") + + sgs = get_sgs(resource_data, resource_type["type"]) + + for sg in sgs: + sg_id = sg["GroupId"] + sg_name = get_sg_name(sg_id, region) + rules_inbound, rules_outbound = get_rules(sg_id, region) + + # Process inbound rules and append to DataFrame + for rule in rules_inbound: + for ip_range in rule["IpRanges"]: + row = { + "Resource Type": resource_type["type"], + "Region": region, + "Resource Name": resource_name, + "Resource-ID": resource_id, + "SG-Name": sg_name, + "SG-ID": sg_id, + "Direction": "Inbound", + "Source": ip_range["CidrIp"], + "Destination": "", + "Protocol": rule["IpProtocol"], + "Ports": rule["FromPort"] if "FromPort" in rule else "N/A", + } + row_df = pd.DataFrame([row], columns=columns) + df = pd.concat([df, row_df], ignore_index=True) + + # Process outbound rules and append to DataFrame + for rule in rules_outbound: + for ip_range in rule["IpRanges"]: + row = { + "Resource Type": resource_type["type"], + "Region": region, + "Resource Name": resource_name, + "Resource-ID": resource_id, + "SG-Name": sg_name, + "SG-ID": sg_id, + "Direction": "Outbound", + "Source": "", + "Destination": ip_range["CidrIp"], + "Protocol": rule["IpProtocol"], + "Ports": rule["FromPort"] if "FromPort" in rule else "N/A", + } + row_df = pd.DataFrame([row], columns=columns) + df = pd.concat([df, row_df], ignore_index=True) + + # Save DataFrame to Excel file time = datetime.datetime.now().strftime("%H-%M-%S_%d-%m-%Y") file_name = "fw_policy-report-" + time + ".xlsx" print(file_name + " has been created") - return df.to_excel(file_name) - + df.to_excel(file_name) if __name__ == "__main__": - main() \ No newline at end of file + main() From 377fbcdb84528623f264b264508a4301e093cff1 Mon Sep 17 00:00:00 2001 From: Mauricio Mendoza <115591158+mmendozac@users.noreply.github.com> Date: Sat, 25 Mar 2023 18:56:19 -0300 Subject: [PATCH 2/3] Update iam.json - Expanded the permissions in the AWS IAM policy to support the new features in the script. - Added the `ec2:DescribeRegions` permission to allow the script to fetch the list of AWS regions. - Added the `ec2:DescribeVpcEndpoints` permission to allow the script to retrieve information about VPC endpoints. - Added the `elasticloadbalancing:DescribeLoadBalancers` permission to enable the script to access information about Application Load Balancers. --- iam.json | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/iam.json b/iam.json index bc834e7..67e70e8 100644 --- a/iam.json +++ b/iam.json @@ -6,9 +6,12 @@ "Effect": "Allow", "Action": [ "ec2:DescribeInstances", - "ec2:DescribeSecurityGroups" + "ec2:DescribeSecurityGroups", + "ec2:DescribeRegions", + "ec2:DescribeVpcEndpoints", + "elasticloadbalancing:DescribeLoadBalancers" ], "Resource": "*" } ] -} \ No newline at end of file +} From 12d8bf10208424c97f712f3b2f5de70d1f47057c Mon Sep 17 00:00:00 2001 From: Mauricio Mendoza <115591158+mauriciomendozacl@users.noreply.github.com> Date: Fri, 31 Mar 2023 13:08:47 -0300 Subject: [PATCH 3/3] Update main.py Error handling was added when any service was not present --- security_group_report/main.py | 45 +++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/security_group_report/main.py b/security_group_report/main.py index 8170113..f027f1d 100644 --- a/security_group_report/main.py +++ b/security_group_report/main.py @@ -34,10 +34,10 @@ def get_rules(sg, region): sg, ] ) - sgs = list(sgs.pages()) - if not sgs or not sgs[0]: + sgs = next(sgs.pages(), []) + if not sgs: return (None, None) - sg = sgs[0][0] + sg = sgs[0] sg_inbound = sg.ip_permissions sg_outbound = sg.ip_permissions_egress return sg_inbound, sg_outbound @@ -47,21 +47,26 @@ def get_sgs(resource, resource_type): if resource_type == "instance": sgs = resource.security_groups elif resource_type == "load_balancer": - sgs = [{"GroupId": sg} for sg in resource["SecurityGroups"]] + security_groups = resource.get("SecurityGroups") + if security_groups is not None: + sgs = [{"GroupId": sg} for sg in security_groups] + else: + sgs = [] elif resource_type == "endpoint": - sgs = [{"GroupId": sg["GroupId"]} for sg in resource["Groups"]] + groups = resource.get("Groups", []) + sgs = [{"GroupId": sg["GroupId"]} for sg in groups] return sgs # Function to get a resource name def get_name(resource, resource_type): - if resource_type == "instance" and resource.tags: + if resource_type == "instance" and getattr(resource, "tags", None): for tag in resource.tags: if tag["Key"] == "Name": return tag["Value"] elif resource_type == "load_balancer": - return resource["LoadBalancerName"] + return resource.get("LoadBalancerName", "None") elif resource_type == "endpoint": - return resource["VpcEndpointId"] + return resource.get("VpcEndpointId", "None") return "None" # Function to get security group name @@ -93,8 +98,8 @@ def main(): ec2r = boto3.resource("ec2", region) elbv2 = boto3.client("elbv2", region) instances = list(ec2r.instances.all()) - load_balancers = elbv2.describe_load_balancers()["LoadBalancers"] - endpoints = ec2r.meta.client.describe_vpc_endpoints()["VpcEndpoints"] + load_balancers = elbv2.describe_load_balancers().get("LoadBalancers", []) + endpoints = ec2r.meta.client.describe_vpc_endpoints().get("VpcEndpoints", []) resources = [ {"type": "instance", "data": instances}, @@ -108,10 +113,10 @@ def main(): resource_id = resource_data.id resource_name = get_name(resource_data, "instance") elif resource_type["type"] == "load_balancer": - resource_id = resource_data["LoadBalancerArn"] + resource_id = resource_data.get("LoadBalancerArn") resource_name = get_name(resource_data, "load_balancer") elif resource_type["type"] == "endpoint": - resource_id = resource_data["VpcEndpointId"] + resource_id = resource_data.get("VpcEndpointId") resource_name = get_name(resource_data, "endpoint") sgs = get_sgs(resource_data, resource_type["type"]) @@ -123,7 +128,7 @@ def main(): # Process inbound rules and append to DataFrame for rule in rules_inbound: - for ip_range in rule["IpRanges"]: + for ip_range in rule.get("IpRanges", []): row = { "Resource Type": resource_type["type"], "Region": region, @@ -132,17 +137,17 @@ def main(): "SG-Name": sg_name, "SG-ID": sg_id, "Direction": "Inbound", - "Source": ip_range["CidrIp"], + "Source": ip_range.get("CidrIp", ""), "Destination": "", - "Protocol": rule["IpProtocol"], - "Ports": rule["FromPort"] if "FromPort" in rule else "N/A", + "Protocol": rule.get("IpProtocol", ""), + "Ports": rule.get("FromPort", "N/A"), } row_df = pd.DataFrame([row], columns=columns) df = pd.concat([df, row_df], ignore_index=True) # Process outbound rules and append to DataFrame for rule in rules_outbound: - for ip_range in rule["IpRanges"]: + for ip_range in rule.get("IpRanges", []): row = { "Resource Type": resource_type["type"], "Region": region, @@ -152,9 +157,9 @@ def main(): "SG-ID": sg_id, "Direction": "Outbound", "Source": "", - "Destination": ip_range["CidrIp"], - "Protocol": rule["IpProtocol"], - "Ports": rule["FromPort"] if "FromPort" in rule else "N/A", + "Destination": ip_range.get("CidrIp", ""), + "Protocol": rule.get("IpProtocol", ""), + "Ports": rule.get("FromPort", "N/A"), } row_df = pd.DataFrame([row], columns=columns) df = pd.concat([df, row_df], ignore_index=True)