diff --git a/tesla_fleet_api/const.py b/tesla_fleet_api/const.py index c997da3..4dc5786 100644 --- a/tesla_fleet_api/const.py +++ b/tesla_fleet_api/const.py @@ -208,6 +208,42 @@ class EnergyDeviceIdentifierType(IntEnum): WALL_CONNECTOR_DIN = 4 +class AuthorizedClientKeyType(IntEnum): + """Key type for energy gateway authorized clients. + + Note: Tesla has not published the full ``key_type`` enum body. The RSA + value below is empirically known to work for registering an RSA-4096 + key via ``add_authorized_client_request``; other values may exist but + are not publicly documented. + """ + + RSA = 1 + + +class AuthorizedClientType(IntEnum): + """Client type used when registering an authorized client on an energy gateway. + + Sourced from Tesla's ``AuthorizedClientType`` protobuf enum (as + reverse-engineered in pypowerwall's ``tedapi_combined.proto``). + + There is no WiFi-specific value: WiFi vs LAN refers to the transport, + not the client type. pypowerwall's v1r flow registers its RSA key as + ``CUSTOMER_MOBILE_APP``. + """ + + INVALID = 0 + CUSTOMER_MOBILE_APP = 1 + VEHICLE = 2 + + +class AuthorizedClientState(IntEnum): + """State of an authorized client registered on an energy gateway.""" + + PENDING = 1 + PENDING_VERIFICATION = 2 + VERIFIED = 3 + + class ClosureState(StrEnum): """Closure state options""" diff --git a/tesla_fleet_api/tesla/energysite.py b/tesla_fleet_api/tesla/energysite.py index 7477602..808e3e6 100644 --- a/tesla_fleet_api/tesla/energysite.py +++ b/tesla_fleet_api/tesla/energysite.py @@ -1,4 +1,5 @@ from __future__ import annotations +import base64 from typing import Any, TYPE_CHECKING from tesla_fleet_api.const import ( Method, @@ -7,6 +8,8 @@ EnergyIslandMode, TeslaEnergyPeriod, EnergyDeviceIdentifierType, + AuthorizedClientKeyType, + AuthorizedClientType, ) if TYPE_CHECKING: @@ -66,6 +69,47 @@ async def list_authorized_clients(self) -> dict[str, Any]: """List authorized clients (paired keys) on the energy gateway including their roles and state.""" return await self._command("authorization", "list_authorized_clients_request") + async def add_authorized_client( + self, + public_key: bytes | str, + description: str = "Powerwall LAN Client", + key_type: AuthorizedClientKeyType | int = AuthorizedClientKeyType.RSA, + authorized_client_type: AuthorizedClientType + | int = AuthorizedClientType.CUSTOMER_MOBILE_APP, + ) -> dict[str, Any]: + """Register an authorized client (public key) with the energy gateway. + + Used to pair a local key (typically RSA-4096 in DER PKCS1 format) with + a Powerwall so it can be used for the LAN TEDapi v1r protocol. After + registration the key may be in PENDING or PENDING_VERIFICATION state + until the gateway confirms it — see ``AuthorizedClientState``. The + gateway may auto-verify via cloud, otherwise a physical breaker + toggle is required to confirm. Use ``list_authorized_clients`` to + poll for VERIFIED state. + + Args: + public_key: The public key to register. Either raw DER PKCS1 + bytes (which will be base64-encoded), or an already + base64-encoded string. + description: Human-readable description of the client. + key_type: The type of key being registered (default RSA). + authorized_client_type: The authorized client type (default LAN). + """ + if isinstance(public_key, bytes): + public_key_b64 = base64.b64encode(public_key).decode("ascii") + else: + public_key_b64 = public_key + return await self._command( + "authorization", + "add_authorized_client_request", + { + "key_type": int(key_type), + "public_key": public_key_b64, + "authorized_client_type": int(authorized_client_type), + "description": description, + }, + ) + async def get_signed_commands_public_key(self) -> dict[str, Any]: """Get the energy gateway's public key for signed commands.""" return await self._command( diff --git a/tesla_fleet_api/tesla/tesla.py b/tesla_fleet_api/tesla/tesla.py index 461938e..8803968 100644 --- a/tesla_fleet_api/tesla/tesla.py +++ b/tesla_fleet_api/tesla/tesla.py @@ -1,5 +1,6 @@ """Tesla Fleet API for Python.""" +import base64 from os.path import exists import aiofiles @@ -10,7 +11,7 @@ from tesla_fleet_api.tesla.vehicle.vehicles import Vehicles # cryptography -from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import ec, rsa from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend @@ -24,6 +25,7 @@ class Tesla: Vehicles = Vehicles private_key: ec.EllipticCurvePrivateKey | None = None + rsa_private_key: rsa.RSAPrivateKey | None = None async def get_private_key( self, path: str = "private_key.pem" @@ -82,3 +84,76 @@ def public_uncompressed_point(self) -> str: encoding=serialization.Encoding.X962, format=serialization.PublicFormat.UncompressedPoint, ).hex() + + async def get_rsa_private_key( + self, path: str = "tedapi_rsa_private.pem", key_size: int = 4096 + ) -> rsa.RSAPrivateKey: + """Get or create an RSA private key for energy gateway client registration. + + The default 4096-bit key matches the format expected by the Powerwall + TEDapi v1r LAN protocol. The private key is stored as an unencrypted + PEM file with permissions 0o600 when created. + """ + if not exists(path): + self.rsa_private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=key_size, + backend=default_backend(), + ) + pem = self.rsa_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + async with aiofiles.open(path, "wb") as key_file: + await key_file.write(pem) + try: + from os import chmod + + chmod(path, 0o600) + except OSError: + pass + else: + async with aiofiles.open(path, "rb") as key_file: + key_data = await key_file.read() + value = serialization.load_pem_private_key( + key_data, password=None, backend=default_backend() + ) + if not isinstance(value, rsa.RSAPrivateKey): + raise AssertionError("Loaded key is not an RSAPrivateKey") + self.rsa_private_key = value + return self.rsa_private_key + + @property + def has_rsa_private_key(self) -> bool: + """Check if the RSA private key has been set.""" + return self.rsa_private_key is not None + + @property + def rsa_public_der_pkcs1(self) -> bytes: + """Return the RSA public key in DER PKCS1 format. + + This is the format the Tesla energy gateway expects when + registering an authorized client. + """ + if self.rsa_private_key is None: + raise ValueError("RSA private key is not set") + return self.rsa_private_key.public_key().public_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PublicFormat.PKCS1, + ) + + @property + def rsa_public_der_pkcs1_b64(self) -> str: + """Return the RSA public key in base64-encoded DER PKCS1 format.""" + return base64.b64encode(self.rsa_public_der_pkcs1).decode("ascii") + + @property + def rsa_public_pem(self) -> str: + """Get the RSA public key in PEM (SubjectPublicKeyInfo) format.""" + if self.rsa_private_key is None: + raise ValueError("RSA private key is not set") + return self.rsa_private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("utf-8")