From e6b71b8cdb7e32eb51d04d7697e6d55d1f968420 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 11 Dec 2024 00:01:53 +0100 Subject: [PATCH 1/7] Adds organisation API support Apart from operating a single charge station, the Charge Amps API also allows controlling (part) of an organization's fleet of charge stations and users. Since this is most likely an uncommon usecase, I placed it behind a feature flag "organisations". This does not add support for CLI flags for accessing the API, but would be a nice future addition. --- README.md | 14 ++- chargeamps/external.py | 23 ++++ chargeamps/models.py | 68 ++++++++++ chargeamps/organisation.py | 246 +++++++++++++++++++++++++++++++++++++ examples/org_test.py | 35 ++++++ examples/test.py | 2 +- 6 files changed, 386 insertions(+), 2 deletions(-) create mode 100644 chargeamps/organisation.py create mode 100644 examples/org_test.py diff --git a/README.md b/README.md index d35b6f2..929191c 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,24 @@ This repository contains a Python module for the Charge Amps' electric vehicle c The module is developed by [Kirei AB](https://www.kirei.se) and is not supported by [Charge Amps AB](https://chargeamps.com). +## How to use + +The simplest way is to install via `pip`, as in: + +``` +pip install chargeamps +``` + +If you need access to the organisation API calls, you need to specify this feature upon installation: + +``` +pip install chargeamps["organisations"] +``` ## External API Key You need an API key to use the Charge Amps external API. Contact [Charge Amps Support](mailto:support@chargeamps.com) for extensive API documentation and API key. - ## References - [Charge Amps External REST API](https://eapi.charge.space/swagger/) diff --git a/chargeamps/external.py b/chargeamps/external.py index f40726d..dc8ae58 100644 --- a/chargeamps/external.py +++ b/chargeamps/external.py @@ -19,6 +19,8 @@ ChargePointStatus, ChargingSession, StartAuth, + User, + Partner, ) API_BASE_URL = "https://eapi.charge.space" @@ -44,6 +46,7 @@ def __init__( self._token = None self._token_expire = 0 self._refresh_token = None + self._user: User = None async def shutdown(self) -> None: await self._session.close() @@ -72,6 +75,7 @@ async def _ensure_token(self) -> None: except HTTPException: self._logger.warning("Token refresh failed") self._token = None + self._user = None self._refresh_token = None else: self._token = None @@ -89,6 +93,7 @@ async def _ensure_token(self) -> None: except HTTPException as exc: self._logger.error("Login failed") self._token = None + self._user = None self._refresh_token = None self._token_expire = 0 raise exc @@ -100,6 +105,7 @@ async def _ensure_token(self) -> None: response_payload = await response.json() self._token = response_payload["token"] + self._user = response_payload["user"] self._refresh_token = response_payload["refreshToken"] token_payload = jwt.decode(self._token, options={"verify_signature": False}) @@ -209,6 +215,13 @@ async def set_chargepoint_connector_settings( request_uri = f"/api/{API_VERSION}/chargepoints/{charge_point_id}/connectors/{connector_id}/settings" await self._put(request_uri, json=payload) + async def get_partner(self, charge_point_id: str) -> Partner: + """Get partner details""" + request_uri = f"/api/{API_VERSION}/chargepoints/{charge_point_id}/partner" + response = await self._get(request_uri) + payload = await response.json() + return Partner.model_validate(payload) + async def remote_start( self, charge_point_id: str, connector_id: int, start_auth: StartAuth ) -> None: @@ -226,3 +239,13 @@ async def reboot(self, charge_point_id) -> None: """Reboot chargepoint""" request_uri = f"/api/{API_VERSION}/chargepoints/{charge_point_id}/reboot" await self._put(request_uri, json="{}") + + async def get_logged_in_user(self) -> User: + """Get authenticated user info""" + user_id = self._user["id"] + + request_uri = f"/api/{API_VERSION}/users/{user_id}" + response = await self._get(request_uri) + payload = await response.json() + + return User.model_validate(payload) diff --git a/chargeamps/models.py b/chargeamps/models.py index 1382cc7..51ff569 100644 --- a/chargeamps/models.py +++ b/chargeamps/models.py @@ -15,6 +15,16 @@ ] +def feature_required(feature_flag): + def decorator(cls): + if feature_flag: + return cls + else: + raise ImportError(f"Feature '{feature_flag}' is not enabled") + + return decorator + + class FrozenBaseSchema(BaseModel): model_config = ConfigDict( alias_generator=to_camel, @@ -94,3 +104,61 @@ class StartAuth(FrozenBaseSchema): rfid_format: str rfid: str external_transaction_id: str + + +class RfidTag(FrozenBaseSchema): + active: bool + rfid: str | None + rfidDec: str | None + rfidDecReverse: str | None + + +class User(FrozenBaseSchema): + id: str + first_name: str | None + last_name: str | None + email: str | None + mobile: str | None + rfid_tags: Optional[list[RfidTag]] + user_status: str + + +class Partner(FrozenBaseSchema): + id: int + name: str + description: str + email: str + phone: str + + +# Only way to register new RFID seems to be through an Organization +@feature_required("organisations") +class Rfid(FrozenBaseSchema): + rfid: str | None + rfidDec: str | None + rfidDecReverse: str | None + + +@feature_required("organisations") +class Organisation(FrozenBaseSchema): + id: str + name: str + description: str + + +@feature_required("organisations") +class OrganisationChargingSession(FrozenBaseSchema): + id: int + charge_point_id: Optional[str] + connector_id: int + user_id: str + rfid: Optional[str] + rfidDec: Optional[str] + rfidDecReverse: Optional[str] + organisation_id: Optional[str] + session_type: str + start_time: Optional[CustomDateTime] = None + end_time: Optional[CustomDateTime] = None + external_transaction_id: Optional[str] + total_consumption_kwh: float + external_id: Optional[str] diff --git a/chargeamps/organisation.py b/chargeamps/organisation.py new file mode 100644 index 0000000..b10ed6f --- /dev/null +++ b/chargeamps/organisation.py @@ -0,0 +1,246 @@ +from datetime import datetime + +from chargeamps.external import ChargeAmpsExternalClient + +from .models import ( + ChargePoint, + ChargePointStatus, + Rfid, + RfidTag, + Partner, + Organisation, + User, + OrganisationChargingSession, + feature_required, +) + +API_BASE_URL = "https://eapi.charge.space" +API_VERSION = "v5" + + +@feature_required("organisations") +class OrganisationClient(ChargeAmpsExternalClient): + def __init__( + self, + email: str, + password: str, + api_key: str, + api_base_url: str = None, + ): + super().__init__(email, password, api_key, api_base_url) + + async def get_organisations(self) -> list[Organisation]: + """Get all associated organisation's details""" + request_uri = f"/api/{API_VERSION}/organisations" + response = await self._get(request_uri) + payload = await response.json() + + return [Organisation.model_validate(org) for org in payload] + + async def get_organisation(self, org_id: str) -> Organisation: + """Get organisation details""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}" + response = await self._get(request_uri) + payload = await response.json() + return Organisation.model_validate(payload) + + async def get_organisation_chargepoints(self, org_id: str) -> list[ChargePoint]: + """Get all charge points for organisation""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargepoints" + response = await self._get(request_uri) + payload = await response.json() + + res = [] + for charge_point in payload: + res.append(ChargePoint.model_validate(charge_point)) + return res + + async def get_organisation_chargepoint_statuses( + self, org_id: str + ) -> list[ChargePointStatus]: + """Get all charge points' status""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargepoints/statuses" + response = await self._get(request_uri) + payload = await response.json() + + res = [] + for charge_point in payload: + res.append(ChargePointStatus.model_validate(charge_point)) + return res + + def verify_rfid( + self, rfid: str, rfid_format: str, rfid_length: str, rfid_dec_format_length: str + ) -> dict[str, str]: + result = {} + result["rfid"] = rfid + if rfid_format == "Hex": + # TODO: Should actual `rfid` param be verified against the length here? + if not rfid_length: + return result + elif rfid_format == "Dec" or rfid_format == "ReverseDec": + if rfid_dec_format_length: + result["rfidDecimalFormatLength"] = rfid_dec_format_length + else: + self._logger.error("Invalid RFID format") + return {} + + if rfid_length in {4, 7, 10}: + result["rfidLength"] = rfid_length + else: + self._logger.error("RFID length invalid, should be either 4, 7 or 10 bytes") + return {} + + result["rfidFormat"] = rfid_format + return result + + async def get_organisation_charging_sessions( + self, + org_id: str, + start_time: datetime = None, + end_time: datetime = None, + rfid: str = None, + rfid_format: str = "Hex", # Possible values: "Hex", "Dec" and "ReverseDec" + rfid_length: int = None, + rfid_dec_format_length: int = None, + ) -> list[OrganisationChargingSession]: + """Get organisation's charging sessions""" + query_params = {} + if start_time: + query_params["startTime"] = start_time.isoformat() + if end_time: + query_params["endTime"] = end_time.isoformat() + + if rfid: + query_params.update( + self.verify_rfid(rfid, rfid_format, rfid_length, rfid_dec_format_length) + ) + + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargingsessions" + response = await self._get(request_uri, params=query_params) + payload = await response.json() + + res = [] + for session in await payload.json(): + res.append(OrganisationChargingSession.model_validate(session)) + return res + + async def get_partner(self, org_id: str) -> Partner: + """Get partner details""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/partner" + response = await self._get(request_uri) + payload = await response.json() + return Partner.model_validate(payload) + + async def get_organisation_rfids(self, org_id: str) -> list[RfidTag]: + """Get organisation's registered rfid tags""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids" + response = await self._get(request_uri) + payload = await response.json() + + res = [] + for rfid in payload: + res.append(RfidTag.model_validate(rfid)) + return res + + async def add_organisation_rfid( + self, org_id: str, rfid: Rfid, rfid_dec_format_length: int = None + ) -> RfidTag: + """Add a new RFID tag to the organisation""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids" + payload = rfid.model_dump(by_alias=True) + if rfid_dec_format_length: + payload["rfidDecimalFormatLength"] = rfid_dec_format_length + await self._put(request_uri, json=payload) + + async def get_organisation_rfid( + self, org_id: str, rfid: Rfid, rfid_format: str = "Hex" + ) -> RfidTag: + """Get information about a specific RFID tag""" + rfid_id = rfid.rfid + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids/{rfid_id}" + response = await self._get(request_uri) + payload = await response.json() + return RfidTag.model_validate(payload) + + async def revoke_organisation_rfid(self, org_id: str, rfid: Rfid) -> None: + """Revoke an RFID tag""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids/revoke" + rfid_id = rfid.rfid + payload = rfid_id.toJson() + await self._put(request_uri, json=payload) + + async def get_organisation_users( + self, org_id: str, rfid: bool = False, rfid_dec_format_length: int = None + ) -> list[User]: + """Get organisation's registered users""" + query_params = {} + if rfid_dec_format_length: + query_params["rfidDecimalFormatLength"] = rfid_dec_format_length + if rfid: + query_params["expand"] = "rfid" + + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users" + response = await self._get(request_uri, params=query_params) + payload = await response.json() + + res = [] + for rfid in payload: + res.append(User.model_validate(rfid)) + return res + + async def add_organisation_user( + self, + org_id: str, + first_name: str = None, + last_name: str = None, + email: str = None, + mobile: str = None, + rfid: [Rfid] = None, + password: str = None, + ) -> User: + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users" + payload = {} + + if first_name: + payload["firstName"] = first_name + if last_name: + payload["lastName"] = last_name + if email: + payload["email"] = email + if mobile: + payload["mobile"] = mobile + if rfid: + payload["rfidTags"] = rfid.model_dump(by_alias=True) + if password: + if len(password) >= 8: + payload["password"] = password + else: + raise ValueError( + "The provided password is too short, must exceed 8 characters" + ) + + print(payload) + response = await self._post(request_uri, json=payload) + new_user = await response.json() + + return User.model_validate(new_user) + + async def get_organisation_user( + self, + org_id: str, + user_id: str, + rfid: bool = False, + rfid_dec_format_length: int = None, + ) -> User: + """Get organisation's registered users""" + query_params = {} + if rfid_dec_format_length: + query_params["rfidDecimalFormatLength"] = rfid_dec_format_length + if rfid: + query_params["expand"] = "rfid" + + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users/{user_id}" + response = await self._get(request_uri) + payload = await response.json() + + return User.model_validate(payload) diff --git a/examples/org_test.py b/examples/org_test.py new file mode 100644 index 0000000..7889de0 --- /dev/null +++ b/examples/org_test.py @@ -0,0 +1,35 @@ +import asyncio +import json + +from chargeamps.organisation import OrganisationClient + + +async def test(): + with open("config.json") as input_file: + c = json.load(input_file) + client = OrganisationClient( + email=c["username"], password=c["password"], api_key=c["api_key"] + ) + + chargepoints = await client.get_organisation_chargepoints() + print(chargepoints) + + status = await client.get_chargepoint_status(chargepoints[0].id) + print("Status:", status) + + for c in status.connector_statuses: + settings = await client.get_chargepoint_connector_settings( + c.charge_point_id, c.connector_id + ) + print("Before:", settings) + settings.max_current = 11 + await client.set_chargepoint_connector_settings(settings) + print("After:", settings) + + await client.shutdown() + + +if __name__ == "__main__": + # asyncio.run(test()) + loop = asyncio.get_event_loop() + loop.run_until_complete(test()) diff --git a/examples/test.py b/examples/test.py index e3a9f7a..0a5247a 100644 --- a/examples/test.py +++ b/examples/test.py @@ -5,7 +5,7 @@ async def test(): - with open("credentials.json") as input_file: + with open("config.json") as input_file: c = json.load(input_file) client = ChargeAmpsExternalClient( email=c["username"], password=c["password"], api_key=c["api_key"] From 0e22d10133a187eb92fa9917ffd3933270edcd03 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 17 Dec 2024 23:32:41 +0100 Subject: [PATCH 2/7] Verify RFID tags properly --- chargeamps/models.py | 2 +- chargeamps/organisation.py | 117 +++++++++++++++++++++---------------- examples/org_test.py | 4 +- 3 files changed, 68 insertions(+), 55 deletions(-) diff --git a/chargeamps/models.py b/chargeamps/models.py index 51ff569..3a33315 100644 --- a/chargeamps/models.py +++ b/chargeamps/models.py @@ -17,7 +17,7 @@ def feature_required(feature_flag): def decorator(cls): - if feature_flag: + if feature_flag == "organisations": return cls else: raise ImportError(f"Feature '{feature_flag}' is not enabled") diff --git a/chargeamps/organisation.py b/chargeamps/organisation.py index b10ed6f..4903ca9 100644 --- a/chargeamps/organisation.py +++ b/chargeamps/organisation.py @@ -1,4 +1,5 @@ from datetime import datetime +import textwrap from chargeamps.external import ChargeAmpsExternalClient @@ -50,10 +51,7 @@ async def get_organisation_chargepoints(self, org_id: str) -> list[ChargePoint]: response = await self._get(request_uri) payload = await response.json() - res = [] - for charge_point in payload: - res.append(ChargePoint.model_validate(charge_point)) - return res + return [ChargePoint.model_validate(cp) for cp in payload] async def get_organisation_chargepoint_statuses( self, org_id: str @@ -63,42 +61,56 @@ async def get_organisation_chargepoint_statuses( response = await self._get(request_uri) payload = await response.json() - res = [] - for charge_point in payload: - res.append(ChargePointStatus.model_validate(charge_point)) - return res + return [ChargePointStatus.model_validate(cp) for cp in payload] + + def is_valid_hex(self, rfid: str) -> bool: + return len(rfid) % 2 == 0 and all(char in "0123456789ABCDEF" for char in rfid) + + def verify_rfid_length(self, rfid: str, length: int | None = None) -> int: + length_in_bytes = len(rfid) // 2 + + if length and length != length_in_bytes: + raise ValueError(textwrap.dedent(f""" + The provided RFID does not match the provided length: + RFID {rfid}, expected length: {length}, calculated length: {length_in_bytes} + """)) + + if length_in_bytes in {4, 7, 10}: + return length_in_bytes + else: + raise ValueError("RFID length invalid, should be either 4, 7 or 10 bytes") def verify_rfid( - self, rfid: str, rfid_format: str, rfid_length: str, rfid_dec_format_length: str + self, rfid: str, rfid_format: str | None, rfid_length: str | None, rfid_dec_format_length: str | None ) -> dict[str, str]: result = {} - result["rfid"] = rfid + if self.is_valid_hex(rfid): + result["rfid"] = rfid + else: + raise ValueError(f"The provided RFID value is not a valid hex value: rfid: {rfid}") + + rfid_actual_length = self.verify_rfid_length(rfid, rfid_length) + if rfid_format != "Hex": + result["rfidFormat"] = rfid_format + result["rfidLength"] = rfid_actual_length + if rfid_format == "Hex": - # TODO: Should actual `rfid` param be verified against the length here? - if not rfid_length: - return result + if rfid_actual_length != 7: + raise ValueError(f"RFID length must be 7 bytes if the (default) format type 'Hex' is set.") elif rfid_format == "Dec" or rfid_format == "ReverseDec": if rfid_dec_format_length: result["rfidDecimalFormatLength"] = rfid_dec_format_length else: - self._logger.error("Invalid RFID format") - return {} + raise ValueError("Invalid RFID format") - if rfid_length in {4, 7, 10}: - result["rfidLength"] = rfid_length - else: - self._logger.error("RFID length invalid, should be either 4, 7 or 10 bytes") - return {} - - result["rfidFormat"] = rfid_format return result async def get_organisation_charging_sessions( self, org_id: str, - start_time: datetime = None, - end_time: datetime = None, - rfid: str = None, + start_time: datetime | None = None, + end_time: datetime | None = None, + rfid: str | None = None, rfid_format: str = "Hex", # Possible values: "Hex", "Dec" and "ReverseDec" rfid_length: int = None, rfid_dec_format_length: int = None, @@ -119,10 +131,7 @@ async def get_organisation_charging_sessions( response = await self._get(request_uri, params=query_params) payload = await response.json() - res = [] - for session in await payload.json(): - res.append(OrganisationChargingSession.model_validate(session)) - return res + return [OrganisationChargingSession.model_validate(cp) for cp in payload] async def get_partner(self, org_id: str) -> Partner: """Get partner details""" @@ -137,28 +146,35 @@ async def get_organisation_rfids(self, org_id: str) -> list[RfidTag]: response = await self._get(request_uri) payload = await response.json() - res = [] - for rfid in payload: - res.append(RfidTag.model_validate(rfid)) - return res + return [RfidTag.model_validate(cp) for cp in payload] async def add_organisation_rfid( - self, org_id: str, rfid: Rfid, rfid_dec_format_length: int = None + self, org_id: str, rfid: Rfid, rfid_dec_format_length: int | None = None ) -> RfidTag: """Add a new RFID tag to the organisation""" request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids" payload = rfid.model_dump(by_alias=True) if rfid_dec_format_length: payload["rfidDecimalFormatLength"] = rfid_dec_format_length - await self._put(request_uri, json=payload) + + response = await self._put(request_uri, json=payload) + payload = await response.json() + return RfidTag.model_validate(payload) async def get_organisation_rfid( - self, org_id: str, rfid: Rfid, rfid_format: str = "Hex" + self, + org_id: str, + rfid: str, + rfid_format: str = "Hex", # Possible values: "Hex", "Dec" and "ReverseDec" + rfid_length: int | None = None, + rfid_dec_format_length: int | None = None ) -> RfidTag: """Get information about a specific RFID tag""" - rfid_id = rfid.rfid - request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids/{rfid_id}" - response = await self._get(request_uri) + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids/{rfid}" + query_params = {"organisationId": org_id} + query_params.update(self.verify_rfid(rfid, rfid_format, rfid_length, rfid_dec_format_length)) + + response = await self._get(request_uri, params=query_params) payload = await response.json() return RfidTag.model_validate(payload) @@ -166,11 +182,11 @@ async def revoke_organisation_rfid(self, org_id: str, rfid: Rfid) -> None: """Revoke an RFID tag""" request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids/revoke" rfid_id = rfid.rfid - payload = rfid_id.toJson() + payload = {"rfid": rfid_id} await self._put(request_uri, json=payload) async def get_organisation_users( - self, org_id: str, rfid: bool = False, rfid_dec_format_length: int = None + self, org_id: str, rfid: bool = False, rfid_dec_format_length: int | None = None ) -> list[User]: """Get organisation's registered users""" query_params = {} @@ -191,12 +207,12 @@ async def get_organisation_users( async def add_organisation_user( self, org_id: str, - first_name: str = None, - last_name: str = None, - email: str = None, - mobile: str = None, - rfid: [Rfid] = None, - password: str = None, + first_name: str | None = None, + last_name: str | None = None, + email: str | None = None, + mobile: str | None = None, + rfid: list[Rfid] | None = None, + password: str | None = None, ) -> User: request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users" payload = {} @@ -210,16 +226,15 @@ async def add_organisation_user( if mobile: payload["mobile"] = mobile if rfid: - payload["rfidTags"] = rfid.model_dump(by_alias=True) + payload["rfidTags"] = [tag.model_dump(by_alias=True) for tag in rfid] if password: if len(password) >= 8: payload["password"] = password else: raise ValueError( - "The provided password is too short, must exceed 8 characters" + "The provided password is too short, must be at least 8 characters" ) - print(payload) response = await self._post(request_uri, json=payload) new_user = await response.json() @@ -230,7 +245,7 @@ async def get_organisation_user( org_id: str, user_id: str, rfid: bool = False, - rfid_dec_format_length: int = None, + rfid_dec_format_length: int | None = None, ) -> User: """Get organisation's registered users""" query_params = {} diff --git a/examples/org_test.py b/examples/org_test.py index 7889de0..553c79c 100644 --- a/examples/org_test.py +++ b/examples/org_test.py @@ -30,6 +30,4 @@ async def test(): if __name__ == "__main__": - # asyncio.run(test()) - loop = asyncio.get_event_loop() - loop.run_until_complete(test()) + asyncio.run(test()) From f11dda217d027ac43319bb86317aa482b7c91314 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 17 Dec 2024 23:37:50 +0100 Subject: [PATCH 3/7] Adds check if user is signed in --- chargeamps/external.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/chargeamps/external.py b/chargeamps/external.py index dc8ae58..37df430 100644 --- a/chargeamps/external.py +++ b/chargeamps/external.py @@ -242,6 +242,9 @@ async def reboot(self, charge_point_id) -> None: async def get_logged_in_user(self) -> User: """Get authenticated user info""" + if not self._user or not isinstance(self._user, dict): + raise ValueError("No user is currently logged in") + user_id = self._user["id"] request_uri = f"/api/{API_VERSION}/users/{user_id}" From bbb6e4d229c10d5294c330b807c0bfcd08c79a41 Mon Sep 17 00:00:00 2001 From: Daniel Edholm Date: Mon, 13 Oct 2025 10:26:26 +0200 Subject: [PATCH 4/7] Fixes issues with merge to master, adapts to httpx --- chargeamps/external.py | 2 +- chargeamps/models.py | 20 ++++++------ chargeamps/organisation.py | 62 +++++++++++++++++++++----------------- examples/org_test.py | 9 +++--- pyproject.toml | 3 ++ uv.lock | 1 + 6 files changed, 55 insertions(+), 42 deletions(-) diff --git a/chargeamps/external.py b/chargeamps/external.py index 25152c9..763a398 100644 --- a/chargeamps/external.py +++ b/chargeamps/external.py @@ -49,7 +49,7 @@ def __init__( self._refresh_token = None self._token_skew = 30 self._token_lock = asyncio.Lock() - self._user: User = None + self._user: User | None = None async def shutdown(self) -> None: if self._owns_client: diff --git a/chargeamps/models.py b/chargeamps/models.py index d50967d..15c0286 100644 --- a/chargeamps/models.py +++ b/chargeamps/models.py @@ -116,7 +116,7 @@ class User(FrozenBaseSchema): last_name: str | None email: str | None mobile: str | None - rfid_tags: Optional[list[RfidTag]] + rfid_tags: list[RfidTag] | None user_status: str @@ -146,16 +146,16 @@ class Organisation(FrozenBaseSchema): @feature_required("organisations") class OrganisationChargingSession(FrozenBaseSchema): id: int - charge_point_id: Optional[str] + charge_point_id: str | None connector_id: int user_id: str - rfid: Optional[str] - rfidDec: Optional[str] - rfidDecReverse: Optional[str] - organisation_id: Optional[str] + rfid: str | None + rfidDec: str | None + rfidDecReverse: str | None + organisation_id: str | None session_type: str - start_time: Optional[CustomDateTime] = None - end_time: Optional[CustomDateTime] = None - external_transaction_id: Optional[str] + start_time: CustomDateTime | None = None + end_time: CustomDateTime | None = None + external_transaction_id: str | None total_consumption_kwh: float - external_id: Optional[str] + external_id: str | None diff --git a/chargeamps/organisation.py b/chargeamps/organisation.py index 4903ca9..31d00b2 100644 --- a/chargeamps/organisation.py +++ b/chargeamps/organisation.py @@ -26,7 +26,7 @@ def __init__( email: str, password: str, api_key: str, - api_base_url: str = None, + api_base_url: str | None = None, ): super().__init__(email, password, api_key, api_base_url) @@ -34,7 +34,7 @@ async def get_organisations(self) -> list[Organisation]: """Get all associated organisation's details""" request_uri = f"/api/{API_VERSION}/organisations" response = await self._get(request_uri) - payload = await response.json() + payload = response.json() return [Organisation.model_validate(org) for org in payload] @@ -42,24 +42,22 @@ async def get_organisation(self, org_id: str) -> Organisation: """Get organisation details""" request_uri = f"/api/{API_VERSION}/organisations/{org_id}" response = await self._get(request_uri) - payload = await response.json() + payload = response.json() return Organisation.model_validate(payload) async def get_organisation_chargepoints(self, org_id: str) -> list[ChargePoint]: """Get all charge points for organisation""" request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargepoints" response = await self._get(request_uri) - payload = await response.json() + payload = response.json() return [ChargePoint.model_validate(cp) for cp in payload] - async def get_organisation_chargepoint_statuses( - self, org_id: str - ) -> list[ChargePointStatus]: + async def get_organisation_chargepoint_statuses(self, org_id: str) -> list[ChargePointStatus]: """Get all charge points' status""" request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargepoints/statuses" response = await self._get(request_uri) - payload = await response.json() + payload = response.json() return [ChargePointStatus.model_validate(cp) for cp in payload] @@ -70,10 +68,12 @@ def verify_rfid_length(self, rfid: str, length: int | None = None) -> int: length_in_bytes = len(rfid) // 2 if length and length != length_in_bytes: - raise ValueError(textwrap.dedent(f""" + raise ValueError( + textwrap.dedent(f""" The provided RFID does not match the provided length: RFID {rfid}, expected length: {length}, calculated length: {length_in_bytes} - """)) + """) + ) if length_in_bytes in {4, 7, 10}: return length_in_bytes @@ -81,7 +81,11 @@ def verify_rfid_length(self, rfid: str, length: int | None = None) -> int: raise ValueError("RFID length invalid, should be either 4, 7 or 10 bytes") def verify_rfid( - self, rfid: str, rfid_format: str | None, rfid_length: str | None, rfid_dec_format_length: str | None + self, + rfid: str, + rfid_format: str | None, + rfid_length: int | None, + rfid_dec_format_length: int | None, ) -> dict[str, str]: result = {} if self.is_valid_hex(rfid): @@ -96,7 +100,9 @@ def verify_rfid( if rfid_format == "Hex": if rfid_actual_length != 7: - raise ValueError(f"RFID length must be 7 bytes if the (default) format type 'Hex' is set.") + raise ValueError( + "RFID length must be 7 bytes if the (default) format type 'Hex' is set." + ) elif rfid_format == "Dec" or rfid_format == "ReverseDec": if rfid_dec_format_length: result["rfidDecimalFormatLength"] = rfid_dec_format_length @@ -109,11 +115,11 @@ async def get_organisation_charging_sessions( self, org_id: str, start_time: datetime | None = None, - end_time: datetime | None = None, + end_time: datetime | None = None, rfid: str | None = None, rfid_format: str = "Hex", # Possible values: "Hex", "Dec" and "ReverseDec" - rfid_length: int = None, - rfid_dec_format_length: int = None, + rfid_length: int | None = None, + rfid_dec_format_length: int | None = None, ) -> list[OrganisationChargingSession]: """Get organisation's charging sessions""" query_params = {} @@ -129,22 +135,22 @@ async def get_organisation_charging_sessions( request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargingsessions" response = await self._get(request_uri, params=query_params) - payload = await response.json() + payload = response.json() return [OrganisationChargingSession.model_validate(cp) for cp in payload] - async def get_partner(self, org_id: str) -> Partner: + async def get_organisation_partner(self, org_id: str) -> Partner: """Get partner details""" request_uri = f"/api/{API_VERSION}/organisations/{org_id}/partner" response = await self._get(request_uri) - payload = await response.json() + payload = response.json() return Partner.model_validate(payload) async def get_organisation_rfids(self, org_id: str) -> list[RfidTag]: """Get organisation's registered rfid tags""" request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids" response = await self._get(request_uri) - payload = await response.json() + payload = response.json() return [RfidTag.model_validate(cp) for cp in payload] @@ -158,7 +164,7 @@ async def add_organisation_rfid( payload["rfidDecimalFormatLength"] = rfid_dec_format_length response = await self._put(request_uri, json=payload) - payload = await response.json() + payload = response.json() return RfidTag.model_validate(payload) async def get_organisation_rfid( @@ -167,15 +173,17 @@ async def get_organisation_rfid( rfid: str, rfid_format: str = "Hex", # Possible values: "Hex", "Dec" and "ReverseDec" rfid_length: int | None = None, - rfid_dec_format_length: int | None = None + rfid_dec_format_length: int | None = None, ) -> RfidTag: """Get information about a specific RFID tag""" request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids/{rfid}" query_params = {"organisationId": org_id} - query_params.update(self.verify_rfid(rfid, rfid_format, rfid_length, rfid_dec_format_length)) + query_params.update( + self.verify_rfid(rfid, rfid_format, rfid_length, rfid_dec_format_length) + ) response = await self._get(request_uri, params=query_params) - payload = await response.json() + payload = response.json() return RfidTag.model_validate(payload) async def revoke_organisation_rfid(self, org_id: str, rfid: Rfid) -> None: @@ -197,7 +205,7 @@ async def get_organisation_users( request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users" response = await self._get(request_uri, params=query_params) - payload = await response.json() + payload = response.json() res = [] for rfid in payload: @@ -236,7 +244,7 @@ async def add_organisation_user( ) response = await self._post(request_uri, json=payload) - new_user = await response.json() + new_user = response.json() return User.model_validate(new_user) @@ -255,7 +263,7 @@ async def get_organisation_user( query_params["expand"] = "rfid" request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users/{user_id}" - response = await self._get(request_uri) - payload = await response.json() + response = await self._get(request_uri, params=query_params) + payload = response.json() return User.model_validate(payload) diff --git a/examples/org_test.py b/examples/org_test.py index 553c79c..af9a72d 100644 --- a/examples/org_test.py +++ b/examples/org_test.py @@ -7,11 +7,12 @@ async def test(): with open("config.json") as input_file: c = json.load(input_file) - client = OrganisationClient( - email=c["username"], password=c["password"], api_key=c["api_key"] - ) + client = OrganisationClient(email=c["username"], password=c["password"], api_key=c["api_key"]) - chargepoints = await client.get_organisation_chargepoints() + org = await client.get_organisations() + org_id = org[0].id + + chargepoints = await client.get_organisation_chargepoints(org_id) print(chargepoints) status = await client.get_chargepoint_status(chargepoints[0].id) diff --git a/pyproject.toml b/pyproject.toml index 06e7cca..b475c80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,9 @@ chargeamps = "chargeamps.cli:main" repository = "https://github.com/kirei/python-chargeamps.git" issues = "https://github.com/kirei/python-chargeamps/issues" +[project.optional-dependencies] +organisations = [] + [dependency-groups] dev = [ "pytest>=8.4.2", diff --git a/uv.lock b/uv.lock index b3fadb2..5c7e5b1 100644 --- a/uv.lock +++ b/uv.lock @@ -175,6 +175,7 @@ requires-dist = [ { name = "pydantic", specifier = ">=2.11.7" }, { name = "pyjwt", specifier = ">=2.10.1" }, ] +provides-extras = ["organisations"] [package.metadata.requires-dev] dev = [ From 1a49a6da14aac572578aef0c071e1fa6de8bb659 Mon Sep 17 00:00:00 2001 From: Daniel Edholm Date: Mon, 13 Oct 2025 10:53:40 +0200 Subject: [PATCH 5/7] Return the logged in user from cached login --- chargeamps/external.py | 13 +++---------- chargeamps/organisation.py | 5 +++-- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/chargeamps/external.py b/chargeamps/external.py index 763a398..c8b6b76 100644 --- a/chargeamps/external.py +++ b/chargeamps/external.py @@ -112,7 +112,7 @@ async def _exclusive_ensure_token(self) -> None: response_payload = response.json() self._token = response_payload["token"] - self._user = response_payload["user"] + self._user = User.model_validate(response_payload["user"]) self._refresh_token = response_payload.get("refreshToken", self._refresh_token) token_payload = jwt.decode(self._token, options={"verify_signature": False}) self._token_expire = int(token_payload.get("exp", 0)) @@ -262,13 +262,6 @@ async def reboot(self, charge_point_id: str) -> None: async def get_logged_in_user(self) -> User: """Get authenticated user info""" - if not self._user or not isinstance(self._user, dict): + if self._user is None: raise ValueError("No user is currently logged in") - - user_id = self._user["id"] - - request_uri = f"/api/{API_VERSION}/users/{user_id}" - response = await self._get(request_uri) - payload = await response.json() - - return User.model_validate(payload) + return self._user diff --git a/chargeamps/organisation.py b/chargeamps/organisation.py index 31d00b2..c344377 100644 --- a/chargeamps/organisation.py +++ b/chargeamps/organisation.py @@ -62,7 +62,8 @@ async def get_organisation_chargepoint_statuses(self, org_id: str) -> list[Charg return [ChargePointStatus.model_validate(cp) for cp in payload] def is_valid_hex(self, rfid: str) -> bool: - return len(rfid) % 2 == 0 and all(char in "0123456789ABCDEF" for char in rfid) + r = rfid.upper() + return len(r) % 2 == 0 and all(ch in "0123456789ABCDEF" for ch in r) def verify_rfid_length(self, rfid: str, length: int | None = None) -> int: length_in_bytes = len(rfid) // 2 @@ -255,7 +256,7 @@ async def get_organisation_user( rfid: bool = False, rfid_dec_format_length: int | None = None, ) -> User: - """Get organisation's registered users""" + """Get organisation user""" query_params = {} if rfid_dec_format_length: query_params["rfidDecimalFormatLength"] = rfid_dec_format_length From a3bcdec25ff36d50b2eb1d49a4c64057ef390e0b Mon Sep 17 00:00:00 2001 From: Daniel Edholm Date: Mon, 13 Oct 2025 11:03:51 +0200 Subject: [PATCH 6/7] Return individual partner properly --- chargeamps/external.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chargeamps/external.py b/chargeamps/external.py index c8b6b76..7036187 100644 --- a/chargeamps/external.py +++ b/chargeamps/external.py @@ -239,7 +239,7 @@ async def get_partner(self, charge_point_id: str) -> Partner: """Get partner details""" request_uri = f"/api/{API_VERSION}/chargepoints/{charge_point_id}/partner" response = await self._get(request_uri) - payload = await response.json() + payload = response.json() return Partner.model_validate(payload) async def remote_start( From c79a15eaef8ecb7c4352f162a4ee80e7951aa5d2 Mon Sep 17 00:00:00 2001 From: Daniel Edholm Date: Mon, 13 Oct 2025 11:16:25 +0200 Subject: [PATCH 7/7] Ensure user is properly handled during token refresh --- chargeamps/external.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/chargeamps/external.py b/chargeamps/external.py index 7036187..01f47f5 100644 --- a/chargeamps/external.py +++ b/chargeamps/external.py @@ -112,7 +112,9 @@ async def _exclusive_ensure_token(self) -> None: response_payload = response.json() self._token = response_payload["token"] - self._user = User.model_validate(response_payload["user"]) + user_payload = response_payload.get("user") + if user_payload is not None: + self._user = User.model_validate(user_payload) self._refresh_token = response_payload.get("refreshToken", self._refresh_token) token_payload = jwt.decode(self._token, options={"verify_signature": False}) self._token_expire = int(token_payload.get("exp", 0))