diff --git a/README.md b/README.md index d35b6f2..929191c 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,24 @@ This repository contains a Python module for the Charge Amps' electric vehicle c The module is developed by [Kirei AB](https://www.kirei.se) and is not supported by [Charge Amps AB](https://chargeamps.com). +## How to use + +The simplest way is to install via `pip`, as in: + +``` +pip install chargeamps +``` + +If you need access to the organisation API calls, you need to specify this feature upon installation: + +``` +pip install chargeamps["organisations"] +``` ## External API Key You need an API key to use the Charge Amps external API. Contact [Charge Amps Support](mailto:support@chargeamps.com) for extensive API documentation and API key. - ## References - [Charge Amps External REST API](https://eapi.charge.space/swagger/) diff --git a/chargeamps/external.py b/chargeamps/external.py index a899312..01f47f5 100644 --- a/chargeamps/external.py +++ b/chargeamps/external.py @@ -17,6 +17,8 @@ ChargePointStatus, ChargingSession, StartAuth, + User, + Partner, ) API_BASE_URL = "https://eapi.charge.space" @@ -47,6 +49,7 @@ def __init__( self._refresh_token = None self._token_skew = 30 self._token_lock = asyncio.Lock() + self._user: User | None = None async def shutdown(self) -> None: if self._owns_client: @@ -80,6 +83,7 @@ async def _exclusive_ensure_token(self) -> None: except (httpx.HTTPStatusError, httpx.RequestError): self._logger.warning("Token refresh failed") self._token = None + self._user = None self._refresh_token = None else: self._token = None @@ -97,6 +101,7 @@ async def _exclusive_ensure_token(self) -> None: except (httpx.HTTPStatusError, httpx.RequestError) as exc: self._logger.error("Login failed") self._token = None + self._user = None self._refresh_token = None self._token_expire = 0 raise exc @@ -107,6 +112,9 @@ async def _exclusive_ensure_token(self) -> None: response_payload = response.json() self._token = response_payload["token"] + user_payload = response_payload.get("user") + if user_payload is not None: + self._user = User.model_validate(user_payload) self._refresh_token = response_payload.get("refreshToken", self._refresh_token) token_payload = jwt.decode(self._token, options={"verify_signature": False}) self._token_expire = int(token_payload.get("exp", 0)) @@ -229,6 +237,13 @@ async def set_chargepoint_connector_settings( ) await self._put(request_uri, json=payload) + async def get_partner(self, charge_point_id: str) -> Partner: + """Get partner details""" + request_uri = f"/api/{API_VERSION}/chargepoints/{charge_point_id}/partner" + response = await self._get(request_uri) + payload = response.json() + return Partner.model_validate(payload) + async def remote_start( self, charge_point_id: str, connector_id: int, start_auth: StartAuth ) -> None: @@ -246,3 +261,9 @@ async def reboot(self, charge_point_id: str) -> None: """Reboot chargepoint""" request_uri = f"/api/{API_VERSION}/chargepoints/{charge_point_id}/reboot" await self._put(request_uri, json="{}") + + async def get_logged_in_user(self) -> User: + """Get authenticated user info""" + if self._user is None: + raise ValueError("No user is currently logged in") + return self._user diff --git a/chargeamps/models.py b/chargeamps/models.py index 2b4624d..15c0286 100644 --- a/chargeamps/models.py +++ b/chargeamps/models.py @@ -12,6 +12,16 @@ ] +def feature_required(feature_flag): + def decorator(cls): + if feature_flag == "organisations": + return cls + else: + raise ImportError(f"Feature '{feature_flag}' is not enabled") + + return decorator + + class FrozenBaseSchema(BaseModel): model_config = ConfigDict( alias_generator=to_camel, @@ -91,3 +101,61 @@ class StartAuth(FrozenBaseSchema): rfid_format: str rfid: str external_transaction_id: str + + +class RfidTag(FrozenBaseSchema): + active: bool + rfid: str | None + rfidDec: str | None + rfidDecReverse: str | None + + +class User(FrozenBaseSchema): + id: str + first_name: str | None + last_name: str | None + email: str | None + mobile: str | None + rfid_tags: list[RfidTag] | None + user_status: str + + +class Partner(FrozenBaseSchema): + id: int + name: str + description: str + email: str + phone: str + + +# Only way to register new RFID seems to be through an Organization +@feature_required("organisations") +class Rfid(FrozenBaseSchema): + rfid: str | None + rfidDec: str | None + rfidDecReverse: str | None + + +@feature_required("organisations") +class Organisation(FrozenBaseSchema): + id: str + name: str + description: str + + +@feature_required("organisations") +class OrganisationChargingSession(FrozenBaseSchema): + id: int + charge_point_id: str | None + connector_id: int + user_id: str + rfid: str | None + rfidDec: str | None + rfidDecReverse: str | None + organisation_id: str | None + session_type: str + start_time: CustomDateTime | None = None + end_time: CustomDateTime | None = None + external_transaction_id: str | None + total_consumption_kwh: float + external_id: str | None diff --git a/chargeamps/organisation.py b/chargeamps/organisation.py new file mode 100644 index 0000000..c344377 --- /dev/null +++ b/chargeamps/organisation.py @@ -0,0 +1,270 @@ +from datetime import datetime +import textwrap + +from chargeamps.external import ChargeAmpsExternalClient + +from .models import ( + ChargePoint, + ChargePointStatus, + Rfid, + RfidTag, + Partner, + Organisation, + User, + OrganisationChargingSession, + feature_required, +) + +API_BASE_URL = "https://eapi.charge.space" +API_VERSION = "v5" + + +@feature_required("organisations") +class OrganisationClient(ChargeAmpsExternalClient): + def __init__( + self, + email: str, + password: str, + api_key: str, + api_base_url: str | None = None, + ): + super().__init__(email, password, api_key, api_base_url) + + async def get_organisations(self) -> list[Organisation]: + """Get all associated organisation's details""" + request_uri = f"/api/{API_VERSION}/organisations" + response = await self._get(request_uri) + payload = response.json() + + return [Organisation.model_validate(org) for org in payload] + + async def get_organisation(self, org_id: str) -> Organisation: + """Get organisation details""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}" + response = await self._get(request_uri) + payload = response.json() + return Organisation.model_validate(payload) + + async def get_organisation_chargepoints(self, org_id: str) -> list[ChargePoint]: + """Get all charge points for organisation""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargepoints" + response = await self._get(request_uri) + payload = response.json() + + return [ChargePoint.model_validate(cp) for cp in payload] + + async def get_organisation_chargepoint_statuses(self, org_id: str) -> list[ChargePointStatus]: + """Get all charge points' status""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargepoints/statuses" + response = await self._get(request_uri) + payload = response.json() + + return [ChargePointStatus.model_validate(cp) for cp in payload] + + def is_valid_hex(self, rfid: str) -> bool: + r = rfid.upper() + return len(r) % 2 == 0 and all(ch in "0123456789ABCDEF" for ch in r) + + def verify_rfid_length(self, rfid: str, length: int | None = None) -> int: + length_in_bytes = len(rfid) // 2 + + if length and length != length_in_bytes: + raise ValueError( + textwrap.dedent(f""" + The provided RFID does not match the provided length: + RFID {rfid}, expected length: {length}, calculated length: {length_in_bytes} + """) + ) + + if length_in_bytes in {4, 7, 10}: + return length_in_bytes + else: + raise ValueError("RFID length invalid, should be either 4, 7 or 10 bytes") + + def verify_rfid( + self, + rfid: str, + rfid_format: str | None, + rfid_length: int | None, + rfid_dec_format_length: int | None, + ) -> dict[str, str]: + result = {} + if self.is_valid_hex(rfid): + result["rfid"] = rfid + else: + raise ValueError(f"The provided RFID value is not a valid hex value: rfid: {rfid}") + + rfid_actual_length = self.verify_rfid_length(rfid, rfid_length) + if rfid_format != "Hex": + result["rfidFormat"] = rfid_format + result["rfidLength"] = rfid_actual_length + + if rfid_format == "Hex": + if rfid_actual_length != 7: + raise ValueError( + "RFID length must be 7 bytes if the (default) format type 'Hex' is set." + ) + elif rfid_format == "Dec" or rfid_format == "ReverseDec": + if rfid_dec_format_length: + result["rfidDecimalFormatLength"] = rfid_dec_format_length + else: + raise ValueError("Invalid RFID format") + + return result + + async def get_organisation_charging_sessions( + self, + org_id: str, + start_time: datetime | None = None, + end_time: datetime | None = None, + rfid: str | None = None, + rfid_format: str = "Hex", # Possible values: "Hex", "Dec" and "ReverseDec" + rfid_length: int | None = None, + rfid_dec_format_length: int | None = None, + ) -> list[OrganisationChargingSession]: + """Get organisation's charging sessions""" + query_params = {} + if start_time: + query_params["startTime"] = start_time.isoformat() + if end_time: + query_params["endTime"] = end_time.isoformat() + + if rfid: + query_params.update( + self.verify_rfid(rfid, rfid_format, rfid_length, rfid_dec_format_length) + ) + + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargingsessions" + response = await self._get(request_uri, params=query_params) + payload = response.json() + + return [OrganisationChargingSession.model_validate(cp) for cp in payload] + + async def get_organisation_partner(self, org_id: str) -> Partner: + """Get partner details""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/partner" + response = await self._get(request_uri) + payload = response.json() + return Partner.model_validate(payload) + + async def get_organisation_rfids(self, org_id: str) -> list[RfidTag]: + """Get organisation's registered rfid tags""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids" + response = await self._get(request_uri) + payload = response.json() + + return [RfidTag.model_validate(cp) for cp in payload] + + async def add_organisation_rfid( + self, org_id: str, rfid: Rfid, rfid_dec_format_length: int | None = None + ) -> RfidTag: + """Add a new RFID tag to the organisation""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids" + payload = rfid.model_dump(by_alias=True) + if rfid_dec_format_length: + payload["rfidDecimalFormatLength"] = rfid_dec_format_length + + response = await self._put(request_uri, json=payload) + payload = response.json() + return RfidTag.model_validate(payload) + + async def get_organisation_rfid( + self, + org_id: str, + rfid: str, + rfid_format: str = "Hex", # Possible values: "Hex", "Dec" and "ReverseDec" + rfid_length: int | None = None, + rfid_dec_format_length: int | None = None, + ) -> RfidTag: + """Get information about a specific RFID tag""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids/{rfid}" + query_params = {"organisationId": org_id} + query_params.update( + self.verify_rfid(rfid, rfid_format, rfid_length, rfid_dec_format_length) + ) + + response = await self._get(request_uri, params=query_params) + payload = response.json() + return RfidTag.model_validate(payload) + + async def revoke_organisation_rfid(self, org_id: str, rfid: Rfid) -> None: + """Revoke an RFID tag""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids/revoke" + rfid_id = rfid.rfid + payload = {"rfid": rfid_id} + await self._put(request_uri, json=payload) + + async def get_organisation_users( + self, org_id: str, rfid: bool = False, rfid_dec_format_length: int | None = None + ) -> list[User]: + """Get organisation's registered users""" + query_params = {} + if rfid_dec_format_length: + query_params["rfidDecimalFormatLength"] = rfid_dec_format_length + if rfid: + query_params["expand"] = "rfid" + + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users" + response = await self._get(request_uri, params=query_params) + payload = response.json() + + res = [] + for rfid in payload: + res.append(User.model_validate(rfid)) + return res + + async def add_organisation_user( + self, + org_id: str, + first_name: str | None = None, + last_name: str | None = None, + email: str | None = None, + mobile: str | None = None, + rfid: list[Rfid] | None = None, + password: str | None = None, + ) -> User: + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users" + payload = {} + + if first_name: + payload["firstName"] = first_name + if last_name: + payload["lastName"] = last_name + if email: + payload["email"] = email + if mobile: + payload["mobile"] = mobile + if rfid: + payload["rfidTags"] = [tag.model_dump(by_alias=True) for tag in rfid] + if password: + if len(password) >= 8: + payload["password"] = password + else: + raise ValueError( + "The provided password is too short, must be at least 8 characters" + ) + + response = await self._post(request_uri, json=payload) + new_user = response.json() + + return User.model_validate(new_user) + + async def get_organisation_user( + self, + org_id: str, + user_id: str, + rfid: bool = False, + rfid_dec_format_length: int | None = None, + ) -> User: + """Get organisation user""" + query_params = {} + if rfid_dec_format_length: + query_params["rfidDecimalFormatLength"] = rfid_dec_format_length + if rfid: + query_params["expand"] = "rfid" + + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users/{user_id}" + response = await self._get(request_uri, params=query_params) + payload = response.json() + + return User.model_validate(payload) diff --git a/examples/org_test.py b/examples/org_test.py new file mode 100644 index 0000000..af9a72d --- /dev/null +++ b/examples/org_test.py @@ -0,0 +1,34 @@ +import asyncio +import json + +from chargeamps.organisation import OrganisationClient + + +async def test(): + with open("config.json") as input_file: + c = json.load(input_file) + client = OrganisationClient(email=c["username"], password=c["password"], api_key=c["api_key"]) + + org = await client.get_organisations() + org_id = org[0].id + + chargepoints = await client.get_organisation_chargepoints(org_id) + print(chargepoints) + + status = await client.get_chargepoint_status(chargepoints[0].id) + print("Status:", status) + + for c in status.connector_statuses: + settings = await client.get_chargepoint_connector_settings( + c.charge_point_id, c.connector_id + ) + print("Before:", settings) + settings.max_current = 11 + await client.set_chargepoint_connector_settings(settings) + print("After:", settings) + + await client.shutdown() + + +if __name__ == "__main__": + asyncio.run(test()) diff --git a/examples/test.py b/examples/test.py index e3a9f7a..0a5247a 100644 --- a/examples/test.py +++ b/examples/test.py @@ -5,7 +5,7 @@ async def test(): - with open("credentials.json") as input_file: + with open("config.json") as input_file: c = json.load(input_file) client = ChargeAmpsExternalClient( email=c["username"], password=c["password"], api_key=c["api_key"] diff --git a/pyproject.toml b/pyproject.toml index 06e7cca..b475c80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,9 @@ chargeamps = "chargeamps.cli:main" repository = "https://github.com/kirei/python-chargeamps.git" issues = "https://github.com/kirei/python-chargeamps/issues" +[project.optional-dependencies] +organisations = [] + [dependency-groups] dev = [ "pytest>=8.4.2", diff --git a/uv.lock b/uv.lock index b3fadb2..5c7e5b1 100644 --- a/uv.lock +++ b/uv.lock @@ -175,6 +175,7 @@ requires-dist = [ { name = "pydantic", specifier = ">=2.11.7" }, { name = "pyjwt", specifier = ">=2.10.1" }, ] +provides-extras = ["organisations"] [package.metadata.requires-dev] dev = [