diff --git a/.gitignore b/.gitignore
index b3fd8dd..7c0fe73 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
*.txt
*.json
__pycache__/*
+.DS_Store
\ No newline at end of file
diff --git a/README.md b/README.md
index 9ceb7c1..3c35136 100644
--- a/README.md
+++ b/README.md
@@ -9,14 +9,51 @@ You can run the script from the command line:
python3 spotify-backup.py playlists.txt
-or, to get a JSON dump, use:
+The browser authorization flow uses Spotify's Authorization Code with PKCE flow.
+If the bundled Spotify app client ID is rejected for your account, create your own
+Spotify app, add `http://127.0.0.1:43019/redirect` as a redirect URI, and run:
+
+ SPOTIFY_CLIENT_ID=your_client_id python3 spotify-backup.py playlists.txt
+
+or:
+
+ python3 spotify-backup.py playlists.txt --client-id=your_client_id
+
+or, to get a merged JSON export, use:
python3 spotify-backup.py playlists.json --format=json
+To merge your Liked Songs and playlists into one JSON export, use:
+
+ python3 spotify-backup.py playlist.json --dump=liked,playlists --format=json
+
+JSON exports are written as a single `Spotify Backup` playlist using your Spotify
+user ID. Tracks from all selected playlists are merged before writing this
+simplified shape:
+
+ {
+ "name": "Spotify Backup",
+ "id": "your_spotify_user_id",
+ "tracks": [
+ {
+ "artist": "Artist Name",
+ "name": "Track Name",
+ "album": "Album Name",
+ "thumbnail": "https://i.scdn.co/image/...",
+ "duration": "3:24",
+ "stream": null
+ }
+ ]
+ }
+
By default, it includes your playlists. To include your Liked Songs, you can use:
python3 spotify-backup.py playlists.txt --dump=liked,playlists
+All exports remove duplicate tracks by Spotify URI before writing the output, so
+the same track will not appear twice in either TXT or JSON files. Liked Albums are
+also deduplicated by Spotify album URI when included in TXT output.
+
If for some reason the browser-based authorization flow doesn't work, you can also [generate an OAuth token](https://developer.spotify.com/web-api/console/get-playlists/) on the developer site (with the `playlist-read-private` permission) and pass it with the `--token` option.
diff --git a/spotify-backup.py b/spotify-backup.py
index f272564..3e6d0f4 100755
--- a/spotify-backup.py
+++ b/spotify-backup.py
@@ -1,12 +1,15 @@
#!/usr/bin/env python3
import argparse
+import base64
import codecs
+import hashlib
import http.client
import http.server
import json
import logging
-import re
+import os
+import secrets
import sys
import time
import urllib.error
@@ -64,29 +67,88 @@ def list(self, url, params={}):
# Pops open a browser window for a user to log in and authorize API access.
@staticmethod
def authorize(client_id, scope):
+ code_verifier = SpotifyAPI._generate_code_verifier()
+ code_challenge = SpotifyAPI._generate_code_challenge(code_verifier)
+ state = secrets.token_urlsafe(16)
+ redirect_uri = SpotifyAPI._redirect_uri()
url = 'https://accounts.spotify.com/authorize?' + urllib.parse.urlencode({
- 'response_type': 'token',
+ 'response_type': 'code',
'client_id': client_id,
'scope': scope,
- 'redirect_uri': 'http://127.0.0.1:{}/redirect'.format(SpotifyAPI._SERVER_PORT)
+ 'redirect_uri': redirect_uri,
+ 'code_challenge_method': 'S256',
+ 'code_challenge': code_challenge,
+ 'state': state
})
+
+ # Start listening before opening the browser so the redirect cannot race the server startup.
+ server = SpotifyAPI._AuthorizationServer('127.0.0.1', SpotifyAPI._SERVER_PORT,
+ client_id, code_verifier, redirect_uri, state)
logging.info(f'Logging in (click if it doesn\'t open automatically): {url}')
webbrowser.open(url)
-
- # Start a simple, local HTTP server to listen for the authorization token... (i.e. a hack).
- server = SpotifyAPI._AuthorizationServer('127.0.0.1', SpotifyAPI._SERVER_PORT)
try:
while True:
server.handle_request()
except SpotifyAPI._Authorization as auth:
return SpotifyAPI(auth.access_token)
+ except SpotifyAPI._AuthorizationError as err:
+ logging.error(f'Authorization failed: {err}')
+ sys.exit(1)
+
+ @staticmethod
+ def _redirect_uri():
+ return 'http://127.0.0.1:{}/redirect'.format(SpotifyAPI._SERVER_PORT)
+
+ @staticmethod
+ def _generate_code_verifier():
+ alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~'
+ return ''.join(secrets.choice(alphabet) for _ in range(64))
+
+ @staticmethod
+ def _generate_code_challenge(code_verifier):
+ digest = hashlib.sha256(code_verifier.encode('ascii')).digest()
+ return base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
+
+ @staticmethod
+ def _exchange_authorization_code(client_id, code_verifier, redirect_uri, code):
+ data = urllib.parse.urlencode({
+ 'client_id': client_id,
+ 'grant_type': 'authorization_code',
+ 'code': code,
+ 'redirect_uri': redirect_uri,
+ 'code_verifier': code_verifier
+ }).encode('utf-8')
+ req = urllib.request.Request('https://accounts.spotify.com/api/token', data=data)
+ req.add_header('Content-Type', 'application/x-www-form-urlencoded')
+ try:
+ res = urllib.request.urlopen(req)
+ except urllib.error.HTTPError as err:
+ reader = codecs.getreader('utf-8')
+ message = err.reason
+ try:
+ error = json.load(reader(err))
+ message = error.get('error_description') or error.get('error') or message
+ except Exception:
+ pass
+ raise SpotifyAPI._AuthorizationError(message)
+
+ reader = codecs.getreader('utf-8')
+ response = json.load(reader(res))
+ access_token = response.get('access_token')
+ if not access_token:
+ raise SpotifyAPI._AuthorizationError('Spotify did not return an access token')
+ return access_token
# The port that the local server listens on. Don't change this,
# as Spotify only will redirect to certain predefined URLs.
_SERVER_PORT = 43019
class _AuthorizationServer(http.server.HTTPServer):
- def __init__(self, host, port):
+ def __init__(self, host, port, client_id, code_verifier, redirect_uri, state):
+ self.client_id = client_id
+ self.code_verifier = code_verifier
+ self.redirect_uri = redirect_uri
+ self.state = state
http.server.HTTPServer.__init__(self, (host, port), SpotifyAPI._AuthorizationHandler)
# Disable the default error handling.
@@ -95,27 +157,44 @@ def handle_error(self, request, client_address):
class _AuthorizationHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
- # The Spotify API has redirected here, but access_token is hidden in the URL fragment.
- # Read it using JavaScript and send it to /token as an actual query string...
- if self.path.startswith('/redirect'):
- self.send_response(200)
- self.send_header('Content-Type', 'text/html')
- self.end_headers()
- self.wfile.write(b'')
-
- # Read access_token and use an exception to kill the server listening...
- elif self.path.startswith('/token?'):
- self.send_response(200)
- self.send_header('Content-Type', 'text/html')
- self.end_headers()
- self.wfile.write(b'Thanks! You may now close this window.')
-
- access_token = re.search('access_token=([^&]*)', self.path).group(1)
- logging.info(f'Received access token from Spotify: {access_token}')
- raise SpotifyAPI._Authorization(access_token)
-
- else:
+ parsed_url = urllib.parse.urlparse(self.path)
+ if parsed_url.path != '/redirect':
self.send_error(404)
+ return
+
+ params = urllib.parse.parse_qs(parsed_url.query)
+ error = params.get('error', [None])[0]
+ if error:
+ error_description = params.get('error_description', [error])[0]
+ self._send_authorization_error(400, f'Spotify returned: {error_description}')
+
+ if params.get('state', [None])[0] != self.server.state:
+ self._send_authorization_error(400, 'Spotify response state did not match the request')
+
+ code = params.get('code', [None])[0]
+ if not code:
+ self._send_authorization_error(400, 'Spotify did not return an authorization code')
+
+ try:
+ access_token = SpotifyAPI._exchange_authorization_code(self.server.client_id,
+ self.server.code_verifier,
+ self.server.redirect_uri,
+ code)
+ except SpotifyAPI._AuthorizationError as err:
+ self._send_authorization_error(500, f'Could not exchange authorization code: {err}')
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html')
+ self.end_headers()
+ self.wfile.write(b'Thanks! You may now close this window.')
+ logging.info('Received access token from Spotify.')
+ raise SpotifyAPI._Authorization(access_token)
+
+ def _send_authorization_error(self, status, message):
+ self.send_response(status)
+ self.send_header('Content-Type', 'text/html')
+ self.end_headers()
+ self.wfile.write(message.encode('utf-8'))
+ raise SpotifyAPI._AuthorizationError(message)
# Disable the default logging.
def log_message(self, format, *args):
@@ -125,6 +204,115 @@ class _Authorization(Exception):
def __init__(self, access_token):
self.access_token = access_token
+ class _AuthorizationError(Exception):
+ pass
+
+
+def format_duration(duration_ms):
+ if duration_ms is None:
+ return ''
+ total_seconds = int(duration_ms) // 1000
+ minutes, seconds = divmod(total_seconds, 60)
+ hours, minutes = divmod(minutes, 60)
+ if hours:
+ return f'{hours}:{minutes:02}:{seconds:02}'
+ return f'{minutes}:{seconds:02}'
+
+
+def thumbnail_url(album):
+ images = album.get('images') or []
+ if not images:
+ return None
+ return min(images, key=lambda image: (image.get('height') or sys.maxsize) *
+ (image.get('width') or sys.maxsize)).get('url')
+
+
+def simplified_track(track):
+ album = track.get('album') or {}
+ artists = track.get('artists') or []
+ artist_names = [artist['name'] for artist in artists if artist.get('name')]
+ return {
+ 'artist': ', '.join(artist_names),
+ 'name': track.get('name'),
+ 'album': album.get('name'),
+ 'thumbnail': thumbnail_url(album),
+ 'duration': format_duration(track.get('duration_ms')),
+ 'stream': None
+ }
+
+
+def track_uri_from_item(item):
+ if not isinstance(item, dict):
+ return None
+ track = item.get('track')
+ if not isinstance(track, dict):
+ return None
+ return track.get('uri')
+
+
+def album_uri_from_item(item):
+ if not isinstance(item, dict):
+ return None
+ album = item.get('album')
+ if not isinstance(album, dict):
+ return None
+ return album.get('uri')
+
+
+def dedupe_playlist_tracks(playlists):
+ seen = set()
+ removed = 0
+ for playlist in playlists:
+ deduped_tracks = []
+ for item in playlist['tracks']:
+ uri = track_uri_from_item(item)
+ if uri:
+ if uri in seen:
+ removed += 1
+ continue
+ seen.add(uri)
+ deduped_tracks.append(item)
+ playlist['tracks'] = deduped_tracks
+ return removed
+
+
+def dedupe_albums(albums):
+ seen = set()
+ deduped_albums = []
+ removed = 0
+ for item in albums:
+ uri = album_uri_from_item(item)
+ if uri:
+ if uri in seen:
+ removed += 1
+ continue
+ seen.add(uri)
+ deduped_albums.append(item)
+ return deduped_albums, removed
+
+
+def merged_json_export(user, playlists):
+ tracks = []
+ seen = set()
+ for playlist in playlists:
+ for item in playlist['tracks']:
+ if not isinstance(item, dict):
+ continue
+ track = item.get('track')
+ if track is None:
+ continue
+ uri = track_uri_from_item(item)
+ if uri:
+ if uri in seen:
+ continue
+ seen.add(uri)
+ tracks.append(simplified_track(track))
+ return {
+ 'name': 'Spotify Backup',
+ 'id': user['id'],
+ 'tracks': tracks
+ }
+
def main():
# Parse arguments.
@@ -136,6 +324,8 @@ def main():
parser.add_argument('--dump', default='playlists', choices=['liked,playlists', 'playlists,liked', 'playlists', 'liked'],
help='dump playlists or liked songs, or both (default: playlists)')
parser.add_argument('--format', default='txt', choices=['json', 'txt'], help='output format (default: txt)')
+ parser.add_argument('--client-id', default=os.environ.get('SPOTIFY_CLIENT_ID'),
+ help='Spotify application client ID (default: SPOTIFY_CLIENT_ID or bundled client ID)')
parser.add_argument('file', help='output filename', nargs='?')
args = parser.parse_args()
@@ -148,7 +338,7 @@ def main():
if args.token:
spotify = SpotifyAPI(args.token)
else:
- spotify = SpotifyAPI.authorize(client_id='5c098bcc800e45d49e476265bc9b6934',
+ spotify = SpotifyAPI.authorize(client_id=args.client_id or '5c098bcc800e45d49e476265bc9b6934',
scope='playlist-read-private playlist-read-collaborative user-library-read')
# Get the ID of the logged in user.
@@ -169,7 +359,7 @@ def main():
# List all playlists and the tracks in each playlist
if 'playlists' in args.dump:
logging.info('Loading playlists...')
- playlist_data = spotify.list('users/{user_id}/playlists'.format(user_id=me['id']), {'limit': 50})
+ playlist_data = spotify.list('me/playlists', {'limit': 50})
logging.info(f'Found {len(playlist_data)} playlists')
# List all tracks in each playlist
@@ -177,16 +367,22 @@ def main():
logging.info('Loading playlist: {name} ({tracks[total]} songs)'.format(**playlist))
playlist['tracks'] = spotify.list(playlist['tracks']['href'], {'limit': 100})
playlists += playlist_data
+
+ duplicate_tracks = dedupe_playlist_tracks(playlists)
+ if duplicate_tracks:
+ logging.info(f'Removed {duplicate_tracks} duplicate tracks by Spotify URI')
+ liked_albums, duplicate_albums = dedupe_albums(liked_albums)
+ if duplicate_albums:
+ logging.info(f'Removed {duplicate_albums} duplicate albums by Spotify URI')
# Write the file.
logging.info('Writing files...')
with open(args.file, 'w', encoding='utf-8') as f:
# JSON file.
if args.format == 'json':
- json.dump({
- 'playlists': playlists,
- 'albums': liked_albums
- }, f)
+ export = merged_json_export(me, playlists)
+ json.dump(export, f, ensure_ascii=False, indent=2)
+ logging.info(f'Merged {len(export["tracks"])} unique tracks into JSON export')
# Tab-separated file.
else: