Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ If using traditional `username` and `password` authentication to log in to your
```python
from delinea.secrets.server import PasswordGrantAuthorizer

authorizer = PasswordGrantAuthorizer("https://hostname/SecretServer", os.getenv("myusername"), os.getenv("password")")
authorizer = PasswordGrantAuthorizer("https://hostname/SecretServer", os.getenv("myusername"), os.getenv("password"))
```

##### With Platform
Expand All @@ -60,12 +60,21 @@ authorizer = DomainPasswordGrantAuthorizer("https://hostname/SecretServer", os.g

#### Access Token Authorization

If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`.
If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. The `AccessTokenAuthorizer` requires a `access_token` and `base_url`.

##### With Secret Server
```python
from delinea.secrets.server import AccessTokenAuthorizer

authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...", "https://hostname/SecretServer")
```

##### With Platform

```python
from delinea.secrets.server import AccessTokenAuthorizer

authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...")
authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...", "https://platform.delinea.app")
```

## Secret Server Cloud
Expand Down
2 changes: 1 addition & 1 deletion delinea/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""The Delinea Secret Server Python SDK"""

__version__ = "2.0.0"
__version__ = "2.0.1"
71 changes: 37 additions & 34 deletions delinea/secrets/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,39 @@ def add_bearer_token_authorization_header(bearer_token, existing_headers={}):
**existing_headers,
}

def _perform_server_detection(self, base_url):
"""Detects if the server is Secret Server or Platform by health check endpoints."""
secret_server_endpoint = base_url.rstrip("/") + "/api/v1/healthcheck"
platform_endpoint = base_url.rstrip("/") + "/health"

if self._validate_health_endpoint(secret_server_endpoint):
self._server_type = "secret_server"
return
if self._validate_health_endpoint(platform_endpoint):
self._server_type = "platform"
return
raise SecretServerError(
"Unable to detect server type via health check endpoints."
)

def _validate_health_endpoint(self, url):
"""Validates if an endpoint returns healthy status."""
try:
response = requests.get(url, timeout=60)
except Exception:
return False

try:
response_body = response.content
except Exception:
return False

try:
json_data = response.json()
return json_data.get("Healthy", False)
except Exception:
return b"Healthy" in response_body or b"healthy" in response_body

@abstractmethod
def get_access_token(self):
"""Returns the access_token from a Grant Request"""
Expand All @@ -198,9 +231,10 @@ class AccessTokenAuthorizer(Authorizer):
def get_access_token(self):
return self.access_token

def __init__(self, access_token, server_type="secret_server"):
def __init__(self, access_token, base_url):
self.access_token = access_token
self._server_type = server_type.lower()
self.base_url = base_url.rstrip("/")
self._perform_server_detection(self.base_url)


class PasswordGrantAuthorizer(Authorizer):
Expand All @@ -211,37 +245,6 @@ class PasswordGrantAuthorizer(Authorizer):
TOKEN_PATH_URI = "/oauth2/token"
PLATFORM_TOKEN_PATH_URI = "/identity/api/oauth2/token/xpmplatform"

def _detect_server_type(self):
"""Detects if the server is Secret Server or Platform by health check endpoints, using _check_json_response."""
ss_health_url = self.base_url.rstrip("/") + "/api/v1/healthcheck"
platform_health_url = self.base_url.rstrip("/") + "/health"
if self._check_json_response(ss_health_url):
self._server_type = "secret_server"
return
if self._check_json_response(platform_health_url):
self._server_type = "platform"
return
raise SecretServerError(
"Unable to detect server type via health check endpoints."
)

def _check_json_response(self, url):
"""Python equivalent of Go checkJSONResponse for health check detection."""
try:
resp = requests.get(url, timeout=60)
except Exception:
return False
try:
body = resp.content
except Exception:
return False
try:
data = resp.json()
healthy = data.get("Healthy", False)
return healthy
except Exception:
return b"Healthy" in body or b"healthy" in body

@staticmethod
def get_access_grant(token_url, grant_request):
"""Gets an *OAuth2 Access Grant* by calling the Secret Server REST API
Expand Down Expand Up @@ -276,7 +279,7 @@ def _refresh(self, seconds_of_drift=300):
else:
# Detect server type if not already done
if not hasattr(self, "_server_type"):
self._detect_server_type()
self._perform_server_detection(self.base_url)
# Decide token_path_uri if not provided
if not self.token_path_uri:
if self._server_type == "secret_server":
Expand Down
10 changes: 8 additions & 2 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ def test_api_url(secret_server, env_vars):
def test_access_token_authorizer(env_vars, authorizer):
assert SecretServer(
f"https://{env_vars['tenant']}.secretservercloud.com/",
AccessTokenAuthorizer(authorizer.get_access_token()),
AccessTokenAuthorizer(
authorizer.get_access_token(),
f"https://{env_vars['tenant']}.secretservercloud.com/",
),
).get_secret(env_vars["secret_id"])["id"] == int(env_vars["secret_id"])


Expand Down Expand Up @@ -104,7 +107,10 @@ def test_platform_api_url(platform_server, platform_env_vars):
def test_platform_access_token_authorizer(platform_env_vars, platform_authorizer):
assert SecretServer(
platform_env_vars["platform_base_url"],
AccessTokenAuthorizer(platform_authorizer.get_access_token(), "platform"),
AccessTokenAuthorizer(
platform_authorizer.get_access_token(),
platform_env_vars["platform_base_url"],
),
).get_secret(platform_env_vars["secret_id"])["id"] == int(
platform_env_vars["secret_id"]
)
Expand Down