diff --git a/src/ecs.py b/src/ecs.py index 3381ca2..810543d 100755 --- a/src/ecs.py +++ b/src/ecs.py @@ -1,134 +1,144 @@ #!/usr/bin/env python3 import boto3 +from typing import List + +LAUNCH_TYPE_FARGATE = "FARGATE" -LAUNCH_TYPE_FARGATE = 'FARGATE' class EcsClient(object): def __init__(self): - self.boto = boto3.client('ecs') - self.logs = boto3.client('logs') + self.boto = boto3.client("ecs") + self.logs = boto3.client("logs") self._last_event = None self._log_next_token = None - - def update_service(self, cluster_name, app_name, task_definition, force_deployment=False): + + def update_service( + self, cluster_name, app_name, task_definition, force_deployment=False + ): return self.boto.update_service( - cluster=cluster_name, - service=app_name, + cluster=cluster_name, + service=app_name, taskDefinition=task_definition, - forceNewDeployment=force_deployment + forceNewDeployment=force_deployment, ) - + def describe_services(self, cluster_name, app_name): - result = self.boto.describe_services( - cluster=cluster_name, - services=[app_name] - ) - - if 'taskSets' in result['services'][0]: - for taskSet in result['services'][0]['taskSets']: - if taskSet['status'] == 'ACTIVE': - self.taskSetId = taskSet['id'] - - if 'deployments' in result['services'][0]: - for deployment in result['services'][0]['deployments']: - if deployment['status'] == 'PRIMARY': - self.ecsDeployId = deployment['id'] - - return result + result = self.boto.describe_services(cluster=cluster_name, services=[app_name]) + + if "taskSets" in result["services"][0]: + for taskSet in result["services"][0]["taskSets"]: + if taskSet["status"] == "ACTIVE": + self.taskSetId = taskSet["id"] + + if "deployments" in result["services"][0]: + for deployment in result["services"][0]["deployments"]: + if deployment["status"] == "PRIMARY": + self.ecsDeployId = deployment["id"] + + return result def register_task_definition(self, task_definition): - result = self.boto.register_task_definition( - **task_definition - ) - self.taskDefArn = result['taskDefinition']['taskDefinitionArn'] + result = self.boto.register_task_definition(**task_definition) + self.taskDefArn = result["taskDefinition"]["taskDefinitionArn"] return result def describe_task_definition(self, task_definition): - result = self.boto.describe_task_definition( - taskDefinition=task_definition - ) - self.taskDefArn = result['taskDefinition']['taskDefinitionArn'] + result = self.boto.describe_task_definition(taskDefinition=task_definition) + self.taskDefArn = result["taskDefinition"]["taskDefinitionArn"] return result - - def list_tasks(self, cluster_name, started_by, desired_status='STOPPED'): + + def list_tasks(self, cluster_name: str, started_by, desired_status="STOPPED"): return self.boto.list_tasks( - cluster=cluster_name, - startedBy=started_by, - desiredStatus=desired_status + cluster=cluster_name, startedBy=started_by, desiredStatus=desired_status ) - - def describe_tasks(self, cluster_name, task_arns): + + def describe_tasks(self, cluster_name: str, task_arns): result = self.boto.describe_tasks(cluster=cluster_name, tasks=task_arns) - self.status = result['tasks'][0]['lastStatus'] + self.status = result["tasks"][0]["lastStatus"] return result - - def run_task(self, cluster_name, task_definition, launchtype, subnets, security_groups): + + def run_task( + self, + cluster_name: str, + task_definition, + launchtype: str, + subnets: List[str], + security_groups: List[str], + container_overrides: object, + ): if launchtype == LAUNCH_TYPE_FARGATE: if not subnets or not security_groups: - msg = 'At least one subnet and one security ' \ - 'group definition are required ' \ - 'for launch type FARGATE' + msg = ( + "At least one subnet and one security " + "group definition are required " + "for launch type FARGATE" + ) raise Exception(msg) network_configuration = { "awsvpcConfiguration": { "subnets": subnets, "securityGroups": security_groups, - "assignPublicIp": "DISABLED" + "assignPublicIp": "DISABLED", } } - + result = self.boto.run_task( cluster=cluster_name, taskDefinition=task_definition, launchType=launchtype, - networkConfiguration=network_configuration + networkConfiguration=network_configuration, + overrides=container_overrides, ) - + else: result = self.boto.run_task( cluster=cluster_name, - taskDefinition=task_definition + taskDefinition=task_definition, + overrides=container_overrides, ) - - self.taskArn = result['tasks'][0]['taskArn'] - self.taskId = self.taskArn.split('/')[-1] - self.status = result['tasks'][0]['lastStatus'] + + self.taskArn = result["tasks"][0]["taskArn"] + self.taskId = self.taskArn.split("/")[-1] + self.status = result["tasks"][0]["lastStatus"] return result - + def describe_log_streams(self, log_group_name): return self.logs.describe_log_streams( - logGroupName=log_group_name, orderBy='LastEventTime', descending=True, limit=1) - + logGroupName=log_group_name, + orderBy="LastEventTime", + descending=True, + limit=1, + ) + def get_log_events(self, log_args): return self.logs.get_log_events(**log_args) - + def tail_log_events(self, log_group_name, log_stream_name): log_args = { - 'logGroupName': log_group_name, - 'logStreamName': log_stream_name, - 'startFromHead': True + "logGroupName": log_group_name, + "logStreamName": log_stream_name, + "startFromHead": True, } if self._log_next_token: - log_args['nextToken'] = self._log_next_token + log_args["nextToken"] = self._log_next_token log_stream_events = self.get_log_events(log_args) - - self._log_next_token = log_stream_events['nextForwardToken'] - return log_stream_events['events'] - + + self._log_next_token = log_stream_events["nextForwardToken"] + return log_stream_events["events"] + def tail_ecs_events(self, cluster_name, app_name): get_events = self.describe_services(cluster_name, app_name) - events = get_events['services'][0]['events'] + events = get_events["services"][0]["events"] events_collected = [] - + for event in events: - if not self._last_event or event['id'] == self._last_event: + if not self._last_event or event["id"] == self._last_event: break events_collected.insert(0, event) - - self._last_event = events[0]['id'] + + self._last_event = events[0]["id"] return events_collected - \ No newline at end of file diff --git a/src/run-task.py b/src/run-task.py index 0efd781..4910a95 100755 --- a/src/run-task.py +++ b/src/run-task.py @@ -8,51 +8,56 @@ from utils import validate_envs, json_template # ----- Check variables ----- -print('Step 1: Checking environment variables \n') +print("Step 1: Checking environment variables \n") -req_vars = [ - 'CLUSTER_NAME', - 'APP_NAME', - 'AWS_DEFAULT_REGION' -] +req_vars = ["CLUSTER_NAME", "APP_NAME", "AWS_DEFAULT_REGION"] try: validate_envs(req_vars) except: exit(1) -cluster_name = os.getenv('CLUSTER_NAME') -app_name = os.getenv('APP_NAME') -launchtype = os.getenv('SERVICE_TYPE') -subnets = os.getenv('SUBNETS') -security_groups = os.getenv('SECURITY_GROUPS') -task_def_file_name = os.getenv('TPL_FILE_NAME', 'task-definition.tpl.json') +cluster_name = os.getenv("CLUSTER_NAME") +app_name = os.getenv("APP_NAME") +launchtype = os.getenv("SERVICE_TYPE") +subnets = os.getenv("SUBNETS") +security_groups = os.getenv("SECURITY_GROUPS") +task_def_file_name = os.getenv("TPL_FILE_NAME", "task-definition.tpl.json") +container_overrides_json = os.getenv( + "CONTAINER_OVERRIDES", '{"containerOverrides": []}' +) env_vars = dict(os.environ) # ----- Create task definition file ----- -print('Step 2: Replace variables inside of %s \n' % task_def_file_name) +print("Step 2: Replace variables inside of %s \n" % task_def_file_name) try: task_definition = json_template(task_def_file_name) -except: +except Exception as err: + print(f"Error building JSON template of task-definition. Error: '{err}'") exit(1) -print('Task definition file: \n%s' % task_definition) +print("Task definition file: \n%s" % task_definition) task_def = json.loads(task_definition) +if container_overrides_json == "": + container_overrides_json = '{"containerOverrides": []}' +print("Container Overrides: \n%s" % container_overrides_json) +container_overrides = json.loads(container_overrides_json) + # ----- Register task definition file ----- -print('Step 3: Registering task definition \n') +print("Step 3: Registering task definition \n") task = EcsClient() try: task.register_task_definition(task_def) - print('Task definition arn: %s \n' % task.taskDefArn) + print("Task definition arn: %s \n" % task.taskDefArn) except Exception as err: - print('Register task definition issue: %s' % err) + print("Register task definition issue: %s" % err) exit(1) # ----- Run task ----- -print('Step 4: Running task') +print("Step 4: Running task") if subnets: subnets = subnets.split(",") @@ -60,76 +65,86 @@ security_groups = security_groups.split(",") try: - task.run_task(cluster_name, task.taskDefArn, - launchtype, subnets, security_groups) + task.run_task( + cluster_name, + task.taskDefArn, + launchtype, + subnets, + security_groups, + container_overrides, + ) except Exception as err: print(err) exit(1) task_time = 0 -while task.status not in ['RUNNING', 'STOPPED']: +while task.status not in ["RUNNING", "STOPPED"]: try: running_task = task.describe_tasks(cluster_name, [task.taskArn]) - sleep = (2 * random.uniform(0, 3)) + sleep = 2 * random.uniform(0, 3) task_time = task_time + int(sleep) time.sleep(sleep) except: print("Error get runnning task") exit(1) else: - print('Provisioning time: %s seconds' % task_time) + print("Provisioning time: %s seconds" % task_time) -print('\n======== RUNNING TASK ========') -print('CLUSTER_NAME: %s' % cluster_name) -print('APP_NAME: %s' % app_name) -print('TASK_DEF_ARN: %s' % task.taskDefArn) -print('TASK_ARN: %s' % task.taskArn) +print("\n======== RUNNING TASK ========") +print("CLUSTER_NAME: %s" % cluster_name) +print("APP_NAME: %s" % app_name) +print("TASK_DEF_ARN: %s" % task.taskDefArn) +print("TASK_ARN: %s" % task.taskArn) log_msg = None last_logs = None -print('\n======== TASK LOGS ========') +print("\n======== TASK LOGS ========") while True: try: - log_group_name = task_def['containerDefinitions'][0]['logConfiguration']['options']['awslogs-group'] - log_prefix = task_def['containerDefinitions'][0]['logConfiguration']['options']['awslogs-stream-prefix'] - log_stream_name = '/'.join([log_prefix, app_name, task.taskId]) - + log_group_name = task_def["containerDefinitions"][0]["logConfiguration"][ + "options" + ]["awslogs-group"] + log_prefix = task_def["containerDefinitions"][0]["logConfiguration"]["options"][ + "awslogs-stream-prefix" + ] + log_stream_name = "/".join([log_prefix, app_name, task.taskId]) + log_events = task.tail_log_events(log_group_name, log_stream_name) for event in log_events: - print(event['message']) + print(event["message"]) except: if not log_msg: - log_msg = 'No logs sent to CloudWatch' + log_msg = "No logs sent to CloudWatch" print(log_msg) finally: running_task = task.describe_tasks(cluster_name, [task.taskArn]) - sleep = (2 * random.uniform(0, 3)) + sleep = 2 * random.uniform(0, 3) time.sleep(sleep) - + if last_logs: break - if task.status == 'STOPPED': + if task.status == "STOPPED": last_logs = True continue -print('\n======== TASK STOPPED ========') -print('Task ID: %s' % task.taskId) -print('Task ARN: %s' % task.taskArn) -print('Service Name: %s' % app_name) -print('Cluster Name: %s' % cluster_name) -if 'startedAt' in running_task['tasks'][0]: - print('Started at: %s' % running_task['tasks'][0]['startedAt']) -print('Stopped at: %s' % running_task['tasks'][0]['stoppedAt']) -print('Stopped Reason: %s' % running_task['tasks'][0]['stoppedReason']) -if 'stopCode' in running_task['tasks'][0]: - print('Stop Code: %s' % running_task['tasks'][0]['stopCode']) -if 'exitCode' in running_task['tasks'][0]['containers'][0]: - print('Exit code: %s' %running_task['tasks'][0]['containers'][0]['exitCode']) -if 'reason' in running_task['tasks'][0]['containers'][0]: - print('Reason: %s' %running_task['tasks'][0]['containers'][0]['reason']) +print("\n======== TASK STOPPED ========") +print("Task ID: %s" % task.taskId) +print("Task ARN: %s" % task.taskArn) +print("Service Name: %s" % app_name) +print("Cluster Name: %s" % cluster_name) +if "startedAt" in running_task["tasks"][0]: + print("Started at: %s" % running_task["tasks"][0]["startedAt"]) +print("Stopped at: %s" % running_task["tasks"][0]["stoppedAt"]) +print("Stopped Reason: %s" % running_task["tasks"][0]["stoppedReason"]) +if "stopCode" in running_task["tasks"][0]: + print("Stop Code: %s" % running_task["tasks"][0]["stopCode"]) +if "exitCode" in running_task["tasks"][0]["containers"][0]: + print("Exit code: %s" % running_task["tasks"][0]["containers"][0]["exitCode"]) +if "reason" in running_task["tasks"][0]["containers"][0]: + print("Reason: %s" % running_task["tasks"][0]["containers"][0]["reason"]) # exit with task exit code -if 'exitCode' in running_task['tasks'][0]['containers'][0]: - exit(running_task['tasks'][0]['containers'][0]['exitCode']) +if "exitCode" in running_task["tasks"][0]["containers"][0]: + exit(running_task["tasks"][0]["containers"][0]["exitCode"]) diff --git a/src/utils.py b/src/utils.py index baaae78..0955415 100644 --- a/src/utils.py +++ b/src/utils.py @@ -4,36 +4,42 @@ import os from string import Template + def validate_envs(req_vars): missing = set(req_vars) - set(os.environ) if missing: - print('Environment variables not set: %s' % missing) + print("Environment variables not set: %s" % missing) raise return True + def validate_json(json_data): try: json.loads(json_data) return True except ValueError as err: - print('JSON not valide: %s' % err) + print("JSON not valide: %s" % err) + -def json_template(json_template, env_vars=os.environ): +def json_template(json_template: str, env_vars=os.environ): try: json_file = open(json_template) data = json_file.read() except: - print('File %s not found' % json_template) + print("File %s not found" % json_template) try: template = Template(data).substitute(env_vars) except KeyError as err: - print('Missing variable %s' % str(err)) + print("Missing variable %s" % str(err)) + exit(1) + except Exception as err: + print(f"Error substituting variables. Error: '{err}'") exit(1) try: validate_json(template) except Exception as err: - print(err) - + print(f"Error validating JSON. Error: '{err}'") + return template diff --git a/src/worker-deploy.py b/src/worker-deploy.py index 7926714..4afb3d3 100755 --- a/src/worker-deploy.py +++ b/src/worker-deploy.py @@ -7,116 +7,128 @@ from utils import validate_envs, json_template # ----- Check variables ----- -print('Step 1: Checking environment variables \n') +print("Step 1: Checking environment variables \n") -req_vars = [ - 'CLUSTER_NAME', - 'APP_NAME', - 'AWS_DEFAULT_REGION' -] +req_vars = ["CLUSTER_NAME", "APP_NAME", "AWS_DEFAULT_REGION"] try: validate_envs(req_vars) except: exit(1) -cluster_name = os.getenv('CLUSTER_NAME') -app_name = os.getenv('APP_NAME') -aws_default_region = os.getenv('AWS_DEFAULT_REGION') -task_def_file_name = os.getenv('TPL_FILE_NAME', 'task-definition.tpl.json') +cluster_name = os.getenv("CLUSTER_NAME") +app_name = os.getenv("APP_NAME") +aws_default_region = os.getenv("AWS_DEFAULT_REGION") +task_def_file_name = os.getenv("TPL_FILE_NAME", "task-definition.tpl.json") # ----- Create task definition file ----- -print('Step 2: Replace variables inside of %s \n' % task_def_file_name) +print("Step 2: Replace variables inside of %s \n" % task_def_file_name) try: task_definition = json_template(task_def_file_name) except: exit(1) -print('Task definition file: \n%s' % task_definition) +print("Task definition file: \n%s" % task_definition) task_def = json.loads(task_definition) # ----- Register task definition file ----- -print('Step 3: Registering task definition') +print("Step 3: Registering task definition") task = EcsClient() try: task.register_task_definition(task_def) - print('Task definition arn: %s \n' % task.taskDefArn) + print("Task definition arn: %s \n" % task.taskDefArn) except Exception as err: - print('Register task definition issue: %s' % err) + print("Register task definition issue: %s" % err) exit(1) - + # ----- Create Deployment ----- -print('Step 4: Creating Deployment') +print("Step 4: Creating Deployment") active_task = task.describe_services(cluster_name, app_name) -active_task_def = active_task['services'][0]['taskDefinition'] +active_task_def = active_task["services"][0]["taskDefinition"] try: task.update_service(cluster_name, app_name, task.taskDefArn) except Exception as err: - print('Deployment FAILED!') - print('ERROR: %s' % str(err).split(": ")[1]) + print("Deployment FAILED!") + print("ERROR: %s" % str(err).split(": ")[1]) exit(1) deployment = task.describe_services(cluster_name, app_name) -print('ECS dpeloyment: %s \n' % task.ecsDeployId) +print("ECS dpeloyment: %s \n" % task.ecsDeployId) # ----- Monitor Deployment ----- -print('Step 5: Deployment Overview') +print("Step 5: Deployment Overview") -print('Monitoring ECS service events for cluster %s on service %s:\n' % (cluster_name, app_name)) +print( + "Monitoring ECS service events for cluster %s on service %s:\n" + % (cluster_name, app_name) +) -ecs_deploy = list(filter(lambda x:x["status"]=="PRIMARY",deployment['services'][0]['deployments'])) -ecs_deploy_status = ecs_deploy[0]['rolloutState'] +ecs_deploy = list( + filter(lambda x: x["status"] == "PRIMARY", deployment["services"][0]["deployments"]) +) +ecs_deploy_status = ecs_deploy[0]["rolloutState"] deploy_timeout_period = 0 -deploy_timeout = int(os.getenv('DEPLOYMENT_TIMEOUT', 900)) +deploy_timeout = int(os.getenv("DEPLOYMENT_TIMEOUT", 900)) + def rollback(): try: task.update_service(cluster_name, app_name, active_task_def, True) - print('Rollback deployment success') + print("Rollback deployment success") except: - print('Rollback deployment failed') + print("Rollback deployment failed") finally: exit(1) -while ecs_deploy_status == 'IN_PROGRESS': + +while ecs_deploy_status == "IN_PROGRESS": # Tail logs from ECS service ecs_events = task.tail_ecs_events(cluster_name, app_name) for event in ecs_events: - print('%s %s' % ('{0:%Y-%m-%d %H:%M:%S %z}'.format(event['createdAt']), event['message'])) - + print( + "%s %s" + % ("{0:%Y-%m-%d %H:%M:%S %z}".format(event["createdAt"]), event["message"]) + ) + # Check if containers are being stoped last_task = task.list_tasks(cluster_name, task.ecsDeployId) - if len(last_task['taskArns']) > 2: - last_task_info = task.describe_tasks(cluster_name, last_task['taskArns']) - last_task_status = last_task_info['tasks'][0]['lastStatus'] - last_task_reason = last_task_info['tasks'][0]['stoppedReason'] - if 'reason' in last_task_info['tasks'][0]['containers'][0]: - last_task_reason = '%s \n%s' % ( - last_task_reason, last_task_info['tasks'][0]['containers'][0]['reason']) - - if last_task_status == 'STOPPED': - print('Containers are being stoped: %s' % last_task_reason) + if len(last_task["taskArns"]) > 2: + last_task_info = task.describe_tasks(cluster_name, last_task["taskArns"]) + last_task_status = last_task_info["tasks"][0]["lastStatus"] + last_task_reason = last_task_info["tasks"][0]["stoppedReason"] + if "reason" in last_task_info["tasks"][0]["containers"][0]: + last_task_reason = "%s \n%s" % ( + last_task_reason, + last_task_info["tasks"][0]["containers"][0]["reason"], + ) + + if last_task_status == "STOPPED": + print("Containers are being stoped: %s" % last_task_reason) rollback() - + # Rechead limit if deploy_timeout_period >= deploy_timeout: - print('Deployment timeout: %s seconds' % deploy_timeout) + print("Deployment timeout: %s seconds" % deploy_timeout) rollback() - + # Get status, increment limit and sleep deployment = task.describe_services(cluster_name, app_name) - ecs_deploy = list(filter(lambda x:x["status"]=="PRIMARY",deployment['services'][0]['deployments'])) - ecs_deploy_status = ecs_deploy[0]['rolloutState'] + ecs_deploy = list( + filter( + lambda x: x["status"] == "PRIMARY", deployment["services"][0]["deployments"] + ) + ) + ecs_deploy_status = ecs_deploy[0]["rolloutState"] deploy_timeout_period += 2 time.sleep(2) # Print Status -print('\nDeployment completed:') -print('CLUSTER_NAME: %s' % cluster_name) -print('APP_NAME: %s' % app_name) -print('TASK_ARN: %s' % task.taskDefArn) +print("\nDeployment completed:") +print("CLUSTER_NAME: %s" % cluster_name) +print("APP_NAME: %s" % app_name) +print("TASK_ARN: %s" % task.taskDefArn)