From a779759a6dc43db796d6e4b96c4cbca642c1cbbe Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Mon, 27 Oct 2025 05:31:46 -0400 Subject: [PATCH 1/3] Removed the server_type parameter from AccessTokenAuthorizer and now determine the server type programmatically using the base URL. --- README.md | 15 ++++++-- delinea/secrets/server.py | 73 ++++++++++++++++++++------------------- 2 files changed, 50 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 2fc090f..a64c152 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ If using traditional `username` and `password` authentication to log in to your ```python from delinea.secrets.server import PasswordGrantAuthorizer -authorizer = PasswordGrantAuthorizer("https://hostname/SecretServer", os.getenv("myusername"), os.getenv("password")") +authorizer = PasswordGrantAuthorizer("https://hostname/SecretServer", os.getenv("myusername"), os.getenv("password")) ``` ##### With Platform @@ -60,12 +60,21 @@ authorizer = DomainPasswordGrantAuthorizer("https://hostname/SecretServer", os.g #### Access Token Authorization -If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. +If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. The `AccessTokenAuthorizer` requires a `access_token` and `base_url`. + +##### With Secret Server +```python +from delinea.secrets.server import AccessTokenAuthorizer + +authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...", "https://hostname/SecretServer") +``` + +##### With Platform ```python from delinea.secrets.server import AccessTokenAuthorizer -authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...") +authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...", "https://platform.delinea.app") ``` ## Secret Server Cloud diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index 9695c9c..63f5b2d 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -179,6 +179,39 @@ def add_bearer_token_authorization_header(bearer_token, existing_headers={}): **existing_headers, } + def _perform_server_detection(self, base_url): + """Detects if the server is Secret Server or Platform by health check endpoints.""" + secret_server_endpoint = base_url.rstrip("/") + "/api/v1/healthcheck" + platform_endpoint = base_url.rstrip("/") + "/health" + + if self._validate_health_endpoint(secret_server_endpoint): + self._server_type = "secret_server" + return + if self._validate_health_endpoint(platform_endpoint): + self._server_type = "platform" + return + raise SecretServerError( + "Unable to detect server type via health check endpoints." + ) + + def _validate_health_endpoint(self, url): + """Validates if an endpoint returns healthy status.""" + try: + response = requests.get(url, timeout=60) + except Exception: + return False + + try: + response_body = response.content + except Exception: + return False + + try: + json_data = response.json() + return json_data.get("Healthy", False) + except Exception: + return b"Healthy" in response_body or b"healthy" in response_body + @abstractmethod def get_access_token(self): """Returns the access_token from a Grant Request""" @@ -198,9 +231,10 @@ class AccessTokenAuthorizer(Authorizer): def get_access_token(self): return self.access_token - def __init__(self, access_token, server_type="secret_server"): + def __init__(self, access_token, base_url): self.access_token = access_token - self._server_type = server_type.lower() + self.base_url = base_url.rstrip("/") + self._perform_server_detection(self.base_url) class PasswordGrantAuthorizer(Authorizer): @@ -211,37 +245,6 @@ class PasswordGrantAuthorizer(Authorizer): TOKEN_PATH_URI = "/oauth2/token" PLATFORM_TOKEN_PATH_URI = "/identity/api/oauth2/token/xpmplatform" - def _detect_server_type(self): - """Detects if the server is Secret Server or Platform by health check endpoints, using _check_json_response.""" - ss_health_url = self.base_url.rstrip("/") + "/api/v1/healthcheck" - platform_health_url = self.base_url.rstrip("/") + "/health" - if self._check_json_response(ss_health_url): - self._server_type = "secret_server" - return - if self._check_json_response(platform_health_url): - self._server_type = "platform" - return - raise SecretServerError( - "Unable to detect server type via health check endpoints." - ) - - def _check_json_response(self, url): - """Python equivalent of Go checkJSONResponse for health check detection.""" - try: - resp = requests.get(url, timeout=60) - except Exception: - return False - try: - body = resp.content - except Exception: - return False - try: - data = resp.json() - healthy = data.get("Healthy", False) - return healthy - except Exception: - return b"Healthy" in body or b"healthy" in body - @staticmethod def get_access_grant(token_url, grant_request): """Gets an *OAuth2 Access Grant* by calling the Secret Server REST API @@ -276,7 +279,7 @@ def _refresh(self, seconds_of_drift=300): else: # Detect server type if not already done if not hasattr(self, "_server_type"): - self._detect_server_type() + self._perform_server_detection(self.base_url) # Decide token_path_uri if not provided if not self.token_path_uri: if self._server_type == "secret_server": @@ -798,4 +801,4 @@ def __init__(self, tenant=None, authorizer=None, tld=DEFAULT_TLD, base_url=None) url = base_url.rstrip("/") else: raise ValueError("Must provide either tenant or base_url") - super().__init__(url, authorizer) + super().__init__(url, authorizer) \ No newline at end of file From b25f2c8ebf6843851bb3236f603e4e87cb4193a3 Mon Sep 17 00:00:00 2001 From: Sagar Wani Date: Mon, 27 Oct 2025 05:47:01 -0400 Subject: [PATCH 2/3] Updated unit test cases --- delinea/__init__.py | 2 +- tests/test_server.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/delinea/__init__.py b/delinea/__init__.py index 34d6518..e05db34 100644 --- a/delinea/__init__.py +++ b/delinea/__init__.py @@ -1,3 +1,3 @@ """The Delinea Secret Server Python SDK""" -__version__ = "2.0.0" +__version__ = "2.0.1" diff --git a/tests/test_server.py b/tests/test_server.py index 9b76a35..e4e254e 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -37,7 +37,7 @@ def test_api_url(secret_server, env_vars): def test_access_token_authorizer(env_vars, authorizer): assert SecretServer( f"https://{env_vars['tenant']}.secretservercloud.com/", - AccessTokenAuthorizer(authorizer.get_access_token()), + AccessTokenAuthorizer(authorizer.get_access_token(), f"https://{env_vars['tenant']}.secretservercloud.com/"), ).get_secret(env_vars["secret_id"])["id"] == int(env_vars["secret_id"]) @@ -104,7 +104,7 @@ def test_platform_api_url(platform_server, platform_env_vars): def test_platform_access_token_authorizer(platform_env_vars, platform_authorizer): assert SecretServer( platform_env_vars["platform_base_url"], - AccessTokenAuthorizer(platform_authorizer.get_access_token(), "platform"), + AccessTokenAuthorizer(platform_authorizer.get_access_token(), platform_env_vars["platform_base_url"]), ).get_secret(platform_env_vars["secret_id"])["id"] == int( platform_env_vars["secret_id"] ) From 69df533ef224cb957cc6a1c9834ae996e94559ea Mon Sep 17 00:00:00 2001 From: Lint Action Date: Mon, 27 Oct 2025 09:51:26 +0000 Subject: [PATCH 3/3] Fix code style issues with Black --- delinea/secrets/server.py | 8 ++++---- tests/test_server.py | 10 ++++++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index 63f5b2d..0f26d4a 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -183,7 +183,7 @@ def _perform_server_detection(self, base_url): """Detects if the server is Secret Server or Platform by health check endpoints.""" secret_server_endpoint = base_url.rstrip("/") + "/api/v1/healthcheck" platform_endpoint = base_url.rstrip("/") + "/health" - + if self._validate_health_endpoint(secret_server_endpoint): self._server_type = "secret_server" return @@ -200,12 +200,12 @@ def _validate_health_endpoint(self, url): response = requests.get(url, timeout=60) except Exception: return False - + try: response_body = response.content except Exception: return False - + try: json_data = response.json() return json_data.get("Healthy", False) @@ -801,4 +801,4 @@ def __init__(self, tenant=None, authorizer=None, tld=DEFAULT_TLD, base_url=None) url = base_url.rstrip("/") else: raise ValueError("Must provide either tenant or base_url") - super().__init__(url, authorizer) \ No newline at end of file + super().__init__(url, authorizer) diff --git a/tests/test_server.py b/tests/test_server.py index e4e254e..ed4cfac 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -37,7 +37,10 @@ def test_api_url(secret_server, env_vars): def test_access_token_authorizer(env_vars, authorizer): assert SecretServer( f"https://{env_vars['tenant']}.secretservercloud.com/", - AccessTokenAuthorizer(authorizer.get_access_token(), f"https://{env_vars['tenant']}.secretservercloud.com/"), + AccessTokenAuthorizer( + authorizer.get_access_token(), + f"https://{env_vars['tenant']}.secretservercloud.com/", + ), ).get_secret(env_vars["secret_id"])["id"] == int(env_vars["secret_id"]) @@ -104,7 +107,10 @@ def test_platform_api_url(platform_server, platform_env_vars): def test_platform_access_token_authorizer(platform_env_vars, platform_authorizer): assert SecretServer( platform_env_vars["platform_base_url"], - AccessTokenAuthorizer(platform_authorizer.get_access_token(), platform_env_vars["platform_base_url"]), + AccessTokenAuthorizer( + platform_authorizer.get_access_token(), + platform_env_vars["platform_base_url"], + ), ).get_secret(platform_env_vars["secret_id"])["id"] == int( platform_env_vars["secret_id"] )