diff --git a/requirements.txt b/requirements.txt index 0e74d9a9..587e8ae6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ wxpython +pytest +coverage wheel future configobj @@ -29,6 +31,7 @@ backports.functools_lru_cache cx_freeze tweepy twitter-text-parser +mastodon.py pyenchant sqlitedict cx-Logging diff --git a/src/controller/buffers/twitter/base.py b/src/controller/buffers/twitter/base.py index 7ebea416..9429aee1 100644 --- a/src/controller/buffers/twitter/base.py +++ b/src/controller/buffers/twitter/base.py @@ -40,9 +40,9 @@ class BaseBuffer(base.Buffer): super(BaseBuffer, self).__init__(parent, function, *args, **kwargs) log.debug("Initializing buffer %s, account %s" % (name, account,)) if bufferType != None: - self.buffer = getattr(buffers, bufferType)(parent, name) + self.buffer = getattr(buffers.twitter, bufferType)(parent, name) else: - self.buffer = buffers.basePanel(parent, name) + self.buffer = buffers.twitter.basePanel(parent, name) self.invisible = True self.name = name self.type = self.buffer.type diff --git a/src/controller/buffers/twitter/trends.py b/src/controller/buffers/twitter/trends.py index 39809df1..16455971 100644 --- a/src/controller/buffers/twitter/trends.py +++ b/src/controller/buffers/twitter/trends.py @@ -26,7 +26,7 @@ class TrendsBuffer(base.Buffer): self.session = sessionObject self.account = account self.invisible = True - self.buffer = buffers.trendsPanel(parent, name) + self.buffer = buffers.twitter.trendsPanel(parent, name) self.buffer.account = account self.type = self.buffer.type self.bind_events() diff --git a/src/controller/mainController.py b/src/controller/mainController.py index f31fde71..14413fa5 100644 --- a/src/controller/mainController.py +++ b/src/controller/mainController.py @@ -1,48 +1,43 @@ # -*- coding: utf-8 -*- -import platform -system = platform.system() -import application +import os +import logging +import webbrowser import wx import requests -from audio_services import youtube_utils import arrow -if system == "Windows": - from update import updater - from wxUI import (view, dialogs, commonMessageDialogs, sysTrayIcon) - from . import settings - from extra import SoundsTutorial, ocr - import keystrokeEditor - from keyboard_handler.wx_handler import WXKeyboardHandler - from . import userActionsController - from . import trendingTopics - from . import user - from . import listsController - from . import filterController - from . import userSelector -elif system == "Linux": - from gtkUI import (view, commonMessageDialogs) +import keystrokeEditor +import sessions +import widgetUtils +import config +import languageHandler +import application +import sound +import output +from pubsub import pub +from tweepy.errors import TweepyException, Forbidden +from geopy.geocoders import Nominatim +from update import updater +from audio_services import youtube_utils +from extra import SoundsTutorial, ocr +from wxUI import view, dialogs, commonMessageDialogs, sysTrayIcon +from . import settings +from keyboard_handler.wx_handler import WXKeyboardHandler +from . import userActionsController +from . import trendingTopics +from . import user +from . import listsController +from . import filterController from sessions.twitter import utils, compose from sessionmanager import manager, sessionManager from controller import buffers from . import messages from . import userAliasController -import sessions from sessions.twitter import session as session_ -from pubsub import pub -import sound -import output -from tweepy.errors import TweepyException, Forbidden from mysc.thread_utils import call_threaded from mysc.repeating_timer import RepeatingTimer from mysc import restart -import config -import widgetUtils -import logging -import webbrowser -from geopy.geocoders import Nominatim from mysc import localization -import os -import languageHandler +from controller.twitter import handler as TwitterHandler log = logging.getLogger("mainController") @@ -136,18 +131,18 @@ class Controller(object): pub.subscribe(self.create_buffer, "createBuffer") pub.subscribe(self.toggle_share_settings, "toggleShare") pub.subscribe(self.restart_streaming, "restartStreaming") - if system == "Windows": - pub.subscribe(self.invisible_shorcuts_changed, "invisible-shorcuts-changed") - widgetUtils.connect_event(self.view, widgetUtils.MENU, self.show_hide, menuitem=self.view.show_hide) - widgetUtils.connect_event(self.view, widgetUtils.MENU, self.search, menuitem=self.view.menuitem_search) - widgetUtils.connect_event(self.view, widgetUtils.MENU, self.list_manager, menuitem=self.view.lists) - widgetUtils.connect_event(self.view, widgetUtils.MENU, self.get_trending_topics, menuitem=self.view.trends) - widgetUtils.connect_event(self.view, widgetUtils.MENU, self.filter, menuitem=self.view.filter) - widgetUtils.connect_event(self.view, widgetUtils.MENU, self.manage_filters, menuitem=self.view.manage_filters) - widgetUtils.connect_event(self.view, widgetUtils.MENU, self.find, menuitem=self.view.find) - widgetUtils.connect_event(self.view, widgetUtils.MENU, self.accountConfiguration, menuitem=self.view.account_settings) - widgetUtils.connect_event(self.view, widgetUtils.MENU, self.configuration, menuitem=self.view.prefs) - widgetUtils.connect_event(self.view, widgetUtils.MENU, self.ocr_image, menuitem=self.view.ocr) + pub.subscribe(self.invisible_shorcuts_changed, "invisible-shorcuts-changed") + pub.subscribe(self.create_account_buffer, "core.create_account") + widgetUtils.connect_event(self.view, widgetUtils.MENU, self.show_hide, menuitem=self.view.show_hide) + widgetUtils.connect_event(self.view, widgetUtils.MENU, self.search, menuitem=self.view.menuitem_search) + widgetUtils.connect_event(self.view, widgetUtils.MENU, self.list_manager, menuitem=self.view.lists) + widgetUtils.connect_event(self.view, widgetUtils.MENU, self.get_trending_topics, menuitem=self.view.trends) + widgetUtils.connect_event(self.view, widgetUtils.MENU, self.filter, menuitem=self.view.filter) + widgetUtils.connect_event(self.view, widgetUtils.MENU, self.manage_filters, menuitem=self.view.manage_filters) + widgetUtils.connect_event(self.view, widgetUtils.MENU, self.find, menuitem=self.view.find) + widgetUtils.connect_event(self.view, widgetUtils.MENU, self.accountConfiguration, menuitem=self.view.account_settings) + widgetUtils.connect_event(self.view, widgetUtils.MENU, self.configuration, menuitem=self.view.prefs) + widgetUtils.connect_event(self.view, widgetUtils.MENU, self.ocr_image, menuitem=self.view.ocr) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.learn_sounds, menuitem=self.view.sounds_tutorial) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.exit, menuitem=self.view.close) widgetUtils.connect_event(self.view, widgetUtils.CLOSE_EVENT, self.exit) @@ -216,6 +211,14 @@ class Controller(object): def taskbar_right_click(self, *args, **kwargs): self.systrayIcon.show_menu() + def get_handler(self, type): + handler = self.handlers.get(type) + if handler == None: + if type == "twitter": + handler = TwitterHandler.Handler() + self.handlers[type]=handler + return handler + def __init__(self): super(Controller, self).__init__() # Visibility state. @@ -229,14 +232,14 @@ class Controller(object): self.accounts = [] # This saves the current account (important in invisible mode) self.current_account = "" + # Handlers are special objects as they manage the mapping of available features and events in different social networks. + self.handlers = dict() self.view.prepare() self.bind_other_events() - if system == "Windows": - self.set_systray_icon() + self.set_systray_icon() def check_invisible_at_startup(self): # Visibility check. It does only work for windows. - if system != "Windows": return if config.app["app-settings"]["hide_gui"] == True: self.show_hide() self.view.Show() @@ -254,13 +257,9 @@ class Controller(object): if sessions.sessions[i].is_logged == False: self.create_ignored_session_buffer(sessions.sessions[i]) continue - self.create_buffers(sessions.sessions[i]) - - # Connection checker executed each minute. - self.checker_function = RepeatingTimer(60, self.check_connection) -# self.checker_function.start() -# self.save_db = RepeatingTimer(300, self.save_data_in_db) -# self.save_db.start() + if sessions.sessions[i].type == "twitter": + handler = self.get_handler(type="twitter") + handler.create_buffers(sessions.sessions[i], controller=self) log.debug("Setting updates to buffers every %d seconds..." % (60*config.app["app-settings"]["update_period"],)) self.update_buffers_function = RepeatingTimer(60*config.app["app-settings"]["update_period"], self.update_buffers) self.update_buffers_function.start() @@ -271,7 +270,8 @@ class Controller(object): if sessions.sessions[i].is_logged == False: continue self.start_buffers(sessions.sessions[i]) self.set_buffer_positions(sessions.sessions[i]) - sessions.sessions[i].start_streaming() + if hasattr(sessions.sessions[i], "start_streaming"): + sessions.sessions[i].start_streaming() if config.app["app-settings"]["play_ready_sound"] == True: sessions.sessions[list(sessions.sessions.keys())[0]].sound.play("ready.ogg") if config.app["app-settings"]["speak_ready_msg"] == True: @@ -280,7 +280,6 @@ class Controller(object): self.streams_checker_function = RepeatingTimer(60, self.check_streams) self.streams_checker_function.start() - def create_ignored_session_buffer(self, session): self.accounts.append(session.settings["twitter"]["user_name"]) account = buffers.base.AccountBuffer(self.view.nb, session.settings["twitter"]["user_name"], session.settings["twitter"]["user_name"], session.session_id) @@ -297,6 +296,13 @@ class Controller(object): self.create_buffers(session, False) self.start_buffers(session) + def create_account_buffer(self, name, session_id): + self.accounts.append(name) + account = buffers.base.AccountBuffer(self.view.nb, name, name, session_id) + account.setup_account() + self.buffers.append(account) + self.view.add_buffer(account.buffer , name=name) + def create_buffer(self, buffer_type="baseBuffer", session_type="twitter", buffer_title="", parent_tab=None, start=False, kwargs={}): log.debug("Creating buffer of type {0} with parent_tab of {2} arguments {1}".format(buffer_type, kwargs, parent_tab)) if not hasattr(buffers, session_type): @@ -322,7 +328,7 @@ class Controller(object): self.view.insert_buffer(buffer.buffer, buffer_title, parent_tab) log.debug("Inserting buffer {0} into control {1}".format(buffer, parent_tab)) - def create_buffers(self, session, createAccounts=True): + def create_mastodon_buffers(self, session, createAccounts=True): """ Generates buffer objects for an user account. session SessionObject: a sessionmanager.session.Session Object""" session.get_user_info() @@ -333,53 +339,7 @@ class Controller(object): self.buffers.append(account) self.view.add_buffer(account.buffer , name=session.db["user_name"]) root_position =self.view.search(session.db["user_name"], session.db["user_name"]) - for i in session.settings['general']['buffer_order']: - if i == 'home': - pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Home"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="home_timeline", name="home_timeline", sessionObject=session, account=session.db["user_name"], sound="tweet_received.ogg", include_ext_alt_text=True, tweet_mode="extended")) - elif i == 'mentions': - pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Mentions"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="mentions_timeline", name="mentions", sessionObject=session, account=session.db["user_name"], sound="mention_received.ogg", include_ext_alt_text=True, tweet_mode="extended")) - elif i == 'dm': - pub.sendMessage("createBuffer", buffer_type="DirectMessagesBuffer", session_type=session.type, buffer_title=_("Direct messages"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_direct_messages", name="direct_messages", sessionObject=session, account=session.db["user_name"], bufferType="dmPanel", compose_func="compose_direct_message", sound="dm_received.ogg")) - elif i == 'sent_dm': - pub.sendMessage("createBuffer", buffer_type="SentDirectMessagesBuffer", session_type=session.type, buffer_title=_("Sent direct messages"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function=None, name="sent_direct_messages", sessionObject=session, account=session.db["user_name"], bufferType="dmPanel", compose_func="compose_direct_message")) - elif i == 'sent_tweets': - pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Sent tweets"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="user_timeline", name="sent_tweets", sessionObject=session, account=session.db["user_name"], screen_name=session.db["user_name"], include_ext_alt_text=True, tweet_mode="extended")) - elif i == 'favorites': - pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Likes"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_favorites", name="favourites", sessionObject=session, account=session.db["user_name"], sound="favourite.ogg", include_ext_alt_text=True, tweet_mode="extended")) - elif i == 'followers': - pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Followers"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_followers", name="followers", sessionObject=session, account=session.db["user_name"], sound="update_followers.ogg", screen_name=session.db["user_name"])) - elif i == 'friends': - pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Following"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_friends", name="friends", sessionObject=session, account=session.db["user_name"], screen_name=session.db["user_name"])) - elif i == 'blocks': - pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Blocked users"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_blocks", name="blocked", sessionObject=session, account=session.db["user_name"])) - elif i == 'muted': - pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Muted users"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_mutes", name="muted", sessionObject=session, account=session.db["user_name"])) - pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="timelines", account=session.db["user_name"])) - timelines_position =self.view.search("timelines", session.db["user_name"]) - for i in session.settings["other_buffers"]["timelines"]: - pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_(u"Timeline for {}").format(i,), parent_tab=timelines_position, start=False, kwargs=dict(parent=self.view.nb, function="user_timeline", name="%s-timeline" % (i,), sessionObject=session, account=session.db["user_name"], sound="tweet_timeline.ogg", bufferType=None, user_id=i, include_ext_alt_text=True, tweet_mode="extended")) - pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Likes timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="favs_timelines", account=session.db["user_name"])) - favs_timelines_position =self.view.search("favs_timelines", session.db["user_name"]) - for i in session.settings["other_buffers"]["favourites_timelines"]: - pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Likes for {}").format(i,), parent_tab=favs_timelines_position, start=False, kwargs=dict(parent=self.view.nb, function="get_favorites", name="%s-favorite" % (i,), sessionObject=session, account=session.db["user_name"], bufferType=None, sound="favourites_timeline_updated.ogg", user_id=i, include_ext_alt_text=True, tweet_mode="extended")) - pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Followers timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="followers_timelines", account=session.db["user_name"])) - followers_timelines_position =self.view.search("followers_timelines", session.db["user_name"]) - for i in session.settings["other_buffers"]["followers_timelines"]: - pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Followers for {}").format(i,), parent_tab=followers_timelines_position, start=False, kwargs=dict(parent=self.view.nb, function="get_followers", name="%s-followers" % (i,), sessionObject=session, account=session.db["user_name"], sound="new_event.ogg", user_id=i)) - pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Following timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="friends_timelines", account=session.db["user_name"])) - friends_timelines_position =self.view.search("friends_timelines", session.db["user_name"]) - for i in session.settings["other_buffers"]["friends_timelines"]: - pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_(u"Friends for {}").format(i,), parent_tab=friends_timelines_position, start=False, kwargs=dict(parent=self.view.nb, function="get_friends", name="%s-friends" % (i,), sessionObject=session, account=session.db["user_name"], sound="new_event.ogg", user_id=i)) - pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Lists"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="lists", account=session.db["user_name"])) - lists_position =self.view.search("lists", session.db["user_name"]) - for i in session.settings["other_buffers"]["lists"]: - pub.sendMessage("createBuffer", buffer_type="ListBuffer", session_type=session.type, buffer_title=_(u"List for {}").format(i), parent_tab=lists_position, start=False, kwargs=dict(parent=self.view.nb, function="list_timeline", name="%s-list" % (i,), sessionObject=session, account=session.db["user_name"], bufferType=None, sound="list_tweet.ogg", list_id=utils.find_list(i, session.db["lists"]), include_ext_alt_text=True, tweet_mode="extended")) - pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Searches"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="searches", account=session.db["user_name"])) - searches_position =self.view.search("searches", session.db["user_name"]) - for i in session.settings["other_buffers"]["tweet_searches"]: - pub.sendMessage("createBuffer", buffer_type="SearchBuffer", session_type=session.type, buffer_title=_(u"Search for {}").format(i), parent_tab=searches_position, start=False, kwargs=dict(parent=self.view.nb, function="search_tweets", name="%s-searchterm" % (i,), sessionObject=session, account=session.db["user_name"], bufferType="searchPanel", sound="search_updated.ogg", q=i, include_ext_alt_text=True, tweet_mode="extended")) - for i in session.settings["other_buffers"]["trending_topic_buffers"]: - pub.sendMessage("createBuffer", buffer_type="TrendsBuffer", session_type=session.type, buffer_title=_("Trending topics for %s") % (i), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="%s_tt" % (i,), sessionObject=session, account=session.db["user_name"], trendsFor=i, sound="trends_updated.ogg")) + def set_buffer_positions(self, session): "Sets positions for buffers if values exist in the database." @@ -635,17 +595,17 @@ class Controller(object): log.debug("Saving global configuration...") for item in sessions.sessions: if sessions.sessions[item].logged == False: continue - log.debug("Disconnecting streaming endpoint for session" + sessions.sessions[item].session_id) - sessions.sessions[item].stop_streaming() - log.debug("Disconnecting streams for %s session" % (sessions.sessions[item].session_id,)) + if hasattr(sessions.sessions[item], "stop_streaming"): + log.debug("Disconnecting streaming endpoint for session" + sessions.sessions[item].session_id) + sessions.sessions[item].stop_streaming() + log.debug("Disconnecting streams for %s session" % (sessions.sessions[item].session_id,)) sessions.sessions[item].sound.cleaner.cancel() log.debug("Saving database for " + sessions.sessions[item].session_id) sessions.sessions[item].save_persistent_data() - if system == "Windows": - self.systrayIcon.RemoveIcon() - pidpath = os.path.join(os.getenv("temp"), "{}.pid".format(application.name)) - if os.path.exists(pidpath): - os.remove(pidpath) + self.systrayIcon.RemoveIcon() + pidpath = os.path.join(os.getenv("temp"), "{}.pid".format(application.name)) + if os.path.exists(pidpath): + os.remove(pidpath) if hasattr(self, "streams_checker_function"): log.debug("Stopping stream checker...") self.streams_checker_function.cancel() diff --git a/src/controller/twitter/__init__.py b/src/controller/twitter/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/src/controller/twitter/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/controller/twitter/handler.py b/src/controller/twitter/handler.py new file mode 100644 index 00000000..a7b75d0b --- /dev/null +++ b/src/controller/twitter/handler.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +from pubsub import pub +from sessions.twitter import utils +from controller import buffers + +class Handler(object): + + def __init__(self): + super(Handler, self).__init__() + + def create_buffers(self, session, createAccounts=True, controller=None): + session.get_user_info() + if createAccounts == True: + pub.sendMessage("core.create_account", name=session.db["user_name"], session_id=session.session_id) + root_position =controller.view.search(session.db["user_name"], session.db["user_name"]) + for i in session.settings['general']['buffer_order']: + if i == 'home': + pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Home"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="home_timeline", name="home_timeline", sessionObject=session, account=session.db["user_name"], sound="tweet_received.ogg", include_ext_alt_text=True, tweet_mode="extended")) + elif i == 'mentions': + pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Mentions"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="mentions_timeline", name="mentions", sessionObject=session, account=session.db["user_name"], sound="mention_received.ogg", include_ext_alt_text=True, tweet_mode="extended")) + elif i == 'dm': + pub.sendMessage("createBuffer", buffer_type="DirectMessagesBuffer", session_type=session.type, buffer_title=_("Direct messages"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_direct_messages", name="direct_messages", sessionObject=session, account=session.db["user_name"], bufferType="dmPanel", compose_func="compose_direct_message", sound="dm_received.ogg")) + elif i == 'sent_dm': + pub.sendMessage("createBuffer", buffer_type="SentDirectMessagesBuffer", session_type=session.type, buffer_title=_("Sent direct messages"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function=None, name="sent_direct_messages", sessionObject=session, account=session.db["user_name"], bufferType="dmPanel", compose_func="compose_direct_message")) + elif i == 'sent_tweets': + pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Sent tweets"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="user_timeline", name="sent_tweets", sessionObject=session, account=session.db["user_name"], screen_name=session.db["user_name"], include_ext_alt_text=True, tweet_mode="extended")) + elif i == 'favorites': + pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Likes"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_favorites", name="favourites", sessionObject=session, account=session.db["user_name"], sound="favourite.ogg", include_ext_alt_text=True, tweet_mode="extended")) + elif i == 'followers': + pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Followers"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_followers", name="followers", sessionObject=session, account=session.db["user_name"], sound="update_followers.ogg", screen_name=session.db["user_name"])) + elif i == 'friends': + pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Following"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_friends", name="friends", sessionObject=session, account=session.db["user_name"], screen_name=session.db["user_name"])) + elif i == 'blocks': + pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Blocked users"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_blocks", name="blocked", sessionObject=session, account=session.db["user_name"])) + elif i == 'muted': + pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Muted users"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_mutes", name="muted", sessionObject=session, account=session.db["user_name"])) + pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="timelines", account=session.db["user_name"])) + timelines_position =controller.view.search("timelines", session.db["user_name"]) + for i in session.settings["other_buffers"]["timelines"]: + pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_(u"Timeline for {}").format(i,), parent_tab=timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="user_timeline", name="%s-timeline" % (i,), sessionObject=session, account=session.db["user_name"], sound="tweet_timeline.ogg", bufferType=None, user_id=i, include_ext_alt_text=True, tweet_mode="extended")) + pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Likes timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="favs_timelines", account=session.db["user_name"])) + favs_timelines_position =controller.view.search("favs_timelines", session.db["user_name"]) + for i in session.settings["other_buffers"]["favourites_timelines"]: + pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Likes for {}").format(i,), parent_tab=favs_timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_favorites", name="%s-favorite" % (i,), sessionObject=session, account=session.db["user_name"], bufferType=None, sound="favourites_timeline_updated.ogg", user_id=i, include_ext_alt_text=True, tweet_mode="extended")) + pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Followers timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="followers_timelines", account=session.db["user_name"])) + followers_timelines_position =controller.view.search("followers_timelines", session.db["user_name"]) + for i in session.settings["other_buffers"]["followers_timelines"]: + pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Followers for {}").format(i,), parent_tab=followers_timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_followers", name="%s-followers" % (i,), sessionObject=session, account=session.db["user_name"], sound="new_event.ogg", user_id=i)) + pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Following timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="friends_timelines", account=session.db["user_name"])) + friends_timelines_position =controller.view.search("friends_timelines", session.db["user_name"]) + for i in session.settings["other_buffers"]["friends_timelines"]: + pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_(u"Friends for {}").format(i,), parent_tab=friends_timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_friends", name="%s-friends" % (i,), sessionObject=session, account=session.db["user_name"], sound="new_event.ogg", user_id=i)) + pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Lists"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="lists", account=session.db["user_name"])) + lists_position =controller.view.search("lists", session.db["user_name"]) + for i in session.settings["other_buffers"]["lists"]: + pub.sendMessage("createBuffer", buffer_type="ListBuffer", session_type=session.type, buffer_title=_(u"List for {}").format(i), parent_tab=lists_position, start=False, kwargs=dict(parent=controller.view.nb, function="list_timeline", name="%s-list" % (i,), sessionObject=session, account=session.db["user_name"], bufferType=None, sound="list_tweet.ogg", list_id=utils.find_list(i, session.db["lists"]), include_ext_alt_text=True, tweet_mode="extended")) + pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Searches"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="searches", account=session.db["user_name"])) + searches_position =controller.view.search("searches", session.db["user_name"]) + for i in session.settings["other_buffers"]["tweet_searches"]: + pub.sendMessage("createBuffer", buffer_type="SearchBuffer", session_type=session.type, buffer_title=_(u"Search for {}").format(i), parent_tab=searches_position, start=False, kwargs=dict(parent=controller.view.nb, function="search_tweets", name="%s-searchterm" % (i,), sessionObject=session, account=session.db["user_name"], bufferType="searchPanel", sound="search_updated.ogg", q=i, include_ext_alt_text=True, tweet_mode="extended")) + for i in session.settings["other_buffers"]["trending_topic_buffers"]: + pub.sendMessage("createBuffer", buffer_type="TrendsBuffer", session_type=session.type, buffer_title=_("Trending topics for %s") % (i), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="%s_tt" % (i,), sessionObject=session, account=session.db["user_name"], trendsFor=i, sound="trends_updated.ogg")) diff --git a/src/keyboard_handler/linux.py b/src/keyboard_handler/linux.py index 15f6ff7b..378f3a25 100644 --- a/src/keyboard_handler/linux.py +++ b/src/keyboard_handler/linux.py @@ -21,7 +21,7 @@ def parse(s): lst.remove(item) #end if if len(lst) > 1: #more than one key, parse error - raise ValueError, 'unknown modifier %s' % lst[0] + raise ValueError('unknown modifier %s' % lst[0]) return (m, lst[0].lower()) class AtspiThread(threading.Thread): def run(self): diff --git a/src/mastodon.defaults b/src/mastodon.defaults new file mode 100644 index 00000000..566ec49f --- /dev/null +++ b/src/mastodon.defaults @@ -0,0 +1,45 @@ +[mastodon] +access_token = string(default="") +instance = string(default="") +user_name = string(default="") +ignored_clients = list(default=list()) + +[general] +relative_times = boolean(default=True) +max_toots_per_call = integer(default=40) +reverse_timelines = boolean(default=False) +persist_size = integer(default=0) +load_cache_in_memory=boolean(default=True) +show_screen_names = boolean(default=False) + +[sound] +volume = float(default=1.0) +input_device = string(default="Default") +output_device = string(default="Default") +session_mute = boolean(default=False) +current_soundpack = string(default="default") + +[other_buffers] +timelines = list(default=list()) +searches = list(default=list()) +lists = list(default=list()) +favourites_timelines = list(default=list()) +followers_timelines = list(default=list()) +friends_timelines = list(default=list()) +trending_topic_buffers = list(default=list()) +muted_buffers = list(default=list()) +autoread_buffers = list(default=list(mentions, direct_messages, events)) + +[mysc] +spelling_language = string(default="") +save_followers_in_autocompletion_db = boolean(default=False) +save_friends_in_autocompletion_db = boolean(default=False) +ocr_language = string(default="") + +[reporting] +braille_reporting = boolean(default=True) +speech_reporting = boolean(default=True) + +[filters] + +[user-aliases] \ No newline at end of file diff --git a/src/run_tests.py b/src/run_tests.py deleted file mode 100644 index 41842347..00000000 --- a/src/run_tests.py +++ /dev/null @@ -1,18 +0,0 @@ -# -*- coding: utf-8 -*- -import unittest - -testmodules = ["test.test_cache"] - -suite = unittest.TestSuite() - -for t in testmodules: - try: - # If the module defines a suite() function, call it to get the suite. - mod = __import__(t, globals(), locals(), ['suite']) - suitefn = getattr(mod, 'suite') - suite.addTest(suitefn()) - except (ImportError, AttributeError): - # else, just load all the test cases from the module. - suite.addTest(unittest.defaultTestLoader.loadTestsFromName(t)) - -unittest.TextTestRunner(verbosity=2).run(suite) \ No newline at end of file diff --git a/src/sessionmanager/manager.py b/src/sessionmanager/manager.py index 3ec251fe..57d58156 100644 --- a/src/sessionmanager/manager.py +++ b/src/sessionmanager/manager.py @@ -1,36 +1,32 @@ # -*- coding: cp1252 -*- -#from config_utils import Configuration, ConfigurationResetException -from __future__ import unicode_literals -from builtins import object -import config -import paths +""" Lightweigth module that saves session position across global config and performs validation of config files. """ import os import logging +import config +import paths log = logging.getLogger("sessionmanager.manager") from sessions import session_exceptions manager = None def setup(): + """ Creates the singleton instance used within TWBlue to access this object. """ global manager if not manager: manager = sessionManager() class sessionManager(object): - # def __init__(self): - # FILE = "sessions.conf" - # SPEC = "app-configuration.defaults" - # try: - # self.main = Configuration(paths.config_path(FILE), paths.app_path(SPEC)) - # except ConfigurationResetException: - # pass def get_current_session(self): + """ Returns the currently focused session, if valid. """ if self.is_valid(config.app["sessions"]["current_session"]): return config.app["sessions"]["current_session"] - else: - return False def add_session(self, id): + """ Adds a new session to the global config, so it will be taken into account for all operations. + + :param id: Session identified. + :param id: str. + """ log.debug("Adding a new session: %s" % (id,)) path = os.path.join(paths.config_path(), id) if not os.path.exists(path): diff --git a/src/sessionmanager/sessionManager.py b/src/sessionmanager/sessionManager.py index d4255862..711f032f 100644 --- a/src/sessionmanager/sessionManager.py +++ b/src/sessionmanager/sessionManager.py @@ -1,42 +1,52 @@ # -*- coding: utf-8 -*- +""" Module to perform session actions such as addition, removal or display of the global settings dialogue. """ import shutil -import widgetUtils -import platform -import output -if platform.system() == "Windows": - from . import wxUI as view - from controller import settings -elif platform.system() == "Linux": - from . import gtkUI as view -import paths import time import os import logging +import widgetUtils import sessions -from sessions.twitter import session -from . import manager +import output +import paths import config_utils import config +from pubsub import pub from tweepy.errors import TweepyException +from controller import settings +from sessions.twitter import session as TwitterSession +from sessions.mastodon import session as MastodonSession +from . import manager +from . import wxUI as view + log = logging.getLogger("sessionmanager.sessionManager") class sessionManagerController(object): - def __init__(self, started=False): + def __init__(self, started: bool = False): + """ Class constructor. + + Creates the SessionManager class controller, responsible for the accounts within TWBlue. From this dialog, users can add/Remove accounts, or load the global settings dialog. + + :param started: Indicates whether this object is being created during application startup (when no other controller has been instantiated) or not. It is important for us to know this, as we won't show the button to open global settings dialog if the application has been started. Users must choose the corresponding option in the menu bar. + :type started: bool + """ super(sessionManagerController, self).__init__() log.debug("Setting up the session manager.") self.started = started + # Initialize the manager, responsible for storing session objects. manager.setup() self.view = view.sessionManagerWindow() - widgetUtils.connect_event(self.view.new, widgetUtils.BUTTON_PRESSED, self.manage_new_account) - widgetUtils.connect_event(self.view.remove, widgetUtils.BUTTON_PRESSED, self.remove) + pub.subscribe(self.manage_new_account, "sessionmanager.new_account") + pub.subscribe(self.remove_account, "sessionmanager.remove_account") if self.started == False: - widgetUtils.connect_event(self.view.configuration, widgetUtils.BUTTON_PRESSED, self.configuration) + pub.subscribe(self.configuration, "sessionmanager.configuration") else: self.view.hide_configuration() + # Store a temporary copy of new and removed sessions, so we will perform actions on them during call to on_ok. self.new_sessions = {} self.removed_sessions = [] def fill_list(self): + """ Fills the session list with all valid sessions that could be found in config path. """ sessionsList = [] reserved_dirs = ["dicts"] log.debug("Filling the sessions list.") @@ -55,10 +65,16 @@ class sessionManagerController(object): output.speak("An exception was raised while attempting to clean malformed session data. See the error log for details. If this message persists, contact the developers.",True) os.exception("Exception thrown while removing malformed session") continue - name = config_test["twitter"]["user_name"] - if config_test["twitter"]["user_key"] != "" and config_test["twitter"]["user_secret"] != "": - sessionsList.append(name) - self.sessions.append(i) + if config_test.get("twitter") != None: + name = _("{account_name} (Twitter)").format(account_name=config_test["twitter"]["user_name"]) + if config_test["twitter"]["user_key"] != "" and config_test["twitter"]["user_secret"] != "": + sessionsList.append(name) + self.sessions.append(dict(type="twitter", id=i)) + elif config_test.get("mastodon") != None: + name = _("{account_name} (Mastodon)").format(account_name=config_test["mastodon"]["user_name"]) + if config_test["mastodon"]["instance"] != "" and config_test["mastodon"]["access_token"] != "": + sessionsList.append(name) + self.sessions.append(dict(type="mastodon", id=i)) else: try: log.debug("Deleting session %s" % (i,)) @@ -69,6 +85,7 @@ class sessionManagerController(object): self.view.fill_list(sessionsList) def show(self): + """ Displays the session manager dialog. """ if self.view.get_response() == widgetUtils.OK: self.do_ok() # else: @@ -77,49 +94,51 @@ class sessionManagerController(object): def do_ok(self): log.debug("Starting sessions...") for i in self.sessions: - if (i in sessions.sessions) == True: continue - s = session.Session(i) + # Skip already created sessions. Useful when session manager controller is not created during startup. + if sessions.sessions.get(i.get("id")) != None: + continue + # Create the session object based in session type. + if i.get("type") == "twitter": + s = TwitterSession.Session(i.get("id")) + elif i.get("type") == "mastodon": + s = MastodonSession.Session(i.get("id")) s.get_configuration() - if i not in config.app["sessions"]["ignored_sessions"]: + if i.get("id") not in config.app["sessions"]["ignored_sessions"]: try: s.login() except TweepyException: self.show_auth_error(s.settings["twitter"]["user_name"]) continue - sessions.sessions[i] = s - self.new_sessions[i] = s + sessions.sessions[i.get("id")] = s + self.new_sessions[i.get("id")] = s # self.view.destroy() def show_auth_error(self, user_name): error = view.auth_error(user_name) - def manage_new_account(self, *args, **kwargs): - if self.view.new_account_dialog() == widgetUtils.YES: - location = (str(time.time())[-6:]) - log.debug("Creating session in the %s path" % (location,)) - s = session.Session(location) - manager.manager.add_session(location) - s.get_configuration() -# try: - s.authorise() - self.sessions.append(location) - self.view.add_new_session_to_list() - s.settings.write() -# except: -# log.exception("Error authorising the session") -# self.view.show_unauthorised_error() -# return + def manage_new_account(self, type): + # Generic settings for all account types. + location = (str(time.time())[-6:]) + log.debug("Creating %s session in the %s path" % (type, location)) + if type == "twitter": + s = TwitterSession.Session(location) + elif type == "mastodon": + s = MastodonSession.Session(location) + manager.manager.add_session(location) + s.get_configuration() + s.authorise() + self.sessions.append(dict(id=location, type=type)) + self.view.add_new_session_to_list() + s.settings.write() - def remove(self, *args, **kwargs): - if self.view.remove_account_dialog() == widgetUtils.YES: - selected_account = self.sessions[self.view.get_selected()] - self.view.remove_session(self.view.get_selected()) - self.removed_sessions.append(selected_account) - self.sessions.remove(selected_account) - shutil.rmtree(path=os.path.join(paths.config_path(), selected_account), ignore_errors=True) + def remove_account(self, index): + selected_account = self.sessions[index] + self.view.remove_session(index) + self.removed_sessions.append(selected_account.get("id")) + self.sessions.remove(selected_account) + shutil.rmtree(path=os.path.join(paths.config_path(), selected_account.get("id")), ignore_errors=True) - - def configuration(self, *args, **kwargs): + def configuration(self): """ Opens the global settings dialogue.""" d = settings.globalSettingsController() if d.response == widgetUtils.OK: diff --git a/src/sessionmanager/wxUI.py b/src/sessionmanager/wxUI.py index 6441f6e8..9b636259 100644 --- a/src/sessionmanager/wxUI.py +++ b/src/sessionmanager/wxUI.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- -from __future__ import unicode_literals +""" Base GUI (Wx) class for the Session manager module.""" import wx +from pubsub import pub from multiplatform_widgets import widgets -import application class sessionManagerWindow(wx.Dialog): + """ Dialog that displays all session managing capabilities to users. """ def __init__(self): super(sessionManagerWindow, self).__init__(parent=None, title=_(u"Session manager"), size=wx.DefaultSize) panel = wx.Panel(self) @@ -16,8 +17,11 @@ class sessionManagerWindow(wx.Dialog): listSizer.Add(self.list.list, 0, wx.ALL, 5) sizer.Add(listSizer, 0, wx.ALL, 5) self.new = wx.Button(panel, -1, _(u"New account"), size=wx.DefaultSize) + self.new.Bind(wx.EVT_BUTTON, self.on_new_account) self.remove = wx.Button(panel, -1, _(u"Remove account")) + self.remove.Bind(wx.EVT_BUTTON, self.on_remove) self.configuration = wx.Button(panel, -1, _(u"Global Settings")) + self.configuration.Bind(wx.EVT_BUTTON, self.on_configuration) ok = wx.Button(panel, wx.ID_OK, size=wx.DefaultSize) ok.SetDefault() cancel = wx.Button(panel, wx.ID_CANCEL, size=wx.DefaultSize) @@ -42,11 +46,29 @@ class sessionManagerWindow(wx.Dialog): if self.list.get_count() == 0: wx.MessageDialog(None, _(u"You need to configure an account."), _(u"Account Error"), wx.ICON_ERROR).ShowModal() return - self.controller.do_ok() self.EndModal(wx.ID_OK) - def new_account_dialog(self): - return wx.MessageDialog(self, _(u"The request to authorize your Twitter account will be opened in your browser. You only need to do this once. Would you like to continue?"), _(u"Authorization"), wx.YES_NO).ShowModal() + def on_new_account(self, *args, **kwargs): + menu = wx.Menu() + twitter = menu.Append(wx.ID_ANY, _("Twitter")) + mastodon = menu.Append(wx.ID_ANY, _("Mastodon")) + menu.Bind(wx.EVT_MENU, self.on_new_twitter_account, twitter) + menu.Bind(wx.EVT_MENU, self.on_new_mastodon_account, mastodon) + self.PopupMenu(menu, self.new.GetPosition()) + + def on_new_mastodon_account(self, *args, **kwargs): + dlg = wx.MessageDialog(self, _("You will be prompted for your Mastodon data (instance URL, email address and password) so we can authorise TWBlue in your instance. Would you like to authorise your account now?"), _(u"Authorization"), wx.YES_NO) + response = dlg.ShowModal() + dlg.Destroy() + if response == wx.ID_YES: + pub.sendMessage("sessionmanager.new_account", type="mastodon") + + def on_new_twitter_account(self, *args, **kwargs): + dlg = wx.MessageDialog(self, _(u"The request to authorize your Twitter account will be opened in your browser. You only need to do this once. Would you like to continue?"), _(u"Authorization"), wx.YES_NO) + response = dlg.ShowModal() + dlg.Destroy() + if response == wx.ID_YES: + pub.sendMessage("sessionmanager.new_account", type="twitter") def add_new_session_to_list(self): total = self.list.get_count() @@ -61,8 +83,16 @@ class sessionManagerWindow(wx.Dialog): def get_response(self): return self.ShowModal() - def remove_account_dialog(self): - return wx.MessageDialog(self, _(u"Do you really want to delete this account?"), _(u"Remove account"), wx.YES_NO).ShowModal() + def on_remove(self, *args, **kwargs): + dlg = wx.MessageDialog(self, _(u"Do you really want to delete this account?"), _(u"Remove account"), wx.YES_NO) + response = dlg.ShowModal() + dlg.Destroy() + if response == wx.ID_YES: + selected = self.list.get_selected() + pub.sendMessage("sessionmanager.remove_account", index=selected) + + def on_configuration(self, *args, **kwargs): + pub.sendMessage("sessionmanager.configuration") def get_selected(self): return self.list.get_selected() diff --git a/src/sessions/__init__.py b/src/sessions/__init__.py index a9341e79..318104d8 100644 --- a/src/sessions/__init__.py +++ b/src/sessions/__init__.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- """ this package contains code related to Sessions. In TWBlue, a session module defines everything a social network needs to be used in the program.""" -from __future__ import unicode_literals # let's define a global object for storing sessions across the program. sessions = {} diff --git a/src/sessions/mastodon/__init__.py b/src/sessions/mastodon/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/src/sessions/mastodon/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/sessions/mastodon/session.py b/src/sessions/mastodon/session.py new file mode 100644 index 00000000..ce4a2e5a --- /dev/null +++ b/src/sessions/mastodon/session.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +import os +import paths +import time +import logging +import wx +import mastodon +import config +import config_utils +import output +import application +from pubsub import pub +from mysc.thread_utils import call_threaded +from sessions import base +from .wxUI import authorisationDialog + +log = logging.getLogger("sessions.mastodonSession") + +class Session(base.baseSession): + + def __init__(self, *args, **kwargs): + super(Session, self).__init__(*args, **kwargs) + self.config_spec = "mastodon.defaults" + self.supported_languages = [] + self.type = "mastodon" + + def login(self, verify_credentials=True): + if self.settings["mastodon"]["access_token"] != None and self.settings["mastodon"]["instance"] != None: + try: + log.debug("Logging in to Mastodon instance {}...".format(self.settings["mastodon"]["instance"])) + self.api = mastodon.Mastodon(access_token=self.settings["mastodon"]["access_token"], api_base_url=self.settings["mastodon"]["instance"]) + if verify_credentials == True: + credentials = self.api.account_verify_credentials() + self.db["user_name"] = credentials["username"] + self.db["user_id"] = credentials["id"] + self.settings["mastodon"]["user_name"] = credentials["username"] + self.logged = True + log.debug("Logged.") + self.counter = 0 + except IOError: + log.error("The login attempt failed.") + self.logged = False + else: + self.logged = False + raise Exceptions.RequireCredentialsSessionError + + def authorise(self): + if self.logged == True: + raise Exceptions.AlreadyAuthorisedError("The authorisation process is not needed at this time.") + else: + self.authorisation_dialog = authorisationDialog() + answer = self.authorisation_dialog.ShowModal() + if answer == wx.ID_OK: + client_id, client_secret = mastodon.Mastodon.create_app("TWBlue", api_base_url=self.authorisation_dialog.instance.GetValue(), website="https://twblue.es") + temporary_api = mastodon.Mastodon(client_id=client_id, client_secret=client_secret, api_base_url=self.authorisation_dialog.instance.GetValue()) + access_token = temporary_api.log_in(self.authorisation_dialog.email.GetValue(), self.authorisation_dialog.password.GetValue()) + self.settings["mastodon"]["access_token"] = access_token + self.settings["mastodon"]["instance"] = self.authorisation_dialog.instance.GetValue() + self.settings.write() + + def get_user_info(self): + """ Retrieves some information required by TWBlue for setup.""" + # retrieve the current user's UTC offset so we can calculate dates properly. + offset = time.timezone if (time.localtime().tm_isdst == 0) else time.altzone + offset = offset / 60 / 60 * -1 + self.db["utc_offset"] = offset + if len(self.supported_languages) == 0: + self.supported_languages = self.api.instance().languages + self.get_lists() + self.get_muted_users() + self.settings.write() + + def get_lists(self): + """ Gets the lists that the user is subscribed to and stores them in the database. Returns None.""" + self.db["lists"] = self.api.lists() + + def get_muted_users(self): + ### ToDo: Use a function to retrieve all muted users. + self.db["muted_users"] = self.api.mutes() + + def get_user_alias(self, user): + aliases = self.settings.get("user-aliases") + if aliases == None: + log.error("Aliases are not defined for this config spec.") + return user.name + user_alias = aliases.get(user.id_str) + if user_alias != None: + return user_alias + return user.name \ No newline at end of file diff --git a/src/sessions/mastodon/wxUI.py b/src/sessions/mastodon/wxUI.py new file mode 100644 index 00000000..009d1e55 --- /dev/null +++ b/src/sessions/mastodon/wxUI.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +import wx + +class authorisationDialog(wx.Dialog): + def __init__(self): + super(authorisationDialog, self).__init__(parent=None, title=_(u"Authorising account...")) + panel = wx.Panel(self) + sizer = wx.BoxSizer(wx.VERTICAL) + static1 = wx.StaticText(panel, wx.NewId(), _("URL of mastodon instance:")) + self.instance = wx.TextCtrl(panel, -1) + sizer1 = wx.BoxSizer(wx.HORIZONTAL) + sizer1.Add(static1, 0, wx.ALL, 5) + sizer1.Add(self.instance, 0, wx.ALL, 5) + sizer.Add(sizer1, 0, wx.ALL, 5) + static2 = wx.StaticText(panel, wx.NewId(), _("Email address:")) + self.email = wx.TextCtrl(panel, -1) + sizer2 = wx.BoxSizer(wx.HORIZONTAL) + sizer2.Add(static2, 0, wx.ALL, 5) + sizer2.Add(self.email, 0, wx.ALL, 5) + sizer.Add(sizer2, 0, wx.ALL, 5) + static3 = wx.StaticText(panel, wx.NewId(), _("Password:")) + self.password = wx.TextCtrl(panel, -1) + sizer3 = wx.BoxSizer(wx.HORIZONTAL) + sizer3.Add(static3, 0, wx.ALL, 5) + sizer3.Add(self.password, 0, wx.ALL, 5) + sizer.Add(sizer3, 0, wx.ALL, 5) + self.ok = wx.Button(panel, wx.ID_OK) + self.cancel = wx.Button(panel, wx.ID_CANCEL) + sizer4 = wx.BoxSizer(wx.HORIZONTAL) + sizer4.Add(self.ok, 0, wx.ALL, 5) + sizer4.Add(self.cancel, 0, wx.ALL, 5) + sizer.Add(sizer4, 0, wx.ALL, 5) + panel.SetSizer(sizer) + min = sizer.CalcMin() + self.SetClientSize(min) diff --git a/src/sessions/twitter/session.py b/src/sessions/twitter/session.py index 94bf3ae3..c906cb94 100644 --- a/src/sessions/twitter/session.py +++ b/src/sessions/twitter/session.py @@ -137,8 +137,7 @@ class Session(base.baseSession): if self.settings["twitter"]["user_key"] != None and self.settings["twitter"]["user_secret"] != None: try: log.debug("Logging in to twitter...") - self.auth = tweepy.OAuth1UserHandler(appkeys.twitter_api_key, appkeys.twitter_api_secret) - self.auth.set_access_token(self.settings["twitter"]["user_key"], self.settings["twitter"]["user_secret"]) + self.auth = tweepy.OAuth1UserHandler(consumer_key=appkeys.twitter_api_key, consumer_secret=appkeys.twitter_api_secret, access_token=self.settings["twitter"]["user_key"], access_token_secret=self.settings["twitter"]["user_secret"]) self.twitter = tweepy.API(self.auth) self.twitter_v2 = tweepy.Client(consumer_key=appkeys.twitter_api_key, consumer_secret=appkeys.twitter_api_secret, access_token=self.settings["twitter"]["user_key"], access_token_secret=self.settings["twitter"]["user_secret"]) if verify_credentials == True: diff --git a/src/sessions/twitter/wxUI.py b/src/sessions/twitter/wxUI.py index a765e72c..e4df1e4d 100644 --- a/src/sessions/twitter/wxUI.py +++ b/src/sessions/twitter/wxUI.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -from __future__ import unicode_literals import wx class authorisationDialog(wx.Dialog): diff --git a/src/test/sessions/__init__.py b/src/test/sessions/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/src/test/sessions/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/test/sessions/test_base_session.py b/src/test/sessions/test_base_session.py new file mode 100644 index 00000000..45756f49 --- /dev/null +++ b/src/test/sessions/test_base_session.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +import sys +import types +import pytest +import os +import sqlitedict +import shutil +from unittest import mock + +# Mock sound module, so LibVLc won't complain. +sound_module = types.ModuleType("sound") +sys.modules["sound"] = sound_module +sound_module.soundManager = mock.MagicMock(name="sound.soundManager") +from sessions import base + +# path where we will save our test config, as we can't rely on paths module due to pytest's paths being different. +session_path = os.path.join(os.getcwd(), "config", "testing") + +@pytest.fixture +def session(): + """ Configures a fake base session from where we can test things. """ + global session_path + s = base.baseSession("testing") + if os.path.exists(session_path) == False: + os.mkdir(session_path) + # Patches paths.app_path and paths.config_path, so we will not have issues during session configuration. + with mock.patch("paths.app_path", return_value=os.getcwd()) as app_path: + with mock.patch("paths.config_path", return_value=os.path.join(os.getcwd(), "config")) as config_path: + s.get_configuration() + yield s + # Session's cleanup code. + if os.path.exists(session_path): + shutil.rmtree(session_path) + del s + +@pytest.fixture +def dataset(): + """ Generates a sample dataset""" + dataset = dict(home_timeline=["message" for i in range(10000)], mentions_timeline=["mention" for i in range(20000)]) + yield dataset + +### Testing database being read from disk. +def test_cache_in_disk_unlimited_size(session, dataset): + """ Tests cache database being read from disk, storing the whole datasets. """ + session.settings["general"]["load_cache_in_memory"] = False + session.settings["general"]["persist_size"] = -1 + session.load_persistent_data() + session.db["home_timeline"] = dataset["home_timeline"] + session.db["mentions_timeline"] = dataset["mentions_timeline"] + session.save_persistent_data() + assert isinstance(session.db, sqlitedict.SqliteDict) + assert session.db.get("home_timeline") != None + assert session.db.get("mentions_timeline") != None + assert len(session.db.get("home_timeline")) == 10000 + assert len(session.db.get("mentions_timeline")) == 20000 + session.db.close() + +def test_cache_in_disk_limited_dataset(session, dataset): + """ Tests wether the cache stores only the amount of items we ask it to store. """ + session.settings["general"]["load_cache_in_memory"] = False + session.settings["general"]["persist_size"] = 100 + session.load_persistent_data() + session.db["home_timeline"] = dataset["home_timeline"] + session.db["mentions_timeline"] = dataset["mentions_timeline"] + # We need to save and load the db again because we cannot modify buffers' size while the database is opened. + # As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db + # Might cause an out of sync error between the GUI lists and the database. + # So we perform the changes to buffer size when loading data during app startup if the DB is read from disk. + session.save_persistent_data() + session.db = dict() + session.load_persistent_data() + assert isinstance(session.db, sqlitedict.SqliteDict) + assert session.db.get("home_timeline") != None + assert session.db.get("mentions_timeline") != None + assert len(session.db.get("home_timeline")) == 100 + assert len(session.db.get("mentions_timeline")) == 100 + session.db.close() + +def test_cache_in_disk_limited_dataset_unreversed(session): + """Test if the cache is saved properly in unreversed buffers, when newest items are at the end of the list. """ + dataset = dict(home_timeline=[i for i in range(20)], mentions_timeline=[i for i in range(20)]) + session.settings["general"]["load_cache_in_memory"] = False + session.settings["general"]["persist_size"] = 10 + session.load_persistent_data() + session.db["home_timeline"] = dataset["home_timeline"] + session.db["mentions_timeline"] = dataset["mentions_timeline"] + # We need to save and load the db again because we cannot modify buffers' size while the database is opened. + # As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db + # Might cause an out of sync error between the GUI lists and the database. + # So we perform the changes to buffer size when loading data during app startup if the DB is read from disk. + session.save_persistent_data() + session.db = dict() + session.load_persistent_data() + assert isinstance(session.db, sqlitedict.SqliteDict) + assert session.db.get("home_timeline") != None + assert session.db.get("mentions_timeline") != None + assert session.db.get("home_timeline")[0] == 10 + assert session.db.get("mentions_timeline")[0] == 10 + assert session.db.get("home_timeline")[-1] == 19 + assert session.db.get("mentions_timeline")[-1] == 19 + session.db.close() + +def test_cache_in_disk_limited_dataset_reversed(session): + """Test if the cache is saved properly in reversed buffers, when newest items are at the start of the list. """ + dataset = dict(home_timeline=[i for i in range(19, -1, -1)], mentions_timeline=[i for i in range(19, -1, -1)]) + session.settings["general"]["load_cache_in_memory"] = False + session.settings["general"]["persist_size"] = 10 + session.settings["general"]["reverse_timelines"] = True + session.load_persistent_data() + session.db["home_timeline"] = dataset["home_timeline"] + session.db["mentions_timeline"] = dataset["mentions_timeline"] + # We need to save and load the db again because we cannot modify buffers' size while the database is opened. + # As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db + # Might cause an out of sync error between the GUI lists and the database. + # So we perform the changes to buffer size when loading data during app startup if the DB is read from disk. + session.save_persistent_data() + session.db = dict() + session.load_persistent_data() + assert isinstance(session.db, sqlitedict.SqliteDict) + assert session.db.get("home_timeline") != None + assert session.db.get("mentions_timeline") != None + assert session.db.get("home_timeline")[0] == 19 + assert session.db.get("mentions_timeline")[0] == 19 + assert session.db.get("home_timeline")[-1] == 10 + assert session.db.get("mentions_timeline")[-1] == 10 + session.db.close() + +### Testing database being loaded into memory. Those tests should give the same results than before +### but as we have different code depending whether we load db into memory or read it from disk, +### We need to test this anyways. +def test_cache_in_memory_unlimited_size(session, dataset): + """ Tests cache database being loaded in memory, storing the whole datasets. """ + session.settings["general"]["load_cache_in_memory"] = True + session.settings["general"]["persist_size"] = -1 + session.load_persistent_data() + session.db["home_timeline"] = dataset["home_timeline"] + session.db["mentions_timeline"] = dataset["mentions_timeline"] + session.save_persistent_data() + session.db = dict() + session.load_persistent_data() + assert isinstance(session.db, dict) + assert session.db.get("home_timeline") != None + assert session.db.get("mentions_timeline") != None + assert len(session.db.get("home_timeline")) == 10000 + assert len(session.db.get("mentions_timeline")) == 20000 + +def test_cache_in_memory_limited_dataset(session, dataset): + """ Tests wether the cache stores only the amount of items we ask it to store, when loaded in memory. """ + session.settings["general"]["load_cache_in_memory"] = True + session.settings["general"]["persist_size"] = 100 + session.load_persistent_data() + session.db["home_timeline"] = dataset["home_timeline"] + session.db["mentions_timeline"] = dataset["mentions_timeline"] + session.save_persistent_data() + session.db = dict() + session.load_persistent_data() + assert isinstance(session.db, dict) + assert session.db.get("home_timeline") != None + assert session.db.get("mentions_timeline") != None + assert len(session.db.get("home_timeline")) == 100 + assert len(session.db.get("mentions_timeline")) == 100 + +def test_cache_in_memory_limited_dataset_unreversed(session): + """Test if the cache is saved properly when loaded in memory in unreversed buffers, when newest items are at the end of the list. """ + dataset = dict(home_timeline=[i for i in range(20)], mentions_timeline=[i for i in range(20)]) + session.settings["general"]["load_cache_in_memory"] = True + session.settings["general"]["persist_size"] = 10 + session.load_persistent_data() + assert len(session.db)==1 + session.db["home_timeline"] = dataset["home_timeline"] + session.db["mentions_timeline"] = dataset["mentions_timeline"] + session.save_persistent_data() + session.db = dict() + session.load_persistent_data() + assert isinstance(session.db, dict) + assert session.db.get("home_timeline") != None + assert session.db.get("mentions_timeline") != None + assert session.db.get("home_timeline")[0] == 10 + assert session.db.get("mentions_timeline")[0] == 10 + assert session.db.get("home_timeline")[-1] == 19 + assert session.db.get("mentions_timeline")[-1] == 19 + +def test_cache_in_memory_limited_dataset_reversed(session): + """Test if the cache is saved properly in reversed buffers, when newest items are at the start of the list. This test if for db read into memory. """ + dataset = dict(home_timeline=[i for i in range(19, -1, -1)], mentions_timeline=[i for i in range(19, -1, -1)]) + session.settings["general"]["load_cache_in_memory"] = True + session.settings["general"]["persist_size"] = 10 + session.settings["general"]["reverse_timelines"] = True + session.load_persistent_data() + session.db["home_timeline"] = dataset["home_timeline"] + session.db["mentions_timeline"] = dataset["mentions_timeline"] + session.save_persistent_data() + session.db = dict() + session.load_persistent_data() + assert isinstance(session.db, dict) + assert session.db.get("home_timeline") != None + assert session.db.get("mentions_timeline") != None + assert session.db.get("home_timeline")[0] == 19 + assert session.db.get("mentions_timeline")[0] == 19 + assert session.db.get("home_timeline")[-1] == 10 + assert session.db.get("mentions_timeline")[-1] == 10 diff --git a/src/test/sessions/twitter/__init__.py b/src/test/sessions/twitter/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/src/test/sessions/twitter/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/test/sessions/twitter/conftest.py b/src/test/sessions/twitter/conftest.py new file mode 100644 index 00000000..14abe1d5 --- /dev/null +++ b/src/test/sessions/twitter/conftest.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +import pytest +from tweepy.models import Status + +@pytest.fixture +def basic_tweet(): + data = {'created_at': 'Mon Jan 03 15:03:36 +0000 2022', 'id': 1478019218884857856, 'id_str': '1478019218884857856', 'full_text': 'Changes in projects for next year https://t.co/nW3GS9RmHd', 'truncated': False, 'display_text_range': [0, 57], 'entities': {'hashtags': [], 'symbols': [], 'user_mentions': [], 'urls': [{'url': 'https://t.co/nW3GS9RmHd', 'expanded_url': 'https://manuelcortez.net/blog/changes-in-projects-for-next-year/#.YdMQQU6t1FI.twitter', 'display_url': 'manuelcortez.net/blog/changes-i…', 'indices': [34, 57]}]}, 'source': 'Twitter Web App', 'in_reply_to_status_id': None, 'in_reply_to_status_id_str': None, 'in_reply_to_user_id': None, 'in_reply_to_user_id_str': None, 'in_reply_to_screen_name': None, 'user': {'id': 258677951, 'id_str': '258677951', 'name': 'Manuel Cortez', 'screen_name': 'manuelcortez00', 'location': 'Nuevo León, México', 'description': 'Python developer, , interested in reading, accessibility, astronomy, physics and science. Я учу русский.', 'url': 'https://t.co/JFRKRA73ZV', 'entities': {'url': {'urls': [{'url': 'https://t.co/JFRKRA73ZV', 'expanded_url': 'https://manuelcortez.net', 'display_url': 'manuelcortez.net', 'indices': [0, 23]}]}, 'description': {'urls': []}}, 'protected': False, 'followers_count': 1453, 'friends_count': 568, 'listed_count': 45, 'created_at': 'Mon Feb 28 06:52:48 +0000 2011', 'favourites_count': 283, 'utc_offset': None, 'time_zone': None, 'geo_enabled': True, 'verified': False, 'statuses_count': 43371, 'lang': None, 'contributors_enabled': False, 'is_translator': False, 'is_translation_enabled': False, 'profile_background_color': 'C0DEED', 'profile_background_image_url': 'http://abs.twimg.com/images/themes/theme1/bg.png', 'profile_background_image_url_https': 'https://abs.twimg.com/images/themes/theme1/bg.png', 'profile_background_tile': False, 'profile_image_url': 'http://pbs.twimg.com/profile_images/442466677645508608/3EBBC-OX_normal.jpeg', 'profile_image_url_https': 'https://pbs.twimg.com/profile_images/442466677645508608/3EBBC-OX_normal.jpeg', 'profile_image_extensions_alt_text': None, 'profile_link_color': '1DA1F2', 'profile_sidebar_border_color': 'C0DEED', 'profile_sidebar_fill_color': 'DDEEF6', 'profile_text_color': '333333', 'profile_use_background_image': True, 'has_extended_profile': False, 'default_profile': True, 'default_profile_image': False, 'following': False, 'follow_request_sent': False, 'notifications': False, 'translator_type': 'regular', 'withheld_in_countries': []}, 'geo': None, 'coordinates': None, 'place': None, 'contributors': None, 'is_quote_status': False, 'retweet_count': 6, 'favorite_count': 2, 'favorited': False, 'retweeted': False, 'possibly_sensitive': False, 'possibly_sensitive_appealable': False, 'lang': 'en'} + yield Status().parse(api=None, json=data) + +@pytest.fixture +def basic_tweet_multiple_mentions(): + data = {'created_at': 'Mon Dec 27 21:21:25 +0000 2021', 'id': 1475577584947707909, 'id_str': '1475577584947707909', 'full_text': '@tamaranatalia9 @Darkstrings @Chris88171572 @manuelcortez00 Well done, thanks Tamara', 'truncated': False, 'display_text_range': [60, 84], 'entities': {'hashtags': [], 'symbols': [], 'user_mentions': [{'screen_name': 'tamaranatalia9', 'name': 'Tamara', 'id': 914114584591597568, 'id_str': '914114584591597568', 'indices': [0, 15]}, {'screen_name': 'Darkstrings', 'name': 'Luc', 'id': 1374154151115042823, 'id_str': '1374154151115042823', 'indices': [16, 28]}, {'screen_name': 'Chris88171572', 'name': 'Chris', 'id': 1323980014799495168, 'id_str': '1323980014799495168', 'indices': [29, 43]}, {'screen_name': 'manuelcortez00', 'name': 'Manuel Cortez', 'id': 258677951, 'id_str': '258677951', 'indices': [44, 59]}], 'urls': []}, 'source': 'Twitter for Android', 'in_reply_to_status_id': 1475550502083563526, 'in_reply_to_status_id_str': '1475550502083563526', 'in_reply_to_user_id': 914114584591597568, 'in_reply_to_user_id_str': '914114584591597568', 'in_reply_to_screen_name': 'tamaranatalia9', 'user': {'id': 784837522157436929, 'id_str': '784837522157436929', 'name': 'Paulus', 'screen_name': 'PauloPer01', 'location': '', 'description': '', 'url': None, 'entities': {'description': {'urls': []}}, 'protected': False, 'followers_count': 1082, 'friends_count': 3029, 'listed_count': 2, 'created_at': 'Sat Oct 08 19:27:01 +0000 2016', 'favourites_count': 78862, 'utc_offset': None, 'time_zone': None, 'geo_enabled': False, 'verified': False, 'statuses_count': 4976, 'lang': None, 'contributors_enabled': False, 'is_translator': False, 'is_translation_enabled': False, 'profile_background_color': 'F5F8FA', 'profile_background_image_url': None, 'profile_background_image_url_https': None, 'profile_background_tile': False, 'profile_image_url': 'http://pbs.twimg.com/profile_images/1464572633014587395/246oPPLa_normal.jpg', 'profile_image_url_https': 'https://pbs.twimg.com/profile_images/1464572633014587395/246oPPLa_normal.jpg', 'profile_image_extensions_alt_text': None, 'profile_link_color': '1DA1F2', 'profile_sidebar_border_color': 'C0DEED', 'profile_sidebar_fill_color': 'DDEEF6', 'profile_text_color': '333333', 'profile_use_background_image': True, 'has_extended_profile': True, 'default_profile': True, 'default_profile_image': False, 'following': False, 'follow_request_sent': False, 'notifications': False, 'translator_type': 'none', 'withheld_in_countries': []}, 'geo': None, 'coordinates': None, 'place': None, 'contributors': None, 'is_quote_status': False, 'retweet_count': 1, 'favorite_count': 2, 'favorited': False, 'retweeted': False, 'lang': 'en'} + yield Status().parse(api=None, json=data) \ No newline at end of file diff --git a/src/test/sessions/twitter/test_twitter_templates.py b/src/test/sessions/twitter/test_twitter_templates.py new file mode 100644 index 00000000..4b66c8d5 --- /dev/null +++ b/src/test/sessions/twitter/test_twitter_templates.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +import pytest +import gettext +import datetime +gettext.install("test") +from unittest import mock +from sessions.twitter import templates + +def test_default_values(): + """ Tests wheter default values are the expected ones. + This might be useful so we will have this failing when we update anything from those values. + As TWBlue might be using those from other dialogs. + """ + assert templates.tweet_variables == ["date", "display_name", "screen_name", "source", "lang", "text", "image_descriptions"] + assert templates.dm_variables == ["date", "sender_display_name", "sender_screen_name", "recipient_display_name", "recipient_display_name", "text"] + assert templates.person_variables == ["display_name", "screen_name", "location", "description", "followers", "following", "listed", "likes", "tweets", "created_at"] + +@pytest.mark.parametrize("offset, language, expected_result", [ + (0, "en_US", "Wednesday, October 10, 2018 20:19:24"), + (-21600, "en_US", "Wednesday, October 10, 2018 14:19:24"), + (7200, "en_US", "Wednesday, October 10, 2018 22:19:24"), + (0, "es_ES", "miércoles, octubre 10, 2018 20:19:24"), + (-21600, "es_ES", "miércoles, octubre 10, 2018 14:19:24"), + (7200, "es_ES", "miércoles, octubre 10, 2018 22:19:24"), + (18000, "es_ES", "jueves, octubre 11, 2018 1:19:24"), +]) +def test_process_date_absolute_time(offset, language, expected_result): + """ Tests date processing function for tweets, when relative_times is set to False. """ + # Date representation used by twitter, converted to datetime object, as tweepy already does this. + # Original date was Wed Oct 10 20:19:24 +0000 2018 + date_field = datetime.datetime(2018, 10, 10, 20, 19, 24) + with mock.patch("languageHandler.curLang", new=language): + processed_date = templates.process_date(date_field, relative_times=False, offset_seconds=offset) + assert processed_date == expected_result + +def test_process_date_relative_time(): + date_field = datetime.datetime(2018, 10, 10, 20, 19, 24) + with mock.patch("languageHandler.curLang", new="es_ES"): + processed_date = templates.process_date(date_field, relative_times=True, offset_seconds=7200) + # As this depends in relative times and this is subject to change, let's do some light checks here and hope the string is going to be valid. + assert isinstance(processed_date, str) + assert "hace" in processed_date and "años" in processed_date + +def test_process_text_basic_tweet(basic_tweet): + expected_result = "Changes in projects for next year https://manuelcortez.net/blog/changes-in-projects-for-next-year/#.YdMQQU6t1FI.twitter" + text = templates.process_text(basic_tweet) + assert text == expected_result + +def test_process_text_basic_tweet_multiple_mentions(basic_tweet_multiple_mentions): + expected_result = "@tamaranatalia9, @Darkstrings and 2 more: Well done, thanks Tamara" + text = templates.process_text(basic_tweet_multiple_mentions) + assert text == expected_result \ No newline at end of file diff --git a/src/test/test_cache.py b/src/test/test_cache.py deleted file mode 100644 index 6337c3b2..00000000 --- a/src/test/test_cache.py +++ /dev/null @@ -1,200 +0,0 @@ -# -*- coding: utf-8 -*- -""" Test case to check some of the scenarios we might face when storing tweets in cache, both loading into memory or rreading from disk. """ -import unittest -import os -import paths -import sqlitedict -import shutil -# The base session module requires sound as a dependency, and this needs libVLC to be locatable. -os.environ['PYTHON_VLC_MODULE_PATH']=os.path.abspath(os.path.join(paths.app_path(), "..", "windows-dependencies", "x86")) -os.environ['PYTHON_VLC_LIB_PATH']=os.path.abspath(os.path.join(paths.app_path(), "..", "windows-dependencies", "x86", "libvlc.dll")) -from sessions import base - -class cacheTestCase(unittest.TestCase): - - def setUp(self): - """ Configures a fake session to check caching objects here. """ - self.session = base.baseSession("testing") - if os.path.exists(os.path.join(paths.config_path(), "testing")) == False: - os.mkdir(os.path.join(paths.config_path(), "testing")) - self.session.get_configuration() - - def tearDown(self): - """ Removes the previously configured session. """ - session_folder = os.path.join(paths.config_path(), "testing") - if os.path.exists(session_folder): - shutil.rmtree(session_folder) - - def generate_dataset(self): - """ Generates a sample dataset""" - dataset = dict(home_timeline=["message" for i in range(10000)], mentions_timeline=["mention" for i in range(20000)]) - return dataset - - ### Testing database being read from disk. - - def test_cache_in_disk_unlimited_size(self): - """ Tests cache database being read from disk, storing the whole datasets. """ - dataset = self.generate_dataset() - self.session.settings["general"]["load_cache_in_memory"] = False - self.session.settings["general"]["persist_size"] = -1 - self.session.load_persistent_data() - self.session.db["home_timeline"] = dataset["home_timeline"] - self.session.db["mentions_timeline"] = dataset["mentions_timeline"] - self.session.save_persistent_data() - self.assertIsInstance(self.session.db, sqlitedict.SqliteDict) - self.assertTrue(self.session.db.get("home_timeline") != None) - self.assertTrue(self.session.db.get("mentions_timeline") != None) - self.assertEquals(len(self.session.db.get("home_timeline")), 10000) - self.assertEquals(len(self.session.db.get("mentions_timeline")), 20000) - self.session.db.close() - - def test_cache_in_disk_limited_dataset(self): - """ Tests wether the cache stores only the amount of items we ask it to store. """ - dataset = self.generate_dataset() - self.session.settings["general"]["load_cache_in_memory"] = False - self.session.settings["general"]["persist_size"] = 100 - self.session.load_persistent_data() - self.session.db["home_timeline"] = dataset["home_timeline"] - self.session.db["mentions_timeline"] = dataset["mentions_timeline"] - # We need to save and load the db again because we cannot modify buffers' size while the database is opened. - # As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db - # Might cause an out of sync error between the GUI lists and the database. - # So we perform the changes to buffer size when loading data during app startup if the DB is read from disk. - self.session.save_persistent_data() - self.session.db = dict() - self.session.load_persistent_data() - self.assertIsInstance(self.session.db, sqlitedict.SqliteDict) - self.assertTrue(self.session.db.get("home_timeline") != None) - self.assertTrue(self.session.db.get("mentions_timeline") != None) - self.assertEquals(len(self.session.db.get("home_timeline")), 100) - self.assertEquals(len(self.session.db.get("mentions_timeline")), 100) - self.session.db.close() - - def test_cache_in_disk_limited_dataset_unreversed(self): - """Test if the cache is saved properly in unreversed buffers, when newest items are at the end of the list. """ - dataset = dict(home_timeline=[i for i in range(20)], mentions_timeline=[i for i in range(20)]) - self.session.settings["general"]["load_cache_in_memory"] = False - self.session.settings["general"]["persist_size"] = 10 - self.session.load_persistent_data() - self.session.db["home_timeline"] = dataset["home_timeline"] - self.session.db["mentions_timeline"] = dataset["mentions_timeline"] - # We need to save and load the db again because we cannot modify buffers' size while the database is opened. - # As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db - # Might cause an out of sync error between the GUI lists and the database. - # So we perform the changes to buffer size when loading data during app startup if the DB is read from disk. - self.session.save_persistent_data() - self.session.db = dict() - self.session.load_persistent_data() - self.assertIsInstance(self.session.db, sqlitedict.SqliteDict) - self.assertTrue(self.session.db.get("home_timeline") != None) - self.assertTrue(self.session.db.get("mentions_timeline") != None) - self.assertEquals(self.session.db.get("home_timeline")[0], 10) - self.assertEquals(self.session.db.get("mentions_timeline")[0], 10) - self.assertEquals(self.session.db.get("home_timeline")[-1], 19) - self.assertEquals(self.session.db.get("mentions_timeline")[-1], 19) - self.session.db.close() - - def test_cache_in_disk_limited_dataset_reversed(self): - """Test if the cache is saved properly in reversed buffers, when newest items are at the start of the list. """ - dataset = dict(home_timeline=[i for i in range(19, -1, -1)], mentions_timeline=[i for i in range(19, -1, -1)]) - self.session.settings["general"]["load_cache_in_memory"] = False - self.session.settings["general"]["persist_size"] = 10 - self.session.settings["general"]["reverse_timelines"] = True - self.session.load_persistent_data() - self.session.db["home_timeline"] = dataset["home_timeline"] - self.session.db["mentions_timeline"] = dataset["mentions_timeline"] - # We need to save and load the db again because we cannot modify buffers' size while the database is opened. - # As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db - # Might cause an out of sync error between the GUI lists and the database. - # So we perform the changes to buffer size when loading data during app startup if the DB is read from disk. - self.session.save_persistent_data() - self.session.db = dict() - self.session.load_persistent_data() - self.assertIsInstance(self.session.db, sqlitedict.SqliteDict) - self.assertTrue(self.session.db.get("home_timeline") != None) - self.assertTrue(self.session.db.get("mentions_timeline") != None) - self.assertEquals(self.session.db.get("home_timeline")[0], 19) - self.assertEquals(self.session.db.get("mentions_timeline")[0], 19) - self.assertEquals(self.session.db.get("home_timeline")[-1], 10) - self.assertEquals(self.session.db.get("mentions_timeline")[-1], 10) - self.session.db.close() - - ### Testing database being loaded into memory. Those tests should give the same results than before - ### but as we have different code depending whether we load db into memory or read it from disk, - ### We need to test this anyways. - def test_cache_in_memory_unlimited_size(self): - """ Tests cache database being loaded in memory, storing the whole datasets. """ - dataset = self.generate_dataset() - self.session.settings["general"]["load_cache_in_memory"] = True - self.session.settings["general"]["persist_size"] = -1 - self.session.load_persistent_data() - self.session.db["home_timeline"] = dataset["home_timeline"] - self.session.db["mentions_timeline"] = dataset["mentions_timeline"] - self.session.save_persistent_data() - self.session.db = dict() - self.session.load_persistent_data() - self.assertIsInstance(self.session.db, dict) - self.assertTrue(self.session.db.get("home_timeline") != None) - self.assertTrue(self.session.db.get("mentions_timeline") != None) - self.assertEquals(len(self.session.db.get("home_timeline")), 10000) - self.assertEquals(len(self.session.db.get("mentions_timeline")), 20000) - - def test_cache_in_memory_limited_dataset(self): - """ Tests wether the cache stores only the amount of items we ask it to store, when loaded in memory. """ - dataset = self.generate_dataset() - self.session.settings["general"]["load_cache_in_memory"] = True - self.session.settings["general"]["persist_size"] = 100 - self.session.load_persistent_data() - self.session.db["home_timeline"] = dataset["home_timeline"] - self.session.db["mentions_timeline"] = dataset["mentions_timeline"] - self.session.save_persistent_data() - self.session.db = dict() - self.session.load_persistent_data() - self.assertIsInstance(self.session.db, dict) - self.assertTrue(self.session.db.get("home_timeline") != None) - self.assertTrue(self.session.db.get("mentions_timeline") != None) - self.assertEquals(len(self.session.db.get("home_timeline")), 100) - self.assertEquals(len(self.session.db.get("mentions_timeline")), 100) - - def test_cache_in_memory_limited_dataset_unreversed(self): - """Test if the cache is saved properly when loaded in memory in unreversed buffers, when newest items are at the end of the list. """ - dataset = dict(home_timeline=[i for i in range(20)], mentions_timeline=[i for i in range(20)]) - self.session.settings["general"]["load_cache_in_memory"] = True - self.session.settings["general"]["persist_size"] = 10 - self.session.load_persistent_data() - self.assertTrue(len(self.session.db)==1) - self.session.db["home_timeline"] = dataset["home_timeline"] - self.session.db["mentions_timeline"] = dataset["mentions_timeline"] - self.session.save_persistent_data() - self.session.db = dict() - self.session.load_persistent_data() - self.assertIsInstance(self.session.db, dict) - self.assertTrue(self.session.db.get("home_timeline") != None) - self.assertTrue(self.session.db.get("mentions_timeline") != None) - self.assertEquals(self.session.db.get("home_timeline")[0], 10) - self.assertEquals(self.session.db.get("mentions_timeline")[0], 10) - self.assertEquals(self.session.db.get("home_timeline")[-1], 19) - self.assertEquals(self.session.db.get("mentions_timeline")[-1], 19) - - def test_cache_in_memory_limited_dataset_reversed(self): - """Test if the cache is saved properly in reversed buffers, when newest items are at the start of the list. This test if for db read into memory. """ - dataset = dict(home_timeline=[i for i in range(19, -1, -1)], mentions_timeline=[i for i in range(19, -1, -1)]) - self.session.settings["general"]["load_cache_in_memory"] = True - self.session.settings["general"]["persist_size"] = 10 - self.session.settings["general"]["reverse_timelines"] = True - self.session.load_persistent_data() - self.session.db["home_timeline"] = dataset["home_timeline"] - self.session.db["mentions_timeline"] = dataset["mentions_timeline"] - self.session.save_persistent_data() - self.session.db = dict() - self.session.load_persistent_data() - self.assertIsInstance(self.session.db, dict) - self.assertTrue(self.session.db.get("home_timeline") != None) - self.assertTrue(self.session.db.get("mentions_timeline") != None) - self.assertEquals(self.session.db.get("home_timeline")[0], 19) - self.assertEquals(self.session.db.get("mentions_timeline")[0], 19) - self.assertEquals(self.session.db.get("home_timeline")[-1], 10) - self.assertEquals(self.session.db.get("mentions_timeline")[-1], 10) - -if __name__ == "__main__": - unittest.main() \ No newline at end of file diff --git a/src/wxUI/buffers/__init__.py b/src/wxUI/buffers/__init__.py index 2825b5fb..702251d8 100644 --- a/src/wxUI/buffers/__init__.py +++ b/src/wxUI/buffers/__init__.py @@ -1,13 +1,3 @@ # -*- coding: utf-8 -*- -from __future__ import absolute_import -from __future__ import unicode_literals -from .base import basePanel -from .dm import dmPanel -from .events import eventsPanel -from .favourites import favsPanel -from .lists import listPanel +from . import twitter, mastodon from .panels import accountPanel, emptyPanel -from .people import peoplePanel -from .trends import trendsPanel -from .tweet_searches import searchPanel -from .user_searches import searchUsersPanel diff --git a/src/wxUI/buffers/mastodon/__init__.py b/src/wxUI/buffers/mastodon/__init__.py new file mode 100644 index 00000000..b3020ba6 --- /dev/null +++ b/src/wxUI/buffers/mastodon/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +from .base import basePanel diff --git a/src/wxUI/buffers/mastodon/base.py b/src/wxUI/buffers/mastodon/base.py new file mode 100644 index 00000000..b23c031c --- /dev/null +++ b/src/wxUI/buffers/mastodon/base.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +import wx +from multiplatform_widgets import widgets + +class basePanel(wx.Panel): + + def set_focus_function(self, f): + self.list.list.Bind(wx.EVT_LIST_ITEM_FOCUSED, f) + + def create_list(self): + self.list = widgets.list(self, _(u"User"), _(u"Text"), _(u"Date"), _(u"Client"), style=wx.LC_REPORT|wx.LC_SINGLE_SEL|wx.LC_VRULES) + self.list.set_windows_size(0, 60) + self.list.set_windows_size(1, 320) + self.list.set_windows_size(2, 110) + self.list.set_windows_size(3, 84) + self.list.set_size() + + def __init__(self, parent, name): + super(basePanel, self).__init__(parent) + self.name = name + self.type = "baseBuffer" + self.sizer = wx.BoxSizer(wx.VERTICAL) + self.create_list() + self.toot = wx.Button(self, -1, _("Toot")) + self.retoot = wx.Button(self, -1, _("Retoot")) + self.reply = wx.Button(self, -1, _(u"Reply")) + self.dm = wx.Button(self, -1, _(u"Direct message")) + btnSizer = wx.BoxSizer(wx.HORIZONTAL) + btnSizer.Add(self.tweet, 0, wx.ALL, 5) + btnSizer.Add(self.retweet, 0, wx.ALL, 5) + btnSizer.Add(self.reply, 0, wx.ALL, 5) + btnSizer.Add(self.dm, 0, wx.ALL, 5) + self.sizer.Add(btnSizer, 0, wx.ALL, 5) + self.sizer.Add(self.list.list, 0, wx.ALL|wx.EXPAND, 5) + self.SetSizer(self.sizer) + self.SetClientSize(self.sizer.CalcMin()) + + def set_position(self, reversed=False): + if reversed == False: + self.list.select_item(self.list.get_count()-1) + else: + self.list.select_item(0) + + def set_focus_in_list(self): + self.list.list.SetFocus() diff --git a/src/wxUI/buffers/twitter/__init__.py b/src/wxUI/buffers/twitter/__init__.py new file mode 100644 index 00000000..91fb04c8 --- /dev/null +++ b/src/wxUI/buffers/twitter/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +from .base import basePanel +from .dm import dmPanel +from .events import eventsPanel +from .favourites import favsPanel +from .lists import listPanel +from .people import peoplePanel +from .trends import trendsPanel +from .tweet_searches import searchPanel +from .user_searches import searchUsersPanel diff --git a/src/wxUI/buffers/base.py b/src/wxUI/buffers/twitter/base.py similarity index 100% rename from src/wxUI/buffers/base.py rename to src/wxUI/buffers/twitter/base.py diff --git a/src/wxUI/buffers/dm.py b/src/wxUI/buffers/twitter/dm.py similarity index 100% rename from src/wxUI/buffers/dm.py rename to src/wxUI/buffers/twitter/dm.py diff --git a/src/wxUI/buffers/events.py b/src/wxUI/buffers/twitter/events.py similarity index 100% rename from src/wxUI/buffers/events.py rename to src/wxUI/buffers/twitter/events.py diff --git a/src/wxUI/buffers/favourites.py b/src/wxUI/buffers/twitter/favourites.py similarity index 100% rename from src/wxUI/buffers/favourites.py rename to src/wxUI/buffers/twitter/favourites.py diff --git a/src/wxUI/buffers/lists.py b/src/wxUI/buffers/twitter/lists.py similarity index 100% rename from src/wxUI/buffers/lists.py rename to src/wxUI/buffers/twitter/lists.py diff --git a/src/wxUI/buffers/people.py b/src/wxUI/buffers/twitter/people.py similarity index 100% rename from src/wxUI/buffers/people.py rename to src/wxUI/buffers/twitter/people.py diff --git a/src/wxUI/buffers/trends.py b/src/wxUI/buffers/twitter/trends.py similarity index 100% rename from src/wxUI/buffers/trends.py rename to src/wxUI/buffers/twitter/trends.py diff --git a/src/wxUI/buffers/tweet_searches.py b/src/wxUI/buffers/twitter/tweet_searches.py similarity index 100% rename from src/wxUI/buffers/tweet_searches.py rename to src/wxUI/buffers/twitter/tweet_searches.py diff --git a/src/wxUI/buffers/user_searches.py b/src/wxUI/buffers/twitter/user_searches.py similarity index 100% rename from src/wxUI/buffers/user_searches.py rename to src/wxUI/buffers/twitter/user_searches.py