Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# EditorConfig is awesome: https://EditorConfig.org

# top-most EditorConfig file
root = true

[*]
indent_style = space
indent_size = 4
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = false
insert_final_newline = false
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.python-version
73 changes: 56 additions & 17 deletions tapo-cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/python3
#!/usr/bin/env python3

# DO NOT RUN ANY OF THIS CODE UNLESS YOU UNDERSTAND WHAT IT DOES
# I TAKE NO RESPONSIBILITY FOR ANYTHING, USE ON YOUR OWN RISK
Expand All @@ -18,13 +18,30 @@
import time
import json
import datetime
import re
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad

# Secrets extracted from the .apk
access_key = '4d11b6b9d5ea4d19a829adbb9714b057'
secret = '6ed7d97f3e73467f8a5bab90b577ba4c'

DEVICE_TYPES_ALL = [
"SMART.TAPOPLUG",
"SMART.TAPOBULB",
"SMART.IPCAMERA",
"SMART.TAPOROBOVAC",
"SMART.TAPOHUB",
"SMART.TAPOSENSOR",
"SMART.TAPOSWITCH",
"SMART.TAPODOORBELL"
]

DEVICE_TYPES_CAMERA = [
"SMART.IPCAMERA",
"SMART.TAPODOORBELL"
]

# Every request needs a uuid nonce and time, any value seems to work but let's not raise any suspicions.
nonce = str(uuid.uuid1())
now = str(int(time.time()))
Expand Down Expand Up @@ -56,7 +73,23 @@ def get_config():
email = config['email']
app_server_url_post = config['appServerUrl']

return token, email, app_server_url_post, 'https://euw1-app-tapo-care.i.tplinknbu.com'
# Derive region from appServerUrl
# e.g. https://n-aps1-wap-gw.tplinkcloud.com -> aps1
# e.g. https://n-euw1-wap-gw.tplinkcloud.com -> euw1
region_match = re.search(r'https://n-([a-z0-9]+)-wap-gw', app_server_url_post)
if region_match:
region = region_match.group(1)
api_url = f'https://{region}-app-tapo-care.i.tplinknbu.com'
else:
# Fallback to a default if regex fails, or check if it's eu-wap
if 'eu-wap' in app_server_url_post:
api_url = 'https://euw1-app-tapo-care.i.tplinknbu.com'
else:
# Default to aps1 or raise error?
# Defaulting to aps1 as it seems to be common for non-EU
api_url = 'https://aps1-app-tapo-care.i.tplinknbu.com'

return token, email, app_server_url_post, api_url
except:
print('Please login first.')
exit(1)
Expand Down Expand Up @@ -146,7 +179,7 @@ def login(username, password):
res = post(url, content, headers_post(content, '/api/v2/account/login'))
if (res['error_code'] != 0):
error(res)

config = json.dumps(res['result'], indent = 4)

# Login but with extra steps
Expand All @@ -157,11 +190,11 @@ def login(username, password):
content = json.dumps(content)
res = post(url, content, headers_post(content, '/api/v2/account/getPushVC4TerminalMFA'))
if (res['error_code'] != 0):
error(res)
error(res)

print('Check your Tapo App for the MFA code!')
mfa_code = str(input('MFA Code (no spaces or dashes): '))

url = 'https://n-wap-gw.tplinkcloud.com/api/v2/account/checkMFACodeAndLogin'
content = {"appType":"TP-Link_Tapo_Android","cloudUserName":username,"code":mfa_code,"MFAProcessId":mfa_process_id,"MFAType":1,"terminalBindEnabled":True}
content = json.dumps(content)
Expand Down Expand Up @@ -200,7 +233,7 @@ def devices():
"""Lists your first 20 Tapo devices."""
get_config() # Checks if logged in
endpoint = '/api/v2/common/getDeviceListByPage'
content = '{"deviceTypeList":["SMART.TAPOPLUG","SMART.TAPOBULB","SMART.IPCAMERA","SMART.TAPOROBOVAC","SMART.TAPOHUB","SMART.TAPOSENSOR","SMART.TAPOSWITCH"],"index":0,"limit":20}'
content = json.dumps({"deviceTypeList": DEVICE_TYPES_ALL, "index": 0, "limit": 20})
res = probe_endpoint_post(content, endpoint)
print(json.dumps(res, indent = 4))

Expand All @@ -209,7 +242,7 @@ def devices_limit():
"""Lists the device limits for your account by device type."""
get_config() # Checks if logged in
endpoint = '/api/v2/common/batchGetDeviceUserNumberLimit'
content = '{"deviceTypeList":["SMART.TAPOPLUG","SMART.TAPOBULB","SMART.IPCAMERA","SMART.TAPOROBOVAC","SMART.TAPOHUB","SMART.TAPOSENSOR","SMART.TAPOSWITCH"]}'
content = json.dumps({"deviceTypeList": DEVICE_TYPES_ALL})
res = probe_endpoint_post(content, endpoint)
print(json.dumps(res, indent = 4))

