Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/python-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches: [main]
pull_request:
workflow_dispatch:

jobs:
lint:
Expand Down
10 changes: 5 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# Unit test / coverage reports
htmlcov/
.tox/
Expand All @@ -8,7 +7,7 @@ htmlcov/
.cache
coverage.xml
*.coveragerc
*.py,cover
*.py.cover
.hypothesis/

# Pytest
Expand All @@ -20,8 +19,6 @@ coverage.xml
env/
venv/
ENV/
env.bak/
venv.bak/

# Python version specific
.python-version
Expand Down Expand Up @@ -49,4 +46,7 @@ venv.bak/
.dmy

#Output files
*.csv
*.csv

#Cache
__pycache__/
42 changes: 26 additions & 16 deletions FetchEvent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
from datadog_api_client import ApiClient, Configuration
from datadog_api_client.v2.api.events_api import EventsApi


def initialize_config():
"""
Sets up configurations, API keys, and Datadog API client.
"""
if not load_dotenv():
raise EnvironmentError("Unable to load .env file. Make sure it exists and contains the required keys.")
raise EnvironmentError(
"Unable to load .env file. Make sure it exists and contains the required keys."
)

DD_API_KEY = os.getenv('DD_API_KEY')
DD_APP_KEY = os.getenv('DD_APP_KEY')
DD_API_KEY = os.getenv("DD_API_KEY")
DD_APP_KEY = os.getenv("DD_APP_KEY")

if not DD_API_KEY or not DD_APP_KEY:
raise ValueError("Datadog API Key or App Key is missing in the .env file.")
Expand All @@ -30,6 +33,7 @@ def initialize_config():

return configuration


def parse_arguments():
"""
Parses command-line arguments.
Expand All @@ -39,16 +43,17 @@ def parse_arguments():
"--query",
type=str,
required=True,
help="The filter query for fetching events, e.g., 'service:trms production service check'."
help="The filter query for fetching events, e.g., 'service:trms production service check'.",
)
parser.add_argument(
"--days",
type=int,
default=30,
help="Number of days in the past to fetch events (default: 30)."
help="Number of days in the past to fetch events (default: 30).",
)
return parser.parse_args()


def fetch_event_data(events_api, query, days):
"""
Fetches event data from Datadog API.
Expand All @@ -60,14 +65,15 @@ def fetch_event_data(events_api, query, days):
events_response = events_api.list_events(
filter_from=str(start_time),
filter_to=str(end_time),
filter_query=query,
page_limit=1000
filter_query=query,
page_limit=1000,
)
return events_response.data if events_response.data else []
except Exception as e:
handle_errors(f"Error fetching events: {e}")
return []


def process_event_data(events_data):
"""
Processes and transforms the fetched event data.
Expand All @@ -85,14 +91,16 @@ def process_event_data(events_data):

event_timestamp_local = event_timestamp.astimezone(target_timezone)

processed_events.append({
"event_id": event.id,
"event_type": event.type,
"event_tags": ", ".join(event.attributes.tags) if event.attributes.tags else "",
"event_timestamp_local": event_timestamp_local.strftime("%Y-%m-%d %H:%M:%S"),
"event_title": attributes.get("title", "No Title"),
"event_status": attributes.get("status", "Unknown")
})
processed_events.append(
{
"event_id": event.id,
"event_type": event.type,
"event_tags": ", ".join(event.attributes.tags) if event.attributes.tags else "",
"event_timestamp_local": event_timestamp_local.strftime("%Y-%m-%d %H:%M:%S"),
"event_title": attributes.get("title", "No Title"),
"event_status": attributes.get("status", "Unknown"),
}
)

return processed_events

Expand All @@ -114,12 +122,14 @@ def save_to_csv(events_df, output_dir_path="output", output_file="output/events_
except Exception as e:
handle_errors(f"Error saving to CSV: {e}")


def handle_errors(error_message):
"""
Centralized error handling mechanism.
"""
print(error_message)


def main():
"""
Orchestrates the workflow and executes the main logic.
Expand All @@ -138,6 +148,6 @@ def main():
else:
print("No events found.")


if __name__ == "__main__":
main()

Binary file removed __pycache__/FetchEvent.cpython-310.pyc
Binary file not shown.
Binary file not shown.
22 changes: 16 additions & 6 deletions tests/test_FetchEvent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,37 @@
fetch_event_data,
process_event_data,
save_to_csv,
handle_errors
handle_errors,
)


def test_initialize_config():
with patch("FetchEvent.load_dotenv", return_value=True), \
patch("FetchEvent.os.getenv", side_effect=["dummy_api_key", "dummy_app_key"]):
with patch("FetchEvent.load_dotenv", return_value=True), patch(
"FetchEvent.os.getenv", side_effect=["dummy_api_key", "dummy_app_key"]
):
config = initialize_config()
assert config.api_key["apiKeyAuth"] == "dummy_api_key"
assert config.api_key["appKeyAuth"] == "dummy_app_key"


def test_parse_arguments():
test_args = ["--query", "test_query", "--days", "7"]
with patch("FetchEvent.argparse.ArgumentParser.parse_args", return_value=argparse.Namespace(query="test_query", days=7)):
with patch(
"FetchEvent.argparse.ArgumentParser.parse_args",
return_value=argparse.Namespace(query="test_query", days=7),
):
args = parse_arguments()
assert args.query == "test_query"
assert args.days == 7


def test_fetch_event_data():
mock_events_api = MagicMock()
mock_events_api.list_events.return_value = MagicMock(data=["event1", "event2"])
events = fetch_event_data(mock_events_api, "test_query", 7)
assert events == ["event1", "event2"]


def test_process_event_data():
mock_event_data = [
MagicMock(
Expand All @@ -40,22 +48,24 @@ def test_process_event_data():
attributes=MagicMock(
tags=["tag1", "tag2"],
timestamp=pd.Timestamp("2023-01-01 00:00:00"),
attributes={"title": "Event Title", "status": "Active"}
)
attributes={"title": "Event Title", "status": "Active"},
),
)
]
result = process_event_data(mock_event_data)
assert isinstance(result, list)
assert len(result) > 0
assert result[0]["event_title"] == "Event Title"


def test_save_to_csv(tmp_path):
test_df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
output_dir = tmp_path / "output"
output_file = output_dir / "events_output.csv"
save_to_csv(test_df, str(output_dir), str(output_file))
assert os.path.exists(output_file)


def test_handle_errors(capfd):
handle_errors("Test error message")
captured = capfd.readouterr()
Expand Down
Loading