diff --git a/.gitignore b/.gitignore index b3fd8dd..7c0fe73 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *.txt *.json __pycache__/* +.DS_Store \ No newline at end of file diff --git a/README.md b/README.md index 9ceb7c1..3c35136 100644 --- a/README.md +++ b/README.md @@ -9,14 +9,51 @@ You can run the script from the command line: python3 spotify-backup.py playlists.txt -or, to get a JSON dump, use: +The browser authorization flow uses Spotify's Authorization Code with PKCE flow. +If the bundled Spotify app client ID is rejected for your account, create your own +Spotify app, add `http://127.0.0.1:43019/redirect` as a redirect URI, and run: + + SPOTIFY_CLIENT_ID=your_client_id python3 spotify-backup.py playlists.txt + +or: + + python3 spotify-backup.py playlists.txt --client-id=your_client_id + +or, to get a merged JSON export, use: python3 spotify-backup.py playlists.json --format=json +To merge your Liked Songs and playlists into one JSON export, use: + + python3 spotify-backup.py playlist.json --dump=liked,playlists --format=json + +JSON exports are written as a single `Spotify Backup` playlist using your Spotify +user ID. Tracks from all selected playlists are merged before writing this +simplified shape: + + { + "name": "Spotify Backup", + "id": "your_spotify_user_id", + "tracks": [ + { + "artist": "Artist Name", + "name": "Track Name", + "album": "Album Name", + "thumbnail": "https://i.scdn.co/image/...", + "duration": "3:24", + "stream": null + } + ] + } + By default, it includes your playlists. To include your Liked Songs, you can use: python3 spotify-backup.py playlists.txt --dump=liked,playlists +All exports remove duplicate tracks by Spotify URI before writing the output, so +the same track will not appear twice in either TXT or JSON files. Liked Albums are +also deduplicated by Spotify album URI when included in TXT output. + If for some reason the browser-based authorization flow doesn't work, you can also [generate an OAuth token](https://developer.spotify.com/web-api/console/get-playlists/) on the developer site (with the `playlist-read-private` permission) and pass it with the `--token` option. diff --git a/spotify-backup.py b/spotify-backup.py index f272564..3e6d0f4 100755 --- a/spotify-backup.py +++ b/spotify-backup.py @@ -1,12 +1,15 @@ #!/usr/bin/env python3 import argparse +import base64 import codecs +import hashlib import http.client import http.server import json import logging -import re +import os +import secrets import sys import time import urllib.error @@ -64,29 +67,88 @@ def list(self, url, params={}): # Pops open a browser window for a user to log in and authorize API access. @staticmethod def authorize(client_id, scope): + code_verifier = SpotifyAPI._generate_code_verifier() + code_challenge = SpotifyAPI._generate_code_challenge(code_verifier) + state = secrets.token_urlsafe(16) + redirect_uri = SpotifyAPI._redirect_uri() url = 'https://accounts.spotify.com/authorize?' + urllib.parse.urlencode({ - 'response_type': 'token', + 'response_type': 'code', 'client_id': client_id, 'scope': scope, - 'redirect_uri': 'http://127.0.0.1:{}/redirect'.format(SpotifyAPI._SERVER_PORT) + 'redirect_uri': redirect_uri, + 'code_challenge_method': 'S256', + 'code_challenge': code_challenge, + 'state': state }) + + # Start listening before opening the browser so the redirect cannot race the server startup. + server = SpotifyAPI._AuthorizationServer('127.0.0.1', SpotifyAPI._SERVER_PORT, + client_id, code_verifier, redirect_uri, state) logging.info(f'Logging in (click if it doesn\'t open automatically): {url}') webbrowser.open(url) - - # Start a simple, local HTTP server to listen for the authorization token... (i.e. a hack). - server = SpotifyAPI._AuthorizationServer('127.0.0.1', SpotifyAPI._SERVER_PORT) try: while True: server.handle_request() except SpotifyAPI._Authorization as auth: return SpotifyAPI(auth.access_token) + except SpotifyAPI._AuthorizationError as err: + logging.error(f'Authorization failed: {err}') + sys.exit(1) + + @staticmethod + def _redirect_uri(): + return 'http://127.0.0.1:{}/redirect'.format(SpotifyAPI._SERVER_PORT) + + @staticmethod + def _generate_code_verifier(): + alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~' + return ''.join(secrets.choice(alphabet) for _ in range(64)) + + @staticmethod + def _generate_code_challenge(code_verifier): + digest = hashlib.sha256(code_verifier.encode('ascii')).digest() + return base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii') + + @staticmethod + def _exchange_authorization_code(client_id, code_verifier, redirect_uri, code): + data = urllib.parse.urlencode({ + 'client_id': client_id, + 'grant_type': 'authorization_code', + 'code': code, + 'redirect_uri': redirect_uri, + 'code_verifier': code_verifier + }).encode('utf-8') + req = urllib.request.Request('https://accounts.spotify.com/api/token', data=data) + req.add_header('Content-Type', 'application/x-www-form-urlencoded') + try: + res = urllib.request.urlopen(req) + except urllib.error.HTTPError as err: + reader = codecs.getreader('utf-8') + message = err.reason + try: + error = json.load(reader(err)) + message = error.get('error_description') or error.get('error') or message + except Exception: + pass + raise SpotifyAPI._AuthorizationError(message) + + reader = codecs.getreader('utf-8') + response = json.load(reader(res)) + access_token = response.get('access_token') + if not access_token: + raise SpotifyAPI._AuthorizationError('Spotify did not return an access token') + return access_token # The port that the local server listens on. Don't change this, # as Spotify only will redirect to certain predefined URLs. _SERVER_PORT = 43019 class _AuthorizationServer(http.server.HTTPServer): - def __init__(self, host, port): + def __init__(self, host, port, client_id, code_verifier, redirect_uri, state): + self.client_id = client_id + self.code_verifier = code_verifier + self.redirect_uri = redirect_uri + self.state = state http.server.HTTPServer.__init__(self, (host, port), SpotifyAPI._AuthorizationHandler) # Disable the default error handling. @@ -95,27 +157,44 @@ def handle_error(self, request, client_address): class _AuthorizationHandler(http.server.BaseHTTPRequestHandler): def do_GET(self): - # The Spotify API has redirected here, but access_token is hidden in the URL fragment. - # Read it using JavaScript and send it to /token as an actual query string... - if self.path.startswith('/redirect'): - self.send_response(200) - self.send_header('Content-Type', 'text/html') - self.end_headers() - self.wfile.write(b'') - - # Read access_token and use an exception to kill the server listening... - elif self.path.startswith('/token?'): - self.send_response(200) - self.send_header('Content-Type', 'text/html') - self.end_headers() - self.wfile.write(b'Thanks! You may now close this window.') - - access_token = re.search('access_token=([^&]*)', self.path).group(1) - logging.info(f'Received access token from Spotify: {access_token}') - raise SpotifyAPI._Authorization(access_token) - - else: + parsed_url = urllib.parse.urlparse(self.path) + if parsed_url.path != '/redirect': self.send_error(404) + return + + params = urllib.parse.parse_qs(parsed_url.query) + error = params.get('error', [None])[0] + if error: + error_description = params.get('error_description', [error])[0] + self._send_authorization_error(400, f'Spotify returned: {error_description}') + + if params.get('state', [None])[0] != self.server.state: + self._send_authorization_error(400, 'Spotify response state did not match the request') + + code = params.get('code', [None])[0] + if not code: + self._send_authorization_error(400, 'Spotify did not return an authorization code') + + try: + access_token = SpotifyAPI._exchange_authorization_code(self.server.client_id, + self.server.code_verifier, + self.server.redirect_uri, + code) + except SpotifyAPI._AuthorizationError as err: + self._send_authorization_error(500, f'Could not exchange authorization code: {err}') + self.send_response(200) + self.send_header('Content-Type', 'text/html') + self.end_headers() + self.wfile.write(b'Thanks! You may now close this window.') + logging.info('Received access token from Spotify.') + raise SpotifyAPI._Authorization(access_token) + + def _send_authorization_error(self, status, message): + self.send_response(status) + self.send_header('Content-Type', 'text/html') + self.end_headers() + self.wfile.write(message.encode('utf-8')) + raise SpotifyAPI._AuthorizationError(message) # Disable the default logging. def log_message(self, format, *args): @@ -125,6 +204,115 @@ class _Authorization(Exception): def __init__(self, access_token): self.access_token = access_token + class _AuthorizationError(Exception): + pass + + +def format_duration(duration_ms): + if duration_ms is None: + return '' + total_seconds = int(duration_ms) // 1000 + minutes, seconds = divmod(total_seconds, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f'{hours}:{minutes:02}:{seconds:02}' + return f'{minutes}:{seconds:02}' + + +def thumbnail_url(album): + images = album.get('images') or [] + if not images: + return None + return min(images, key=lambda image: (image.get('height') or sys.maxsize) * + (image.get('width') or sys.maxsize)).get('url') + + +def simplified_track(track): + album = track.get('album') or {} + artists = track.get('artists') or [] + artist_names = [artist['name'] for artist in artists if artist.get('name')] + return { + 'artist': ', '.join(artist_names), + 'name': track.get('name'), + 'album': album.get('name'), + 'thumbnail': thumbnail_url(album), + 'duration': format_duration(track.get('duration_ms')), + 'stream': None + } + + +def track_uri_from_item(item): + if not isinstance(item, dict): + return None + track = item.get('track') + if not isinstance(track, dict): + return None + return track.get('uri') + + +def album_uri_from_item(item): + if not isinstance(item, dict): + return None + album = item.get('album') + if not isinstance(album, dict): + return None + return album.get('uri') + + +def dedupe_playlist_tracks(playlists): + seen = set() + removed = 0 + for playlist in playlists: + deduped_tracks = [] + for item in playlist['tracks']: + uri = track_uri_from_item(item) + if uri: + if uri in seen: + removed += 1 + continue + seen.add(uri) + deduped_tracks.append(item) + playlist['tracks'] = deduped_tracks + return removed + + +def dedupe_albums(albums): + seen = set() + deduped_albums = [] + removed = 0 + for item in albums: + uri = album_uri_from_item(item) + if uri: + if uri in seen: + removed += 1 + continue + seen.add(uri) + deduped_albums.append(item) + return deduped_albums, removed + + +def merged_json_export(user, playlists): + tracks = [] + seen = set() + for playlist in playlists: + for item in playlist['tracks']: + if not isinstance(item, dict): + continue + track = item.get('track') + if track is None: + continue + uri = track_uri_from_item(item) + if uri: + if uri in seen: + continue + seen.add(uri) + tracks.append(simplified_track(track)) + return { + 'name': 'Spotify Backup', + 'id': user['id'], + 'tracks': tracks + } + def main(): # Parse arguments. @@ -136,6 +324,8 @@ def main(): parser.add_argument('--dump', default='playlists', choices=['liked,playlists', 'playlists,liked', 'playlists', 'liked'], help='dump playlists or liked songs, or both (default: playlists)') parser.add_argument('--format', default='txt', choices=['json', 'txt'], help='output format (default: txt)') + parser.add_argument('--client-id', default=os.environ.get('SPOTIFY_CLIENT_ID'), + help='Spotify application client ID (default: SPOTIFY_CLIENT_ID or bundled client ID)') parser.add_argument('file', help='output filename', nargs='?') args = parser.parse_args() @@ -148,7 +338,7 @@ def main(): if args.token: spotify = SpotifyAPI(args.token) else: - spotify = SpotifyAPI.authorize(client_id='5c098bcc800e45d49e476265bc9b6934', + spotify = SpotifyAPI.authorize(client_id=args.client_id or '5c098bcc800e45d49e476265bc9b6934', scope='playlist-read-private playlist-read-collaborative user-library-read') # Get the ID of the logged in user. @@ -169,7 +359,7 @@ def main(): # List all playlists and the tracks in each playlist if 'playlists' in args.dump: logging.info('Loading playlists...') - playlist_data = spotify.list('users/{user_id}/playlists'.format(user_id=me['id']), {'limit': 50}) + playlist_data = spotify.list('me/playlists', {'limit': 50}) logging.info(f'Found {len(playlist_data)} playlists') # List all tracks in each playlist @@ -177,16 +367,22 @@ def main(): logging.info('Loading playlist: {name} ({tracks[total]} songs)'.format(**playlist)) playlist['tracks'] = spotify.list(playlist['tracks']['href'], {'limit': 100}) playlists += playlist_data + + duplicate_tracks = dedupe_playlist_tracks(playlists) + if duplicate_tracks: + logging.info(f'Removed {duplicate_tracks} duplicate tracks by Spotify URI') + liked_albums, duplicate_albums = dedupe_albums(liked_albums) + if duplicate_albums: + logging.info(f'Removed {duplicate_albums} duplicate albums by Spotify URI') # Write the file. logging.info('Writing files...') with open(args.file, 'w', encoding='utf-8') as f: # JSON file. if args.format == 'json': - json.dump({ - 'playlists': playlists, - 'albums': liked_albums - }, f) + export = merged_json_export(me, playlists) + json.dump(export, f, ensure_ascii=False, indent=2) + logging.info(f'Merged {len(export["tracks"])} unique tracks into JSON export') # Tab-separated file. else: