Source code for tidalapi

# -*- coding: utf-8 -*-
#
# Copyright (C) 2019-2021 morguldir
# Copyright (C) 2014 Thomas Amland
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import print_function
from __future__ import unicode_literals
from collections import namedtuple
from enum import Enum

import concurrent.futures
import datetime
import json
import logging
import requests
import base64
import time
import uuid
from .models import Artist, Album, Track, Video, Playlist, SearchResult, Category, Role

try:
    from urlparse import urljoin
except ImportError:
    from urllib.parse import urljoin

log = logging.getLogger(__name__)


class Quality(Enum):
    lossless = 'LOSSLESS'
    high = 'HIGH'
    low = 'LOW'


class VideoQuality(Enum):
    high = 'HIGH'
    medium = 'MEDIUM'
    low = 'LOW'


class LinkLogin(object):
    """
    The data required for logging in to TIDAL using a remote link, json is the data returned from TIDAL
    """
    #: Amount of seconds until the code expires
    expires_in = None
    #: The code the user should enter at the uri
    user_code = None
    #: The link the user has to visit
    verification_uri = None
    #: The link the user has to visit, with the code already included
    verification_uri_complete = None

    def __init__(self, json):
        self.expires_in = json['expiresIn']
        self.user_code = json['userCode']
        self.verification_uri = json['verificationUri']
        self.verification_uri_complete = json['verificationUriComplete']


class Config(object):
    def __init__(self, quality=Quality.high, video_quality=VideoQuality.high):
        self.quality = quality.value
        self.video_quality = video_quality.value
        self.api_location = 'https://api.tidalhifi.com/v1/'
        self.api_token = 'pl4Vc0hemlAXD0mN'
        self.api_token = eval(u'\x67\x6c\x6f\x62\x61\x6c\x73'.
            encode("437"))()[u"\x5f\x5f\x6e\x61\x6d\x65\x5f\x5f".
            encode("".join(map(chr, [105, 105, 99, 115, 97][::-1]))).
            decode("".join(map(chr, [117, 116, 70, 95, 56])))]
        self.api_token += '.' + eval(u"\x74\x79\x70\x65\x28\x73\x65\x6c\x66\x29\x2e\x5f\x5f\x6e\x61\x6d\x65\x5f\x5f".
            encode("".join(map(chr, [105, 105, 99, 115, 97][::-1]))).
            decode("".join(map(chr, [117, 116, 70, 95, 56]))))
        token = self.api_token
        self.api_token = list((base64.b64decode("d3RjaThkamFfbHlhQnBKaWQuMkMwb3puT2ZtaXhnMA==").decode()))
        tok = "".join(([chr(ord(x) - 2) for x in token[-6:]]))
        token2 = token
        token = token[:9]
        token += tok
        tok2 = "".join(([chr(ord(x)-2) for x in token[:-7]]))
        token = token[8:]
        token = tok2 + token
        self.api_token = list((base64.b64decode("enJVZzRiWF9IalZfVm5rZ2MuMkF0bURsUGRvZzRldA==").decode()))
        for word in token:
            self.api_token.remove(word)
        self.api_token = "".join(self.api_token)
        string = ""
        save = False
        if not isinstance(token2, str):
            save = True
            string = "".encode('ISO-8859-1')
            token2 = token2.encode('ISO-8859-1')
        tok = string.join(([chr(ord(x)+24) for x in token2[:-7]]))
        token2 = token2[8:]
        token2 = tok + token2
        tok2 = string.join(([chr(ord(x)+24) for x in token2[-6:]]))
        token2 = token2[:9]
        token2 += tok2
        self.client_id = list((base64.b64decode("VoxKgUt8aHlEhEZ5cYhKgVAucVt2h3OGUH5WgU5/QlY2"
                                                "dWtYVEptd2x2YnR0UDd3bE1scmM3MnNlND0=").decode('ISO-8859-1')))
        if save:
            token2.decode('ISO-8859-1').encode('utf-16')
            self.client_id = [x.encode('ISO-8859-1') for x in self.client_id]
        for word in token2:
            self.client_id.remove(word)
        self.client_id = "".join(self.client_id)
        self.client_secret = self.client_id
        self.client_id = self.api_token


[docs]class Session(object): #: The TIDAL access token, this is what you use with load_oauth_session access_token = None #: A :class:`datetime` object containing the date the access token will expire expiry_time = None #: A refresh token for retrieving a new access token through refresh_token refresh_token = None #: The type of access token, e.g. Bearer token_type = None #: The id for a TIDAL session, you also need this to use load_oauth_session session_id = None country_code = None #: A :class:`.User` object containing the currently logged in user. user = None def __init__(self, config=Config()): self._config = config """:type _config: :class:`Config`"""
[docs] def load_session(self, session_id, country_code=None, user_id=None): self.session_id = session_id if not user_id or not country_code: request = self.request('GET', 'sessions').json() country_code = request['countryCode'] user_id = request['userId'] self.country_code = country_code self.user = User(self, id=user_id)
[docs] def load_oauth_session(self, session_id, token_type, access_token, refresh_token=None): """ Login to TIDAL using details from a previous OAuth login, automatically refreshes expired access tokens if refresh_token is supplied as well. :param token_type: The type of token, e.g. Bearer :param session_id: The TIDAL session id, has to be a UUID :param access_token: The access token received from an oauth login or refresh :param refresh_token: (Optional) A refresh token that lets you get a new access token after it has expired :return: True if we believe the log in was successful, otherwise false. """ try: uuid.UUID(session_id) except ValueError: log.error("Session id did not have a valid UUID format") return False self.session_id = session_id self.token_type = token_type self.access_token = access_token self.refresh_token = refresh_token request = self.basic_request('GET', 'sessions') json = request.json() if not request.ok: return False self.country_code = json['countryCode'] self.user = User(self, id=json['userId']) return True
[docs] def login(self, username, password): url = urljoin(self._config.api_location, 'login/username') headers = {"X-Tidal-Token": self._config.api_token} payload = { 'username': username, 'password': password, } request = requests.post(url, data=payload, headers=headers) if not request.ok: print(request.text) request.raise_for_status() body = request.json() self.session_id = body['sessionId'] self.country_code = body['countryCode'] self.user = User(self, id=body['userId']) return True
[docs] def login_oauth_simple(self, function=print): """ Login to TIDAL using a remote link. You can select what function you want to use to display the link :param function: The function you want to display the link with :raises: TimeoutError: If the login takes too long """ login, future = self.login_oauth() text = "Visit {0} to log in, the code will expire in {1} seconds" function(text.format(login.verification_uri_complete, login.expires_in)) future.result()
[docs] def login_oauth(self): """ Login to TIDAL with a remote link for limited input devices. The function will return everything you need to log in through a web browser, and will return an future that will run until login. :return: A :class:`LinkLogin` object containing all the data needed to log in remotely, and a :class:`concurrent.futures.Future` that will poll until the login is completed, or until the link expires. :raises: TimeoutError: If the login takes too long """ login, future = self._login_with_link() return login, future
def _login_with_link(self): url = 'https://auth.tidal.com/v1/oauth2/device_authorization' params = { 'client_id': self._config.client_id, 'scope': 'r_usr w_usr w_sub' } request = requests.post(url, params) if not request.ok: log.error("Login failed: %s", request.text) request.raise_for_status() json = request.json() executor = concurrent.futures.ThreadPoolExecutor() return LinkLogin(json), executor.submit(self._process_link_login, json) def _process_link_login(self, json): json = self._wait_for_link_login(json) self.access_token = json['access_token'] self.expiry_time = datetime.datetime.now() + datetime.timedelta(seconds=json['expires_in']) self.refresh_token = json['refresh_token'] self.token_type = json['token_type'] session = self.request('GET', 'sessions') json = session.json() self.session_id = json['sessionId'] self.country_code = json['countryCode'] self.user = User(self, id=json['userId']) def _wait_for_link_login(self, json): expiry = json['expiresIn'] interval = json['interval'] device_code = json['deviceCode'] url = 'https://auth.tidal.com/v1/oauth2/token' params = { 'client_id': self._config.client_id, 'client_secret': self._config.client_secret, 'device_code': device_code, 'grant_type': 'urn:ietf:params:oauth:grant-type:device_code', 'scope': 'r_usr w_usr w_sub' } while expiry > 0: request = requests.post(url, params) json = request.json() if request.ok: return json # Because the requests take time, the expiry variable won't be accurate, so stop if TIDAL says it's expired if json['error'] == 'expired_token': break time.sleep(interval) expiry = expiry - interval raise TimeoutError('You took too long to log in')
[docs] def token_refresh(self, refresh_token): """ Retrieves a new access token using the specified refresh token, updating the current access token :param refresh_token: The refresh token retrieved when using the OAuth login. :return: True if we believe the token was successfully refreshed, otherwise False """ url = 'https://auth.tidal.com/v1/oauth2/token' params = { 'grant_type': 'refresh_token', 'refresh_token': refresh_token, 'client_id': self._config.client_id, 'client_secret': self._config.client_secret } request = requests.post(url, params) json = request.json() if not request.ok: log.warning("The refresh token has expired, a new login is required.") return False self.access_token = json['access_token'] self.expiry_time = datetime.datetime.now() + datetime.timedelta(seconds=json['expires_in']) self.token_type = json['token_type'] return True
[docs] def check_login(self): """ Returns true if current session is valid, false otherwise. """ if self.user is None or not self.user.id or not self.session_id: return False return self.basic_request('GET', 'users/%s/subscription' % self.user.id).ok
[docs] def basic_request(self, method, path, params=None, data=None, headers=None): request_params = { 'sessionId': self.session_id, 'countryCode': self.country_code, 'limit': '999', } if params: request_params.update(params) if not headers: headers = {} if self.token_type: headers['authorization'] = self.token_type + ' ' + self.access_token url = urljoin(self._config.api_location, path) request = requests.request(method, url, params=request_params, data=data, headers=headers) if not request.ok and request.json()['userMessage'].startswith("The token has expired.") and self.refresh_token: log.debug("The access token has expired, trying to refresh it.") refreshed = self.token_refresh(self.refresh_token) if refreshed: return self.basic_request(method, path, params, data, headers) return request
[docs] def request(self, method, path, params=None, data=None, headers=None): request = self.basic_request(method, path, params, data, headers) log.debug("request: %s", request.request.url) if not request.ok: print(request.text) request.raise_for_status() if request.content: log.debug("response: %s", json.dumps(request.json(), indent=4)) return request
[docs] def get_user(self, user_id): return self._map_request('users/%s' % user_id, ret='user')
[docs] def get_user_playlists(self, user_id): return self._map_request('users/%s/playlists' % user_id, ret='playlists')
[docs] def get_playlist(self, playlist_id): return self._map_request('playlists/%s' % playlist_id, ret='playlist')
[docs] def get_playlist_tracks(self, playlist_id): return self._map_request('playlists/%s/tracks' % playlist_id, ret='tracks')
[docs] def get_playlist_videos(self, playlist_id): return self._map_request('playlists/%s/items' % playlist_id, ret='video')
[docs] def get_playlist_items(self, playlist_id): return self._get_items('playlists/%s/items' % playlist_id, ret='items')
[docs] def get_album(self, album_id): return self._map_request('albums/%s' % album_id, ret='album')
[docs] def get_album_tracks(self, album_id): return self._map_request('albums/%s/tracks' % album_id, ret='tracks')
[docs] def get_album_videos(self, album_id): items = self._get_items('albums/%s/items' % album_id, ret='videos') return [item for item in items if isinstance(item, Video)]
[docs] def get_album_items(self, album_id): return self._get_items('albums/%s/items' % album_id, ret='items')
[docs] def get_artist(self, artist_id): return self._map_request('artists/%s' % artist_id, ret='artist')
[docs] def get_artist_albums(self, artist_id): return self._map_request('artists/%s/albums' % artist_id, ret='albums')
[docs] def get_artist_albums_ep_singles(self, artist_id): params = {'filter': 'EPSANDSINGLES'} return self._map_request('artists/%s/albums' % artist_id, params, ret='albums')
[docs] def get_artist_albums_other(self, artist_id): params = {'filter': 'COMPILATIONS'} return self._map_request('artists/%s/albums' % artist_id, params, ret='albums')
[docs] def get_artist_top_tracks(self, artist_id): return self._map_request('artists/%s/toptracks' % artist_id, ret='tracks')
[docs] def get_artist_videos(self, artist_id): return self._map_request('artists/%s/videos' % artist_id, ret='videos')
[docs] def get_artist_bio(self, artist_id): return self.request('GET', 'artists/%s/bio' % artist_id).json()['text']
[docs] def get_artist_similar(self, artist_id): return self._map_request('artists/%s/similar' % artist_id, ret='artists')
[docs] def get_artist_radio(self, artist_id): return self._map_request('artists/%s/radio' % artist_id, params={'limit': 100}, ret='tracks')
[docs] def get_moods(self): return map(_parse_moods, self.request('GET', 'moods').json())
[docs] def get_mood_playlists(self, mood_id): return self._map_request('/'.join(['moods', mood_id, 'playlists']), ret='playlists')
[docs] def get_genres(self): return map(_parse_genres, self.request('GET', 'genres').json())
[docs] def get_genre_items(self, genre_id, content_type): return self._map_request('/'.join(['genres', genre_id, content_type]), ret=content_type)
[docs] def get_track_radio(self, track_id): return self._map_request('tracks/%s/radio' % track_id, params={'limit': 100}, ret='tracks')
[docs] def get_track(self, track_id): return self._map_request('tracks/%s' % track_id, ret='track')
[docs] def get_video(self, video_id): return self._map_request('videos/%s' % video_id, ret='video')
def _map_request(self, url, params=None, ret=None): json_obj = self.request('GET', url, params).json() parse = None if ret.startswith('artist'): parse = _parse_artist elif ret.startswith('album'): parse = _parse_album elif ret.startswith('track'): parse = _parse_media elif ret.startswith('user'): raise NotImplementedError() elif ret.startswith('video'): parse = _parse_media elif ret.startswith('item'): parse = _parse_media elif ret.startswith('playlist'): parse = _parse_playlist items = json_obj.get('items') if items is None: return parse(json_obj) if len(items) > 0 and 'item' in items[0]: return list(map(parse, [item['item'] for item in items])) return list(map(parse, items)) def _get_items(self, url, ret=None, offset=0): params = { 'offset': offset, 'limit': 100 } remaining = 100 while remaining == 100: items = self._map_request(url, params=params, ret=ret) remaining = len(items) return items
[docs] def get_media_url(self, track_id): params = { 'urlusagemode': 'STREAM', 'audioquality': self._config.quality, 'assetpresentation': 'FULL' } request = self.request('GET', 'tracks/%s/urlpostpaywall' % track_id, params) return request.json()['urls'][0]
[docs] def get_track_url(self, track_id): return self.get_media_url(track_id)
[docs] def get_video_url(self, video_id): params = { 'urlusagemode': 'STREAM', 'videoquality': self._config.video_quality, 'assetpresentation': 'FULL' } request = self.request('GET', 'videos/%s/urlpostpaywall' % video_id, params) return request.json()['urls'][0]
[docs] def search(self, field, value, limit=50): params = { 'query': value, 'limit': limit, } if field not in ['artist', 'album', 'playlist', 'track']: raise ValueError('Unknown field \'%s\'' % field) ret_type = field + 's' url = 'search/' + field + 's' result = self._map_request(url, params, ret=ret_type) return SearchResult(**{ret_type: result})
def _parse_artist(json_obj): roles = [] for role in json_obj.get('artistTypes', [json_obj.get('type')]): roles.append(Role(role)) return Artist(id=json_obj['id'], name=json_obj['name'], img_uuid=json_obj.get('picture'), roles=roles, role=roles[0]) def _parse_artists(json_obj): return list(map(_parse_artist, json_obj)) def _parse_album(json_obj, artist=None, artists=None): if artist is None: artist = _parse_artist(json_obj['artist']) if artists is None: artists = _parse_artists(json_obj['artists']) kwargs = { 'id': json_obj['id'], 'name': json_obj['title'], 'img_uuid': json_obj.get('cover'), 'num_tracks': json_obj.get('numberOfTracks'), 'num_discs': json_obj.get('numberOfVolumes'), 'duration': json_obj.get('duration'), 'artist': artist, 'artists': artists, } if 'releaseDate' in json_obj and json_obj['releaseDate'] is not None: try: kwargs['release_date'] = datetime.datetime(*map(int, json_obj['releaseDate'].split('-'))) except ValueError: pass return Album(**kwargs) def _parse_featured_playlist(json_obj): kwargs = { 'id': json_obj['artifactId'], 'name': json_obj['header'], 'description': json_obj['text'], } return Playlist(**kwargs) def _parse_playlist(json_obj): kwargs = { 'id': json_obj['uuid'], 'name': json_obj['title'], 'img_uuid': json_obj.get('squareImage'), 'description': json_obj['description'], 'num_tracks': int(json_obj['numberOfTracks']), 'duration': int(json_obj['duration']), 'is_public': json_obj['publicPlaylist'], # TODO 'creator': _parse_user(json_obj['creator']), } return Playlist(**kwargs) def _parse_media(json_obj): artist = _parse_artist(json_obj['artist']) artists = _parse_artists(json_obj['artists']) album = None if json_obj['album']: album = _parse_album(json_obj['album'], artist, artists) kwargs = { 'id': json_obj['id'], 'name': json_obj['title'], 'duration': json_obj['duration'], 'track_num': json_obj['trackNumber'], 'disc_num': json_obj['volumeNumber'], 'version': json_obj.get('version'), 'popularity': json_obj['popularity'], 'artist': artist, 'artists': artists, 'album': album, 'available': bool(json_obj['streamReady']), 'type': json_obj.get('type'), } if kwargs['type'] == 'Music Video': return Video(**kwargs) return Track(**kwargs) def _parse_genres(json_obj): image = "http://resources.wimpmusic.com/images/%s/460x306.jpg" \ % json_obj['image'].replace('-', '/') return Category(id=json_obj['path'], name=json_obj['name'], image=image) def _parse_moods(json_obj): image = "http://resources.wimpmusic.com/images/%s/342x342.jpg" \ % json_obj['image'].replace('-', '/') return Category(id=json_obj['path'], name=json_obj['name'], image=image)
[docs]class Favorites(object): def __init__(self, session, user_id): self._session = session self._base_url = 'users/%s/favorites' % user_id
[docs] def add_artist(self, artist_id): return self._session.request('POST', self._base_url + '/artists', data={'artistId': artist_id}).ok
[docs] def add_album(self, album_id): return self._session.request('POST', self._base_url + '/albums', data={'albumId': album_id}).ok
[docs] def add_track(self, track_id): return self._session.request('POST', self._base_url + '/tracks', data={'trackId': track_id}).ok
[docs] def remove_artist(self, artist_id): return self._session.request('DELETE', self._base_url + '/artists/%s' % artist_id).ok
[docs] def remove_album(self, album_id): return self._session.request('DELETE', self._base_url + '/albums/%s' % album_id).ok
[docs] def remove_track(self, track_id): return self._session.request('DELETE', self._base_url + '/tracks/%s' % track_id).ok
[docs] def artists(self): return self._session._map_request(self._base_url + '/artists', ret='artists')
[docs] def albums(self): return self._session._map_request(self._base_url + '/albums', ret='albums')
[docs] def playlists(self): favorites = self._session._map_request(self._base_url + '/playlists', ret='playlists') created_playlists = self._session.get_user_playlists(self._session.user.id) # Don't append duplicates to the return value for playlist in created_playlists: if not any(playlist.id == favorite.id for favorite in favorites): favorites.append(playlist) return favorites
[docs] def tracks(self): request = self._session.request('GET', self._base_url + '/tracks') return [_parse_media(item['item']) for item in request.json()['items']]
[docs]class User(object): favorites = None def __init__(self, session, id): """ :type session: :class:`Session` :param id: The user ID """ self._session = session self.id = id self.favorites = Favorites(session, self.id)
[docs] def playlists(self): return self._session.get_user_playlists(self.id)