Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = IntuneCD
version = 2.4.0
version = 2.4.1
author = Tobias Almén
author_email = almenscorner@outlook.com
description = Tool to backup and update configurations in Intune
Expand Down
4 changes: 2 additions & 2 deletions src/IntuneCD/intunecdlib/BaseBackupModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ def _process_single_item(

if self.prefix:
if name_key == "":
return self.results
return {"config_count": 0, "outputs": []}
match = self.check_prefix_match(data[f"{name_key}"], self.prefix)
if not match:
return self.results
return {"config_count": 0, "outputs": []}

if log_message:
self.log(msg=log_message + data[f"{name_key}"])
Expand Down
209 changes: 147 additions & 62 deletions src/IntuneCD/intunecdlib/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,66 +7,151 @@

import os
import shutil
from datetime import datetime

# Folders to exclude from archving
exclude = set(
[
"Management Intents",
"archive",
"__archive__",
"Assignment Report",
"Autopilot Devices",
]
)
# Date tag for archive folder
date_tag = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")


def archive(path, file, root) -> None:
"""Moves a file to the archive folder.

Args:
path (_str_): path to current folder
file (_str_): file to archive
root (_str_): root of the file
"""
if not os.path.exists(f"{path}/__archive__/{date_tag}"):
os.makedirs(f"{path}/__archive__/{date_tag}")
shutil.move(
os.path.join(root, file),
os.path.join(path, f"__archive__/{date_tag}", file),
)


def move_to_archive(path, created_files, output) -> None:
"""Moves a file to the archive folder.

Args:
path (_str_): path to current folder
created_files (_list_): list of created files during backup
output (_str_): format the file is in
"""
if not os.path.exists(f"{path}/__archive__"):
os.makedirs(f"{path}/__archive__/")

for root, dirs, files in os.walk(path, topdown=True):
# Remove excluded folders from dirs
dirs[:] = [d for d in dirs if d not in exclude]
for file in files:
# if json is in root, skip it and move on
if file.endswith(".json") and root == path:
continue
if file.endswith(".yaml") or file.endswith(".json"):
# if file is not in created_files, archive it
if file.replace(f".{output}", "") not in created_files:
archive(path, file, root)

# Check if Management Intents folder exists
if os.path.exists(f"{path}/Management Intents") is True:
for root, dirs, files in os.walk(f"{path}/Management Intents", topdown=True):

from datetime import datetime, timedelta, timezone
from .BaseBackupModule import BaseBackupModule
from .process_audit_data import ProcessAuditData


class Archive(BaseBackupModule):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.exclude = {
"Management Intents",
"archive",
"__archive__",
"Assignment Report",
"Autopilot Devices",
"Activation Lock Bypass Codes",
}
self.date_tag = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
self.process_audit_data = ProcessAuditData()
self.audit_endpoint = (
"https://graph.microsoft.com/beta/deviceManagement/auditEvents"
)
if self.append_id and self.audit:
self.audit_data = self._get_audit_delete_events()

def archive_file(self, file, root):
archive_path = os.path.join(self.path, "__archive__", self.date_tag)
if not os.path.exists(archive_path):
os.makedirs(archive_path)

src = os.path.join(root, file)
dst = os.path.join(archive_path, file)
shutil.move(src, dst)

if self.audit_data:
self._handle_audit_commit(file, dst, archive_path, src)

def _get_audit_delete_events(self) -> list:
"""Gets all delete events from the audit log from the last 24h

Returns:
list: A list of all delete events
"""
if not os.getenv("AUDIT_DAYS_BACK"):
days_back = 1
else:
days_back = int(os.getenv("AUDIT_DAYS_BACK"))
start_date = (
datetime.now(timezone.utc) - timedelta(days=days_back)
).isoformat()
end_date = datetime.now(timezone.utc).isoformat()

q_params = {
"$filter": (
f"activityOperationType eq 'Delete' and "
f"activityDateTime gt {start_date} and "
f"activityDateTime le {end_date}"
),
"$select": "actor,activityDateTime,activityType,activityOperationType,activityResult,resources",
"$orderby": "activityDateTime desc",
}

audit_data = self.make_graph_request(
self.audit_endpoint, params=q_params, method="GET"
)

return audit_data

def _handle_audit_commit(self, filename, filepath, archive_path, source_file):
"""Handles the audit commit for the file

Args:
filename: The name of the file
filepath: The path to the file
archive_path: The path to the archive
"""
resource_id = filename.split("__")[-1].replace(".json", "").replace(".yaml", "")

if self.audit_data:
audit_data_record = next(
(
item
for item in self.audit_data.get("value", [])
if resource_id
in [res.get("resourceId") for res in item.get("resources", [])]
),
None,
)

if audit_data_record:
if audit_data_record["actor"]["auditActorType"] == "ItPro":
actor = audit_data_record["actor"].get("userPrincipalName")
else:
actor = audit_data_record["actor"].get("applicationDisplayName")

audit_data_record = {
"resourceId": audit_data_record["resources"][0]["resourceId"],
"auditResourceType": audit_data_record["resources"][0][
"auditResourceType"
],
"actor": actor,
"activityDateTime": audit_data_record["activityDateTime"],
"activityType": audit_data_record["activityType"],
"activityOperationType": audit_data_record["activityOperationType"],
"activityResult": audit_data_record["activityResult"],
}

self.filename = os.path.splitext(os.path.basename(filepath))[0]
# process audit and check for deleted files
self.process_audit_data.process_audit_data(
audit_data_record,
None,
archive_path,
filepath,
get_record=False,
record=audit_data_record,
source_file=source_file,
)
# process audit and check for archived files
self.process_audit_data.process_audit_data(
audit_data_record,
None,
archive_path,
filepath,
get_record=False,
record=audit_data_record,
)

def move_to_archive(self, created_files):
if not os.path.exists(os.path.join(self.path, "__archive__")):
os.makedirs(os.path.join(self.path, "__archive__"))

for root, dirs, files in os.walk(self.path, topdown=True):
dirs[:] = [d for d in dirs if d not in self.exclude]
for file in files:
if file.endswith(".yaml") or file.endswith(".json"):
# if file is not in created_files, archive it
if file.replace(f".{output}", "") not in created_files:
archive(path, file, root)
if file.endswith((".yaml", ".json")):
if root == self.path and file.endswith(".json"):
continue
if file.replace(f".{self.filetype}", "") not in created_files:
self.archive_file(file, root)

mgmt_path = os.path.join(self.path, "Management Intents")
if os.path.exists(mgmt_path):
for root, dirs, files in os.walk(mgmt_path, topdown=True):
for file in files:
if file.endswith((".yaml", ".json")):
if file.replace(f".{self.filetype}", "") not in created_files:
self.archive_file(file, root)
17 changes: 15 additions & 2 deletions src/IntuneCD/intunecdlib/assignment_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,31 @@ def _process_file(self, path, name, payload_type, groups):
return

for assignment in data["assignments"]:
intent = ""
if not assignment["target"].get("groupName"):
continue

intent_string = assignment.get("intent", "")
if "intent" in assignment and assignment["intent"] not in ["apply", ""]:
intent = assignment["intent"]
else:
if (
assignment["target"]["@odata.type"]
== "#microsoft.graph.groupAssignmentTarget"
):
intent = "Include"
if (
assignment["target"]["@odata.type"]
== "#microsoft.graph.exclusionGroupAssignmentTarget"
):
intent = "Exclude"
config_type = ""
if data.get("@odata.type"):
config_type = f'{data["@odata.type"].split(".")[2]}'
payload_name = data.get("displayName", data.get("name", ""))
payload_data = {
"name": payload_name,
"type": config_type,
"intent": intent_string,
"intent": intent,
}

group_data = {
Expand Down
59 changes: 29 additions & 30 deletions src/IntuneCD/intunecdlib/documentation_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,52 +76,51 @@ def write_assignment_table(data, headers):
if "assignments" in data:
assignments = data["assignments"]
assignment_list = []
target = ""
intent = ""
for assignment in assignments:
headers = ["intent", "target", "filter type", "filter name"]
target = ""
intent = ""
if (
assignment["target"]["@odata.type"]
== "#microsoft.graph.allDevicesAssignmentTarget"
):
target = "All Devices"
intent = "Include"
if (
assignment["target"]["@odata.type"]
== "#microsoft.graph.allLicensedUsersAssignmentTarget"
):
target = "All Users"
intent = "Include"
if "groupName" in assignment["target"]:
target = assignment["target"]["groupName"]
if "intent" in assignment:
if "intent" in assignment and assignment["intent"] not in ["apply", ""]:
intent = assignment["intent"]
headers = ["intent", "target", "filter type", "filter name"]
else:
headers = ["target", "filter type", "filter name"]
if intent:
assignment_list.append(
[
intent,
target,
assignment["target"][
"deviceAndAppManagementAssignmentFilterType"
],
assignment["target"][
"deviceAndAppManagementAssignmentFilterId"
],
]
)
else:
assignment_list.append(
[
target,
assignment["target"][
"deviceAndAppManagementAssignmentFilterType"
],
assignment["target"][
"deviceAndAppManagementAssignmentFilterId"
],
]
)
if (
assignment["target"]["@odata.type"]
== "#microsoft.graph.groupAssignmentTarget"
):
intent = "Include"
if (
assignment["target"]["@odata.type"]
== "#microsoft.graph.exclusionGroupAssignmentTarget"
):
intent = "Exclude"
assignment_list.append(
[
intent,
target,
assignment["target"][
"deviceAndAppManagementAssignmentFilterType"
],
assignment["target"][
"deviceAndAppManagementAssignmentFilterId"
],
]
)

assignment_list.sort(key=lambda x: x[0], reverse=True) # Sort by the 'Intent' column in reverse order
table = write_assignment_table(assignment_list, headers)

return table
Expand Down
3 changes: 2 additions & 1 deletion src/IntuneCD/intunecdlib/get_accesstoken.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def obtain_accesstoken_app(TENANT_NAME, CLIENT_ID, CLIENT_SECRET):
return token


def obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE):
def obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE, PASSPHRASE):
"""
This function is used to get an access token to MS Graph using a certificate.

Expand All @@ -73,6 +73,7 @@ def obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE):
client_credential={
"thumbprint": THUMBPRINT,
"private_key": open(KEY_FILE, encoding="utf-8").read(),
"passphrase": PASSPHRASE,
},
authority=AUTHORITY + TENANT_NAME,
)
Expand Down
5 changes: 4 additions & 1 deletion src/IntuneCD/intunecdlib/get_authparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ def getAuth(mode, localauth, certauth, interactiveauth, scopes, entra, tenant):
THUMBPRINT = os.environ.get("THUMBPRINT")
TENANT_NAME = os.environ.get("TENANT_NAME")
CLIENT_ID = os.environ.get("CLIENT_ID")
PASSPHRASE = os.environ.get("PASSPHRASE")

if not all([KEY_FILE, THUMBPRINT, TENANT_NAME, CLIENT_ID]):
raise ValueError("One or more os.environ variables not set")
return obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE)
return obtain_accesstoken_cert(
TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE, PASSPHRASE
)

if interactiveauth:
TENANT_NAME = os.environ.get("TENANT_NAME")
Expand Down
Loading