diff --git a/functions/hello/main.py b/functions/hello/main.py index 1720c20..d190134 100644 --- a/functions/hello/main.py +++ b/functions/hello/main.py @@ -1,34 +1,32 @@ from crowdstrike.foundry.function import Function, Request, Response, APIError - func = Function.instance() # Handler hello -@func.handler(method='POST', path='/hello') +@func.handler(method="POST", path="/hello") def on_post(request: Request) -> Response: - # # Replace the following example code with your handler code # # Demonstrates how to validate the request body if your handler requires input payload # Replace with your own request and update the request_schema.json to match - if 'name' not in request.body: - # This example expects 'name' field in the request body and returns + if "name" not in request.body: + # This example expects "name" field in the request body and returns # an error response (400 - Bad Request) if not provided by the caller return Response( code=400, - errors=[APIError(code=400, message='missing name from request body')] + errors=[APIError(code=400, message="missing name from request body")] ) # Demonstrates how to return a success response with JSON body # Replace with your response and update the response_schema.json to match return Response( - body={'greeting': f'Hello {request.body["name"]}! It is nice to see you.'}, + body={"greeting": f"Hello {request.body['name']}! It is nice to see you."}, code=200, ) -if __name__ == '__main__': +if __name__ == "__main__": func.run() diff --git a/functions/hello/test_main.py b/functions/hello/test_main.py index ed002c8..8e9dc50 100644 --- a/functions/hello/test_main.py +++ b/functions/hello/test_main.py @@ -3,14 +3,17 @@ from crowdstrike.foundry.function import Request + def mock_handler(*args, **kwargs): def identity(func): return func + return identity + class FnTestCase(unittest.TestCase): def setUp(self): - patcher = patch('crowdstrike.foundry.function.Function.handler', new=mock_handler) + patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) self.addCleanup(patcher.stop) self.handler_patch = patcher.start() @@ -27,7 +30,7 @@ def test_on_post_success(self): response = on_post(request) self.assertEqual(response.code, 200) - self.assertEqual(response.body['greeting'], 'Hello Test User! It is nice to see you.') + self.assertEqual(response.body["greeting"], "Hello Test User! It is nice to see you.") def test_on_post_missing_name(self): from main import on_post @@ -36,7 +39,8 @@ def test_on_post_missing_name(self): response = on_post(request) self.assertEqual(response.code, 400) self.assertEqual(len(response.errors), 1) - self.assertEqual(response.errors[0].message, 'missing name from request body') + self.assertEqual(response.errors[0].message, "missing name from request body") + -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/functions/host-details/main.py b/functions/host-details/main.py index cc01fc6..9e77a69 100644 --- a/functions/host-details/main.py +++ b/functions/host-details/main.py @@ -2,20 +2,19 @@ # Import service collection you'd like to use from falconpy import Hosts - func = Function.instance() -@func.handler(method='POST', path='/host-details') +@func.handler(method="POST", path="/host-details") def on_post(request: Request) -> Response: # Validate request - if 'host_id' not in request.body: + if "host_id" not in request.body: return Response( code=400, - errors=[APIError(code=400, message='missing host_id from request body')] + errors=[APIError(code=400, message="missing host_id from request body")] ) - host_id = request.body['host_id'] + host_id = request.body["host_id"] # Initialize the Hosts class with context-aware authentication falcon = Hosts() @@ -27,7 +26,7 @@ def on_post(request: Request) -> Response: return Response( code=response["status_code"], errors=[APIError(code=response["status_code"], - message=f"Error retrieving host: {response['body']}")], + message=f"Error retrieving host: {response['body']}")], ) # Return host information @@ -36,5 +35,6 @@ def on_post(request: Request) -> Response: code=200, ) -if __name__ == '__main__': + +if __name__ == "__main__": func.run() diff --git a/functions/host-info/main.py b/functions/host-info/main.py index 5766ce6..22ef9b9 100755 --- a/functions/host-info/main.py +++ b/functions/host-info/main.py @@ -1,15 +1,17 @@ -from crowdstrike.foundry.function import Function, Request, Response, APIError -from utils import validate_host_id, format_error_response from logging import Logger from typing import Dict +from crowdstrike.foundry.function import Function, Request, Response + +from utils import validate_host_id, format_error_response func = Function.instance() + # Handler on_post -@func.handler(method='POST', path='/host-info') +@func.handler(method="POST", path="/host-info") def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: - host_id = request.body.get('host_id') + host_id = request.body.get("host_id") logger.info(f"Host ID: {host_id}") logger.info(f"Is valid? {validate_host_id(host_id)}") @@ -20,10 +22,10 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) return Response( code=200, body={ - "host":host_id + "host": host_id } ) -if __name__ == '__main__': +if __name__ == "__main__": func.run() diff --git a/functions/host-info/utils.py b/functions/host-info/utils.py index 94671c7..3c43144 100644 --- a/functions/host-info/utils.py +++ b/functions/host-info/utils.py @@ -1,23 +1,28 @@ -import re import json -from typing import Dict, Any, Optional, Union, List +import re +from typing import Dict, Any, Optional + from crowdstrike.foundry.function import APIError + def validate_host_id(host_id: str) -> bool: """Validate that a host ID is in the correct format.""" if not host_id or not isinstance(host_id, str): return False - return len(host_id) == 32 and all(c in '0123456789abcdef' for c in host_id.lower()) + return len(host_id) == 32 and all(c in "0123456789abcdef" for c in host_id.lower()) + def validate_email(email: str) -> bool: """Validate email format.""" - pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return bool(re.match(pattern, email)) + def format_error_response(message: str, code: int = 400) -> list[Any]: """Create a standardized error response.""" return [APIError(code=code, message=message)] + def safe_json_parse(data: str) -> Optional[Dict[str, Any]]: """Safely parse JSON data.""" try: diff --git a/functions/log-event/main.py b/functions/log-event/main.py index 160e3a1..f7f4ef0 100644 --- a/functions/log-event/main.py +++ b/functions/log-event/main.py @@ -1,9 +1,10 @@ -from crowdstrike.foundry.function import Function, Request, Response, APIError -from falconpy import APIHarnessV2 -import time import os +import time import uuid +from crowdstrike.foundry.function import Function, Request, Response, APIError +from falconpy import APIHarnessV2 + func = Function.instance() diff --git a/functions/servicenow/main.py b/functions/servicenow/main.py index 5af2a79..3881c09 100644 --- a/functions/servicenow/main.py +++ b/functions/servicenow/main.py @@ -1,12 +1,13 @@ -from crowdstrike.foundry.function import Function, Request, Response, APIError -from falconpy import APIIntegrations from logging import Logger from typing import Dict +from crowdstrike.foundry.function import Function, Request, Response, APIError +from falconpy import APIIntegrations func = Function.instance() -@func.handler(method='POST', path='/ticket') + +@func.handler(method="POST", path="/ticket") def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: """ Create an incident ticket in ServiceNow using the Table API. @@ -24,10 +25,10 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) - caller_id: User ID of the caller """ # Validate required fields - if 'title' not in request.body or 'description' not in request.body: + if "title" not in request.body or "description" not in request.body: return Response( code=400, - errors=[APIError(code=400, message='Missing required fields: title and description')] + errors=[APIError(code=400, message="Missing required fields: title and description")] ) # Prepare payload for ServiceNow incident creation @@ -75,7 +76,7 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) logger.info(f"ServiceNow API response: {response}") if response["status_code"] >= 400: - error_message = response.get('error', {}).get('message', 'Unknown error') + error_message = response.get("error", {}).get("message", "Unknown error") return Response( code=response["status_code"], errors=[APIError( @@ -108,5 +109,5 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) ) -if __name__ == '__main__': +if __name__ == "__main__": func.run() diff --git a/functions/user-management/main.py b/functions/user-management/main.py index c016c77..e25a956 100755 --- a/functions/user-management/main.py +++ b/functions/user-management/main.py @@ -1,15 +1,17 @@ -from crowdstrike.foundry.function import Function, Request, Response, APIError -from utils import validate_email, format_error_response from logging import Logger from typing import Dict +from crowdstrike.foundry.function import Function, Request, Response + +from utils import validate_email, format_error_response func = Function.instance() + # Handler on_post -@func.handler(method='POST', path='/create-user') +@func.handler(method="POST", path="/create-user") def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: - email = request.body.get('email') + email = request.body.get("email") logger.info(f"Email: {email}") logger.info(f"Is valid? {validate_email(email)}") @@ -20,10 +22,10 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) return Response( code=200, body={ - "email":email + "email": email } ) -if __name__ == '__main__': +if __name__ == "__main__": func.run() diff --git a/functions/user-management/utils.py b/functions/user-management/utils.py index 94671c7..3c43144 100644 --- a/functions/user-management/utils.py +++ b/functions/user-management/utils.py @@ -1,23 +1,28 @@ -import re import json -from typing import Dict, Any, Optional, Union, List +import re +from typing import Dict, Any, Optional + from crowdstrike.foundry.function import APIError + def validate_host_id(host_id: str) -> bool: """Validate that a host ID is in the correct format.""" if not host_id or not isinstance(host_id, str): return False - return len(host_id) == 32 and all(c in '0123456789abcdef' for c in host_id.lower()) + return len(host_id) == 32 and all(c in "0123456789abcdef" for c in host_id.lower()) + def validate_email(email: str) -> bool: """Validate email format.""" - pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return bool(re.match(pattern, email)) + def format_error_response(message: str, code: int = 400) -> list[Any]: """Create a standardized error response.""" return [APIError(code=code, message=message)] + def safe_json_parse(data: str) -> Optional[Dict[str, Any]]: """Safely parse JSON data.""" try: