diff --git a/README.md b/README.md index 2fc090f..a64c152 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ If using traditional `username` and `password` authentication to log in to your ```python from delinea.secrets.server import PasswordGrantAuthorizer -authorizer = PasswordGrantAuthorizer("https://hostname/SecretServer", os.getenv("myusername"), os.getenv("password")") +authorizer = PasswordGrantAuthorizer("https://hostname/SecretServer", os.getenv("myusername"), os.getenv("password")) ``` ##### With Platform @@ -60,12 +60,21 @@ authorizer = DomainPasswordGrantAuthorizer("https://hostname/SecretServer", os.g #### Access Token Authorization -If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. +If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. The `AccessTokenAuthorizer` requires a `access_token` and `base_url`. + +##### With Secret Server +```python +from delinea.secrets.server import AccessTokenAuthorizer + +authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...", "https://hostname/SecretServer") +``` + +##### With Platform ```python from delinea.secrets.server import AccessTokenAuthorizer -authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...") +authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...", "https://platform.delinea.app") ``` ## Secret Server Cloud diff --git a/delinea/__init__.py b/delinea/__init__.py index 34d6518..e05db34 100644 --- a/delinea/__init__.py +++ b/delinea/__init__.py @@ -1,3 +1,3 @@ """The Delinea Secret Server Python SDK""" -__version__ = "2.0.0" +__version__ = "2.0.1" diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index 9695c9c..0f26d4a 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -179,6 +179,39 @@ def add_bearer_token_authorization_header(bearer_token, existing_headers={}): **existing_headers, } + def _perform_server_detection(self, base_url): + """Detects if the server is Secret Server or Platform by health check endpoints.""" + secret_server_endpoint = base_url.rstrip("/") + "/api/v1/healthcheck" + platform_endpoint = base_url.rstrip("/") + "/health" + + if self._validate_health_endpoint(secret_server_endpoint): + self._server_type = "secret_server" + return + if self._validate_health_endpoint(platform_endpoint): + self._server_type = "platform" + return + raise SecretServerError( + "Unable to detect server type via health check endpoints." + ) + + def _validate_health_endpoint(self, url): + """Validates if an endpoint returns healthy status.""" + try: + response = requests.get(url, timeout=60) + except Exception: + return False + + try: + response_body = response.content + except Exception: + return False + + try: + json_data = response.json() + return json_data.get("Healthy", False) + except Exception: + return b"Healthy" in response_body or b"healthy" in response_body + @abstractmethod def get_access_token(self): """Returns the access_token from a Grant Request""" @@ -198,9 +231,10 @@ class AccessTokenAuthorizer(Authorizer): def get_access_token(self): return self.access_token - def __init__(self, access_token, server_type="secret_server"): + def __init__(self, access_token, base_url): self.access_token = access_token - self._server_type = server_type.lower() + self.base_url = base_url.rstrip("/") + self._perform_server_detection(self.base_url) class PasswordGrantAuthorizer(Authorizer): @@ -211,37 +245,6 @@ class PasswordGrantAuthorizer(Authorizer): TOKEN_PATH_URI = "/oauth2/token" PLATFORM_TOKEN_PATH_URI = "/identity/api/oauth2/token/xpmplatform" - def _detect_server_type(self): - """Detects if the server is Secret Server or Platform by health check endpoints, using _check_json_response.""" - ss_health_url = self.base_url.rstrip("/") + "/api/v1/healthcheck" - platform_health_url = self.base_url.rstrip("/") + "/health" - if self._check_json_response(ss_health_url): - self._server_type = "secret_server" - return - if self._check_json_response(platform_health_url): - self._server_type = "platform" - return - raise SecretServerError( - "Unable to detect server type via health check endpoints." - ) - - def _check_json_response(self, url): - """Python equivalent of Go checkJSONResponse for health check detection.""" - try: - resp = requests.get(url, timeout=60) - except Exception: - return False - try: - body = resp.content - except Exception: - return False - try: - data = resp.json() - healthy = data.get("Healthy", False) - return healthy - except Exception: - return b"Healthy" in body or b"healthy" in body - @staticmethod def get_access_grant(token_url, grant_request): """Gets an *OAuth2 Access Grant* by calling the Secret Server REST API @@ -276,7 +279,7 @@ def _refresh(self, seconds_of_drift=300): else: # Detect server type if not already done if not hasattr(self, "_server_type"): - self._detect_server_type() + self._perform_server_detection(self.base_url) # Decide token_path_uri if not provided if not self.token_path_uri: if self._server_type == "secret_server": diff --git a/tests/test_server.py b/tests/test_server.py index 9b76a35..ed4cfac 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -37,7 +37,10 @@ def test_api_url(secret_server, env_vars): def test_access_token_authorizer(env_vars, authorizer): assert SecretServer( f"https://{env_vars['tenant']}.secretservercloud.com/", - AccessTokenAuthorizer(authorizer.get_access_token()), + AccessTokenAuthorizer( + authorizer.get_access_token(), + f"https://{env_vars['tenant']}.secretservercloud.com/", + ), ).get_secret(env_vars["secret_id"])["id"] == int(env_vars["secret_id"]) @@ -104,7 +107,10 @@ def test_platform_api_url(platform_server, platform_env_vars): def test_platform_access_token_authorizer(platform_env_vars, platform_authorizer): assert SecretServer( platform_env_vars["platform_base_url"], - AccessTokenAuthorizer(platform_authorizer.get_access_token(), "platform"), + AccessTokenAuthorizer( + platform_authorizer.get_access_token(), + platform_env_vars["platform_base_url"], + ), ).get_secret(platform_env_vars["secret_id"])["id"] == int( platform_env_vars["secret_id"] )