diff --git a/setup.cfg b/setup.cfg
index b5ea5d5c..41ea94ca 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = IntuneCD
-version = 2.4.0
+version = 2.4.1
author = Tobias Almén
author_email = almenscorner@outlook.com
description = Tool to backup and update configurations in Intune
diff --git a/src/IntuneCD/intunecdlib/BaseBackupModule.py b/src/IntuneCD/intunecdlib/BaseBackupModule.py
index bf09fcf7..9c41588c 100644
--- a/src/IntuneCD/intunecdlib/BaseBackupModule.py
+++ b/src/IntuneCD/intunecdlib/BaseBackupModule.py
@@ -236,10 +236,10 @@ def _process_single_item(
if self.prefix:
if name_key == "":
- return self.results
+ return {"config_count": 0, "outputs": []}
match = self.check_prefix_match(data[f"{name_key}"], self.prefix)
if not match:
- return self.results
+ return {"config_count": 0, "outputs": []}
if log_message:
self.log(msg=log_message + data[f"{name_key}"])
diff --git a/src/IntuneCD/intunecdlib/archive.py b/src/IntuneCD/intunecdlib/archive.py
index e99f6061..fc9d1794 100644
--- a/src/IntuneCD/intunecdlib/archive.py
+++ b/src/IntuneCD/intunecdlib/archive.py
@@ -7,66 +7,151 @@
import os
import shutil
-from datetime import datetime
-
-# Folders to exclude from archving
-exclude = set(
- [
- "Management Intents",
- "archive",
- "__archive__",
- "Assignment Report",
- "Autopilot Devices",
- ]
-)
-# Date tag for archive folder
-date_tag = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
-
-
-def archive(path, file, root) -> None:
- """Moves a file to the archive folder.
-
- Args:
- path (_str_): path to current folder
- file (_str_): file to archive
- root (_str_): root of the file
- """
- if not os.path.exists(f"{path}/__archive__/{date_tag}"):
- os.makedirs(f"{path}/__archive__/{date_tag}")
- shutil.move(
- os.path.join(root, file),
- os.path.join(path, f"__archive__/{date_tag}", file),
- )
-
-
-def move_to_archive(path, created_files, output) -> None:
- """Moves a file to the archive folder.
-
- Args:
- path (_str_): path to current folder
- created_files (_list_): list of created files during backup
- output (_str_): format the file is in
- """
- if not os.path.exists(f"{path}/__archive__"):
- os.makedirs(f"{path}/__archive__/")
-
- for root, dirs, files in os.walk(path, topdown=True):
- # Remove excluded folders from dirs
- dirs[:] = [d for d in dirs if d not in exclude]
- for file in files:
- # if json is in root, skip it and move on
- if file.endswith(".json") and root == path:
- continue
- if file.endswith(".yaml") or file.endswith(".json"):
- # if file is not in created_files, archive it
- if file.replace(f".{output}", "") not in created_files:
- archive(path, file, root)
-
- # Check if Management Intents folder exists
- if os.path.exists(f"{path}/Management Intents") is True:
- for root, dirs, files in os.walk(f"{path}/Management Intents", topdown=True):
+
+from datetime import datetime, timedelta, timezone
+from .BaseBackupModule import BaseBackupModule
+from .process_audit_data import ProcessAuditData
+
+
+class Archive(BaseBackupModule):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.exclude = {
+ "Management Intents",
+ "archive",
+ "__archive__",
+ "Assignment Report",
+ "Autopilot Devices",
+ "Activation Lock Bypass Codes",
+ }
+ self.date_tag = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
+ self.process_audit_data = ProcessAuditData()
+ self.audit_endpoint = (
+ "https://graph.microsoft.com/beta/deviceManagement/auditEvents"
+ )
+ if self.append_id and self.audit:
+ self.audit_data = self._get_audit_delete_events()
+
+ def archive_file(self, file, root):
+ archive_path = os.path.join(self.path, "__archive__", self.date_tag)
+ if not os.path.exists(archive_path):
+ os.makedirs(archive_path)
+
+ src = os.path.join(root, file)
+ dst = os.path.join(archive_path, file)
+ shutil.move(src, dst)
+
+ if self.audit_data:
+ self._handle_audit_commit(file, dst, archive_path, src)
+
+ def _get_audit_delete_events(self) -> list:
+ """Gets all delete events from the audit log from the last 24h
+
+ Returns:
+ list: A list of all delete events
+ """
+ if not os.getenv("AUDIT_DAYS_BACK"):
+ days_back = 1
+ else:
+ days_back = int(os.getenv("AUDIT_DAYS_BACK"))
+ start_date = (
+ datetime.now(timezone.utc) - timedelta(days=days_back)
+ ).isoformat()
+ end_date = datetime.now(timezone.utc).isoformat()
+
+ q_params = {
+ "$filter": (
+ f"activityOperationType eq 'Delete' and "
+ f"activityDateTime gt {start_date} and "
+ f"activityDateTime le {end_date}"
+ ),
+ "$select": "actor,activityDateTime,activityType,activityOperationType,activityResult,resources",
+ "$orderby": "activityDateTime desc",
+ }
+
+ audit_data = self.make_graph_request(
+ self.audit_endpoint, params=q_params, method="GET"
+ )
+
+ return audit_data
+
+ def _handle_audit_commit(self, filename, filepath, archive_path, source_file):
+ """Handles the audit commit for the file
+
+ Args:
+ filename: The name of the file
+ filepath: The path to the file
+ archive_path: The path to the archive
+ """
+ resource_id = filename.split("__")[-1].replace(".json", "").replace(".yaml", "")
+
+ if self.audit_data:
+ audit_data_record = next(
+ (
+ item
+ for item in self.audit_data.get("value", [])
+ if resource_id
+ in [res.get("resourceId") for res in item.get("resources", [])]
+ ),
+ None,
+ )
+
+ if audit_data_record:
+ if audit_data_record["actor"]["auditActorType"] == "ItPro":
+ actor = audit_data_record["actor"].get("userPrincipalName")
+ else:
+ actor = audit_data_record["actor"].get("applicationDisplayName")
+
+ audit_data_record = {
+ "resourceId": audit_data_record["resources"][0]["resourceId"],
+ "auditResourceType": audit_data_record["resources"][0][
+ "auditResourceType"
+ ],
+ "actor": actor,
+ "activityDateTime": audit_data_record["activityDateTime"],
+ "activityType": audit_data_record["activityType"],
+ "activityOperationType": audit_data_record["activityOperationType"],
+ "activityResult": audit_data_record["activityResult"],
+ }
+
+ self.filename = os.path.splitext(os.path.basename(filepath))[0]
+ # process audit and check for deleted files
+ self.process_audit_data.process_audit_data(
+ audit_data_record,
+ None,
+ archive_path,
+ filepath,
+ get_record=False,
+ record=audit_data_record,
+ source_file=source_file,
+ )
+ # process audit and check for archived files
+ self.process_audit_data.process_audit_data(
+ audit_data_record,
+ None,
+ archive_path,
+ filepath,
+ get_record=False,
+ record=audit_data_record,
+ )
+
+ def move_to_archive(self, created_files):
+ if not os.path.exists(os.path.join(self.path, "__archive__")):
+ os.makedirs(os.path.join(self.path, "__archive__"))
+
+ for root, dirs, files in os.walk(self.path, topdown=True):
+ dirs[:] = [d for d in dirs if d not in self.exclude]
for file in files:
- if file.endswith(".yaml") or file.endswith(".json"):
- # if file is not in created_files, archive it
- if file.replace(f".{output}", "") not in created_files:
- archive(path, file, root)
+ if file.endswith((".yaml", ".json")):
+ if root == self.path and file.endswith(".json"):
+ continue
+ if file.replace(f".{self.filetype}", "") not in created_files:
+ self.archive_file(file, root)
+
+ mgmt_path = os.path.join(self.path, "Management Intents")
+ if os.path.exists(mgmt_path):
+ for root, dirs, files in os.walk(mgmt_path, topdown=True):
+ for file in files:
+ if file.endswith((".yaml", ".json")):
+ if file.replace(f".{self.filetype}", "") not in created_files:
+ self.archive_file(file, root)
diff --git a/src/IntuneCD/intunecdlib/assignment_report.py b/src/IntuneCD/intunecdlib/assignment_report.py
index 1f27ab84..1cb150b6 100644
--- a/src/IntuneCD/intunecdlib/assignment_report.py
+++ b/src/IntuneCD/intunecdlib/assignment_report.py
@@ -30,10 +30,23 @@ def _process_file(self, path, name, payload_type, groups):
return
for assignment in data["assignments"]:
+ intent = ""
if not assignment["target"].get("groupName"):
continue
- intent_string = assignment.get("intent", "")
+ if "intent" in assignment and assignment["intent"] not in ["apply", ""]:
+ intent = assignment["intent"]
+ else:
+ if (
+ assignment["target"]["@odata.type"]
+ == "#microsoft.graph.groupAssignmentTarget"
+ ):
+ intent = "Include"
+ if (
+ assignment["target"]["@odata.type"]
+ == "#microsoft.graph.exclusionGroupAssignmentTarget"
+ ):
+ intent = "Exclude"
config_type = ""
if data.get("@odata.type"):
config_type = f'{data["@odata.type"].split(".")[2]}'
@@ -41,7 +54,7 @@ def _process_file(self, path, name, payload_type, groups):
payload_data = {
"name": payload_name,
"type": config_type,
- "intent": intent_string,
+ "intent": intent,
}
group_data = {
diff --git a/src/IntuneCD/intunecdlib/documentation_functions.py b/src/IntuneCD/intunecdlib/documentation_functions.py
index 2414fac8..f5fb6f1f 100644
--- a/src/IntuneCD/intunecdlib/documentation_functions.py
+++ b/src/IntuneCD/intunecdlib/documentation_functions.py
@@ -76,52 +76,51 @@ def write_assignment_table(data, headers):
if "assignments" in data:
assignments = data["assignments"]
assignment_list = []
- target = ""
- intent = ""
for assignment in assignments:
+ headers = ["intent", "target", "filter type", "filter name"]
+ target = ""
+ intent = ""
if (
assignment["target"]["@odata.type"]
== "#microsoft.graph.allDevicesAssignmentTarget"
):
target = "All Devices"
+ intent = "Include"
if (
assignment["target"]["@odata.type"]
== "#microsoft.graph.allLicensedUsersAssignmentTarget"
):
target = "All Users"
+ intent = "Include"
if "groupName" in assignment["target"]:
target = assignment["target"]["groupName"]
- if "intent" in assignment:
+ if "intent" in assignment and assignment["intent"] not in ["apply", ""]:
intent = assignment["intent"]
- headers = ["intent", "target", "filter type", "filter name"]
else:
- headers = ["target", "filter type", "filter name"]
- if intent:
- assignment_list.append(
- [
- intent,
- target,
- assignment["target"][
- "deviceAndAppManagementAssignmentFilterType"
- ],
- assignment["target"][
- "deviceAndAppManagementAssignmentFilterId"
- ],
- ]
- )
- else:
- assignment_list.append(
- [
- target,
- assignment["target"][
- "deviceAndAppManagementAssignmentFilterType"
- ],
- assignment["target"][
- "deviceAndAppManagementAssignmentFilterId"
- ],
- ]
- )
+ if (
+ assignment["target"]["@odata.type"]
+ == "#microsoft.graph.groupAssignmentTarget"
+ ):
+ intent = "Include"
+ if (
+ assignment["target"]["@odata.type"]
+ == "#microsoft.graph.exclusionGroupAssignmentTarget"
+ ):
+ intent = "Exclude"
+ assignment_list.append(
+ [
+ intent,
+ target,
+ assignment["target"][
+ "deviceAndAppManagementAssignmentFilterType"
+ ],
+ assignment["target"][
+ "deviceAndAppManagementAssignmentFilterId"
+ ],
+ ]
+ )
+ assignment_list.sort(key=lambda x: x[0], reverse=True) # Sort by the 'Intent' column in reverse order
table = write_assignment_table(assignment_list, headers)
return table
diff --git a/src/IntuneCD/intunecdlib/get_accesstoken.py b/src/IntuneCD/intunecdlib/get_accesstoken.py
index eb038d4f..9ca4eebb 100644
--- a/src/IntuneCD/intunecdlib/get_accesstoken.py
+++ b/src/IntuneCD/intunecdlib/get_accesstoken.py
@@ -56,7 +56,7 @@ def obtain_accesstoken_app(TENANT_NAME, CLIENT_ID, CLIENT_SECRET):
return token
-def obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE):
+def obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE, PASSPHRASE):
"""
This function is used to get an access token to MS Graph using a certificate.
@@ -73,6 +73,7 @@ def obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE):
client_credential={
"thumbprint": THUMBPRINT,
"private_key": open(KEY_FILE, encoding="utf-8").read(),
+ "passphrase": PASSPHRASE,
},
authority=AUTHORITY + TENANT_NAME,
)
diff --git a/src/IntuneCD/intunecdlib/get_authparams.py b/src/IntuneCD/intunecdlib/get_authparams.py
index 0132b74b..856bc6d2 100644
--- a/src/IntuneCD/intunecdlib/get_authparams.py
+++ b/src/IntuneCD/intunecdlib/get_authparams.py
@@ -30,10 +30,13 @@ def getAuth(mode, localauth, certauth, interactiveauth, scopes, entra, tenant):
THUMBPRINT = os.environ.get("THUMBPRINT")
TENANT_NAME = os.environ.get("TENANT_NAME")
CLIENT_ID = os.environ.get("CLIENT_ID")
+ PASSPHRASE = os.environ.get("PASSPHRASE")
if not all([KEY_FILE, THUMBPRINT, TENANT_NAME, CLIENT_ID]):
raise ValueError("One or more os.environ variables not set")
- return obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE)
+ return obtain_accesstoken_cert(
+ TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE, PASSPHRASE
+ )
if interactiveauth:
TENANT_NAME = os.environ.get("TENANT_NAME")
diff --git a/src/IntuneCD/intunecdlib/process_audit_data.py b/src/IntuneCD/intunecdlib/process_audit_data.py
index 25654a05..b7933687 100644
--- a/src/IntuneCD/intunecdlib/process_audit_data.py
+++ b/src/IntuneCD/intunecdlib/process_audit_data.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import subprocess
+import os
from .IntuneCDBase import IntuneCDBase
@@ -128,7 +129,30 @@ def _git_check_new_file(self, path, file):
return False
- def _git_commit_changes(self, audit_record, path, file):
+ def _git_check_deleted_file(self, path, file):
+ """Checks if the file is deleted.
+
+ Args:
+ path: The path to the git repo.
+ file: The file to check.
+ """
+
+ # check if it is a deleted file
+ cmd = ["git", "-C", path, "ls-files", "--deleted", f"{file}"]
+ self.log(
+ function="_git_check_deleted_file",
+ msg=f"Running command {cmd} to check if {file} is a deleted file.",
+ )
+ deleted_file_result = subprocess.run(
+ cmd, capture_output=True, text=True, check=False
+ )
+ # check if "deleted" is in the stdout
+ if deleted_file_result.stdout:
+ return True
+
+ return False
+
+ def _git_commit_changes(self, audit_record, path, file, deleted=False):
"""
Commits the changes to the git repo.
@@ -136,8 +160,17 @@ def _git_commit_changes(self, audit_record, path, file):
:param path: The path to the git repo.
:param file: The file to commit.
"""
+
+ file_path = os.path.relpath(os.path.abspath(file), start=os.path.abspath(path))
+ self.log(
+ function="_git_commit_changes",
+ msg=f"Relative path for {file} is {file_path}.",
+ )
# commit the changes
- cmd = ["git", "-C", path, "add", f"{file}"]
+ if deleted:
+ cmd = ["git", "-C", path, "rm", f"{file_path}"]
+ else:
+ cmd = ["git", "-C", path, "add", f"{file_path}"]
self.log(
function="_git_commit_changes",
msg=f"Running command {cmd} to add {file} to the git repo.",
@@ -194,7 +227,16 @@ def _get_payload_from_audit_data(self, audit_data, compare_data):
return records
- def process_audit_data(self, audit_data, compare_data, path, file):
+ def process_audit_data(
+ self,
+ audit_data,
+ compare_data,
+ path,
+ file,
+ get_record=True,
+ record=None,
+ source_file=None,
+ ):
"""
Processes the audit data from Intune.
@@ -213,7 +255,8 @@ def process_audit_data(self, audit_data, compare_data, path, file):
# Commit the changes
if git_repo:
- record = self._get_payload_from_audit_data(audit_data, compare_data)
+ if get_record:
+ record = self._get_payload_from_audit_data(audit_data, compare_data)
if not record:
self.log(
function="process_audit_data",
@@ -227,13 +270,25 @@ def process_audit_data(self, audit_data, compare_data, path, file):
self._configure_git(record, path)
# check if file has been modified
result = self._git_check_modified(path, file)
+ file_not_found = False
if not result:
file_not_found = self._git_check_new_file(path, file)
- if result or file_not_found:
+ if source_file:
+ path = os.path.dirname(source_file)
+ file_deleted = self._git_check_deleted_file(path, source_file)
+ else:
+ file_deleted = self._git_check_deleted_file(path, file)
+
+ if result or file_not_found or file_deleted:
# commit the changes
- self._git_commit_changes(record, path, file)
+ if file_deleted and source_file:
+ self._git_commit_changes(record, path, source_file, deleted=True)
+ elif file_deleted:
+ self._git_commit_changes(record, path, file, deleted=True)
+ else:
+ self._git_commit_changes(record, path, file)
else:
self.log(
function="process_audit_data",
diff --git a/src/IntuneCD/run_backup.py b/src/IntuneCD/run_backup.py
index 75c423e0..707420f7 100644
--- a/src/IntuneCD/run_backup.py
+++ b/src/IntuneCD/run_backup.py
@@ -10,7 +10,7 @@
from .backup_entra import backup_entra
from .backup_intune import backup_intune
-from .intunecdlib.archive import move_to_archive
+from .intunecdlib.archive import Archive
from .intunecdlib.get_accesstoken import obtain_azure_token
from .intunecdlib.get_authparams import getAuth
@@ -102,7 +102,7 @@ def get_parser(include_help=True):
"CustomAttributes",
"DeviceCategories",
"windowsDriverUpdates",
- "windowsFeatuteUpdates",
+ "windowsFeatureUpdates",
"windowsQualityUpdates",
"Roles",
"ScopeTags",
@@ -112,6 +112,7 @@ def get_parser(include_help=True):
"DeviceCompliancePolicies",
"ComplianceScripts",
"ReusablePolicySettings",
+ "SettingsCatalog",
"entraApplications",
"entraAuthenticationFlowsPolicy",
"entraAuthenticationMethods",
@@ -306,7 +307,15 @@ def run_backup(
]
if not args.skip_archive:
- move_to_archive(path, created_files, output)
+ archiver = Archive(
+ path=args.path,
+ filetype=args.output,
+ append_id=args.append_id,
+ audit=args.audit,
+ token=token,
+ )
+
+ archiver.move_to_archive(created_files)
return config_count
diff --git a/tests/test_archive.py b/tests/test_archive.py
index e695bf33..68b82920 100644
--- a/tests/test_archive.py
+++ b/tests/test_archive.py
@@ -11,7 +11,7 @@
from testfixtures import TempDirectory
-from src.IntuneCD.intunecdlib.archive import move_to_archive
+from src.IntuneCD.intunecdlib.archive import Archive
class TestMoveToArchive(unittest.TestCase):
@@ -54,28 +54,27 @@ def tearDown(self):
def test_folder_exists(self):
"""The archive folder should be created."""
- move_to_archive(self.directory.path, self.createdFile, "json")
+ Archive(path=self.directory.path, filetype="json").move_to_archive(
+ self.createdFile
+ )
- self.assertEqual(os.path.exists(self.archive_folder), True)
+ archive_path = os.path.join(self.directory.path, "__archive__")
+ self.assertTrue(os.path.exists(archive_path))
def test_file_moved(self):
"""The files should be moved to the archive folder."""
self.createdFile = []
- move_to_archive(self.directory.path, self.createdFile, "json")
-
- self.assertEqual(
- os.path.exists(self.archive_folder + "/" + self.datetime + "/test.json"),
- True,
- )
- self.assertEqual(
- os.path.exists(
- self.archive_folder + "/" + self.datetime + "/test_intent.json"
- ),
- True,
+ archive = Archive(path=self.directory.path, filetype="json")
+ archive.move_to_archive(self.createdFile)
+ archive_path = os.path.join(
+ self.directory.path, "__archive__", archive.date_tag
)
+ self.assertTrue(os.path.exists(os.path.join(archive_path, "test.json")))
+ self.assertTrue(os.path.exists(os.path.join(archive_path, "test_intent.json")))
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_documentation_functions.py b/tests/test_documentation_functions.py
index 6018c148..7301f6fd 100644
--- a/tests/test_documentation_functions.py
+++ b/tests/test_documentation_functions.py
@@ -72,7 +72,7 @@ def test_assignment_table(self):
}
]
}
- self.expected_table = "|intent| target |filter type|filter name||------|----------|-----------|-----------||apply |test-group|test |test-filter|"
+ self.expected_table = "|intent| target |filter type|filter name||------|----------|-----------|-----------|| |test-group|test |test-filter|"
self.result = assignment_table(self.table_data)
self.string = str(self.result)
@@ -122,10 +122,10 @@ def test_document_configs(self):
"""The list should be returned."""
self.directory.write(
"config/test_file_name.json",
- '{"@odata.type":"test","test":"test","name":"test","description":"test","testvals":"1,2","testbool":false,"testlist":["test"],"testlistdict":[{"test":{"test":{"test":["1"],"testb64":"dW5pY29ybg=="}}}],"testdict2":{"test":{"test":{"test":["1"]}}},"testdictlist":{"test":["a","b","c"]},"assignments":[{"intent":"apply","target":{"@odata.type":"#test","groupName":"test-group","deviceAndAppManagementAssignmentFilterId":"test-filter","deviceAndAppManagementAssignmentFilterType":"test"}}]}',
+ '{"@odata.type":"test","test":"test","name":"test","description":"test","testvals":"1,2","testbool":false,"testlist":["test"],"testlistdict":[{"test":{"test":{"test":["1"],"testb64":"dW5pY29ybg=="}}}],"testdict2":{"test":{"test":{"test":["1"]}}},"testdictlist":{"test":["a","b","c"]},"assignments":[{"intent":"Include","target":{"@odata.type":"#test","groupName":"test-group","deviceAndAppManagementAssignmentFilterId":"test-filter","deviceAndAppManagementAssignmentFilterType":"test"}}]}',
encoding="utf-8",
)
- self.expected_data = "##test###testDescription:test####Assignments|intent|target|filtertype|filtername||------|----------|-----------|-----------||apply|test-group|test|test-filter|####Configuration|setting|value||------------|-------------------------------------------------------------------------------------||Odatatype|test||Test|test||Name|test||Testvals|1,2||Testbool|False||Testlist|test
||Testlistdict|**test:**