Renamed extractors module to services
This commit is contained in:
3
src/services/__init__.py
Normal file
3
src/services/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
from . import youtube, zaycev, tidal
|
70
src/services/base.py
Normal file
70
src/services/base.py
Normal file
@@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
""" Base components useful for all other extractors. """
|
||||
import logging
|
||||
import wx
|
||||
import config
|
||||
log = logging.getLogger("extractors.config")
|
||||
|
||||
class baseInterface(object):
|
||||
name = "base"
|
||||
enabled = False
|
||||
needs_transcode = False
|
||||
results = []
|
||||
|
||||
def __init__(self):
|
||||
super(baseInterface, self).__init__()
|
||||
log.debug("started extraction service for {0}".format(self.name,))
|
||||
|
||||
def search(self, text, *args, **kwargs):
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_download_url(self, url):
|
||||
raise NotImplementedError()
|
||||
|
||||
def format_track(self, item):
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_file_format(self):
|
||||
return "mp3"
|
||||
|
||||
def transcoder_enabled(self):
|
||||
return False
|
||||
|
||||
class song(object):
|
||||
""" Represents a song in all services. Data will be filled by the service itself"""
|
||||
|
||||
def __init__(self, extractor):
|
||||
self.extractor = extractor
|
||||
self.bitrate = 0
|
||||
self.title = ""
|
||||
self.artist = ""
|
||||
self.duration = ""
|
||||
self.size = 0
|
||||
self.url = ""
|
||||
self.download_url = ""
|
||||
self.info = None
|
||||
|
||||
def format_track(self):
|
||||
return self.extractor.format_track(self)
|
||||
|
||||
def get_download_url(self):
|
||||
self.download_url = self.extractor.get_download_url(self.url)
|
||||
|
||||
class baseSettings(wx.Panel):
|
||||
config_section = "base"
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(baseSettings, self).__init__(*args, **kwargs)
|
||||
self.map = []
|
||||
|
||||
def save(self):
|
||||
for i in self.map:
|
||||
config.app["services"][self.config_section][i[0]] = i[1].GetValue()
|
||||
|
||||
def load(self):
|
||||
for i in self.map:
|
||||
if i[0] in config.app["services"][self.config_section]:
|
||||
i[1].SetValue(config.app["services"][self.config_section][i[0]])
|
||||
else:
|
||||
log.error("No key available: {key} on extractor {extractor}".format(key=i[0], extractor=self.config_section))
|
168
src/services/tidal.py
Normal file
168
src/services/tidal.py
Normal file
@@ -0,0 +1,168 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import logging
|
||||
import webbrowser
|
||||
import wx
|
||||
import tidalapi
|
||||
import config
|
||||
from update.utils import seconds_to_string
|
||||
from .import base
|
||||
|
||||
log = logging.getLogger("extractors.tidal.com")
|
||||
|
||||
class interface(base.baseInterface):
|
||||
name = "tidal"
|
||||
enabled = config.app["services"]["tidal"].get("enabled")
|
||||
# This should not be enabled if credentials are not in config.
|
||||
if config.app["services"]["tidal"]["username"] == "" or config.app["services"]["tidal"]["password"] == "":
|
||||
enabled = False
|
||||
|
||||
def __init__(self):
|
||||
super(interface, self).__init__()
|
||||
self.setup()
|
||||
|
||||
def setup(self):
|
||||
# Assign quality or switch to high if not specified/not found.
|
||||
if hasattr(tidalapi.Quality, config.app["services"]["tidal"]["quality"]):
|
||||
quality = getattr(tidalapi.Quality, config.app["services"]["tidal"]["quality"])
|
||||
else:
|
||||
quality = tidalapi.Quality.high
|
||||
_config = tidalapi.Config(quality=quality)
|
||||
username = config.app["services"]["tidal"]["username"]
|
||||
password = config.app["services"]["tidal"]["password"]
|
||||
log.debug("Using quality: %s" % (quality,))
|
||||
self.session = tidalapi.Session(config=_config)
|
||||
self.session.login(username=username, password=password)
|
||||
|
||||
def get_file_format(self):
|
||||
if config.app["services"]["tidal"]["quality"] == "lossless":
|
||||
self.file_extension = "flac"
|
||||
else:
|
||||
self.file_extension = "mp3"
|
||||
return self.file_extension
|
||||
|
||||
def transcoder_enabled(self):
|
||||
if config.app["services"]["tidal"]["quality"] == "lossless":
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def search(self, text, page=1):
|
||||
if text == "" or text == None:
|
||||
raise ValueError("Text must be passed and should not be blank.")
|
||||
log.debug("Retrieving data from Tidal...")
|
||||
fieldtypes = ["artist", "album", "playlist"]
|
||||
field = "track"
|
||||
for i in fieldtypes:
|
||||
if text.startswith(i+"://"):
|
||||
field = i
|
||||
text = text.replace(i+"://", "")
|
||||
log.debug("Searching for %s..." % (field))
|
||||
search_response = self.session.search(value=text, field=field)
|
||||
self.results = []
|
||||
if field == "track":
|
||||
data = search_response.tracks
|
||||
elif field == "artist":
|
||||
data = []
|
||||
artist = search_response.artists[0].id
|
||||
albums = self.session.get_artist_albums(artist)
|
||||
for album in albums:
|
||||
tracks = self.session.get_album_tracks(album.id)
|
||||
for track in tracks:
|
||||
data.append(track)
|
||||
compilations = self.session.get_artist_albums_other(artist)
|
||||
for album in compilations:
|
||||
tracks = self.session.get_album_tracks(album.id)
|
||||
for track in tracks:
|
||||
data.append(track)
|
||||
singles = self.session.get_artist_albums_ep_singles(artist)
|
||||
for album in singles:
|
||||
tracks = self.session.get_album_tracks(album.id)
|
||||
for track in tracks:
|
||||
data.append(track)
|
||||
for search_result in data:
|
||||
s = base.song(self)
|
||||
s.title = search_result.name
|
||||
s.artist = search_result.artist.name
|
||||
s.duration = seconds_to_string(search_result.duration)
|
||||
s.url = search_result.id
|
||||
s.info = search_result
|
||||
self.results.append(s)
|
||||
log.debug("{0} results found.".format(len(self.results)))
|
||||
|
||||
def get_download_url(self, url):
|
||||
url = self.session.get_media_url(url)
|
||||
if url.startswith("https://") or url.startswith("http://") == False:
|
||||
url = "rtmp://"+url
|
||||
return url
|
||||
|
||||
def format_track(self, item):
|
||||
return "{title}. {artist}. {duration}".format(title=item.title, duration=item.duration, artist=item.artist)
|
||||
|
||||
class settings(base.baseSettings):
|
||||
name = _("Tidal")
|
||||
config_section = "tidal"
|
||||
|
||||
def get_quality_list(self):
|
||||
results = dict(low=_("Low"), high=_("High"), lossless=_("Lossless"))
|
||||
return results
|
||||
|
||||
def get_quality_value(self, *args, **kwargs):
|
||||
q = self.get_quality_list()
|
||||
for i in q.keys():
|
||||
if q.get(i) == self.quality.GetStringSelection():
|
||||
return i
|
||||
|
||||
def set_quality_value(self, value, *args, **kwargs):
|
||||
q = self.get_quality_list()
|
||||
for i in q.keys():
|
||||
if i == value:
|
||||
self.quality.SetStringSelection(q.get(i))
|
||||
break
|
||||
|
||||
def __init__(self, parent):
|
||||
super(settings, self).__init__(parent=parent)
|
||||
sizer = wx.BoxSizer(wx.VERTICAL)
|
||||
self.enabled = wx.CheckBox(self, wx.NewId(), _("Enable this service"))
|
||||
self.enabled.Bind(wx.EVT_CHECKBOX, self.on_enabled)
|
||||
self.map.append(("enabled", self.enabled))
|
||||
sizer.Add(self.enabled, 0, wx.ALL, 5)
|
||||
username = wx.StaticText(self, wx.NewId(), _("Tidal username or email address"))
|
||||
self.username = wx.TextCtrl(self, wx.NewId())
|
||||
usernamebox = wx.BoxSizer(wx.HORIZONTAL)
|
||||
usernamebox.Add(username, 0, wx.ALL, 5)
|
||||
usernamebox.Add(self.username, 0, wx.ALL, 5)
|
||||
sizer.Add(usernamebox, 0, wx.ALL, 5)
|
||||
self.map.append(("username", self.username))
|
||||
|
||||
password = wx.StaticText(self, wx.NewId(), _("Password"))
|
||||
self.password = wx.TextCtrl(self, wx.NewId(), style=wx.TE_PASSWORD)
|
||||
passwordbox = wx.BoxSizer(wx.HORIZONTAL)
|
||||
passwordbox.Add(password, 0, wx.ALL, 5)
|
||||
passwordbox.Add(self.password, 0, wx.ALL, 5)
|
||||
sizer.Add(passwordbox, 0, wx.ALL, 5)
|
||||
self.map.append(("password", self.password))
|
||||
self.get_account = wx.Button(self, wx.NewId(), _("You can subscribe for a tidal account here"))
|
||||
self.get_account.Bind(wx.EVT_BUTTON, self.on_get_account)
|
||||
sizer.Add(self.get_account, 0, wx.ALL, 5)
|
||||
quality = wx.StaticText(self, wx.NewId(), _("Audio quality"))
|
||||
self.quality = wx.ComboBox(self, wx.NewId(), choices=[i for i in self.get_quality_list().values()], value=_("High"), style=wx.CB_READONLY)
|
||||
qualitybox = wx.BoxSizer(wx.HORIZONTAL)
|
||||
qualitybox.Add(quality, 0, wx.ALL, 5)
|
||||
qualitybox.Add(self.quality, 0, wx.ALL, 5)
|
||||
sizer.Add(qualitybox, 0, wx.ALL, 5)
|
||||
# Monkeypatch for getting the right quality value here.
|
||||
self.quality.GetValue = self.get_quality_value
|
||||
self.quality.SetValue = self.set_quality_value
|
||||
self.map.append(("quality", self.quality))
|
||||
self.SetSizer(sizer)
|
||||
|
||||
def on_enabled(self, *args, **kwargs):
|
||||
for i in self.map:
|
||||
if i[1] != self.enabled:
|
||||
if self.enabled.GetValue() == True:
|
||||
i[1].Enable(True)
|
||||
else:
|
||||
i[1].Enable(False)
|
||||
|
||||
def on_get_account(self, *args, **kwargs):
|
||||
webbrowser.open_new_tab("https://tidal.com")
|
134
src/services/youtube.py
Normal file
134
src/services/youtube.py
Normal file
@@ -0,0 +1,134 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import isodate
|
||||
import youtube_dl
|
||||
import logging
|
||||
import wx
|
||||
import config
|
||||
from googleapiclient.discovery import build
|
||||
from googleapiclient.errors import HttpError
|
||||
from update.utils import seconds_to_string
|
||||
from .import base
|
||||
|
||||
DEVELOPER_KEY = "AIzaSyCU_hvZJEjLlAGAnlscquKEkE8l0lVOfn0"
|
||||
YOUTUBE_API_SERVICE_NAME = "youtube"
|
||||
YOUTUBE_API_VERSION = "v3"
|
||||
|
||||
log = logging.getLogger("extractors.youtube.com")
|
||||
|
||||
class interface(base.baseInterface):
|
||||
name = "YouTube"
|
||||
enabled = config.app["services"]["youtube"].get("enabled")
|
||||
|
||||
def search(self, text, page=1):
|
||||
if text == "" or text == None:
|
||||
raise ValueError("Text must be passed and should not be blank.")
|
||||
if text.startswith("https") or text.startswith("http"):
|
||||
return self.search_from_url(text)
|
||||
type = "video"
|
||||
max_results = config.app["services"]["youtube"]["max_results"]
|
||||
log.debug("Retrieving data from Youtube...")
|
||||
youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=DEVELOPER_KEY)
|
||||
search_response = youtube.search().list(q=text, part="id,snippet", maxResults=max_results, type=type).execute()
|
||||
self.results = []
|
||||
ids = []
|
||||
for search_result in search_response.get("items", []):
|
||||
if search_result["id"]["kind"] == "youtube#video":
|
||||
s = base.song(self)
|
||||
s.title = search_result["snippet"]["title"]
|
||||
ids.append(search_result["id"]["videoId"])
|
||||
s.url = "https://www.youtube.com/watch?v="+search_result["id"]["videoId"]
|
||||
self.results.append(s)
|
||||
ssr = youtube.videos().list(id=",".join(ids), part="contentDetails", maxResults=1).execute()
|
||||
for i in range(len(self.results)):
|
||||
self.results[i].duration = seconds_to_string(isodate.parse_duration(ssr["items"][i]["contentDetails"]["duration"]).total_seconds())
|
||||
log.debug("{0} results found.".format(len(self.results)))
|
||||
|
||||
def search_from_url(self, url):
|
||||
log.debug("Getting download URL for {0}".format(url,))
|
||||
if "playlist?list=" in url:
|
||||
return self.search_from_playlist(url)
|
||||
ydl = youtube_dl.YoutubeDL({'quiet': True, 'no_warnings': True, 'logger': log, 'prefer-free-formats': True, 'format': 'bestaudio', 'outtmpl': u'%(id)s%(ext)s'})
|
||||
with ydl:
|
||||
result = ydl.extract_info(url, download=False)
|
||||
if 'entries' in result:
|
||||
videos = result['entries']
|
||||
else:
|
||||
videos = [result]
|
||||
for video in videos:
|
||||
s = baseFile.song(self)
|
||||
s.title = video["title"]
|
||||
s.url = video["webpage_url"] # Cannot use direct URL here cause Youtube URLS expire after a minute.
|
||||
s.duration = seconds_to_string(video["duration"])
|
||||
self.results.append(s)
|
||||
log.debug("{0} results found.".format(len(self.results)))
|
||||
|
||||
def search_from_playlist(self, url):
|
||||
id = url.split("=")[1]
|
||||
max_results = 50
|
||||
log.debug("Retrieving data from Youtube...")
|
||||
youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=DEVELOPER_KEY)
|
||||
search_response = youtube.playlistItems().list(playlistId=id, part="id, status, snippet", maxResults=max_results).execute()
|
||||
self.results = []
|
||||
ids = []
|
||||
for search_result in search_response.get("items", []):
|
||||
if search_result["status"]["privacyStatus"] != "public":
|
||||
continue
|
||||
s = baseFile.song(self)
|
||||
s.title = search_result["snippet"]["title"]
|
||||
ids.append(search_result["snippet"]["resourceId"]["videoId"])
|
||||
s.url = "https://www.youtube.com/watch?v="+search_result["snippet"]["resourceId"]["videoId"]
|
||||
self.results.append(s)
|
||||
ssr = youtube.videos().list(id=",".join(ids), part="contentDetails", maxResults=50).execute()
|
||||
for i in range(len(self.results)):
|
||||
self.results[i].duration = seconds_to_string(isodate.parse_duration(ssr["items"][i]["contentDetails"]["duration"]).total_seconds())
|
||||
log.debug("{0} results found.".format(len(self.results)))
|
||||
|
||||
def get_download_url(self, url):
|
||||
log.debug("Getting download URL for {0}".format(url,))
|
||||
ydl = youtube_dl.YoutubeDL({'quiet': True, 'no_warnings': True, 'logger': log, 'format': 'bestaudio/best', 'outtmpl': u'%(id)s%(ext)s'})
|
||||
with ydl:
|
||||
result = ydl.extract_info(url, download=False)
|
||||
if 'entries' in result:
|
||||
video = result['entries'][0]
|
||||
else:
|
||||
video = result
|
||||
# From here we should extract the first format so it will contain audio only.
|
||||
log.debug("Download URL: {0}".format(video["formats"][0]["url"],))
|
||||
return video["formats"][0]["url"]
|
||||
|
||||
def format_track(self, item):
|
||||
return "{0} {1}".format(item.title, item.duration)
|
||||
|
||||
def transcoder_enabled(self):
|
||||
return config.app["services"]["youtube"]["transcode"]
|
||||
|
||||
class settings(base.baseSettings):
|
||||
name = _("Youtube Settings")
|
||||
config_section = "youtube"
|
||||
|
||||
def __init__(self, parent):
|
||||
super(settings, self).__init__(parent=parent)
|
||||
sizer = wx.BoxSizer(wx.VERTICAL)
|
||||
self.enabled = wx.CheckBox(self, wx.NewId(), _("Enable this service"))
|
||||
self.enabled.Bind(wx.EVT_CHECKBOX, self.on_enabled)
|
||||
self.map.append(("enabled", self.enabled))
|
||||
sizer.Add(self.enabled, 0, wx.ALL, 5)
|
||||
max_results_label = wx.StaticText(self, wx.NewId(), _("Max results per page"))
|
||||
self.max_results = wx.SpinCtrl(self, wx.NewId())
|
||||
self.max_results.SetRange(1, 50)
|
||||
max_results_sizer = wx.BoxSizer(wx.HORIZONTAL)
|
||||
max_results_sizer.Add(max_results_label, 0, wx.ALL, 5)
|
||||
max_results_sizer.Add(self.max_results, 0, wx.ALL, 5)
|
||||
self.map.append(("max_results", self.max_results))
|
||||
# self.transcode = wx.CheckBox(self, wx.NewId(), _("Enable transcode when downloading"))
|
||||
# self.map.append(("transcode", self.transcode))
|
||||
# sizer.Add(self.transcode, 0, wx.ALL, 5)
|
||||
self.SetSizer(sizer)
|
||||
|
||||
def on_enabled(self, *args, **kwargs):
|
||||
for i in self.map:
|
||||
if i[1] != self.enabled:
|
||||
if self.enabled.GetValue() == True:
|
||||
i[1].Enable(True)
|
||||
else:
|
||||
i[1].Enable(False)
|
61
src/services/zaycev.py
Normal file
61
src/services/zaycev.py
Normal file
@@ -0,0 +1,61 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
import re
|
||||
import json
|
||||
import requests
|
||||
import logging
|
||||
import wx
|
||||
import config
|
||||
from bs4 import BeautifulSoup
|
||||
from . import base
|
||||
|
||||
log = logging.getLogger("extractors.zaycev.net")
|
||||
|
||||
class interface(base.baseInterface):
|
||||
name = "zaycev.net"
|
||||
enabled = config.app["services"]["zaycev"].get("enabled")
|
||||
|
||||
def search(self, text, page=1):
|
||||
if text == "" or text == None:
|
||||
raise ValueError("Text must be passed and should not be blank.")
|
||||
site = "http://zaycev.net/search.html?query_search=%s" % (text,)
|
||||
log.debug("Retrieving data from {0}...".format(site,))
|
||||
r = requests.get(site)
|
||||
soup = BeautifulSoup(r.text, 'html.parser')
|
||||
search_results = soup.find_all("div", {"class": "musicset-track__title track-geo__title"})
|
||||
self.results = []
|
||||
for i in search_results:
|
||||
# The easiest method to get artist and song names is to fetch links. There are only two links per result here.
|
||||
data = i.find_all("a")
|
||||
# from here, data[0] contains artist info and data[1] contains info of the retrieved song.
|
||||
s = base.song(self)
|
||||
s.title = data[1].text
|
||||
s.artist = data[0].text
|
||||
s.url = "http://zaycev.net%s" % (data[1].attrs["href"])
|
||||
# s.duration = self.hd[i]["duration"]
|
||||
# s.size = self.hd[i]["size"]
|
||||
# s.bitrate = self.hd[i]["bitrate"]
|
||||
self.results.append(s)
|
||||
log.debug("{0} results found.".format(len(self.results)))
|
||||
|
||||
def get_download_url(self, url):
|
||||
log.debug("Getting download URL for {0}".format(url,))
|
||||
soups = BeautifulSoup(requests.get(url).text, 'html.parser')
|
||||
data = json.loads(requests.get('http://zaycev.net' + soups.find('div', {'class':"musicset-track"}).get('data-url')).text)
|
||||
log.debug("Download URL: {0}".format(data["url"]))
|
||||
return data["url"]
|
||||
|
||||
def format_track(self, item):
|
||||
return "{0}. {1}. {2}".format(item.title, item.duration, item.size)
|
||||
|
||||
class settings(base.baseSettings):
|
||||
name = _("zaycev.net")
|
||||
config_section = "zaycev"
|
||||
|
||||
def __init__(self, parent):
|
||||
super(settings, self).__init__(parent=parent)
|
||||
sizer = wx.BoxSizer(wx.VERTICAL)
|
||||
self.enabled = wx.CheckBox(self, wx.NewId(), _("Enable this service (works only in the Russian Federation)"))
|
||||
self.map.append(("enabled", self.enabled))
|
||||
sizer.Add(self.enabled, 0, wx.ALL, 5)
|
||||
self.SetSizer(sizer)
|
Reference in New Issue
Block a user