Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions functions/hello/main.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,32 @@
from crowdstrike.foundry.function import Function, Request, Response, APIError


func = Function.instance()


# Handler hello
@func.handler(method='POST', path='/hello')
@func.handler(method="POST", path="/hello")
def on_post(request: Request) -> Response:

#
# Replace the following example code with your handler code
#

# Demonstrates how to validate the request body if your handler requires input payload
# Replace with your own request and update the request_schema.json to match
if 'name' not in request.body:
# This example expects 'name' field in the request body and returns
if "name" not in request.body:
# This example expects "name" field in the request body and returns
# an error response (400 - Bad Request) if not provided by the caller
return Response(
code=400,
errors=[APIError(code=400, message='missing name from request body')]
errors=[APIError(code=400, message="missing name from request body")]
)

# Demonstrates how to return a success response with JSON body
# Replace with your response and update the response_schema.json to match
return Response(
body={'greeting': f'Hello {request.body["name"]}! It is nice to see you.'},
body={"greeting": f"Hello {request.body['name']}! It is nice to see you."},
code=200,
)


if __name__ == '__main__':
if __name__ == "__main__":
func.run()
12 changes: 8 additions & 4 deletions functions/hello/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@

from crowdstrike.foundry.function import Request


def mock_handler(*args, **kwargs):
def identity(func):
return func

return identity


class FnTestCase(unittest.TestCase):
def setUp(self):
patcher = patch('crowdstrike.foundry.function.Function.handler', new=mock_handler)
patcher = patch("crowdstrike.foundry.function.Function.handler", new=mock_handler)
self.addCleanup(patcher.stop)
self.handler_patch = patcher.start()

Expand All @@ -27,7 +30,7 @@ def test_on_post_success(self):

response = on_post(request)
self.assertEqual(response.code, 200)
self.assertEqual(response.body['greeting'], 'Hello Test User! It is nice to see you.')
self.assertEqual(response.body["greeting"], "Hello Test User! It is nice to see you.")

def test_on_post_missing_name(self):
from main import on_post
Expand All @@ -36,7 +39,8 @@ def test_on_post_missing_name(self):
response = on_post(request)
self.assertEqual(response.code, 400)
self.assertEqual(len(response.errors), 1)
self.assertEqual(response.errors[0].message, 'missing name from request body')
self.assertEqual(response.errors[0].message, "missing name from request body")


if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()
14 changes: 7 additions & 7 deletions functions/host-details/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@
# Import service collection you'd like to use
from falconpy import Hosts


func = Function.instance()


@func.handler(method='POST', path='/host-details')
@func.handler(method="POST", path="/host-details")
def on_post(request: Request) -> Response:
# Validate request
if 'host_id' not in request.body:
if "host_id" not in request.body:
return Response(
code=400,
errors=[APIError(code=400, message='missing host_id from request body')]
errors=[APIError(code=400, message="missing host_id from request body")]
)

host_id = request.body['host_id']
host_id = request.body["host_id"]

# Initialize the Hosts class with context-aware authentication
falcon = Hosts()
Expand All @@ -27,7 +26,7 @@ def on_post(request: Request) -> Response:
return Response(
code=response["status_code"],
errors=[APIError(code=response["status_code"],
message=f"Error retrieving host: {response['body']}")],
message=f"Error retrieving host: {response['body']}")],
)

# Return host information
Expand All @@ -36,5 +35,6 @@ def on_post(request: Request) -> Response:
code=200,
)

if __name__ == '__main__':

if __name__ == "__main__":
func.run()
14 changes: 8 additions & 6 deletions functions/host-info/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from crowdstrike.foundry.function import Function, Request, Response, APIError
from utils import validate_host_id, format_error_response
from logging import Logger
from typing import Dict

from crowdstrike.foundry.function import Function, Request, Response

from utils import validate_host_id, format_error_response

func = Function.instance()


# Handler on_post
@func.handler(method='POST', path='/host-info')
@func.handler(method="POST", path="/host-info")
def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response:
host_id = request.body.get('host_id')
host_id = request.body.get("host_id")

logger.info(f"Host ID: {host_id}")
logger.info(f"Is valid? {validate_host_id(host_id)}")
Expand All @@ -20,10 +22,10 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger)
return Response(
code=200,
body={
"host":host_id
"host": host_id
}
)


if __name__ == '__main__':
if __name__ == "__main__":
func.run()
13 changes: 9 additions & 4 deletions functions/host-info/utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
import re
import json
from typing import Dict, Any, Optional, Union, List
import re
from typing import Dict, Any, Optional

from crowdstrike.foundry.function import APIError


def validate_host_id(host_id: str) -> bool:
"""Validate that a host ID is in the correct format."""
if not host_id or not isinstance(host_id, str):
return False
return len(host_id) == 32 and all(c in '0123456789abcdef' for c in host_id.lower())
return len(host_id) == 32 and all(c in "0123456789abcdef" for c in host_id.lower())


def validate_email(email: str) -> bool:
"""Validate email format."""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return bool(re.match(pattern, email))


def format_error_response(message: str, code: int = 400) -> list[Any]:
"""Create a standardized error response."""
return [APIError(code=code, message=message)]


def safe_json_parse(data: str) -> Optional[Dict[str, Any]]:
"""Safely parse JSON data."""
try:
Expand Down
7 changes: 4 additions & 3 deletions functions/log-event/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from crowdstrike.foundry.function import Function, Request, Response, APIError
from falconpy import APIHarnessV2
import time
import os
import time
import uuid

from crowdstrike.foundry.function import Function, Request, Response, APIError
from falconpy import APIHarnessV2

func = Function.instance()


Expand Down
15 changes: 8 additions & 7 deletions functions/servicenow/main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from crowdstrike.foundry.function import Function, Request, Response, APIError
from falconpy import APIIntegrations
from logging import Logger
from typing import Dict

from crowdstrike.foundry.function import Function, Request, Response, APIError
from falconpy import APIIntegrations

func = Function.instance()

@func.handler(method='POST', path='/ticket')

@func.handler(method="POST", path="/ticket")
def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response:
"""
Create an incident ticket in ServiceNow using the Table API.
Expand All @@ -24,10 +25,10 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger)
- caller_id: User ID of the caller
"""
# Validate required fields
if 'title' not in request.body or 'description' not in request.body:
if "title" not in request.body or "description" not in request.body:
return Response(
code=400,
errors=[APIError(code=400, message='Missing required fields: title and description')]
errors=[APIError(code=400, message="Missing required fields: title and description")]
)

# Prepare payload for ServiceNow incident creation
Expand Down Expand Up @@ -75,7 +76,7 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger)
logger.info(f"ServiceNow API response: {response}")

if response["status_code"] >= 400:
error_message = response.get('error', {}).get('message', 'Unknown error')
error_message = response.get("error", {}).get("message", "Unknown error")
return Response(
code=response["status_code"],
errors=[APIError(
Expand Down Expand Up @@ -108,5 +109,5 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger)
)


if __name__ == '__main__':
if __name__ == "__main__":
func.run()
14 changes: 8 additions & 6 deletions functions/user-management/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from crowdstrike.foundry.function import Function, Request, Response, APIError
from utils import validate_email, format_error_response
from logging import Logger
from typing import Dict

from crowdstrike.foundry.function import Function, Request, Response

from utils import validate_email, format_error_response

func = Function.instance()


# Handler on_post
@func.handler(method='POST', path='/create-user')
@func.handler(method="POST", path="/create-user")
def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response:
email = request.body.get('email')
email = request.body.get("email")

logger.info(f"Email: {email}")
logger.info(f"Is valid? {validate_email(email)}")
Expand All @@ -20,10 +22,10 @@ def on_post(request: Request, config: Dict[str, object] | None, logger: Logger)
return Response(
code=200,
body={
"email":email
"email": email
}
)


if __name__ == '__main__':
if __name__ == "__main__":
func.run()
13 changes: 9 additions & 4 deletions functions/user-management/utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
import re
import json
from typing import Dict, Any, Optional, Union, List
import re
from typing import Dict, Any, Optional

from crowdstrike.foundry.function import APIError


def validate_host_id(host_id: str) -> bool:
"""Validate that a host ID is in the correct format."""
if not host_id or not isinstance(host_id, str):
return False
return len(host_id) == 32 and all(c in '0123456789abcdef' for c in host_id.lower())
return len(host_id) == 32 and all(c in "0123456789abcdef" for c in host_id.lower())


def validate_email(email: str) -> bool:
"""Validate email format."""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return bool(re.match(pattern, email))


def format_error_response(message: str, code: int = 400) -> list[Any]:
"""Create a standardized error response."""
return [APIError(code=code, message=message)]


def safe_json_parse(data: str) -> Optional[Dict[str, Any]]:
"""Safely parse JSON data."""
try:
Expand Down