diff --git a/setup.cfg b/setup.cfg index b5ea5d5c..41ea94ca 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = IntuneCD -version = 2.4.0 +version = 2.4.1 author = Tobias Almén author_email = almenscorner@outlook.com description = Tool to backup and update configurations in Intune diff --git a/src/IntuneCD/intunecdlib/BaseBackupModule.py b/src/IntuneCD/intunecdlib/BaseBackupModule.py index bf09fcf7..9c41588c 100644 --- a/src/IntuneCD/intunecdlib/BaseBackupModule.py +++ b/src/IntuneCD/intunecdlib/BaseBackupModule.py @@ -236,10 +236,10 @@ def _process_single_item( if self.prefix: if name_key == "": - return self.results + return {"config_count": 0, "outputs": []} match = self.check_prefix_match(data[f"{name_key}"], self.prefix) if not match: - return self.results + return {"config_count": 0, "outputs": []} if log_message: self.log(msg=log_message + data[f"{name_key}"]) diff --git a/src/IntuneCD/intunecdlib/archive.py b/src/IntuneCD/intunecdlib/archive.py index e99f6061..fc9d1794 100644 --- a/src/IntuneCD/intunecdlib/archive.py +++ b/src/IntuneCD/intunecdlib/archive.py @@ -7,66 +7,151 @@ import os import shutil -from datetime import datetime - -# Folders to exclude from archving -exclude = set( - [ - "Management Intents", - "archive", - "__archive__", - "Assignment Report", - "Autopilot Devices", - ] -) -# Date tag for archive folder -date_tag = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - - -def archive(path, file, root) -> None: - """Moves a file to the archive folder. - - Args: - path (_str_): path to current folder - file (_str_): file to archive - root (_str_): root of the file - """ - if not os.path.exists(f"{path}/__archive__/{date_tag}"): - os.makedirs(f"{path}/__archive__/{date_tag}") - shutil.move( - os.path.join(root, file), - os.path.join(path, f"__archive__/{date_tag}", file), - ) - - -def move_to_archive(path, created_files, output) -> None: - """Moves a file to the archive folder. - - Args: - path (_str_): path to current folder - created_files (_list_): list of created files during backup - output (_str_): format the file is in - """ - if not os.path.exists(f"{path}/__archive__"): - os.makedirs(f"{path}/__archive__/") - - for root, dirs, files in os.walk(path, topdown=True): - # Remove excluded folders from dirs - dirs[:] = [d for d in dirs if d not in exclude] - for file in files: - # if json is in root, skip it and move on - if file.endswith(".json") and root == path: - continue - if file.endswith(".yaml") or file.endswith(".json"): - # if file is not in created_files, archive it - if file.replace(f".{output}", "") not in created_files: - archive(path, file, root) - - # Check if Management Intents folder exists - if os.path.exists(f"{path}/Management Intents") is True: - for root, dirs, files in os.walk(f"{path}/Management Intents", topdown=True): + +from datetime import datetime, timedelta, timezone +from .BaseBackupModule import BaseBackupModule +from .process_audit_data import ProcessAuditData + + +class Archive(BaseBackupModule): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.exclude = { + "Management Intents", + "archive", + "__archive__", + "Assignment Report", + "Autopilot Devices", + "Activation Lock Bypass Codes", + } + self.date_tag = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + self.process_audit_data = ProcessAuditData() + self.audit_endpoint = ( + "https://graph.microsoft.com/beta/deviceManagement/auditEvents" + ) + if self.append_id and self.audit: + self.audit_data = self._get_audit_delete_events() + + def archive_file(self, file, root): + archive_path = os.path.join(self.path, "__archive__", self.date_tag) + if not os.path.exists(archive_path): + os.makedirs(archive_path) + + src = os.path.join(root, file) + dst = os.path.join(archive_path, file) + shutil.move(src, dst) + + if self.audit_data: + self._handle_audit_commit(file, dst, archive_path, src) + + def _get_audit_delete_events(self) -> list: + """Gets all delete events from the audit log from the last 24h + + Returns: + list: A list of all delete events + """ + if not os.getenv("AUDIT_DAYS_BACK"): + days_back = 1 + else: + days_back = int(os.getenv("AUDIT_DAYS_BACK")) + start_date = ( + datetime.now(timezone.utc) - timedelta(days=days_back) + ).isoformat() + end_date = datetime.now(timezone.utc).isoformat() + + q_params = { + "$filter": ( + f"activityOperationType eq 'Delete' and " + f"activityDateTime gt {start_date} and " + f"activityDateTime le {end_date}" + ), + "$select": "actor,activityDateTime,activityType,activityOperationType,activityResult,resources", + "$orderby": "activityDateTime desc", + } + + audit_data = self.make_graph_request( + self.audit_endpoint, params=q_params, method="GET" + ) + + return audit_data + + def _handle_audit_commit(self, filename, filepath, archive_path, source_file): + """Handles the audit commit for the file + + Args: + filename: The name of the file + filepath: The path to the file + archive_path: The path to the archive + """ + resource_id = filename.split("__")[-1].replace(".json", "").replace(".yaml", "") + + if self.audit_data: + audit_data_record = next( + ( + item + for item in self.audit_data.get("value", []) + if resource_id + in [res.get("resourceId") for res in item.get("resources", [])] + ), + None, + ) + + if audit_data_record: + if audit_data_record["actor"]["auditActorType"] == "ItPro": + actor = audit_data_record["actor"].get("userPrincipalName") + else: + actor = audit_data_record["actor"].get("applicationDisplayName") + + audit_data_record = { + "resourceId": audit_data_record["resources"][0]["resourceId"], + "auditResourceType": audit_data_record["resources"][0][ + "auditResourceType" + ], + "actor": actor, + "activityDateTime": audit_data_record["activityDateTime"], + "activityType": audit_data_record["activityType"], + "activityOperationType": audit_data_record["activityOperationType"], + "activityResult": audit_data_record["activityResult"], + } + + self.filename = os.path.splitext(os.path.basename(filepath))[0] + # process audit and check for deleted files + self.process_audit_data.process_audit_data( + audit_data_record, + None, + archive_path, + filepath, + get_record=False, + record=audit_data_record, + source_file=source_file, + ) + # process audit and check for archived files + self.process_audit_data.process_audit_data( + audit_data_record, + None, + archive_path, + filepath, + get_record=False, + record=audit_data_record, + ) + + def move_to_archive(self, created_files): + if not os.path.exists(os.path.join(self.path, "__archive__")): + os.makedirs(os.path.join(self.path, "__archive__")) + + for root, dirs, files in os.walk(self.path, topdown=True): + dirs[:] = [d for d in dirs if d not in self.exclude] for file in files: - if file.endswith(".yaml") or file.endswith(".json"): - # if file is not in created_files, archive it - if file.replace(f".{output}", "") not in created_files: - archive(path, file, root) + if file.endswith((".yaml", ".json")): + if root == self.path and file.endswith(".json"): + continue + if file.replace(f".{self.filetype}", "") not in created_files: + self.archive_file(file, root) + + mgmt_path = os.path.join(self.path, "Management Intents") + if os.path.exists(mgmt_path): + for root, dirs, files in os.walk(mgmt_path, topdown=True): + for file in files: + if file.endswith((".yaml", ".json")): + if file.replace(f".{self.filetype}", "") not in created_files: + self.archive_file(file, root) diff --git a/src/IntuneCD/intunecdlib/assignment_report.py b/src/IntuneCD/intunecdlib/assignment_report.py index 1f27ab84..1cb150b6 100644 --- a/src/IntuneCD/intunecdlib/assignment_report.py +++ b/src/IntuneCD/intunecdlib/assignment_report.py @@ -30,10 +30,23 @@ def _process_file(self, path, name, payload_type, groups): return for assignment in data["assignments"]: + intent = "" if not assignment["target"].get("groupName"): continue - intent_string = assignment.get("intent", "") + if "intent" in assignment and assignment["intent"] not in ["apply", ""]: + intent = assignment["intent"] + else: + if ( + assignment["target"]["@odata.type"] + == "#microsoft.graph.groupAssignmentTarget" + ): + intent = "Include" + if ( + assignment["target"]["@odata.type"] + == "#microsoft.graph.exclusionGroupAssignmentTarget" + ): + intent = "Exclude" config_type = "" if data.get("@odata.type"): config_type = f'{data["@odata.type"].split(".")[2]}' @@ -41,7 +54,7 @@ def _process_file(self, path, name, payload_type, groups): payload_data = { "name": payload_name, "type": config_type, - "intent": intent_string, + "intent": intent, } group_data = { diff --git a/src/IntuneCD/intunecdlib/documentation_functions.py b/src/IntuneCD/intunecdlib/documentation_functions.py index 2414fac8..f5fb6f1f 100644 --- a/src/IntuneCD/intunecdlib/documentation_functions.py +++ b/src/IntuneCD/intunecdlib/documentation_functions.py @@ -76,52 +76,51 @@ def write_assignment_table(data, headers): if "assignments" in data: assignments = data["assignments"] assignment_list = [] - target = "" - intent = "" for assignment in assignments: + headers = ["intent", "target", "filter type", "filter name"] + target = "" + intent = "" if ( assignment["target"]["@odata.type"] == "#microsoft.graph.allDevicesAssignmentTarget" ): target = "All Devices" + intent = "Include" if ( assignment["target"]["@odata.type"] == "#microsoft.graph.allLicensedUsersAssignmentTarget" ): target = "All Users" + intent = "Include" if "groupName" in assignment["target"]: target = assignment["target"]["groupName"] - if "intent" in assignment: + if "intent" in assignment and assignment["intent"] not in ["apply", ""]: intent = assignment["intent"] - headers = ["intent", "target", "filter type", "filter name"] else: - headers = ["target", "filter type", "filter name"] - if intent: - assignment_list.append( - [ - intent, - target, - assignment["target"][ - "deviceAndAppManagementAssignmentFilterType" - ], - assignment["target"][ - "deviceAndAppManagementAssignmentFilterId" - ], - ] - ) - else: - assignment_list.append( - [ - target, - assignment["target"][ - "deviceAndAppManagementAssignmentFilterType" - ], - assignment["target"][ - "deviceAndAppManagementAssignmentFilterId" - ], - ] - ) + if ( + assignment["target"]["@odata.type"] + == "#microsoft.graph.groupAssignmentTarget" + ): + intent = "Include" + if ( + assignment["target"]["@odata.type"] + == "#microsoft.graph.exclusionGroupAssignmentTarget" + ): + intent = "Exclude" + assignment_list.append( + [ + intent, + target, + assignment["target"][ + "deviceAndAppManagementAssignmentFilterType" + ], + assignment["target"][ + "deviceAndAppManagementAssignmentFilterId" + ], + ] + ) + assignment_list.sort(key=lambda x: x[0], reverse=True) # Sort by the 'Intent' column in reverse order table = write_assignment_table(assignment_list, headers) return table diff --git a/src/IntuneCD/intunecdlib/get_accesstoken.py b/src/IntuneCD/intunecdlib/get_accesstoken.py index eb038d4f..9ca4eebb 100644 --- a/src/IntuneCD/intunecdlib/get_accesstoken.py +++ b/src/IntuneCD/intunecdlib/get_accesstoken.py @@ -56,7 +56,7 @@ def obtain_accesstoken_app(TENANT_NAME, CLIENT_ID, CLIENT_SECRET): return token -def obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE): +def obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE, PASSPHRASE): """ This function is used to get an access token to MS Graph using a certificate. @@ -73,6 +73,7 @@ def obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE): client_credential={ "thumbprint": THUMBPRINT, "private_key": open(KEY_FILE, encoding="utf-8").read(), + "passphrase": PASSPHRASE, }, authority=AUTHORITY + TENANT_NAME, ) diff --git a/src/IntuneCD/intunecdlib/get_authparams.py b/src/IntuneCD/intunecdlib/get_authparams.py index 0132b74b..856bc6d2 100644 --- a/src/IntuneCD/intunecdlib/get_authparams.py +++ b/src/IntuneCD/intunecdlib/get_authparams.py @@ -30,10 +30,13 @@ def getAuth(mode, localauth, certauth, interactiveauth, scopes, entra, tenant): THUMBPRINT = os.environ.get("THUMBPRINT") TENANT_NAME = os.environ.get("TENANT_NAME") CLIENT_ID = os.environ.get("CLIENT_ID") + PASSPHRASE = os.environ.get("PASSPHRASE") if not all([KEY_FILE, THUMBPRINT, TENANT_NAME, CLIENT_ID]): raise ValueError("One or more os.environ variables not set") - return obtain_accesstoken_cert(TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE) + return obtain_accesstoken_cert( + TENANT_NAME, CLIENT_ID, THUMBPRINT, KEY_FILE, PASSPHRASE + ) if interactiveauth: TENANT_NAME = os.environ.get("TENANT_NAME") diff --git a/src/IntuneCD/intunecdlib/process_audit_data.py b/src/IntuneCD/intunecdlib/process_audit_data.py index 25654a05..b7933687 100644 --- a/src/IntuneCD/intunecdlib/process_audit_data.py +++ b/src/IntuneCD/intunecdlib/process_audit_data.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import subprocess +import os from .IntuneCDBase import IntuneCDBase @@ -128,7 +129,30 @@ def _git_check_new_file(self, path, file): return False - def _git_commit_changes(self, audit_record, path, file): + def _git_check_deleted_file(self, path, file): + """Checks if the file is deleted. + + Args: + path: The path to the git repo. + file: The file to check. + """ + + # check if it is a deleted file + cmd = ["git", "-C", path, "ls-files", "--deleted", f"{file}"] + self.log( + function="_git_check_deleted_file", + msg=f"Running command {cmd} to check if {file} is a deleted file.", + ) + deleted_file_result = subprocess.run( + cmd, capture_output=True, text=True, check=False + ) + # check if "deleted" is in the stdout + if deleted_file_result.stdout: + return True + + return False + + def _git_commit_changes(self, audit_record, path, file, deleted=False): """ Commits the changes to the git repo. @@ -136,8 +160,17 @@ def _git_commit_changes(self, audit_record, path, file): :param path: The path to the git repo. :param file: The file to commit. """ + + file_path = os.path.relpath(os.path.abspath(file), start=os.path.abspath(path)) + self.log( + function="_git_commit_changes", + msg=f"Relative path for {file} is {file_path}.", + ) # commit the changes - cmd = ["git", "-C", path, "add", f"{file}"] + if deleted: + cmd = ["git", "-C", path, "rm", f"{file_path}"] + else: + cmd = ["git", "-C", path, "add", f"{file_path}"] self.log( function="_git_commit_changes", msg=f"Running command {cmd} to add {file} to the git repo.", @@ -194,7 +227,16 @@ def _get_payload_from_audit_data(self, audit_data, compare_data): return records - def process_audit_data(self, audit_data, compare_data, path, file): + def process_audit_data( + self, + audit_data, + compare_data, + path, + file, + get_record=True, + record=None, + source_file=None, + ): """ Processes the audit data from Intune. @@ -213,7 +255,8 @@ def process_audit_data(self, audit_data, compare_data, path, file): # Commit the changes if git_repo: - record = self._get_payload_from_audit_data(audit_data, compare_data) + if get_record: + record = self._get_payload_from_audit_data(audit_data, compare_data) if not record: self.log( function="process_audit_data", @@ -227,13 +270,25 @@ def process_audit_data(self, audit_data, compare_data, path, file): self._configure_git(record, path) # check if file has been modified result = self._git_check_modified(path, file) + file_not_found = False if not result: file_not_found = self._git_check_new_file(path, file) - if result or file_not_found: + if source_file: + path = os.path.dirname(source_file) + file_deleted = self._git_check_deleted_file(path, source_file) + else: + file_deleted = self._git_check_deleted_file(path, file) + + if result or file_not_found or file_deleted: # commit the changes - self._git_commit_changes(record, path, file) + if file_deleted and source_file: + self._git_commit_changes(record, path, source_file, deleted=True) + elif file_deleted: + self._git_commit_changes(record, path, file, deleted=True) + else: + self._git_commit_changes(record, path, file) else: self.log( function="process_audit_data", diff --git a/src/IntuneCD/run_backup.py b/src/IntuneCD/run_backup.py index 75c423e0..707420f7 100644 --- a/src/IntuneCD/run_backup.py +++ b/src/IntuneCD/run_backup.py @@ -10,7 +10,7 @@ from .backup_entra import backup_entra from .backup_intune import backup_intune -from .intunecdlib.archive import move_to_archive +from .intunecdlib.archive import Archive from .intunecdlib.get_accesstoken import obtain_azure_token from .intunecdlib.get_authparams import getAuth @@ -102,7 +102,7 @@ def get_parser(include_help=True): "CustomAttributes", "DeviceCategories", "windowsDriverUpdates", - "windowsFeatuteUpdates", + "windowsFeatureUpdates", "windowsQualityUpdates", "Roles", "ScopeTags", @@ -112,6 +112,7 @@ def get_parser(include_help=True): "DeviceCompliancePolicies", "ComplianceScripts", "ReusablePolicySettings", + "SettingsCatalog", "entraApplications", "entraAuthenticationFlowsPolicy", "entraAuthenticationMethods", @@ -306,7 +307,15 @@ def run_backup( ] if not args.skip_archive: - move_to_archive(path, created_files, output) + archiver = Archive( + path=args.path, + filetype=args.output, + append_id=args.append_id, + audit=args.audit, + token=token, + ) + + archiver.move_to_archive(created_files) return config_count diff --git a/tests/test_archive.py b/tests/test_archive.py index e695bf33..68b82920 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -11,7 +11,7 @@ from testfixtures import TempDirectory -from src.IntuneCD.intunecdlib.archive import move_to_archive +from src.IntuneCD.intunecdlib.archive import Archive class TestMoveToArchive(unittest.TestCase): @@ -54,28 +54,27 @@ def tearDown(self): def test_folder_exists(self): """The archive folder should be created.""" - move_to_archive(self.directory.path, self.createdFile, "json") + Archive(path=self.directory.path, filetype="json").move_to_archive( + self.createdFile + ) - self.assertEqual(os.path.exists(self.archive_folder), True) + archive_path = os.path.join(self.directory.path, "__archive__") + self.assertTrue(os.path.exists(archive_path)) def test_file_moved(self): """The files should be moved to the archive folder.""" self.createdFile = [] - move_to_archive(self.directory.path, self.createdFile, "json") - - self.assertEqual( - os.path.exists(self.archive_folder + "/" + self.datetime + "/test.json"), - True, - ) - self.assertEqual( - os.path.exists( - self.archive_folder + "/" + self.datetime + "/test_intent.json" - ), - True, + archive = Archive(path=self.directory.path, filetype="json") + archive.move_to_archive(self.createdFile) + archive_path = os.path.join( + self.directory.path, "__archive__", archive.date_tag ) + self.assertTrue(os.path.exists(os.path.join(archive_path, "test.json"))) + self.assertTrue(os.path.exists(os.path.join(archive_path, "test_intent.json"))) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_documentation_functions.py b/tests/test_documentation_functions.py index 6018c148..7301f6fd 100644 --- a/tests/test_documentation_functions.py +++ b/tests/test_documentation_functions.py @@ -72,7 +72,7 @@ def test_assignment_table(self): } ] } - self.expected_table = "|intent| target |filter type|filter name||------|----------|-----------|-----------||apply |test-group|test |test-filter|" + self.expected_table = "|intent| target |filter type|filter name||------|----------|-----------|-----------|| |test-group|test |test-filter|" self.result = assignment_table(self.table_data) self.string = str(self.result) @@ -122,10 +122,10 @@ def test_document_configs(self): """The list should be returned.""" self.directory.write( "config/test_file_name.json", - '{"@odata.type":"test","test":"test","name":"test","description":"test","testvals":"1,2","testbool":false,"testlist":["test"],"testlistdict":[{"test":{"test":{"test":["1"],"testb64":"dW5pY29ybg=="}}}],"testdict2":{"test":{"test":{"test":["1"]}}},"testdictlist":{"test":["a","b","c"]},"assignments":[{"intent":"apply","target":{"@odata.type":"#test","groupName":"test-group","deviceAndAppManagementAssignmentFilterId":"test-filter","deviceAndAppManagementAssignmentFilterType":"test"}}]}', + '{"@odata.type":"test","test":"test","name":"test","description":"test","testvals":"1,2","testbool":false,"testlist":["test"],"testlistdict":[{"test":{"test":{"test":["1"],"testb64":"dW5pY29ybg=="}}}],"testdict2":{"test":{"test":{"test":["1"]}}},"testdictlist":{"test":["a","b","c"]},"assignments":[{"intent":"Include","target":{"@odata.type":"#test","groupName":"test-group","deviceAndAppManagementAssignmentFilterId":"test-filter","deviceAndAppManagementAssignmentFilterType":"test"}}]}', encoding="utf-8", ) - self.expected_data = "##test###testDescription:test####Assignments|intent|target|filtertype|filtername||------|----------|-----------|-----------||apply|test-group|test|test-filter|####Configuration|setting|value||------------|-------------------------------------------------------------------------------------||Odatatype|test||Test|test||Name|test||Testvals|1,2||Testbool|False||Testlist|test
||Testlistdict|**test:**
||Testdict2|**test:**||Testdictlist|**test:**|" + self.expected_data = "##test###testDescription:test####Assignments|intent|target|filtertype|filtername||-------|----------|-----------|-----------||Include|test-group|test|test-filter|####Configuration|setting|value||------------|-------------------------------------------------------------------------------------||Odatatype|test||Test|test||Name|test||Testvals|1,2||Testbool|False||Testlist|test
||Testlistdict|**test:**
||Testdict2|**test:**||Testdictlist|**test:**|" document_configs( f"{self.directory.path}/config", @@ -147,10 +147,10 @@ def test_document_management_intents(self): """The list should be returned.""" self.directory.write( "intent/test/test_file_name.json", - '{"test": "test", "name": "test", "description": "test", "settingsDelta": [{"test": "test", "definitionId": "deviceConfiguration--macOSEndpointProtectionConfiguration_fileVaultNumberOfTimesUserCanIgnore","valueJson": "1","value": 1}], "assignments": [{"intent": "apply", "target": {"@odata.type": "#test", "groupName": "test-group", "deviceAndAppManagementAssignmentFilterId": "test-filter", "deviceAndAppManagementAssignmentFilterType": "test"}}]}', + '{"test": "test", "name": "test", "description": "test", "settingsDelta": [{"test": "test", "definitionId": "deviceConfiguration--macOSEndpointProtectionConfiguration_fileVaultNumberOfTimesUserCanIgnore","valueJson": "1","value": 1}], "assignments": [{"intent": "Include", "target": {"@odata.type": "#test", "groupName": "test-group", "deviceAndAppManagementAssignmentFilterId": "test-filter", "deviceAndAppManagementAssignmentFilterType": "test"}}]}', encoding="utf-8", ) - self.expected_data = "##intent###testDescription:test####Assignments|intent|target|filtertype|filtername||------|----------|-----------|-----------||apply|test-group|test|test-filter|####Configuration|setting|value||------------------------------------------|-----||Test|test||Name|test||FileVaultNumberOfTimesUserCanIgnore|1|" + self.expected_data = "##intent###testDescription:test####Assignments|intent|target|filtertype|filtername||-------|----------|-----------|-----------||Include|test-group|test|test-filter|####Configuration|setting|value||------------------------------------------|-----||Test|test||Name|test||FileVaultNumberOfTimesUserCanIgnore|1|" document_management_intents( f"{self.directory.path}/intent/",