Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions lighter/libc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import ctypes

def get_libc() -> ctypes.CDLL:
libc_name = ctypes.util.find_library("c")
if not libc_name:
raise RuntimeError("Could not find C standard library")

libc = ctypes.CDLL(libc_name)
libc.free.argtypes = [ctypes.c_void_p]
libc.free.restype = None

return libc

_libc = get_libc()

def free(ptr: int) -> None:
_libc.free(ptr)

108 changes: 57 additions & 51 deletions lighter/signer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import os
import time
from typing import Dict, List, Optional, Union, Tuple
from typing import Dict, List, Optional, Union, Tuple, Any

from eth_account import Account
from eth_account.messages import encode_defunct
Expand All @@ -19,12 +19,13 @@
from lighter.models.resp_send_tx import RespSendTx
from lighter.models.resp_send_tx_batch import RespSendTxBatch
from lighter.transactions import CreateOrder, CancelOrder, Withdraw, CreateGroupedOrders
from lighter.libc import free

CODE_OK = 200


class ApiKeyResponse(ctypes.Structure):
_fields_ = [("privateKey", ctypes.c_char_p), ("publicKey", ctypes.c_char_p), ("err", ctypes.c_char_p)]
_fields_ = [('privateKey', ctypes.c_void_p), ('publicKey', ctypes.c_void_p), ('err', ctypes.c_void_p)]


class CreateOrderTxReq(ctypes.Structure):
Expand All @@ -43,16 +44,16 @@ class CreateOrderTxReq(ctypes.Structure):


class StrOrErr(ctypes.Structure):
_fields_ = [("str", ctypes.c_char_p), ("err", ctypes.c_char_p)]
_fields_ = [('str', ctypes.c_void_p), ('err', ctypes.c_void_p)]


class SignedTxResponse(ctypes.Structure):
_fields_ = [
("txType", ctypes.c_uint8),
("txInfo", ctypes.c_char_p),
("txHash", ctypes.c_char_p),
("messageToSign", ctypes.c_char_p),
("err", ctypes.c_char_p),
('txType', ctypes.c_uint8),
('txInfo', ctypes.c_void_p),
('txHash', ctypes.c_void_p),
('messageToSign', ctypes.c_void_p),
('err', ctypes.c_void_p),
]


Expand Down Expand Up @@ -84,15 +85,29 @@ def __get_shared_library():
)


def decode_and_free(ptr: Any) -> Optional[str]:
if not ptr:
return None
try:
# Read the string from the pointer
c_str = ctypes.cast(ptr, ctypes.c_char_p).value
if c_str is not None:
return c_str.decode('utf-8')
return None
finally:
# Free the memory allocated by the C library
free(ptr)


def __populate_shared_library_functions(signer):
signer.GenerateAPIKey.argtypes = []
signer.GenerateAPIKey.restype = ApiKeyResponse

signer.CreateClient.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_int, ctypes.c_longlong]
signer.CreateClient.restype = ctypes.c_char_p
signer.CreateClient.restype = ctypes.c_void_p

signer.CheckClient.argtypes = [ctypes.c_int, ctypes.c_longlong]
signer.CheckClient.restype = ctypes.c_char_p
signer.CheckClient.restype = ctypes.c_void_p

signer.SignChangePubKey.argtypes = [ctypes.c_char_p, ctypes.c_longlong, ctypes.c_int, ctypes.c_longlong]
signer.SignChangePubKey.restype = SignedTxResponse
Expand Down Expand Up @@ -160,12 +175,10 @@ def get_signer():


def create_api_key():
result = get_signer().GenerateAPIKey()

private_key_str = result.privateKey.decode("utf-8") if result.privateKey else None
public_key_str = result.publicKey.decode("utf-8") if result.publicKey else None
error = result.err.decode("utf-8") if result.err else None

result = lighter.signer_client.get_signer().GenerateAPIKey()
private_key_str = decode_and_free(result.privateKey)
public_key_str = decode_and_free(result.publicKey)
error = decode_and_free(result.err)
return private_key_str, public_key_str, error


