Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ MANIFEST
# Environments
.venv
venv/
.idea/
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ Available options:
- `token_duration`: Validity period (in seconds) for retieved authorization tokens.
- `aws_access_key_id`: Use a specific AWS access key to authenticate with AWS.
- `aws_secret_access_key`: Use a specific AWS secret access key to authenticate with AWS.
- `assume_role`: Role ARN to assume with the current profile name to get the CodeArtifact credentials.
Copy link
Copy Markdown
Owner

@jmkeyes jmkeyes May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to rename this to assume_role_arn to be more explicit about the content it contains.

- `assume_role_session_name`: Name to attache to attach for the role session. If not specified, a name will be
selected by AWS SDK.

For more explanation of these options see the [AWS CLI documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html).

Expand All @@ -56,6 +59,9 @@ profile_name=default
aws_access_key_id=xxxxxxxxx
aws_secret_access_key=xxxxxxxxx

# Assume the following role to obtain the credentials
assume_role=arn:aws:iam::xxxxxxxxx:role/xxxxxxxxx

```

### Multiple Section Configuration (EXPERIMENTAL)
Expand Down
35 changes: 28 additions & 7 deletions keyrings/codeartifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,7 @@ def get_password(self, service, username):
name=repository_name,
)

# Create session with any supplied configuration.
session = boto3.Session(
region_name=region,
profile_name=config.get("profile_name"),
aws_access_key_id=config.get("aws_access_key_id"),
aws_secret_access_key=config.get("aws_secret_access_key"),
)
session = self.get_boto_session(region=region, config=config)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of 537fe6f, I've created a _get_codeartifact_client instance method that returns an initialized CodeArtifact service client with specific options.


# Create a CodeArtifact client for this repository's region.
client = session.client("codeartifact", region_name=region)
Expand Down Expand Up @@ -193,3 +187,30 @@ def set_password(self, service, username, password):
def delete_password(self, service, username):
# Defer deleting a password to the next backend
raise NotImplementedError()

@staticmethod
def get_boto_session(*, region, config):
should_assume_role = config.get("assume_role")

# Create session with any supplied configuration.
session = boto3.Session(
region_name=region,
profile_name=config.get("profile_name"),
aws_access_key_id=config.get("aws_access_key_id"),
aws_secret_access_key=config.get("aws_secret_access_key"),
)

if should_assume_role is not None:
assumed_role = session.client("sts").assume_role(
RoleArn=config["assume_role"],
RoleSessionName=config.get(
"assume_role_session_name", "KeyRingsCodeArtifact"
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the AWS SDK will not pick a RoleSessionName for you; you can see the docs that they say they are required. So I have include an explicit default as a fallback instead.

Copy link
Copy Markdown
Owner

@jmkeyes jmkeyes May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in a comment, I'd like this fallback to be a dynamic value usable for auditing in CloudTrail, but that's not required for this PR.

),
)
return boto3.Session(
aws_access_key_id=assumed_role["Credentials"]["AccessKeyId"],
aws_secret_access_key=assumed_role["Credentials"]["SecretAccessKey"],
aws_session_token=assumed_role["Credentials"]["SessionToken"],
)

return session
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pytest >= 6
pytest-cov
pytest-mock
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be necessary with the recent unit test modifications.

114 changes: 89 additions & 25 deletions tests/test_backend.py
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recent changes to the unit tests in 537fe6f may help clean up these tests.

Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,21 @@
from urllib.parse import urlunparse
from botocore.client import BaseClient
from datetime import datetime, timedelta
from keyrings.codeartifact import CodeArtifactBackend
from keyrings.codeartifact import CodeArtifactBackend, CodeArtifactKeyringConfig


@pytest.fixture
def backend():
def mocked_keyring_config(mocker):
mock_config_instance = mocker.create_autospec(
CodeArtifactKeyringConfig, spec_set=True
)
mock_config = mocker.patch("keyrings.codeartifact.CodeArtifactKeyringConfig")
mock_config.return_value = mock_config_instance
return mock_config_instance


@pytest.fixture
def backend(mocked_keyring_config):
# Find the system-wide keyring.
original = keyring.get_keyring()

Expand All @@ -33,6 +43,55 @@ def codeartifact_pypi_url(domain, owner, region, name):
return codeartifact_url(domain, owner, region, f"/pypi/{name}/")


def make_check_codeartifact_api_call(*, config, domain, domain_owner):
assumed_role = False
assume_role = config.get("assume_role")
assume_session_name = config.get("assume_session_name")
should_assume_role = assume_role is not None

def _make_api_call(client, *args, **kwargs):
nonlocal assumed_role
if should_assume_role and not assumed_role:
# We should only ever call GetAuthorizationToken
assert args[0] == "AssumeRole"

# We should only ever supply these parameters.
assert args[1]["RoleArn"] == assume_role
if assume_session_name is not None:
assert args[1]["RoleSessionName"] == assume_session_name
assumed_role = True
return {
"Credentials": {
"AccessKeyId": "",
"SecretAccessKey": "",
"SessionToken": "",
}
}
else:
assert assumed_role == should_assume_role

# We should only ever call GetAuthorizationToken
assert args[0] == "GetAuthorizationToken"

# We should only ever supply these parameters.
assert args[1]["domain"] == domain
assert args[1]["domainOwner"] == domain_owner
assert args[1]["durationSeconds"] == 3600

tzinfo = datetime.now().astimezone().tzinfo
current_time = datetime.now(tz=tzinfo)

# Compute the expiration based on the current timestamp.
expiration = timedelta(seconds=args[1]["durationSeconds"])

return {
"authorizationToken": "TOKEN",
"expiration": current_time + expiration,
}

return _make_api_call


def test_set_password_raises(backend):
with pytest.raises(NotImplementedError):
keyring.set_password("service", "username", "password")
Expand Down Expand Up @@ -67,29 +126,34 @@ def test_get_credential_invalid_path(backend, service):
assert not keyring.get_credential(service, None)


def test_get_credential_supported_host(backend, monkeypatch):
def _make_api_call(client, *args, **kwargs):
# We should only ever call GetAuthorizationToken
assert args[0] == "GetAuthorizationToken"

# We should only ever supply these parameters.
assert args[1]["domain"] == "domain"
assert args[1]["domainOwner"] == "000000000000"
assert args[1]["durationSeconds"] == 3600

tzinfo = datetime.now().astimezone().tzinfo
current_time = datetime.now(tz=tzinfo)

# Compute the expiration based on the current timestamp.
expiration = timedelta(seconds=args[1]["durationSeconds"])

return {
"authorizationToken": "TOKEN",
"expiration": current_time + expiration,
}

monkeypatch.setattr(BaseClient, "_make_api_call", _make_api_call)
url = codeartifact_pypi_url("domain", "000000000000", "region", "name")
@pytest.mark.parametrize(
["config"],
[
({},),
(
{
"assume_role": "arn:aws:iam::000000000000:role/some-role",
"assume_role_session_name": "SomeSessionName",
},
),
],
)
def test_get_credential_supported_host(
backend, config, mocked_keyring_config, monkeypatch
):
domain = "domain"
domain_owner = "000000000000"

monkeypatch.setattr(
BaseClient,
"_make_api_call",
make_check_codeartifact_api_call(
config=config, domain=domain, domain_owner=domain_owner
),
)
mocked_keyring_config.lookup.return_value = config

url = codeartifact_pypi_url(domain, domain_owner, "region", "name")
credentials = backend.get_credential(url, None)

assert credentials.username == "aws"
Expand Down
25 changes: 8 additions & 17 deletions tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
# test_config.py -- config parsing tests

import pytest

from io import StringIO
from os.path import dirname, join
from pathlib import Path

from keyrings.codeartifact import CodeArtifactKeyringConfig


@pytest.fixture
def config_file():
working_directory = dirname(__file__)

def _config_file(path):
return join(working_directory, "config", path)

return _config_file
CONFIG_DIR = Path(__file__).parent / "config"


@pytest.mark.parametrize(
Expand All @@ -26,8 +17,8 @@ def _config_file(path):
("domain", "00000000", "ca-central-1", "repository"),
],
)
def test_parse_single_section_only(config_file, parameters):
config = CodeArtifactKeyringConfig(config_file("single_section.cfg"))
def test_parse_single_section_only(parameters):
config = CodeArtifactKeyringConfig(CONFIG_DIR / "single_section.cfg")

# A single section has only one configuration.
values = config.lookup(*parameters)
Expand Down Expand Up @@ -89,8 +80,8 @@ def test_bogus_config_returns_empty_configuration(config_data):
),
],
)
def test_multiple_sections_with_defaults(config_file, query, expected):
path = config_file("multiple_sections_with_default.cfg")
def test_multiple_sections_with_defaults(query, expected):
path = CONFIG_DIR / "multiple_sections_with_default.cfg"
config = CodeArtifactKeyringConfig(path)
values = config.lookup(**query)

Expand All @@ -115,8 +106,8 @@ def test_multiple_sections_with_defaults(config_file, query, expected):
),
],
)
def test_multiple_sections_no_defaults(config_file, query, expected):
path = config_file("multiple_sections_no_default.cfg")
def test_multiple_sections_no_defaults(query, expected):
path = CONFIG_DIR / "multiple_sections_no_default.cfg"
config = CodeArtifactKeyringConfig(path)
values = config.lookup(**query)

Expand Down