Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 45 additions & 12 deletions there/repl_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def connection_lost(self, exc):
def _parse_error(self, text):
"""Read the error message and convert exceptions"""
lines = text.splitlines()
if not lines[-1]:
lines = lines[:-1]
if lines[0].startswith('Traceback'):
m = re_oserror.match(lines[-1])
if m:
Expand Down Expand Up @@ -270,15 +272,33 @@ def truncate(self, path, length):

def read_rtc(self):
"""Read RTC and return a datetime object"""
year, month, day, weekday, hour, minute, second, subsecond = self.evaluate('import pyb; print(pyb.RTC().datetime())')
# subseconds are 1/256th of a second counting down
return datetime.datetime(year, month, day, hour, minute, second, (999999 * (255 - subsecond)) // 256)
rtc = self.evaluate(
'try:\n'
' import pyb\n'
' dt = pyb.RTC().datetime()\n'
' # subseconds are 1/256th of a second counting down\n'
' print(dt[0:3] + dt[4:7] + tuple(((999999 * (255 - dt[7])) // 256,)))\n'
'except ImportError:\n'
' import rtc\n'
' print(tuple(rtc.RTC().datetime)[:7])')
return datetime.datetime(*rtc)

def set_rtc(self, board_time=None):
"""Set the targets RTC from given datetime object"""
if board_time is None:
board_time = datetime.datetime.now()
self.exec('import pyb; print(pyb.RTC().datetime(({0:%Y},{0:%m},{0:%d},{1},{0:%H},{0:%M},{0:%S},{2})))'.format(
self.exec(
'date_tuple=({0:%Y},{0:%m},{0:%d},{1},{0:%H},{0:%M},{0:%S},{2})\n'
'try:\n'
' import pyb\n'
' pyb.RTC().datetime(date_tuple)\n'
'except ImportError:\n'
' import rtc\n\n'
' class RTC(object):\n'
' @property\n'
' def datetime(self):\n'
' return time.struct_time(date_tuple)\n\n'
' rtc.set_time_source(RTC())'.format(
board_time,
board_time.weekday() + 1,
255 - (255 * board_time.microsecond) // 999999
Expand All @@ -294,7 +314,11 @@ def read_flash_as_stream(self, offset, length):
Iterate over blocks (`bytes`) of Flash memory.
"""
self.exec(
'import ubinascii, pyb;\n'
'import pyb\n'
'try:\n'
' import ubinascii as binascii\n'
'except ImportError:\n'
' import binascii\n'
'def _b():\n'
f' f = pyb.Flash(start={offset!r}, len={length!r})\n'
' f.ioctl(1, 1)\n' # switch to new BDEV API
Expand All @@ -303,7 +327,7 @@ def read_flash_as_stream(self, offset, length):
f' n_blocks = ({length!r} // blk) if {length!r} > 0 else f.ioctl(4, 0)\n'
f' for n in range(0, n_blocks):\n'
' f.readblocks(n, mem, 0)\n'
' print(ubinascii.b2a_base64(mem))\n'
' print(binascii.b2a_base64(mem))\n'
' yield\n'
' print(b"")\n'
' yield\n'
Expand Down Expand Up @@ -479,20 +503,24 @@ def read_as_stream(self):
# reading (lines * linesize) must not take more than 1sec and 2kB target RAM!
n_blocks = max(1, self._repl.serial.baudrate // 5120)
self._repl.exec(
f'import ubinascii; _f = open({self.as_posix()!r}, "rb"); _mem = memoryview(bytearray(512))\n'
'try:\n'
' import ubinascii as binascii\n'
'except ImportError:\n'
' import binascii\n'
f'_f = open({self.as_posix()!r}, "rb"); _mem = memoryview(bytearray(512))\n'
'def _b(blocks=8):\n'
' print("[")\n'
' for _ in range(blocks):\n'
' n = _f.readinto(_mem)\n'
' if not n: break\n'
' print(ubinascii.b2a_base64(_mem[:n]), ",")\n'
' print(binascii.b2a_base64(_mem[:n]), ",")\n'
' print("]")')
while True:
blocks = self._repl.evaluate(f'_b({n_blocks})')
if not blocks:
break
yield from [binascii.a2b_base64(block) for block in blocks]
self._repl.exec('_f.close(); del _f, _b')
self._repl.exec('_f.close(); del _f')

def read_bytes(self) -> bytes:
"""
Expand All @@ -512,15 +540,20 @@ def write_bytes(self, data) -> int:
self._stat_cache = None
if not isinstance(data, (bytes, bytearray)):
raise TypeError(f'contents must be bytes/bytearray, got {type(data)} instead')
self._repl.exec(f'from ubinascii import a2b_base64 as a2b; _f = open({self.as_posix()!r}, "wb")')
self._repl.exec(
'try:\n'
' import ubinascii as binascii\n'
'except ImportError:\n'
' import binascii\n'
f'_f = open({self.as_posix()!r}, "wb")')
# write in chunks
with io.BytesIO(data) as local_file:
while True:
block = local_file.read(512)
if not block:
break
self._repl.exec(f'_f.write(a2b({binascii.b2a_base64(block).rstrip()!r}))')
self._repl.exec('_f.close(); del _f, a2b')
self._repl.exec(f'_f.write(binascii.a2b_base64({binascii.b2a_base64(block).rstrip()!r}))')
self._repl.exec('_f.close(); del _f, binascii')
return len(data)

# read_text(), write_text()
Expand Down