Expand All @@ -218,7 +251,7 @@ def devices_info():
"""Lists A LOT of parameters for your devices."""
get_config() # Checks if logged in
endpoint = '/api/v2/common/getDeviceListByPage'
content = '{"deviceTypeList":["SMART.TAPOPLUG","SMART.TAPOBULB","SMART.IPCAMERA","SMART.TAPOROBOVAC","SMART.TAPOHUB","SMART.TAPOSENSOR","SMART.TAPOSWITCH"],"index":0,"limit":20}'
content = json.dumps({"deviceTypeList": DEVICE_TYPES_ALL, "index": 0, "limit": 20})
devs = probe_endpoint_post(content, endpoint)

endpoint = '/api/v2/common/passthrough'
Expand All @@ -234,7 +267,7 @@ def devices_info():
@click.command()
def service_urls():
"""Lists URLs for various Tapo services."""
get_config() # Checks if logged in
get_config() # Checks if logged in
endpoint = '/api/v2/common/getAppServiceUrl'
content = '{"serviceIds":["nbu.iot-app-server.app","nbu.iot-cloud-gateway.app","nbu.iot-security.appdevice","cipc.api"]}'
res = probe_endpoint_post(content, endpoint)
Expand Down Expand Up @@ -279,9 +312,9 @@ def list_videos(days):
"""Lists videos for the last X days."""
get_config() # Checks if logged in
endpoint = '/api/v2/common/getDeviceListByPage'
content = '{"deviceTypeList":["SMART.IPCAMERA"],"index":0,"limit":20}'
content = json.dumps({"deviceTypeList": DEVICE_TYPES_CAMERA, "index": 0, "limit": 20})
devs = probe_endpoint_post(content, endpoint)

end_unixtime = time.time() + 86400
start_unixtime = end_unixtime - (days + 1) * 86400
end_time = datetime.datetime.utcfromtimestamp(end_unixtime).strftime('%Y-%m-%d 00:00:00')
Expand All @@ -291,6 +324,9 @@ def list_videos(days):
for dev in devs['deviceList']:
params = 'deviceId=' + dev['deviceId'] + '&page=0&pageSize=3000&order=desc&startTime=' + start_time + '&endTime=' + end_time
videos = probe_endpoint_get(params, endpoint)
if 'total' not in videos:
print('\nError getting videos for ' + dev['alias'] + ': ' + str(videos))
continue
print('\nFound ' + str(videos['total']) + ' videos for ' + dev['alias'] + ':')
if 'index' in videos:
for video in videos['index']:
Expand All @@ -305,14 +341,14 @@ def list_videos(days):
def download_videos(days, path, overwrite):
"""Downloads videos for the last X days to path."""
get_config() # Checks if logged in

path = path if path[-1] == '/' else path + '/'
path = os.path.expanduser(path)

endpoint = '/api/v2/common/getDeviceListByPage'
content = '{"deviceTypeList":["SMART.IPCAMERA"],"index":0,"limit":20}'
content = json.dumps({"deviceTypeList": DEVICE_TYPES_CAMERA, "index": 0, "limit": 20})
devs = probe_endpoint_post(content, endpoint)

end_unixtime = time.time() + 86400
start_unixtime = end_unixtime - (days + 1) * 86400
end_time = datetime.datetime.utcfromtimestamp(end_unixtime).strftime('%Y-%m-%d 00:00:00')
Expand All @@ -323,6 +359,9 @@ def download_videos(days, path, overwrite):
for dev in devs['deviceList']:
params = 'deviceId=' + dev['deviceId'] + '&page=0&pageSize=3000&order=desc&startTime=' + start_time + '&endTime=' + end_time
videos = probe_endpoint_get(params, endpoint)
if 'total' not in videos:
print('\nError getting videos for ' + dev['alias'] + ': ' + str(videos))
continue
print('\nFound ' + str(videos['total']) + ' videos for ' + dev['alias'] + ':')
if 'index' in videos:
for video in videos['index']:
Expand All @@ -338,11 +377,11 @@ def download_videos(days, path, overwrite):
exit(1)

key_b64 = video['video'][0]['decryptionInfo']['key']

file_path = path + dev['alias'] + '/' + datetime.datetime.strptime(video['eventLocalTime'], '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d') + '/'
file_name = video['eventLocalTime'].replace(':','-') + '.mp4'
if os.path.exists(file_path + file_name) and overwrite == 0:
print('Already exists ' + file_path + file_name)
print('Already exists ' + file_path + file_name)
result.append({'file': file_path + file_name, 'device': dev['alias'], 'new_video': False, 'video': video})
else:
print('Downloading to ' + file_path + file_name)
Expand Down