From 7eb2d84915d7729a7d0b38069435ff9961308ac5 Mon Sep 17 00:00:00 2001 From: Patrick Date: Wed, 10 Jan 2024 13:59:06 +0000 Subject: [PATCH 1/2] fix upload_entities params --- pyapacheatlas/core/client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyapacheatlas/core/client.py b/pyapacheatlas/core/client.py index a22209e..1893173 100644 --- a/pyapacheatlas/core/client.py +++ b/pyapacheatlas/core/client.py @@ -1205,7 +1205,8 @@ def upload_entities(self, batch, batch_size=None): # TODO Include a Do Not Overwrite call results = None atlas_endpoint = self.endpoint_url + "/entity/bulk" - + parameters = {"businessAttributeUpdateBehavior": "replace"} + payload = AtlasClient._prepare_entity_upload(batch) results = [] @@ -1218,6 +1219,7 @@ def upload_entities(self, batch, batch_size=None): logging.debug(f"Batch upload #{batch_id} of size {batch_size}") postBulkEntities = self._post_http( atlas_endpoint, + params=parameters, json=batch ) temp_results = postBulkEntities.body @@ -1226,6 +1228,7 @@ def upload_entities(self, batch, batch_size=None): else: postBulkEntities = self._post_http( atlas_endpoint, + params=parameters, json=payload ) From b81270f7e1134cb2beb8edf05efd2514df876f12 Mon Sep 17 00:00:00 2001 From: Patrick Date: Tue, 16 Jan 2024 15:53:44 +0000 Subject: [PATCH 2/2] create PurviewClient.upload_entities() to allow purview specific parameters to be passed through to the url. --- pyapacheatlas/core/client.py | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/pyapacheatlas/core/client.py b/pyapacheatlas/core/client.py index 1893173..59a4ed5 100644 --- a/pyapacheatlas/core/client.py +++ b/pyapacheatlas/core/client.py @@ -1188,7 +1188,7 @@ def _prepare_entity_upload(batch): return payload - def upload_entities(self, batch, batch_size=None): + def upload_entities(self, batch, batch_size=None, parameters={}): """ Upload entities to your Atlas backed Data Catalog. @@ -1199,13 +1199,13 @@ def upload_entities(self, batch, batch_size=None): Union(dict, :class:`~pyapacheatlas.core.entity.AtlasEntity`, list(dict), list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) ) :param int batch_size: The number of entities you want to send in bulk + :param dict parameters: The parameters to pass into the url. :return: The results of your bulk entity upload. :rtype: dict """ # TODO Include a Do Not Overwrite call results = None atlas_endpoint = self.endpoint_url + "/entity/bulk" - parameters = {"businessAttributeUpdateBehavior": "replace"} payload = AtlasClient._prepare_entity_upload(batch) @@ -1654,6 +1654,33 @@ def import_terms(self, csv_path, glossary_name="Glossary", glossary_guid=None): csv_path, glossary_name, glossary_guid) return results + def upload_entities(self, batch, batch_size=None, businessAttributeUpdateBehavior=None, collectionId=None): + """ + Upload entities to your Atlas backed Data Catalog. + + :param batch: + The batch of entities you want to upload. Supports a single dict, + AtlasEntity, list of dicts, list of atlas entities. + :type batch: + Union(dict, :class:`~pyapacheatlas.core.entity.AtlasEntity`, + list(dict), list(:class:`~pyapacheatlas.core.entity.AtlasEntity`) ) + :param int batch_size: The number of entities you want to send in bulk + :param str businessAttributeUpdateBehavior: Used to define the update behavior for business attributes when updating entities. https://learn.microsoft.com/en-us/rest/api/purview/datamapdataplane/entity/bulk-create-or-update?view=rest-purview-datamapdataplane-2023-09-01&tabs=HTTP#businessattributeupdatebehavior + :param str collectionId: The collection where entities will be moved to. + Typically a 6-letter pseudo-random string such as "xcgw8s" which can be obtained + e.g. by visual inspection in the purview web UI (https://web.purview.azure.com/). + Only specify a value if you need to move an entity to another collection. + :return: The results of your bulk entity upload. + :rtype: dict + """ + parameters={} + if businessAttributeUpdateBehavior is not None: + parameters.update({"businessAttributeUpdateBehavior": businessAttributeUpdateBehavior}) + if collectionId is not None: + parameters.update({"collectionId": collectionId}) + results = super().upload_entities(batch, batch_size, parameters) + return results + @PurviewOnly def import_terms_status(self, operation_guid): """