Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions tesla_fleet_api/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,42 @@ class EnergyDeviceIdentifierType(IntEnum):
WALL_CONNECTOR_DIN = 4


class AuthorizedClientKeyType(IntEnum):
"""Key type for energy gateway authorized clients.

Note: Tesla has not published the full ``key_type`` enum body. The RSA
value below is empirically known to work for registering an RSA-4096
key via ``add_authorized_client_request``; other values may exist but
are not publicly documented.
"""

RSA = 1


class AuthorizedClientType(IntEnum):
"""Client type used when registering an authorized client on an energy gateway.

Sourced from Tesla's ``AuthorizedClientType`` protobuf enum (as
reverse-engineered in pypowerwall's ``tedapi_combined.proto``).

There is no WiFi-specific value: WiFi vs LAN refers to the transport,
not the client type. pypowerwall's v1r flow registers its RSA key as
``CUSTOMER_MOBILE_APP``.
"""

INVALID = 0
CUSTOMER_MOBILE_APP = 1
VEHICLE = 2


class AuthorizedClientState(IntEnum):
"""State of an authorized client registered on an energy gateway."""

PENDING = 1
PENDING_VERIFICATION = 2
VERIFIED = 3


class ClosureState(StrEnum):
"""Closure state options"""

Expand Down
44 changes: 44 additions & 0 deletions tesla_fleet_api/tesla/energysite.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
import base64
from typing import Any, TYPE_CHECKING
from tesla_fleet_api.const import (
Method,
Expand All @@ -7,6 +8,8 @@
EnergyIslandMode,
TeslaEnergyPeriod,
EnergyDeviceIdentifierType,
AuthorizedClientKeyType,
AuthorizedClientType,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -66,6 +69,47 @@ async def list_authorized_clients(self) -> dict[str, Any]:
"""List authorized clients (paired keys) on the energy gateway including their roles and state."""
return await self._command("authorization", "list_authorized_clients_request")

async def add_authorized_client(
self,
public_key: bytes | str,
description: str = "Powerwall LAN Client",
key_type: AuthorizedClientKeyType | int = AuthorizedClientKeyType.RSA,
authorized_client_type: AuthorizedClientType
| int = AuthorizedClientType.CUSTOMER_MOBILE_APP,
) -> dict[str, Any]:
"""Register an authorized client (public key) with the energy gateway.

Used to pair a local key (typically RSA-4096 in DER PKCS1 format) with
a Powerwall so it can be used for the LAN TEDapi v1r protocol. After
registration the key may be in PENDING or PENDING_VERIFICATION state
until the gateway confirms it — see ``AuthorizedClientState``. The
gateway may auto-verify via cloud, otherwise a physical breaker
toggle is required to confirm. Use ``list_authorized_clients`` to
poll for VERIFIED state.

Args:
public_key: The public key to register. Either raw DER PKCS1
bytes (which will be base64-encoded), or an already
base64-encoded string.
description: Human-readable description of the client.
key_type: The type of key being registered (default RSA).
authorized_client_type: The authorized client type (default LAN).
"""
if isinstance(public_key, bytes):
public_key_b64 = base64.b64encode(public_key).decode("ascii")
else:
public_key_b64 = public_key
return await self._command(
"authorization",
"add_authorized_client_request",
{
"key_type": int(key_type),
"public_key": public_key_b64,
"authorized_client_type": int(authorized_client_type),
"description": description,
},
)

async def get_signed_commands_public_key(self) -> dict[str, Any]:
"""Get the energy gateway's public key for signed commands."""
return await self._command(
Expand Down
77 changes: 76 additions & 1 deletion tesla_fleet_api/tesla/tesla.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tesla Fleet API for Python."""

import base64
from os.path import exists
import aiofiles

Expand All @@ -10,7 +11,7 @@
from tesla_fleet_api.tesla.vehicle.vehicles import Vehicles

# cryptography
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend

Expand All @@ -24,6 +25,7 @@ class Tesla:
Vehicles = Vehicles

private_key: ec.EllipticCurvePrivateKey | None = None
rsa_private_key: rsa.RSAPrivateKey | None = None

async def get_private_key(
self, path: str = "private_key.pem"
Expand Down Expand Up @@ -82,3 +84,76 @@ def public_uncompressed_point(self) -> str:
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
).hex()

async def get_rsa_private_key(
self, path: str = "tedapi_rsa_private.pem", key_size: int = 4096
) -> rsa.RSAPrivateKey:
"""Get or create an RSA private key for energy gateway client registration.

The default 4096-bit key matches the format expected by the Powerwall
TEDapi v1r LAN protocol. The private key is stored as an unencrypted
PEM file with permissions 0o600 when created.
"""
if not exists(path):
self.rsa_private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend(),
)
pem = self.rsa_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
async with aiofiles.open(path, "wb") as key_file:
await key_file.write(pem)
try:
from os import chmod

chmod(path, 0o600)
except OSError:
pass
else:
async with aiofiles.open(path, "rb") as key_file:
key_data = await key_file.read()
value = serialization.load_pem_private_key(
key_data, password=None, backend=default_backend()
)
if not isinstance(value, rsa.RSAPrivateKey):
raise AssertionError("Loaded key is not an RSAPrivateKey")
self.rsa_private_key = value
return self.rsa_private_key

@property
def has_rsa_private_key(self) -> bool:
"""Check if the RSA private key has been set."""
return self.rsa_private_key is not None

@property
def rsa_public_der_pkcs1(self) -> bytes:
"""Return the RSA public key in DER PKCS1 format.

This is the format the Tesla energy gateway expects when
registering an authorized client.
"""
if self.rsa_private_key is None:
raise ValueError("RSA private key is not set")
return self.rsa_private_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.PKCS1,
)

@property
def rsa_public_der_pkcs1_b64(self) -> str:
"""Return the RSA public key in base64-encoded DER PKCS1 format."""
return base64.b64encode(self.rsa_public_der_pkcs1).decode("ascii")

@property
def rsa_public_pem(self) -> str:
"""Get the RSA public key in PEM (SubjectPublicKeyInfo) format."""
if self.rsa_private_key is None:
raise ValueError("RSA private key is not set")
return self.rsa_private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("utf-8")
Loading