Expand Down Expand Up @@ -306,36 +319,35 @@ def __init__(
# === signer helpers ===
@staticmethod
def __decode_tx_info(result: SignedTxResponse) -> Union[Tuple[str, str, str, None], Tuple[None, None, None, str]]:
if result.err:
error = result.err.decode("utf-8")
return None, None, None, error

# Use txType from response if available, otherwise use the provided type
tx_type = result.txType
tx_info_str = result.txInfo.decode("utf-8") if result.txInfo else None
tx_hash_str = result.txHash.decode("utf-8") if result.txHash else None

return tx_type, tx_info_str, tx_hash_str, None
err_str = decode_and_free(result.err)
tx_info_str = decode_and_free(result.txInfo)
tx_hash_str = decode_and_free(result.txHash)
decode_and_free(result.messageToSign)

if err_str:
return None, None, None, err_str

return result.txType, tx_info_str, tx_hash_str, None

@staticmethod
def __decode_and_sign_tx_info(eth_private_key: str, result: SignedTxResponse) -> Union[Tuple[str, str, str, None], Tuple[None, None, None, str]]:
if result.err:
err = result.err.decode("utf-8")
return None, None, None, err
err_str = decode_and_free(result.err)
tx_info_str = decode_and_free(result.txInfo)
tx_hash_str = decode_and_free(result.txHash)
msg_to_sign_str = decode_and_free(result.messageToSign)

if err_str:
return None, None, None, err_str

tx_type = result.txType
tx_info_str = result.txInfo.decode("utf-8") if result.txInfo else None
tx_hash_str = result.txHash.decode("utf-8") if result.txHash else None
msg_to_sign = result.messageToSign.decode("utf-8") if result.messageToSign else None

# sign the message
acct = Account.from_key(eth_private_key)
message = encode_defunct(text=msg_to_sign)
message = encode_defunct(text=msg_to_sign_str)
signature = acct.sign_message(message)

# add signature to tx_info
tx_info = json.loads(tx_info_str)
tx_info["L1Sig"] = signature.signature.to_0x_hex()
tx_info['L1Sig'] = signature.signature.to_0x_hex()
return tx_type, json.dumps(tx_info), tx_hash_str, None

def validate_api_private_keys(self, private_keys: Dict[int, str]):
Expand All @@ -348,30 +360,24 @@ def validate_api_private_keys(self, private_keys: Dict[int, str]):
private_keys[api_key_index] = private_key[2:]

def create_client(self, api_key_index):
err = self.signer.CreateClient(
self.url.encode("utf-8"),
self.api_key_dict[api_key_index].encode("utf-8"),
err_ptr = self.signer.CreateClient(
self.url.encode('utf-8'),
self.api_key_dict[api_key_index].encode('utf-8'),
self.chain_id,
api_key_index,
self.account_index,
)

if err is None:
return

err = decode_and_free(err_ptr)
if err is not None:
raise Exception(err.decode("utf-8"))
raise Exception(err)

def __signer_check_client(
self,
api_key_index: int,
account_index: int,
) -> Optional[str]:
err = self.signer.CheckClient(api_key_index, account_index)
if err is None:
return None

return err.decode("utf-8")
err_ptr = self.signer.CheckClient(api_key_index, account_index)
return decode_and_free(err_ptr)

# check_client verifies that the given API key associated with (api_key_index, account_index) matches the one on Lighter
def check_client(self):
Expand Down Expand Up @@ -402,10 +408,10 @@ def create_auth_token_with_expiry(self, deadline: int = DEFAULT_10_MIN_AUTH_EXPI
if timestamp is None:
timestamp = int(time.time())

result = self.signer.CreateAuthToken(deadline+timestamp, api_key_index, self.account_index)
result = self.signer.CreateAuthToken(deadline + timestamp, api_key_index, self.account_index)

auth = result.str.decode("utf-8") if result.str else None
error = result.err.decode("utf-8") if result.err else None
auth = decode_and_free(result.str)
error = decode_and_free(result.err)
return auth, error

def sign_change_api_key(self, eth_private_key: str, new_pubkey: str, nonce: int = DEFAULT_NONCE, api_key_index: int = DEFAULT_API_KEY_INDEX) -> Union[Tuple[str, str, str, None], Tuple[None, None, None, str]]:
Expand Down