Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 37 additions & 41 deletions roomba/password.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__version__ = "2.0a"
__version__ = "2.0b"
'''
Python 3.6
Quick Program to get blid and password from roomba
Expand Down Expand Up @@ -36,7 +36,7 @@ class Password(object):

config_dicts = ['data', 'mapsize', 'pmaps', 'regions']

def __init__(self, address='255.255.255.255', file=".\config.ini", login=[]):
def __init__(self, address='255.255.255.255', file="./config.ini", login=[]):
self.address = address
self.file = file
self.login = None
Expand All @@ -61,7 +61,7 @@ def read_config_file(self):
return roombas

def receive_udp(self):
#set up UDP socket to receive data from robot

port = 5678
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(10)
Expand Down Expand Up @@ -176,59 +176,55 @@ def get_password(self):
return self.save_config_file(file_roombas)

def get_password_from_roomba(self, addr):
'''
Send MQTT magic packet to addr
this is 0xf0 (mqtt reserved) 0x05(data length) 0xefcc3b2900 (data)
Should receive 37 bytes containing the password for roomba at addr
This is is 0xf0 (mqtt RESERVED) length (0x23 = 35) 0xefcc3b2900 (magic packet),
followed by 0xXXXX... (30 bytes of password). so 7 bytes, followed by 30 bytes of password
total of 37 bytes
Uses 10 second timeout for socket connection
'''

data = b''
packet = bytes.fromhex('f005efcc3b2900')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
#context = ssl.SSLContext(ssl.PROTOCOL_TLS)

# Create an SSL context with unsafe legacy renegotiation enabled
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
#context.set_ciphers('DEFAULT@SECLEVEL=1:HIGH:!DH:!aNULL')
wrappedSocket = context.wrap_socket(sock)


# Enable unsafe legacy renegotiation
context.options |= ssl.OP_LEGACY_SERVER_CONNECT
context.set_ciphers("DEFAULT:@SECLEVEL=1")

try:
wrappedSocket.connect((addr, 8883))
self.log.debug('Connection Successful')
wrappedSocket.send(packet)
self.log.debug('Waiting for data')

while len(data) < 37:
data_received = wrappedSocket.recv(1024)
data+= data_received
if len(data_received) == 0:
self.log.info("socket closed")
break

wrappedSocket.close()
return data

with context.wrap_socket(sock, server_hostname=addr) as wrappedSocket:
wrappedSocket.connect((addr, 8883))
self.log.debug('Connection Successful')
wrappedSocket.send(packet)
self.log.debug('Waiting for data')

while len(data) < 37:
data_received = wrappedSocket.recv(1024)
data += data_received
if len(data_received) == 0:
self.log.info("Socket closed")
break

return data

except ssl.SSLError as e:
self.log.error(f'SSL Error: {e}')
self.log.error('Ensure the Roomba firmware supports legacy TLS renegotiation.')
except socket.timeout as e:
self.log.error('Connection Timeout Error (for {}): {}'.format(addr, e))
self.log.error(f'Connection Timeout Error (for {addr}): {e}')
except (ConnectionRefusedError, OSError) as e:
if e.errno == 111: #errno.ECONNREFUSED
self.log.error('Unable to Connect to roomba at ip {}, make sure nothing else is connected (app?), '
'as only one connection at a time is allowed'.format(addr))
elif e.errno == 113: #errno.No Route to Host
self.log.error('Unable to contact roomba on ip {} is the ip correct?'.format(addr))
if e.errno == 111: # errno.ECONNREFUSED
self.log.error(f'Unable to connect to Roomba at IP {addr}. Ensure no other connections are active.')
elif e.errno == 113: # errno.No Route to Host
self.log.error(f'Unable to contact Roomba at IP {addr}. Is the IP correct?')
else:
self.log.error("Connection Error (for {}): {}".format(addr, e))
self.log.error(f"Connection Error (for {addr}): {e}")
except Exception as e:
self.log.exception(e)

self.log.error('Unable to get password from roomba')
self.log.error('Unable to get password from Roomba')
return data

def save_config_file(self, roomba):
Config = configparser.ConfigParser()
if roomba:
Expand Down