From 5237a36bfd7ab73e548c8968d30163f01b7605c6 Mon Sep 17 00:00:00 2001 From: Matt Raible Date: Tue, 8 Jul 2025 17:20:23 -0600 Subject: [PATCH 1/5] Improve Python code format --- functions/hello/main.py | 2 -- functions/hello/test_main.py | 4 ++++ functions/host-details/main.py | 4 ++-- functions/host-info/main.py | 8 +++++--- functions/host-info/utils.py | 9 +++++++-- functions/log-event/main.py | 7 ++++--- functions/servicenow/main.py | 5 +++-- functions/user-management/main.py | 8 +++++--- functions/user-management/utils.py | 9 +++++++-- 9 files changed, 37 insertions(+), 19 deletions(-) diff --git a/functions/hello/main.py b/functions/hello/main.py index 1720c20..708295c 100644 --- a/functions/hello/main.py +++ b/functions/hello/main.py @@ -1,13 +1,11 @@ from crowdstrike.foundry.function import Function, Request, Response, APIError - func = Function.instance() # Handler hello @func.handler(method='POST', path='/hello') def on_post(request: Request) -> Response: - # # Replace the following example code with your handler code # diff --git a/functions/hello/test_main.py b/functions/hello/test_main.py index ed002c8..602564a 100644 --- a/functions/hello/test_main.py +++ b/functions/hello/test_main.py @@ -3,11 +3,14 @@ from crowdstrike.foundry.function import Request + def mock_handler(*args, **kwargs): def identity(func): return func + return identity + class FnTestCase(unittest.TestCase): def setUp(self): patcher = patch('crowdstrike.foundry.function.Function.handler', new=mock_handler) @@ -38,5 +41,6 @@ def test_on_post_missing_name(self): self.assertEqual(len(response.errors), 1) self.assertEqual(response.errors[0].message, 'missing name from request body') + if __name__ == '__main__': unittest.main() diff --git a/functions/host-details/main.py b/functions/host-details/main.py index cc01fc6..4a6a322 100644 --- a/functions/host-details/main.py +++ b/functions/host-details/main.py @@ -2,7 +2,6 @@ # Import service collection you'd like to use from falconpy import Hosts - func = Function.instance() @@ -27,7 +26,7 @@ def on_post(request: Request) -> Response: return Response( code=response["status_code"], errors=[APIError(code=response["status_code"], - message=f"Error retrieving host: {response['body']}")], + message=f"Error retrieving host: {response['body']}")], ) # Return host information @@ -36,5 +35,6 @@ def on_post(request: Request) -> Response: code=200, ) + if __name__ == '__main__': func.run() diff --git a/functions/host-info/main.py b/functions/host-info/main.py index 5766ce6..194659c 100755 --- a/functions/host-info/main.py +++ b/functions/host-info/main.py @@ -1,11 +1,13 @@ -from crowdstrike.foundry.function import Function, Request, Response, APIError -from utils import validate_host_id, format_error_response from logging import Logger from typing import Dict +from crowdstrike.foundry.function import Function, Request, Response + +from utils import validate_host_id, format_error_response func = Function.instance() + # Handler on_post @func.handler(method='POST', path='/host-info') def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: @@ -20,7 +22,7 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) return Response( code=200, body={ - "host":host_id + "host": host_id } ) diff --git a/functions/host-info/utils.py b/functions/host-info/utils.py index 94671c7..999e68e 100644 --- a/functions/host-info/utils.py +++ b/functions/host-info/utils.py @@ -1,23 +1,28 @@ -import re import json -from typing import Dict, Any, Optional, Union, List +import re +from typing import Dict, Any, Optional + from crowdstrike.foundry.function import APIError + def validate_host_id(host_id: str) -> bool: """Validate that a host ID is in the correct format.""" if not host_id or not isinstance(host_id, str): return False return len(host_id) == 32 and all(c in '0123456789abcdef' for c in host_id.lower()) + def validate_email(email: str) -> bool: """Validate email format.""" pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool(re.match(pattern, email)) + def format_error_response(message: str, code: int = 400) -> list[Any]: """Create a standardized error response.""" return [APIError(code=code, message=message)] + def safe_json_parse(data: str) -> Optional[Dict[str, Any]]: """Safely parse JSON data.""" try: diff --git a/functions/log-event/main.py b/functions/log-event/main.py index 160e3a1..f7f4ef0 100644 --- a/functions/log-event/main.py +++ b/functions/log-event/main.py @@ -1,9 +1,10 @@ -from crowdstrike.foundry.function import Function, Request, Response, APIError -from falconpy import APIHarnessV2 -import time import os +import time import uuid +from crowdstrike.foundry.function import Function, Request, Response, APIError +from falconpy import APIHarnessV2 + func = Function.instance() diff --git a/functions/servicenow/main.py b/functions/servicenow/main.py index 5af2a79..87e2965 100644 --- a/functions/servicenow/main.py +++ b/functions/servicenow/main.py @@ -1,11 +1,12 @@ -from crowdstrike.foundry.function import Function, Request, Response, APIError -from falconpy import APIIntegrations from logging import Logger from typing import Dict +from crowdstrike.foundry.function import Function, Request, Response, APIError +from falconpy import APIIntegrations func = Function.instance() + @func.handler(method='POST', path='/ticket') def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: """ diff --git a/functions/user-management/main.py b/functions/user-management/main.py index c016c77..23908c7 100755 --- a/functions/user-management/main.py +++ b/functions/user-management/main.py @@ -1,11 +1,13 @@ -from crowdstrike.foundry.function import Function, Request, Response, APIError -from utils import validate_email, format_error_response from logging import Logger from typing import Dict +from crowdstrike.foundry.function import Function, Request, Response + +from utils import validate_email, format_error_response func = Function.instance() + # Handler on_post @func.handler(method='POST', path='/create-user') def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: @@ -20,7 +22,7 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) return Response( code=200, body={ - "email":email + "email": email } ) diff --git a/functions/user-management/utils.py b/functions/user-management/utils.py index 94671c7..999e68e 100644 --- a/functions/user-management/utils.py +++ b/functions/user-management/utils.py @@ -1,23 +1,28 @@ -import re import json -from typing import Dict, Any, Optional, Union, List +import re +from typing import Dict, Any, Optional + from crowdstrike.foundry.function import APIError + def validate_host_id(host_id: str) -> bool: """Validate that a host ID is in the correct format.""" if not host_id or not isinstance(host_id, str): return False return len(host_id) == 32 and all(c in '0123456789abcdef' for c in host_id.lower()) + def validate_email(email: str) -> bool: """Validate email format.""" pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool(re.match(pattern, email)) + def format_error_response(message: str, code: int = 400) -> list[Any]: """Create a standardized error response.""" return [APIError(code=code, message=message)] + def safe_json_parse(data: str) -> Optional[Dict[str, Any]]: """Safely parse JSON data.""" try: From 9a22f255bc39077e552d9d145495ca3662a1885c Mon Sep 17 00:00:00 2001 From: Matt Raible Date: Wed, 9 Jul 2025 10:47:37 -0600 Subject: [PATCH 2/5] Single quotes to double quotes --- functions/hello/main.py | 12 ++++++------ functions/hello/test_main.py | 8 ++++---- functions/host-details/main.py | 10 +++++----- functions/host-info/main.py | 6 +++--- functions/host-info/utils.py | 4 ++-- functions/servicenow/main.py | 10 +++++----- functions/user-management/main.py | 6 +++--- functions/user-management/utils.py | 4 ++-- 8 files changed, 30 insertions(+), 30 deletions(-) diff --git a/functions/hello/main.py b/functions/hello/main.py index 708295c..c48eca7 100644 --- a/functions/hello/main.py +++ b/functions/hello/main.py @@ -4,7 +4,7 @@ # Handler hello -@func.handler(method='POST', path='/hello') +@func.handler(method="POST", path="/hello") def on_post(request: Request) -> Response: # # Replace the following example code with your handler code @@ -12,21 +12,21 @@ def on_post(request: Request) -> Response: # Demonstrates how to validate the request body if your handler requires input payload # Replace with your own request and update the request_schema.json to match - if 'name' not in request.body: - # This example expects 'name' field in the request body and returns + if "name" not in request.body: + # This example expects "name" field in the request body and returns # an error response (400 - Bad Request) if not provided by the caller return Response( code=400, - errors=[APIError(code=400, message='missing name from request body')] + errors=[APIError(code=400, message="missing name from request body")] ) # Demonstrates how to return a success response with JSON body # Replace with your response and update the response_schema.json to match return Response( - body={'greeting': f'Hello {request.body["name"]}! It is nice to see you.'}, + body={"greeting": f"Hello {request.body["name"]}! It is nice to see you."}, code=200, ) -if __name__ == '__main__': +if __name__ == "__main__": func.run() diff --git a/functions/hello/test_main.py b/functions/hello/test_main.py index 602564a..8e9dc50 100644 --- a/functions/hello/test_main.py +++ b/functions/hello/test_main.py @@ -13,7 +13,7 @@ def identity(func): class FnTestCase(unittest.TestCase): def setUp(self): - patcher = patch('crowdstrike.foundry.function.Function.handler', new=mock_handler) + patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler) self.addCleanup(patcher.stop) self.handler_patch = patcher.start() @@ -30,7 +30,7 @@ def test_on_post_success(self): response = on_post(request) self.assertEqual(response.code, 200) - self.assertEqual(response.body['greeting'], 'Hello Test User! It is nice to see you.') + self.assertEqual(response.body["greeting"], "Hello Test User! It is nice to see you.") def test_on_post_missing_name(self): from main import on_post @@ -39,8 +39,8 @@ def test_on_post_missing_name(self): response = on_post(request) self.assertEqual(response.code, 400) self.assertEqual(len(response.errors), 1) - self.assertEqual(response.errors[0].message, 'missing name from request body') + self.assertEqual(response.errors[0].message, "missing name from request body") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/functions/host-details/main.py b/functions/host-details/main.py index 4a6a322..9e77a69 100644 --- a/functions/host-details/main.py +++ b/functions/host-details/main.py @@ -5,16 +5,16 @@ func = Function.instance() -@func.handler(method='POST', path='/host-details') +@func.handler(method="POST", path="/host-details") def on_post(request: Request) -> Response: # Validate request - if 'host_id' not in request.body: + if "host_id" not in request.body: return Response( code=400, - errors=[APIError(code=400, message='missing host_id from request body')] + errors=[APIError(code=400, message="missing host_id from request body")] ) - host_id = request.body['host_id'] + host_id = request.body["host_id"] # Initialize the Hosts class with context-aware authentication falcon = Hosts() @@ -36,5 +36,5 @@ def on_post(request: Request) -> Response: ) -if __name__ == '__main__': +if __name__ == "__main__": func.run() diff --git a/functions/host-info/main.py b/functions/host-info/main.py index 194659c..22ef9b9 100755 --- a/functions/host-info/main.py +++ b/functions/host-info/main.py @@ -9,9 +9,9 @@ # Handler on_post -@func.handler(method='POST', path='/host-info') +@func.handler(method="POST", path="/host-info") def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: - host_id = request.body.get('host_id') + host_id = request.body.get("host_id") logger.info(f"Host ID: {host_id}") logger.info(f"Is valid? {validate_host_id(host_id)}") @@ -27,5 +27,5 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) ) -if __name__ == '__main__': +if __name__ == "__main__": func.run() diff --git a/functions/host-info/utils.py b/functions/host-info/utils.py index 999e68e..3c43144 100644 --- a/functions/host-info/utils.py +++ b/functions/host-info/utils.py @@ -9,12 +9,12 @@ def validate_host_id(host_id: str) -> bool: """Validate that a host ID is in the correct format.""" if not host_id or not isinstance(host_id, str): return False - return len(host_id) == 32 and all(c in '0123456789abcdef' for c in host_id.lower()) + return len(host_id) == 32 and all(c in "0123456789abcdef" for c in host_id.lower()) def validate_email(email: str) -> bool: """Validate email format.""" - pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return bool(re.match(pattern, email)) diff --git a/functions/servicenow/main.py b/functions/servicenow/main.py index 87e2965..3881c09 100644 --- a/functions/servicenow/main.py +++ b/functions/servicenow/main.py @@ -7,7 +7,7 @@ func = Function.instance() -@func.handler(method='POST', path='/ticket') +@func.handler(method="POST", path="/ticket") def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: """ Create an incident ticket in ServiceNow using the Table API. @@ -25,10 +25,10 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) - caller_id: User ID of the caller """ # Validate required fields - if 'title' not in request.body or 'description' not in request.body: + if "title" not in request.body or "description" not in request.body: return Response( code=400, - errors=[APIError(code=400, message='Missing required fields: title and description')] + errors=[APIError(code=400, message="Missing required fields: title and description")] ) # Prepare payload for ServiceNow incident creation @@ -76,7 +76,7 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) logger.info(f"ServiceNow API response: {response}") if response["status_code"] >= 400: - error_message = response.get('error', {}).get('message', 'Unknown error') + error_message = response.get("error", {}).get("message", "Unknown error") return Response( code=response["status_code"], errors=[APIError( @@ -109,5 +109,5 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) ) -if __name__ == '__main__': +if __name__ == "__main__": func.run() diff --git a/functions/user-management/main.py b/functions/user-management/main.py index 23908c7..e25a956 100755 --- a/functions/user-management/main.py +++ b/functions/user-management/main.py @@ -9,9 +9,9 @@ # Handler on_post -@func.handler(method='POST', path='/create-user') +@func.handler(method="POST", path="/create-user") def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response: - email = request.body.get('email') + email = request.body.get("email") logger.info(f"Email: {email}") logger.info(f"Is valid? {validate_email(email)}") @@ -27,5 +27,5 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) ) -if __name__ == '__main__': +if __name__ == "__main__": func.run() diff --git a/functions/user-management/utils.py b/functions/user-management/utils.py index 999e68e..3c43144 100644 --- a/functions/user-management/utils.py +++ b/functions/user-management/utils.py @@ -9,12 +9,12 @@ def validate_host_id(host_id: str) -> bool: """Validate that a host ID is in the correct format.""" if not host_id or not isinstance(host_id, str): return False - return len(host_id) == 32 and all(c in '0123456789abcdef' for c in host_id.lower()) + return len(host_id) == 32 and all(c in "0123456789abcdef" for c in host_id.lower()) def validate_email(email: str) -> bool: """Validate email format.""" - pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return bool(re.match(pattern, email)) From 82dda3715fc3d1b13344c6a0df634bb522516218 Mon Sep 17 00:00:00 2001 From: Matt Raible Date: Wed, 9 Jul 2025 10:56:13 -0600 Subject: [PATCH 3/5] Fix indent for multiline calls --- functions/host-details/main.py | 2 +- functions/log-event/main.py | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/functions/host-details/main.py b/functions/host-details/main.py index 9e77a69..c51c74f 100644 --- a/functions/host-details/main.py +++ b/functions/host-details/main.py @@ -26,7 +26,7 @@ def on_post(request: Request) -> Response: return Response( code=response["status_code"], errors=[APIError(code=response["status_code"], - message=f"Error retrieving host: {response['body']}")], + message=f"Error retrieving host: {response['body']}")], ) # Return host information diff --git a/functions/log-event/main.py b/functions/log-event/main.py index f7f4ef0..71bf9e1 100644 --- a/functions/log-event/main.py +++ b/functions/log-event/main.py @@ -40,11 +40,11 @@ def on_post(request: Request) -> Response: collection_name = "event_logs" response = api_client.command("PutObject", - body=json, - collection_name=collection_name, - object_key=event_id, - headers=headers - ) + body=json, + collection_name=collection_name, + object_key=event_id, + headers=headers + ) if response["status_code"] != 200: error_message = response.get('error', {}).get('message', 'Unknown error') @@ -58,11 +58,11 @@ def on_post(request: Request) -> Response: # Query the collection to retrieve the event by id query_response = api_client.command("SearchObjects", - filter=f"event_id:'{event_id}'", - collection_name=collection_name, - limit=5, - headers=headers - ) + filter=f"event_id:'{event_id}'", + collection_name=collection_name, + limit=5, + headers=headers + ) return Response( body={ From 0690189e168cdc8779e08c710adf71529b801958 Mon Sep 17 00:00:00 2001 From: Matt Raible Date: Wed, 9 Jul 2025 10:59:17 -0600 Subject: [PATCH 4/5] Revert single-quote change for inline variable --- functions/hello/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functions/hello/main.py b/functions/hello/main.py index c48eca7..d190134 100644 --- a/functions/hello/main.py +++ b/functions/hello/main.py @@ -23,7 +23,7 @@ def on_post(request: Request) -> Response: # Demonstrates how to return a success response with JSON body # Replace with your response and update the response_schema.json to match return Response( - body={"greeting": f"Hello {request.body["name"]}! It is nice to see you."}, + body={"greeting": f"Hello {request.body['name']}! It is nice to see you."}, code=200, ) From 33822f17eb3ce5a0d9ca455908c93e3cfdf3500c Mon Sep 17 00:00:00 2001 From: Matt Raible Date: Wed, 9 Jul 2025 12:17:28 -0600 Subject: [PATCH 5/5] Align when multiline --- functions/host-details/main.py | 2 +- functions/log-event/main.py | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/functions/host-details/main.py b/functions/host-details/main.py index c51c74f..9e77a69 100644 --- a/functions/host-details/main.py +++ b/functions/host-details/main.py @@ -26,7 +26,7 @@ def on_post(request: Request) -> Response: return Response( code=response["status_code"], errors=[APIError(code=response["status_code"], - message=f"Error retrieving host: {response['body']}")], + message=f"Error retrieving host: {response['body']}")], ) # Return host information diff --git a/functions/log-event/main.py b/functions/log-event/main.py index 71bf9e1..f7f4ef0 100644 --- a/functions/log-event/main.py +++ b/functions/log-event/main.py @@ -40,11 +40,11 @@ def on_post(request: Request) -> Response: collection_name = "event_logs" response = api_client.command("PutObject", - body=json, - collection_name=collection_name, - object_key=event_id, - headers=headers - ) + body=json, + collection_name=collection_name, + object_key=event_id, + headers=headers + ) if response["status_code"] != 200: error_message = response.get('error', {}).get('message', 'Unknown error') @@ -58,11 +58,11 @@ def on_post(request: Request) -> Response: # Query the collection to retrieve the event by id query_response = api_client.command("SearchObjects", - filter=f"event_id:'{event_id}'", - collection_name=collection_name, - limit=5, - headers=headers - ) + filter=f"event_id:'{event_id}'", + collection_name=collection_name, + limit=5, + headers=headers + ) return Response( body={