Update controller

This commit is contained in:
Manuel Cortez 2022-11-07 09:17:08 -06:00
commit 14a10989e5
No known key found for this signature in database
GPG Key ID: 9E0735CA15EFE790
37 changed files with 753 additions and 418 deletions

View File

@ -1,4 +1,6 @@
wxpython wxpython
pytest
coverage
wheel wheel
future future
configobj configobj
@ -29,6 +31,7 @@ backports.functools_lru_cache
cx_freeze cx_freeze
tweepy tweepy
twitter-text-parser twitter-text-parser
mastodon.py
pyenchant pyenchant
sqlitedict sqlitedict
cx-Logging cx-Logging

View File

@ -40,9 +40,9 @@ class BaseBuffer(base.Buffer):
super(BaseBuffer, self).__init__(parent, function, *args, **kwargs) super(BaseBuffer, self).__init__(parent, function, *args, **kwargs)
log.debug("Initializing buffer %s, account %s" % (name, account,)) log.debug("Initializing buffer %s, account %s" % (name, account,))
if bufferType != None: if bufferType != None:
self.buffer = getattr(buffers, bufferType)(parent, name) self.buffer = getattr(buffers.twitter, bufferType)(parent, name)
else: else:
self.buffer = buffers.basePanel(parent, name) self.buffer = buffers.twitter.basePanel(parent, name)
self.invisible = True self.invisible = True
self.name = name self.name = name
self.type = self.buffer.type self.type = self.buffer.type

View File

@ -26,7 +26,7 @@ class TrendsBuffer(base.Buffer):
self.session = sessionObject self.session = sessionObject
self.account = account self.account = account
self.invisible = True self.invisible = True
self.buffer = buffers.trendsPanel(parent, name) self.buffer = buffers.twitter.trendsPanel(parent, name)
self.buffer.account = account self.buffer.account = account
self.type = self.buffer.type self.type = self.buffer.type
self.bind_events() self.bind_events()

View File

@ -1,48 +1,43 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import platform import os
system = platform.system() import logging
import application import webbrowser
import wx import wx
import requests import requests
from audio_services import youtube_utils
import arrow import arrow
if system == "Windows":
from update import updater
from wxUI import (view, dialogs, commonMessageDialogs, sysTrayIcon)
from . import settings
from extra import SoundsTutorial, ocr
import keystrokeEditor import keystrokeEditor
import sessions
import widgetUtils
import config
import languageHandler
import application
import sound
import output
from pubsub import pub
from tweepy.errors import TweepyException, Forbidden
from geopy.geocoders import Nominatim
from update import updater
from audio_services import youtube_utils
from extra import SoundsTutorial, ocr
from wxUI import view, dialogs, commonMessageDialogs, sysTrayIcon
from . import settings
from keyboard_handler.wx_handler import WXKeyboardHandler from keyboard_handler.wx_handler import WXKeyboardHandler
from . import userActionsController from . import userActionsController
from . import trendingTopics from . import trendingTopics
from . import user from . import user
from . import listsController from . import listsController
from . import filterController from . import filterController
from . import userSelector
elif system == "Linux":
from gtkUI import (view, commonMessageDialogs)
from sessions.twitter import utils, compose from sessions.twitter import utils, compose
from sessionmanager import manager, sessionManager from sessionmanager import manager, sessionManager
from controller import buffers from controller import buffers
from . import messages from . import messages
from . import userAliasController from . import userAliasController
import sessions
from sessions.twitter import session as session_ from sessions.twitter import session as session_
from pubsub import pub
import sound
import output
from tweepy.errors import TweepyException, Forbidden
from mysc.thread_utils import call_threaded from mysc.thread_utils import call_threaded
from mysc.repeating_timer import RepeatingTimer from mysc.repeating_timer import RepeatingTimer
from mysc import restart from mysc import restart
import config
import widgetUtils
import logging
import webbrowser
from geopy.geocoders import Nominatim
from mysc import localization from mysc import localization
import os from controller.twitter import handler as TwitterHandler
import languageHandler
log = logging.getLogger("mainController") log = logging.getLogger("mainController")
@ -136,8 +131,8 @@ class Controller(object):
pub.subscribe(self.create_buffer, "createBuffer") pub.subscribe(self.create_buffer, "createBuffer")
pub.subscribe(self.toggle_share_settings, "toggleShare") pub.subscribe(self.toggle_share_settings, "toggleShare")
pub.subscribe(self.restart_streaming, "restartStreaming") pub.subscribe(self.restart_streaming, "restartStreaming")
if system == "Windows":
pub.subscribe(self.invisible_shorcuts_changed, "invisible-shorcuts-changed") pub.subscribe(self.invisible_shorcuts_changed, "invisible-shorcuts-changed")
pub.subscribe(self.create_account_buffer, "core.create_account")
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.show_hide, menuitem=self.view.show_hide) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.show_hide, menuitem=self.view.show_hide)
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.search, menuitem=self.view.menuitem_search) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.search, menuitem=self.view.menuitem_search)
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.list_manager, menuitem=self.view.lists) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.list_manager, menuitem=self.view.lists)
@ -216,6 +211,14 @@ class Controller(object):
def taskbar_right_click(self, *args, **kwargs): def taskbar_right_click(self, *args, **kwargs):
self.systrayIcon.show_menu() self.systrayIcon.show_menu()
def get_handler(self, type):
handler = self.handlers.get(type)
if handler == None:
if type == "twitter":
handler = TwitterHandler.Handler()
self.handlers[type]=handler
return handler
def __init__(self): def __init__(self):
super(Controller, self).__init__() super(Controller, self).__init__()
# Visibility state. # Visibility state.
@ -229,14 +232,14 @@ class Controller(object):
self.accounts = [] self.accounts = []
# This saves the current account (important in invisible mode) # This saves the current account (important in invisible mode)
self.current_account = "" self.current_account = ""
# Handlers are special objects as they manage the mapping of available features and events in different social networks.
self.handlers = dict()
self.view.prepare() self.view.prepare()
self.bind_other_events() self.bind_other_events()
if system == "Windows":
self.set_systray_icon() self.set_systray_icon()
def check_invisible_at_startup(self): def check_invisible_at_startup(self):
# Visibility check. It does only work for windows. # Visibility check. It does only work for windows.
if system != "Windows": return
if config.app["app-settings"]["hide_gui"] == True: if config.app["app-settings"]["hide_gui"] == True:
self.show_hide() self.show_hide()
self.view.Show() self.view.Show()
@ -254,13 +257,9 @@ class Controller(object):
if sessions.sessions[i].is_logged == False: if sessions.sessions[i].is_logged == False:
self.create_ignored_session_buffer(sessions.sessions[i]) self.create_ignored_session_buffer(sessions.sessions[i])
continue continue
self.create_buffers(sessions.sessions[i]) if sessions.sessions[i].type == "twitter":
handler = self.get_handler(type="twitter")
# Connection checker executed each minute. handler.create_buffers(sessions.sessions[i], controller=self)
self.checker_function = RepeatingTimer(60, self.check_connection)
# self.checker_function.start()
# self.save_db = RepeatingTimer(300, self.save_data_in_db)
# self.save_db.start()
log.debug("Setting updates to buffers every %d seconds..." % (60*config.app["app-settings"]["update_period"],)) log.debug("Setting updates to buffers every %d seconds..." % (60*config.app["app-settings"]["update_period"],))
self.update_buffers_function = RepeatingTimer(60*config.app["app-settings"]["update_period"], self.update_buffers) self.update_buffers_function = RepeatingTimer(60*config.app["app-settings"]["update_period"], self.update_buffers)
self.update_buffers_function.start() self.update_buffers_function.start()
@ -271,6 +270,7 @@ class Controller(object):
if sessions.sessions[i].is_logged == False: continue if sessions.sessions[i].is_logged == False: continue
self.start_buffers(sessions.sessions[i]) self.start_buffers(sessions.sessions[i])
self.set_buffer_positions(sessions.sessions[i]) self.set_buffer_positions(sessions.sessions[i])
if hasattr(sessions.sessions[i], "start_streaming"):
sessions.sessions[i].start_streaming() sessions.sessions[i].start_streaming()
if config.app["app-settings"]["play_ready_sound"] == True: if config.app["app-settings"]["play_ready_sound"] == True:
sessions.sessions[list(sessions.sessions.keys())[0]].sound.play("ready.ogg") sessions.sessions[list(sessions.sessions.keys())[0]].sound.play("ready.ogg")
@ -280,7 +280,6 @@ class Controller(object):
self.streams_checker_function = RepeatingTimer(60, self.check_streams) self.streams_checker_function = RepeatingTimer(60, self.check_streams)
self.streams_checker_function.start() self.streams_checker_function.start()
def create_ignored_session_buffer(self, session): def create_ignored_session_buffer(self, session):
self.accounts.append(session.settings["twitter"]["user_name"]) self.accounts.append(session.settings["twitter"]["user_name"])
account = buffers.base.AccountBuffer(self.view.nb, session.settings["twitter"]["user_name"], session.settings["twitter"]["user_name"], session.session_id) account = buffers.base.AccountBuffer(self.view.nb, session.settings["twitter"]["user_name"], session.settings["twitter"]["user_name"], session.session_id)
@ -297,6 +296,13 @@ class Controller(object):
self.create_buffers(session, False) self.create_buffers(session, False)
self.start_buffers(session) self.start_buffers(session)
def create_account_buffer(self, name, session_id):
self.accounts.append(name)
account = buffers.base.AccountBuffer(self.view.nb, name, name, session_id)
account.setup_account()
self.buffers.append(account)
self.view.add_buffer(account.buffer , name=name)
def create_buffer(self, buffer_type="baseBuffer", session_type="twitter", buffer_title="", parent_tab=None, start=False, kwargs={}): def create_buffer(self, buffer_type="baseBuffer", session_type="twitter", buffer_title="", parent_tab=None, start=False, kwargs={}):
log.debug("Creating buffer of type {0} with parent_tab of {2} arguments {1}".format(buffer_type, kwargs, parent_tab)) log.debug("Creating buffer of type {0} with parent_tab of {2} arguments {1}".format(buffer_type, kwargs, parent_tab))
if not hasattr(buffers, session_type): if not hasattr(buffers, session_type):
@ -322,7 +328,7 @@ class Controller(object):
self.view.insert_buffer(buffer.buffer, buffer_title, parent_tab) self.view.insert_buffer(buffer.buffer, buffer_title, parent_tab)
log.debug("Inserting buffer {0} into control {1}".format(buffer, parent_tab)) log.debug("Inserting buffer {0} into control {1}".format(buffer, parent_tab))
def create_buffers(self, session, createAccounts=True): def create_mastodon_buffers(self, session, createAccounts=True):
""" Generates buffer objects for an user account. """ Generates buffer objects for an user account.
session SessionObject: a sessionmanager.session.Session Object""" session SessionObject: a sessionmanager.session.Session Object"""
session.get_user_info() session.get_user_info()
@ -333,53 +339,7 @@ class Controller(object):
self.buffers.append(account) self.buffers.append(account)
self.view.add_buffer(account.buffer , name=session.db["user_name"]) self.view.add_buffer(account.buffer , name=session.db["user_name"])
root_position =self.view.search(session.db["user_name"], session.db["user_name"]) root_position =self.view.search(session.db["user_name"], session.db["user_name"])
for i in session.settings['general']['buffer_order']:
if i == 'home':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Home"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="home_timeline", name="home_timeline", sessionObject=session, account=session.db["user_name"], sound="tweet_received.ogg", include_ext_alt_text=True, tweet_mode="extended"))
elif i == 'mentions':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Mentions"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="mentions_timeline", name="mentions", sessionObject=session, account=session.db["user_name"], sound="mention_received.ogg", include_ext_alt_text=True, tweet_mode="extended"))
elif i == 'dm':
pub.sendMessage("createBuffer", buffer_type="DirectMessagesBuffer", session_type=session.type, buffer_title=_("Direct messages"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_direct_messages", name="direct_messages", sessionObject=session, account=session.db["user_name"], bufferType="dmPanel", compose_func="compose_direct_message", sound="dm_received.ogg"))
elif i == 'sent_dm':
pub.sendMessage("createBuffer", buffer_type="SentDirectMessagesBuffer", session_type=session.type, buffer_title=_("Sent direct messages"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function=None, name="sent_direct_messages", sessionObject=session, account=session.db["user_name"], bufferType="dmPanel", compose_func="compose_direct_message"))
elif i == 'sent_tweets':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Sent tweets"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="user_timeline", name="sent_tweets", sessionObject=session, account=session.db["user_name"], screen_name=session.db["user_name"], include_ext_alt_text=True, tweet_mode="extended"))
elif i == 'favorites':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Likes"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_favorites", name="favourites", sessionObject=session, account=session.db["user_name"], sound="favourite.ogg", include_ext_alt_text=True, tweet_mode="extended"))
elif i == 'followers':
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Followers"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_followers", name="followers", sessionObject=session, account=session.db["user_name"], sound="update_followers.ogg", screen_name=session.db["user_name"]))
elif i == 'friends':
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Following"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_friends", name="friends", sessionObject=session, account=session.db["user_name"], screen_name=session.db["user_name"]))
elif i == 'blocks':
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Blocked users"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_blocks", name="blocked", sessionObject=session, account=session.db["user_name"]))
elif i == 'muted':
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Muted users"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, function="get_mutes", name="muted", sessionObject=session, account=session.db["user_name"]))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="timelines", account=session.db["user_name"]))
timelines_position =self.view.search("timelines", session.db["user_name"])
for i in session.settings["other_buffers"]["timelines"]:
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_(u"Timeline for {}").format(i,), parent_tab=timelines_position, start=False, kwargs=dict(parent=self.view.nb, function="user_timeline", name="%s-timeline" % (i,), sessionObject=session, account=session.db["user_name"], sound="tweet_timeline.ogg", bufferType=None, user_id=i, include_ext_alt_text=True, tweet_mode="extended"))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Likes timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="favs_timelines", account=session.db["user_name"]))
favs_timelines_position =self.view.search("favs_timelines", session.db["user_name"])
for i in session.settings["other_buffers"]["favourites_timelines"]:
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Likes for {}").format(i,), parent_tab=favs_timelines_position, start=False, kwargs=dict(parent=self.view.nb, function="get_favorites", name="%s-favorite" % (i,), sessionObject=session, account=session.db["user_name"], bufferType=None, sound="favourites_timeline_updated.ogg", user_id=i, include_ext_alt_text=True, tweet_mode="extended"))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Followers timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="followers_timelines", account=session.db["user_name"]))
followers_timelines_position =self.view.search("followers_timelines", session.db["user_name"])
for i in session.settings["other_buffers"]["followers_timelines"]:
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Followers for {}").format(i,), parent_tab=followers_timelines_position, start=False, kwargs=dict(parent=self.view.nb, function="get_followers", name="%s-followers" % (i,), sessionObject=session, account=session.db["user_name"], sound="new_event.ogg", user_id=i))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Following timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="friends_timelines", account=session.db["user_name"]))
friends_timelines_position =self.view.search("friends_timelines", session.db["user_name"])
for i in session.settings["other_buffers"]["friends_timelines"]:
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_(u"Friends for {}").format(i,), parent_tab=friends_timelines_position, start=False, kwargs=dict(parent=self.view.nb, function="get_friends", name="%s-friends" % (i,), sessionObject=session, account=session.db["user_name"], sound="new_event.ogg", user_id=i))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Lists"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="lists", account=session.db["user_name"]))
lists_position =self.view.search("lists", session.db["user_name"])
for i in session.settings["other_buffers"]["lists"]:
pub.sendMessage("createBuffer", buffer_type="ListBuffer", session_type=session.type, buffer_title=_(u"List for {}").format(i), parent_tab=lists_position, start=False, kwargs=dict(parent=self.view.nb, function="list_timeline", name="%s-list" % (i,), sessionObject=session, account=session.db["user_name"], bufferType=None, sound="list_tweet.ogg", list_id=utils.find_list(i, session.db["lists"]), include_ext_alt_text=True, tweet_mode="extended"))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Searches"), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="searches", account=session.db["user_name"]))
searches_position =self.view.search("searches", session.db["user_name"])
for i in session.settings["other_buffers"]["tweet_searches"]:
pub.sendMessage("createBuffer", buffer_type="SearchBuffer", session_type=session.type, buffer_title=_(u"Search for {}").format(i), parent_tab=searches_position, start=False, kwargs=dict(parent=self.view.nb, function="search_tweets", name="%s-searchterm" % (i,), sessionObject=session, account=session.db["user_name"], bufferType="searchPanel", sound="search_updated.ogg", q=i, include_ext_alt_text=True, tweet_mode="extended"))
for i in session.settings["other_buffers"]["trending_topic_buffers"]:
pub.sendMessage("createBuffer", buffer_type="TrendsBuffer", session_type=session.type, buffer_title=_("Trending topics for %s") % (i), parent_tab=root_position, start=False, kwargs=dict(parent=self.view.nb, name="%s_tt" % (i,), sessionObject=session, account=session.db["user_name"], trendsFor=i, sound="trends_updated.ogg"))
def set_buffer_positions(self, session): def set_buffer_positions(self, session):
"Sets positions for buffers if values exist in the database." "Sets positions for buffers if values exist in the database."
@ -635,13 +595,13 @@ class Controller(object):
log.debug("Saving global configuration...") log.debug("Saving global configuration...")
for item in sessions.sessions: for item in sessions.sessions:
if sessions.sessions[item].logged == False: continue if sessions.sessions[item].logged == False: continue
if hasattr(sessions.sessions[item], "stop_streaming"):
log.debug("Disconnecting streaming endpoint for session" + sessions.sessions[item].session_id) log.debug("Disconnecting streaming endpoint for session" + sessions.sessions[item].session_id)
sessions.sessions[item].stop_streaming() sessions.sessions[item].stop_streaming()
log.debug("Disconnecting streams for %s session" % (sessions.sessions[item].session_id,)) log.debug("Disconnecting streams for %s session" % (sessions.sessions[item].session_id,))
sessions.sessions[item].sound.cleaner.cancel() sessions.sessions[item].sound.cleaner.cancel()
log.debug("Saving database for " + sessions.sessions[item].session_id) log.debug("Saving database for " + sessions.sessions[item].session_id)
sessions.sessions[item].save_persistent_data() sessions.sessions[item].save_persistent_data()
if system == "Windows":
self.systrayIcon.RemoveIcon() self.systrayIcon.RemoveIcon()
pidpath = os.path.join(os.getenv("temp"), "{}.pid".format(application.name)) pidpath = os.path.join(os.getenv("temp"), "{}.pid".format(application.name))
if os.path.exists(pidpath): if os.path.exists(pidpath):

View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
from pubsub import pub
from sessions.twitter import utils
from controller import buffers
class Handler(object):
def __init__(self):
super(Handler, self).__init__()
def create_buffers(self, session, createAccounts=True, controller=None):
session.get_user_info()
if createAccounts == True:
pub.sendMessage("core.create_account", name=session.db["user_name"], session_id=session.session_id)
root_position =controller.view.search(session.db["user_name"], session.db["user_name"])
for i in session.settings['general']['buffer_order']:
if i == 'home':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Home"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="home_timeline", name="home_timeline", sessionObject=session, account=session.db["user_name"], sound="tweet_received.ogg", include_ext_alt_text=True, tweet_mode="extended"))
elif i == 'mentions':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Mentions"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="mentions_timeline", name="mentions", sessionObject=session, account=session.db["user_name"], sound="mention_received.ogg", include_ext_alt_text=True, tweet_mode="extended"))
elif i == 'dm':
pub.sendMessage("createBuffer", buffer_type="DirectMessagesBuffer", session_type=session.type, buffer_title=_("Direct messages"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_direct_messages", name="direct_messages", sessionObject=session, account=session.db["user_name"], bufferType="dmPanel", compose_func="compose_direct_message", sound="dm_received.ogg"))
elif i == 'sent_dm':
pub.sendMessage("createBuffer", buffer_type="SentDirectMessagesBuffer", session_type=session.type, buffer_title=_("Sent direct messages"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function=None, name="sent_direct_messages", sessionObject=session, account=session.db["user_name"], bufferType="dmPanel", compose_func="compose_direct_message"))
elif i == 'sent_tweets':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Sent tweets"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="user_timeline", name="sent_tweets", sessionObject=session, account=session.db["user_name"], screen_name=session.db["user_name"], include_ext_alt_text=True, tweet_mode="extended"))
elif i == 'favorites':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Likes"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_favorites", name="favourites", sessionObject=session, account=session.db["user_name"], sound="favourite.ogg", include_ext_alt_text=True, tweet_mode="extended"))
elif i == 'followers':
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Followers"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_followers", name="followers", sessionObject=session, account=session.db["user_name"], sound="update_followers.ogg", screen_name=session.db["user_name"]))
elif i == 'friends':
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Following"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_friends", name="friends", sessionObject=session, account=session.db["user_name"], screen_name=session.db["user_name"]))
elif i == 'blocks':
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Blocked users"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_blocks", name="blocked", sessionObject=session, account=session.db["user_name"]))
elif i == 'muted':
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Muted users"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_mutes", name="muted", sessionObject=session, account=session.db["user_name"]))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="timelines", account=session.db["user_name"]))
timelines_position =controller.view.search("timelines", session.db["user_name"])
for i in session.settings["other_buffers"]["timelines"]:
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_(u"Timeline for {}").format(i,), parent_tab=timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="user_timeline", name="%s-timeline" % (i,), sessionObject=session, account=session.db["user_name"], sound="tweet_timeline.ogg", bufferType=None, user_id=i, include_ext_alt_text=True, tweet_mode="extended"))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Likes timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="favs_timelines", account=session.db["user_name"]))
favs_timelines_position =controller.view.search("favs_timelines", session.db["user_name"])
for i in session.settings["other_buffers"]["favourites_timelines"]:
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Likes for {}").format(i,), parent_tab=favs_timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_favorites", name="%s-favorite" % (i,), sessionObject=session, account=session.db["user_name"], bufferType=None, sound="favourites_timeline_updated.ogg", user_id=i, include_ext_alt_text=True, tweet_mode="extended"))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Followers timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="followers_timelines", account=session.db["user_name"]))
followers_timelines_position =controller.view.search("followers_timelines", session.db["user_name"])
for i in session.settings["other_buffers"]["followers_timelines"]:
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Followers for {}").format(i,), parent_tab=followers_timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_followers", name="%s-followers" % (i,), sessionObject=session, account=session.db["user_name"], sound="new_event.ogg", user_id=i))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Following timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="friends_timelines", account=session.db["user_name"]))
friends_timelines_position =controller.view.search("friends_timelines", session.db["user_name"])
for i in session.settings["other_buffers"]["friends_timelines"]:
pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_(u"Friends for {}").format(i,), parent_tab=friends_timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_friends", name="%s-friends" % (i,), sessionObject=session, account=session.db["user_name"], sound="new_event.ogg", user_id=i))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Lists"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="lists", account=session.db["user_name"]))
lists_position =controller.view.search("lists", session.db["user_name"])
for i in session.settings["other_buffers"]["lists"]:
pub.sendMessage("createBuffer", buffer_type="ListBuffer", session_type=session.type, buffer_title=_(u"List for {}").format(i), parent_tab=lists_position, start=False, kwargs=dict(parent=controller.view.nb, function="list_timeline", name="%s-list" % (i,), sessionObject=session, account=session.db["user_name"], bufferType=None, sound="list_tweet.ogg", list_id=utils.find_list(i, session.db["lists"]), include_ext_alt_text=True, tweet_mode="extended"))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Searches"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="searches", account=session.db["user_name"]))
searches_position =controller.view.search("searches", session.db["user_name"])
for i in session.settings["other_buffers"]["tweet_searches"]:
pub.sendMessage("createBuffer", buffer_type="SearchBuffer", session_type=session.type, buffer_title=_(u"Search for {}").format(i), parent_tab=searches_position, start=False, kwargs=dict(parent=controller.view.nb, function="search_tweets", name="%s-searchterm" % (i,), sessionObject=session, account=session.db["user_name"], bufferType="searchPanel", sound="search_updated.ogg", q=i, include_ext_alt_text=True, tweet_mode="extended"))
for i in session.settings["other_buffers"]["trending_topic_buffers"]:
pub.sendMessage("createBuffer", buffer_type="TrendsBuffer", session_type=session.type, buffer_title=_("Trending topics for %s") % (i), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="%s_tt" % (i,), sessionObject=session, account=session.db["user_name"], trendsFor=i, sound="trends_updated.ogg"))

View File

@ -21,7 +21,7 @@ def parse(s):
lst.remove(item) lst.remove(item)
#end if #end if
if len(lst) > 1: #more than one key, parse error if len(lst) > 1: #more than one key, parse error
raise ValueError, 'unknown modifier %s' % lst[0] raise ValueError('unknown modifier %s' % lst[0])
return (m, lst[0].lower()) return (m, lst[0].lower())
class AtspiThread(threading.Thread): class AtspiThread(threading.Thread):
def run(self): def run(self):

45
src/mastodon.defaults Normal file
View File

@ -0,0 +1,45 @@
[mastodon]
access_token = string(default="")
instance = string(default="")
user_name = string(default="")
ignored_clients = list(default=list())
[general]
relative_times = boolean(default=True)
max_toots_per_call = integer(default=40)
reverse_timelines = boolean(default=False)
persist_size = integer(default=0)
load_cache_in_memory=boolean(default=True)
show_screen_names = boolean(default=False)
[sound]
volume = float(default=1.0)
input_device = string(default="Default")
output_device = string(default="Default")
session_mute = boolean(default=False)
current_soundpack = string(default="default")
[other_buffers]
timelines = list(default=list())
searches = list(default=list())
lists = list(default=list())
favourites_timelines = list(default=list())
followers_timelines = list(default=list())
friends_timelines = list(default=list())
trending_topic_buffers = list(default=list())
muted_buffers = list(default=list())
autoread_buffers = list(default=list(mentions, direct_messages, events))
[mysc]
spelling_language = string(default="")
save_followers_in_autocompletion_db = boolean(default=False)
save_friends_in_autocompletion_db = boolean(default=False)
ocr_language = string(default="")
[reporting]
braille_reporting = boolean(default=True)
speech_reporting = boolean(default=True)
[filters]
[user-aliases]

View File

@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
import unittest
testmodules = ["test.test_cache"]
suite = unittest.TestSuite()
for t in testmodules:
try:
# If the module defines a suite() function, call it to get the suite.
mod = __import__(t, globals(), locals(), ['suite'])
suitefn = getattr(mod, 'suite')
suite.addTest(suitefn())
except (ImportError, AttributeError):
# else, just load all the test cases from the module.
suite.addTest(unittest.defaultTestLoader.loadTestsFromName(t))
unittest.TextTestRunner(verbosity=2).run(suite)

View File

@ -1,36 +1,32 @@
# -*- coding: cp1252 -*- # -*- coding: cp1252 -*-
#from config_utils import Configuration, ConfigurationResetException """ Lightweigth module that saves session position across global config and performs validation of config files. """
from __future__ import unicode_literals
from builtins import object
import config
import paths
import os import os
import logging import logging
import config
import paths
log = logging.getLogger("sessionmanager.manager") log = logging.getLogger("sessionmanager.manager")
from sessions import session_exceptions from sessions import session_exceptions
manager = None manager = None
def setup(): def setup():
""" Creates the singleton instance used within TWBlue to access this object. """
global manager global manager
if not manager: if not manager:
manager = sessionManager() manager = sessionManager()
class sessionManager(object): class sessionManager(object):
# def __init__(self):
# FILE = "sessions.conf"
# SPEC = "app-configuration.defaults"
# try:
# self.main = Configuration(paths.config_path(FILE), paths.app_path(SPEC))
# except ConfigurationResetException:
# pass
def get_current_session(self): def get_current_session(self):
""" Returns the currently focused session, if valid. """
if self.is_valid(config.app["sessions"]["current_session"]): if self.is_valid(config.app["sessions"]["current_session"]):
return config.app["sessions"]["current_session"] return config.app["sessions"]["current_session"]
else:
return False
def add_session(self, id): def add_session(self, id):
""" Adds a new session to the global config, so it will be taken into account for all operations.
:param id: Session identified.
:param id: str.
"""
log.debug("Adding a new session: %s" % (id,)) log.debug("Adding a new session: %s" % (id,))
path = os.path.join(paths.config_path(), id) path = os.path.join(paths.config_path(), id)
if not os.path.exists(path): if not os.path.exists(path):

View File

@ -1,42 +1,52 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" Module to perform session actions such as addition, removal or display of the global settings dialogue. """
import shutil import shutil
import widgetUtils
import platform
import output
if platform.system() == "Windows":
from . import wxUI as view
from controller import settings
elif platform.system() == "Linux":
from . import gtkUI as view
import paths
import time import time
import os import os
import logging import logging
import widgetUtils
import sessions import sessions
from sessions.twitter import session import output
from . import manager import paths
import config_utils import config_utils
import config import config
from pubsub import pub
from tweepy.errors import TweepyException from tweepy.errors import TweepyException
from controller import settings
from sessions.twitter import session as TwitterSession
from sessions.mastodon import session as MastodonSession
from . import manager
from . import wxUI as view
log = logging.getLogger("sessionmanager.sessionManager") log = logging.getLogger("sessionmanager.sessionManager")
class sessionManagerController(object): class sessionManagerController(object):
def __init__(self, started=False): def __init__(self, started: bool = False):
""" Class constructor.
Creates the SessionManager class controller, responsible for the accounts within TWBlue. From this dialog, users can add/Remove accounts, or load the global settings dialog.
:param started: Indicates whether this object is being created during application startup (when no other controller has been instantiated) or not. It is important for us to know this, as we won't show the button to open global settings dialog if the application has been started. Users must choose the corresponding option in the menu bar.
:type started: bool
"""
super(sessionManagerController, self).__init__() super(sessionManagerController, self).__init__()
log.debug("Setting up the session manager.") log.debug("Setting up the session manager.")
self.started = started self.started = started
# Initialize the manager, responsible for storing session objects.
manager.setup() manager.setup()
self.view = view.sessionManagerWindow() self.view = view.sessionManagerWindow()
widgetUtils.connect_event(self.view.new, widgetUtils.BUTTON_PRESSED, self.manage_new_account) pub.subscribe(self.manage_new_account, "sessionmanager.new_account")
widgetUtils.connect_event(self.view.remove, widgetUtils.BUTTON_PRESSED, self.remove) pub.subscribe(self.remove_account, "sessionmanager.remove_account")
if self.started == False: if self.started == False:
widgetUtils.connect_event(self.view.configuration, widgetUtils.BUTTON_PRESSED, self.configuration) pub.subscribe(self.configuration, "sessionmanager.configuration")
else: else:
self.view.hide_configuration() self.view.hide_configuration()
# Store a temporary copy of new and removed sessions, so we will perform actions on them during call to on_ok.
self.new_sessions = {} self.new_sessions = {}
self.removed_sessions = [] self.removed_sessions = []
def fill_list(self): def fill_list(self):
""" Fills the session list with all valid sessions that could be found in config path. """
sessionsList = [] sessionsList = []
reserved_dirs = ["dicts"] reserved_dirs = ["dicts"]
log.debug("Filling the sessions list.") log.debug("Filling the sessions list.")
@ -55,10 +65,16 @@ class sessionManagerController(object):
output.speak("An exception was raised while attempting to clean malformed session data. See the error log for details. If this message persists, contact the developers.",True) output.speak("An exception was raised while attempting to clean malformed session data. See the error log for details. If this message persists, contact the developers.",True)
os.exception("Exception thrown while removing malformed session") os.exception("Exception thrown while removing malformed session")
continue continue
name = config_test["twitter"]["user_name"] if config_test.get("twitter") != None:
name = _("{account_name} (Twitter)").format(account_name=config_test["twitter"]["user_name"])
if config_test["twitter"]["user_key"] != "" and config_test["twitter"]["user_secret"] != "": if config_test["twitter"]["user_key"] != "" and config_test["twitter"]["user_secret"] != "":
sessionsList.append(name) sessionsList.append(name)
self.sessions.append(i) self.sessions.append(dict(type="twitter", id=i))
elif config_test.get("mastodon") != None:
name = _("{account_name} (Mastodon)").format(account_name=config_test["mastodon"]["user_name"])
if config_test["mastodon"]["instance"] != "" and config_test["mastodon"]["access_token"] != "":
sessionsList.append(name)
self.sessions.append(dict(type="mastodon", id=i))
else: else:
try: try:
log.debug("Deleting session %s" % (i,)) log.debug("Deleting session %s" % (i,))
@ -69,6 +85,7 @@ class sessionManagerController(object):
self.view.fill_list(sessionsList) self.view.fill_list(sessionsList)
def show(self): def show(self):
""" Displays the session manager dialog. """
if self.view.get_response() == widgetUtils.OK: if self.view.get_response() == widgetUtils.OK:
self.do_ok() self.do_ok()
# else: # else:
@ -77,49 +94,51 @@ class sessionManagerController(object):
def do_ok(self): def do_ok(self):
log.debug("Starting sessions...") log.debug("Starting sessions...")
for i in self.sessions: for i in self.sessions:
if (i in sessions.sessions) == True: continue # Skip already created sessions. Useful when session manager controller is not created during startup.
s = session.Session(i) if sessions.sessions.get(i.get("id")) != None:
continue
# Create the session object based in session type.
if i.get("type") == "twitter":
s = TwitterSession.Session(i.get("id"))
elif i.get("type") == "mastodon":
s = MastodonSession.Session(i.get("id"))
s.get_configuration() s.get_configuration()
if i not in config.app["sessions"]["ignored_sessions"]: if i.get("id") not in config.app["sessions"]["ignored_sessions"]:
try: try:
s.login() s.login()
except TweepyException: except TweepyException:
self.show_auth_error(s.settings["twitter"]["user_name"]) self.show_auth_error(s.settings["twitter"]["user_name"])
continue continue
sessions.sessions[i] = s sessions.sessions[i.get("id")] = s
self.new_sessions[i] = s self.new_sessions[i.get("id")] = s
# self.view.destroy() # self.view.destroy()
def show_auth_error(self, user_name): def show_auth_error(self, user_name):
error = view.auth_error(user_name) error = view.auth_error(user_name)
def manage_new_account(self, *args, **kwargs): def manage_new_account(self, type):
if self.view.new_account_dialog() == widgetUtils.YES: # Generic settings for all account types.
location = (str(time.time())[-6:]) location = (str(time.time())[-6:])
log.debug("Creating session in the %s path" % (location,)) log.debug("Creating %s session in the %s path" % (type, location))
s = session.Session(location) if type == "twitter":
s = TwitterSession.Session(location)
elif type == "mastodon":
s = MastodonSession.Session(location)
manager.manager.add_session(location) manager.manager.add_session(location)
s.get_configuration() s.get_configuration()
# try:
s.authorise() s.authorise()
self.sessions.append(location) self.sessions.append(dict(id=location, type=type))
self.view.add_new_session_to_list() self.view.add_new_session_to_list()
s.settings.write() s.settings.write()
# except:
# log.exception("Error authorising the session")
# self.view.show_unauthorised_error()
# return
def remove(self, *args, **kwargs): def remove_account(self, index):
if self.view.remove_account_dialog() == widgetUtils.YES: selected_account = self.sessions[index]
selected_account = self.sessions[self.view.get_selected()] self.view.remove_session(index)
self.view.remove_session(self.view.get_selected()) self.removed_sessions.append(selected_account.get("id"))
self.removed_sessions.append(selected_account)
self.sessions.remove(selected_account) self.sessions.remove(selected_account)
shutil.rmtree(path=os.path.join(paths.config_path(), selected_account), ignore_errors=True) shutil.rmtree(path=os.path.join(paths.config_path(), selected_account.get("id")), ignore_errors=True)
def configuration(self):
def configuration(self, *args, **kwargs):
""" Opens the global settings dialogue.""" """ Opens the global settings dialogue."""
d = settings.globalSettingsController() d = settings.globalSettingsController()
if d.response == widgetUtils.OK: if d.response == widgetUtils.OK:

View File

@ -1,10 +1,11 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import unicode_literals """ Base GUI (Wx) class for the Session manager module."""
import wx import wx
from pubsub import pub
from multiplatform_widgets import widgets from multiplatform_widgets import widgets
import application
class sessionManagerWindow(wx.Dialog): class sessionManagerWindow(wx.Dialog):
""" Dialog that displays all session managing capabilities to users. """
def __init__(self): def __init__(self):
super(sessionManagerWindow, self).__init__(parent=None, title=_(u"Session manager"), size=wx.DefaultSize) super(sessionManagerWindow, self).__init__(parent=None, title=_(u"Session manager"), size=wx.DefaultSize)
panel = wx.Panel(self) panel = wx.Panel(self)
@ -16,8 +17,11 @@ class sessionManagerWindow(wx.Dialog):
listSizer.Add(self.list.list, 0, wx.ALL, 5) listSizer.Add(self.list.list, 0, wx.ALL, 5)
sizer.Add(listSizer, 0, wx.ALL, 5) sizer.Add(listSizer, 0, wx.ALL, 5)
self.new = wx.Button(panel, -1, _(u"New account"), size=wx.DefaultSize) self.new = wx.Button(panel, -1, _(u"New account"), size=wx.DefaultSize)
self.new.Bind(wx.EVT_BUTTON, self.on_new_account)
self.remove = wx.Button(panel, -1, _(u"Remove account")) self.remove = wx.Button(panel, -1, _(u"Remove account"))
self.remove.Bind(wx.EVT_BUTTON, self.on_remove)
self.configuration = wx.Button(panel, -1, _(u"Global Settings")) self.configuration = wx.Button(panel, -1, _(u"Global Settings"))
self.configuration.Bind(wx.EVT_BUTTON, self.on_configuration)
ok = wx.Button(panel, wx.ID_OK, size=wx.DefaultSize) ok = wx.Button(panel, wx.ID_OK, size=wx.DefaultSize)
ok.SetDefault() ok.SetDefault()
cancel = wx.Button(panel, wx.ID_CANCEL, size=wx.DefaultSize) cancel = wx.Button(panel, wx.ID_CANCEL, size=wx.DefaultSize)
@ -42,11 +46,29 @@ class sessionManagerWindow(wx.Dialog):
if self.list.get_count() == 0: if self.list.get_count() == 0:
wx.MessageDialog(None, _(u"You need to configure an account."), _(u"Account Error"), wx.ICON_ERROR).ShowModal() wx.MessageDialog(None, _(u"You need to configure an account."), _(u"Account Error"), wx.ICON_ERROR).ShowModal()
return return
self.controller.do_ok()
self.EndModal(wx.ID_OK) self.EndModal(wx.ID_OK)
def new_account_dialog(self): def on_new_account(self, *args, **kwargs):
return wx.MessageDialog(self, _(u"The request to authorize your Twitter account will be opened in your browser. You only need to do this once. Would you like to continue?"), _(u"Authorization"), wx.YES_NO).ShowModal() menu = wx.Menu()
twitter = menu.Append(wx.ID_ANY, _("Twitter"))
mastodon = menu.Append(wx.ID_ANY, _("Mastodon"))
menu.Bind(wx.EVT_MENU, self.on_new_twitter_account, twitter)
menu.Bind(wx.EVT_MENU, self.on_new_mastodon_account, mastodon)
self.PopupMenu(menu, self.new.GetPosition())
def on_new_mastodon_account(self, *args, **kwargs):
dlg = wx.MessageDialog(self, _("You will be prompted for your Mastodon data (instance URL, email address and password) so we can authorise TWBlue in your instance. Would you like to authorise your account now?"), _(u"Authorization"), wx.YES_NO)
response = dlg.ShowModal()
dlg.Destroy()
if response == wx.ID_YES:
pub.sendMessage("sessionmanager.new_account", type="mastodon")
def on_new_twitter_account(self, *args, **kwargs):
dlg = wx.MessageDialog(self, _(u"The request to authorize your Twitter account will be opened in your browser. You only need to do this once. Would you like to continue?"), _(u"Authorization"), wx.YES_NO)
response = dlg.ShowModal()
dlg.Destroy()
if response == wx.ID_YES:
pub.sendMessage("sessionmanager.new_account", type="twitter")
def add_new_session_to_list(self): def add_new_session_to_list(self):
total = self.list.get_count() total = self.list.get_count()
@ -61,8 +83,16 @@ class sessionManagerWindow(wx.Dialog):
def get_response(self): def get_response(self):
return self.ShowModal() return self.ShowModal()
def remove_account_dialog(self): def on_remove(self, *args, **kwargs):
return wx.MessageDialog(self, _(u"Do you really want to delete this account?"), _(u"Remove account"), wx.YES_NO).ShowModal() dlg = wx.MessageDialog(self, _(u"Do you really want to delete this account?"), _(u"Remove account"), wx.YES_NO)
response = dlg.ShowModal()
dlg.Destroy()
if response == wx.ID_YES:
selected = self.list.get_selected()
pub.sendMessage("sessionmanager.remove_account", index=selected)
def on_configuration(self, *args, **kwargs):
pub.sendMessage("sessionmanager.configuration")
def get_selected(self): def get_selected(self):
return self.list.get_selected() return self.list.get_selected()

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" this package contains code related to Sessions. """ this package contains code related to Sessions.
In TWBlue, a session module defines everything a social network needs to be used in the program.""" In TWBlue, a session module defines everything a social network needs to be used in the program."""
from __future__ import unicode_literals
# let's define a global object for storing sessions across the program. # let's define a global object for storing sessions across the program.
sessions = {} sessions = {}

View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
import os
import paths
import time
import logging
import wx
import mastodon
import config
import config_utils
import output
import application
from pubsub import pub
from mysc.thread_utils import call_threaded
from sessions import base
from .wxUI import authorisationDialog
log = logging.getLogger("sessions.mastodonSession")
class Session(base.baseSession):
def __init__(self, *args, **kwargs):
super(Session, self).__init__(*args, **kwargs)
self.config_spec = "mastodon.defaults"
self.supported_languages = []
self.type = "mastodon"
def login(self, verify_credentials=True):
if self.settings["mastodon"]["access_token"] != None and self.settings["mastodon"]["instance"] != None:
try:
log.debug("Logging in to Mastodon instance {}...".format(self.settings["mastodon"]["instance"]))
self.api = mastodon.Mastodon(access_token=self.settings["mastodon"]["access_token"], api_base_url=self.settings["mastodon"]["instance"])
if verify_credentials == True:
credentials = self.api.account_verify_credentials()
self.db["user_name"] = credentials["username"]
self.db["user_id"] = credentials["id"]
self.settings["mastodon"]["user_name"] = credentials["username"]
self.logged = True
log.debug("Logged.")
self.counter = 0
except IOError:
log.error("The login attempt failed.")
self.logged = False
else:
self.logged = False
raise Exceptions.RequireCredentialsSessionError
def authorise(self):
if self.logged == True:
raise Exceptions.AlreadyAuthorisedError("The authorisation process is not needed at this time.")
else:
self.authorisation_dialog = authorisationDialog()
answer = self.authorisation_dialog.ShowModal()
if answer == wx.ID_OK:
client_id, client_secret = mastodon.Mastodon.create_app("TWBlue", api_base_url=self.authorisation_dialog.instance.GetValue(), website="https://twblue.es")
temporary_api = mastodon.Mastodon(client_id=client_id, client_secret=client_secret, api_base_url=self.authorisation_dialog.instance.GetValue())
access_token = temporary_api.log_in(self.authorisation_dialog.email.GetValue(), self.authorisation_dialog.password.GetValue())
self.settings["mastodon"]["access_token"] = access_token
self.settings["mastodon"]["instance"] = self.authorisation_dialog.instance.GetValue()
self.settings.write()
def get_user_info(self):
""" Retrieves some information required by TWBlue for setup."""
# retrieve the current user's UTC offset so we can calculate dates properly.
offset = time.timezone if (time.localtime().tm_isdst == 0) else time.altzone
offset = offset / 60 / 60 * -1
self.db["utc_offset"] = offset
if len(self.supported_languages) == 0:
self.supported_languages = self.api.instance().languages
self.get_lists()
self.get_muted_users()
self.settings.write()
def get_lists(self):
""" Gets the lists that the user is subscribed to and stores them in the database. Returns None."""
self.db["lists"] = self.api.lists()
def get_muted_users(self):
### ToDo: Use a function to retrieve all muted users.
self.db["muted_users"] = self.api.mutes()
def get_user_alias(self, user):
aliases = self.settings.get("user-aliases")
if aliases == None:
log.error("Aliases are not defined for this config spec.")
return user.name
user_alias = aliases.get(user.id_str)
if user_alias != None:
return user_alias
return user.name

View File

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
import wx
class authorisationDialog(wx.Dialog):
def __init__(self):
super(authorisationDialog, self).__init__(parent=None, title=_(u"Authorising account..."))
panel = wx.Panel(self)
sizer = wx.BoxSizer(wx.VERTICAL)
static1 = wx.StaticText(panel, wx.NewId(), _("URL of mastodon instance:"))
self.instance = wx.TextCtrl(panel, -1)
sizer1 = wx.BoxSizer(wx.HORIZONTAL)
sizer1.Add(static1, 0, wx.ALL, 5)
sizer1.Add(self.instance, 0, wx.ALL, 5)
sizer.Add(sizer1, 0, wx.ALL, 5)
static2 = wx.StaticText(panel, wx.NewId(), _("Email address:"))
self.email = wx.TextCtrl(panel, -1)
sizer2 = wx.BoxSizer(wx.HORIZONTAL)
sizer2.Add(static2, 0, wx.ALL, 5)
sizer2.Add(self.email, 0, wx.ALL, 5)
sizer.Add(sizer2, 0, wx.ALL, 5)
static3 = wx.StaticText(panel, wx.NewId(), _("Password:"))
self.password = wx.TextCtrl(panel, -1)
sizer3 = wx.BoxSizer(wx.HORIZONTAL)
sizer3.Add(static3, 0, wx.ALL, 5)
sizer3.Add(self.password, 0, wx.ALL, 5)
sizer.Add(sizer3, 0, wx.ALL, 5)
self.ok = wx.Button(panel, wx.ID_OK)
self.cancel = wx.Button(panel, wx.ID_CANCEL)
sizer4 = wx.BoxSizer(wx.HORIZONTAL)
sizer4.Add(self.ok, 0, wx.ALL, 5)
sizer4.Add(self.cancel, 0, wx.ALL, 5)
sizer.Add(sizer4, 0, wx.ALL, 5)
panel.SetSizer(sizer)
min = sizer.CalcMin()
self.SetClientSize(min)

View File

@ -137,8 +137,7 @@ class Session(base.baseSession):
if self.settings["twitter"]["user_key"] != None and self.settings["twitter"]["user_secret"] != None: if self.settings["twitter"]["user_key"] != None and self.settings["twitter"]["user_secret"] != None:
try: try:
log.debug("Logging in to twitter...") log.debug("Logging in to twitter...")
self.auth = tweepy.OAuth1UserHandler(appkeys.twitter_api_key, appkeys.twitter_api_secret) self.auth = tweepy.OAuth1UserHandler(consumer_key=appkeys.twitter_api_key, consumer_secret=appkeys.twitter_api_secret, access_token=self.settings["twitter"]["user_key"], access_token_secret=self.settings["twitter"]["user_secret"])
self.auth.set_access_token(self.settings["twitter"]["user_key"], self.settings["twitter"]["user_secret"])
self.twitter = tweepy.API(self.auth) self.twitter = tweepy.API(self.auth)
self.twitter_v2 = tweepy.Client(consumer_key=appkeys.twitter_api_key, consumer_secret=appkeys.twitter_api_secret, access_token=self.settings["twitter"]["user_key"], access_token_secret=self.settings["twitter"]["user_secret"]) self.twitter_v2 = tweepy.Client(consumer_key=appkeys.twitter_api_key, consumer_secret=appkeys.twitter_api_secret, access_token=self.settings["twitter"]["user_key"], access_token_secret=self.settings["twitter"]["user_secret"])
if verify_credentials == True: if verify_credentials == True:

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import unicode_literals
import wx import wx
class authorisationDialog(wx.Dialog): class authorisationDialog(wx.Dialog):

View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
import sys
import types
import pytest
import os
import sqlitedict
import shutil
from unittest import mock
# Mock sound module, so LibVLc won't complain.
sound_module = types.ModuleType("sound")
sys.modules["sound"] = sound_module
sound_module.soundManager = mock.MagicMock(name="sound.soundManager")
from sessions import base
# path where we will save our test config, as we can't rely on paths module due to pytest's paths being different.
session_path = os.path.join(os.getcwd(), "config", "testing")
@pytest.fixture
def session():
""" Configures a fake base session from where we can test things. """
global session_path
s = base.baseSession("testing")
if os.path.exists(session_path) == False:
os.mkdir(session_path)
# Patches paths.app_path and paths.config_path, so we will not have issues during session configuration.
with mock.patch("paths.app_path", return_value=os.getcwd()) as app_path:
with mock.patch("paths.config_path", return_value=os.path.join(os.getcwd(), "config")) as config_path:
s.get_configuration()
yield s
# Session's cleanup code.
if os.path.exists(session_path):
shutil.rmtree(session_path)
del s
@pytest.fixture
def dataset():
""" Generates a sample dataset"""
dataset = dict(home_timeline=["message" for i in range(10000)], mentions_timeline=["mention" for i in range(20000)])
yield dataset
### Testing database being read from disk.
def test_cache_in_disk_unlimited_size(session, dataset):
""" Tests cache database being read from disk, storing the whole datasets. """
session.settings["general"]["load_cache_in_memory"] = False
session.settings["general"]["persist_size"] = -1
session.load_persistent_data()
session.db["home_timeline"] = dataset["home_timeline"]
session.db["mentions_timeline"] = dataset["mentions_timeline"]
session.save_persistent_data()
assert isinstance(session.db, sqlitedict.SqliteDict)
assert session.db.get("home_timeline") != None
assert session.db.get("mentions_timeline") != None
assert len(session.db.get("home_timeline")) == 10000
assert len(session.db.get("mentions_timeline")) == 20000
session.db.close()
def test_cache_in_disk_limited_dataset(session, dataset):
""" Tests wether the cache stores only the amount of items we ask it to store. """
session.settings["general"]["load_cache_in_memory"] = False
session.settings["general"]["persist_size"] = 100
session.load_persistent_data()
session.db["home_timeline"] = dataset["home_timeline"]
session.db["mentions_timeline"] = dataset["mentions_timeline"]
# We need to save and load the db again because we cannot modify buffers' size while the database is opened.
# As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db
# Might cause an out of sync error between the GUI lists and the database.
# So we perform the changes to buffer size when loading data during app startup if the DB is read from disk.
session.save_persistent_data()
session.db = dict()
session.load_persistent_data()
assert isinstance(session.db, sqlitedict.SqliteDict)
assert session.db.get("home_timeline") != None
assert session.db.get("mentions_timeline") != None
assert len(session.db.get("home_timeline")) == 100
assert len(session.db.get("mentions_timeline")) == 100
session.db.close()
def test_cache_in_disk_limited_dataset_unreversed(session):
"""Test if the cache is saved properly in unreversed buffers, when newest items are at the end of the list. """
dataset = dict(home_timeline=[i for i in range(20)], mentions_timeline=[i for i in range(20)])
session.settings["general"]["load_cache_in_memory"] = False
session.settings["general"]["persist_size"] = 10
session.load_persistent_data()
session.db["home_timeline"] = dataset["home_timeline"]
session.db["mentions_timeline"] = dataset["mentions_timeline"]
# We need to save and load the db again because we cannot modify buffers' size while the database is opened.
# As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db
# Might cause an out of sync error between the GUI lists and the database.
# So we perform the changes to buffer size when loading data during app startup if the DB is read from disk.
session.save_persistent_data()
session.db = dict()
session.load_persistent_data()
assert isinstance(session.db, sqlitedict.SqliteDict)
assert session.db.get("home_timeline") != None
assert session.db.get("mentions_timeline") != None
assert session.db.get("home_timeline")[0] == 10
assert session.db.get("mentions_timeline")[0] == 10
assert session.db.get("home_timeline")[-1] == 19
assert session.db.get("mentions_timeline")[-1] == 19
session.db.close()
def test_cache_in_disk_limited_dataset_reversed(session):
"""Test if the cache is saved properly in reversed buffers, when newest items are at the start of the list. """
dataset = dict(home_timeline=[i for i in range(19, -1, -1)], mentions_timeline=[i for i in range(19, -1, -1)])
session.settings["general"]["load_cache_in_memory"] = False
session.settings["general"]["persist_size"] = 10
session.settings["general"]["reverse_timelines"] = True
session.load_persistent_data()
session.db["home_timeline"] = dataset["home_timeline"]
session.db["mentions_timeline"] = dataset["mentions_timeline"]
# We need to save and load the db again because we cannot modify buffers' size while the database is opened.
# As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db
# Might cause an out of sync error between the GUI lists and the database.
# So we perform the changes to buffer size when loading data during app startup if the DB is read from disk.
session.save_persistent_data()
session.db = dict()
session.load_persistent_data()
assert isinstance(session.db, sqlitedict.SqliteDict)
assert session.db.get("home_timeline") != None
assert session.db.get("mentions_timeline") != None
assert session.db.get("home_timeline")[0] == 19
assert session.db.get("mentions_timeline")[0] == 19
assert session.db.get("home_timeline")[-1] == 10
assert session.db.get("mentions_timeline")[-1] == 10
session.db.close()
### Testing database being loaded into memory. Those tests should give the same results than before
### but as we have different code depending whether we load db into memory or read it from disk,
### We need to test this anyways.
def test_cache_in_memory_unlimited_size(session, dataset):
""" Tests cache database being loaded in memory, storing the whole datasets. """
session.settings["general"]["load_cache_in_memory"] = True
session.settings["general"]["persist_size"] = -1
session.load_persistent_data()
session.db["home_timeline"] = dataset["home_timeline"]
session.db["mentions_timeline"] = dataset["mentions_timeline"]
session.save_persistent_data()
session.db = dict()
session.load_persistent_data()
assert isinstance(session.db, dict)
assert session.db.get("home_timeline") != None
assert session.db.get("mentions_timeline") != None
assert len(session.db.get("home_timeline")) == 10000
assert len(session.db.get("mentions_timeline")) == 20000
def test_cache_in_memory_limited_dataset(session, dataset):
""" Tests wether the cache stores only the amount of items we ask it to store, when loaded in memory. """
session.settings["general"]["load_cache_in_memory"] = True
session.settings["general"]["persist_size"] = 100
session.load_persistent_data()
session.db["home_timeline"] = dataset["home_timeline"]
session.db["mentions_timeline"] = dataset["mentions_timeline"]
session.save_persistent_data()
session.db = dict()
session.load_persistent_data()
assert isinstance(session.db, dict)
assert session.db.get("home_timeline") != None
assert session.db.get("mentions_timeline") != None
assert len(session.db.get("home_timeline")) == 100
assert len(session.db.get("mentions_timeline")) == 100
def test_cache_in_memory_limited_dataset_unreversed(session):
"""Test if the cache is saved properly when loaded in memory in unreversed buffers, when newest items are at the end of the list. """
dataset = dict(home_timeline=[i for i in range(20)], mentions_timeline=[i for i in range(20)])
session.settings["general"]["load_cache_in_memory"] = True
session.settings["general"]["persist_size"] = 10
session.load_persistent_data()
assert len(session.db)==1
session.db["home_timeline"] = dataset["home_timeline"]
session.db["mentions_timeline"] = dataset["mentions_timeline"]
session.save_persistent_data()
session.db = dict()
session.load_persistent_data()
assert isinstance(session.db, dict)
assert session.db.get("home_timeline") != None
assert session.db.get("mentions_timeline") != None
assert session.db.get("home_timeline")[0] == 10
assert session.db.get("mentions_timeline")[0] == 10
assert session.db.get("home_timeline")[-1] == 19
assert session.db.get("mentions_timeline")[-1] == 19
def test_cache_in_memory_limited_dataset_reversed(session):
"""Test if the cache is saved properly in reversed buffers, when newest items are at the start of the list. This test if for db read into memory. """
dataset = dict(home_timeline=[i for i in range(19, -1, -1)], mentions_timeline=[i for i in range(19, -1, -1)])
session.settings["general"]["load_cache_in_memory"] = True
session.settings["general"]["persist_size"] = 10
session.settings["general"]["reverse_timelines"] = True
session.load_persistent_data()
session.db["home_timeline"] = dataset["home_timeline"]
session.db["mentions_timeline"] = dataset["mentions_timeline"]
session.save_persistent_data()
session.db = dict()
session.load_persistent_data()
assert isinstance(session.db, dict)
assert session.db.get("home_timeline") != None
assert session.db.get("mentions_timeline") != None
assert session.db.get("home_timeline")[0] == 19
assert session.db.get("mentions_timeline")[0] == 19
assert session.db.get("home_timeline")[-1] == 10
assert session.db.get("mentions_timeline")[-1] == 10

View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
import pytest
from tweepy.models import Status
@pytest.fixture
def basic_tweet():
data = {'created_at': 'Mon Jan 03 15:03:36 +0000 2022', 'id': 1478019218884857856, 'id_str': '1478019218884857856', 'full_text': 'Changes in projects for next year https://t.co/nW3GS9RmHd', 'truncated': False, 'display_text_range': [0, 57], 'entities': {'hashtags': [], 'symbols': [], 'user_mentions': [], 'urls': [{'url': 'https://t.co/nW3GS9RmHd', 'expanded_url': 'https://manuelcortez.net/blog/changes-in-projects-for-next-year/#.YdMQQU6t1FI.twitter', 'display_url': 'manuelcortez.net/blog/changes-i…', 'indices': [34, 57]}]}, 'source': '<a href="https://mobile.twitter.com" rel="nofollow">Twitter Web App</a>', 'in_reply_to_status_id': None, 'in_reply_to_status_id_str': None, 'in_reply_to_user_id': None, 'in_reply_to_user_id_str': None, 'in_reply_to_screen_name': None, 'user': {'id': 258677951, 'id_str': '258677951', 'name': 'Manuel Cortez', 'screen_name': 'manuelcortez00', 'location': 'Nuevo León, México', 'description': 'Python developer, , interested in reading, accessibility, astronomy, physics and science. Я учу русский.', 'url': 'https://t.co/JFRKRA73ZV', 'entities': {'url': {'urls': [{'url': 'https://t.co/JFRKRA73ZV', 'expanded_url': 'https://manuelcortez.net', 'display_url': 'manuelcortez.net', 'indices': [0, 23]}]}, 'description': {'urls': []}}, 'protected': False, 'followers_count': 1453, 'friends_count': 568, 'listed_count': 45, 'created_at': 'Mon Feb 28 06:52:48 +0000 2011', 'favourites_count': 283, 'utc_offset': None, 'time_zone': None, 'geo_enabled': True, 'verified': False, 'statuses_count': 43371, 'lang': None, 'contributors_enabled': False, 'is_translator': False, 'is_translation_enabled': False, 'profile_background_color': 'C0DEED', 'profile_background_image_url': 'http://abs.twimg.com/images/themes/theme1/bg.png', 'profile_background_image_url_https': 'https://abs.twimg.com/images/themes/theme1/bg.png', 'profile_background_tile': False, 'profile_image_url': 'http://pbs.twimg.com/profile_images/442466677645508608/3EBBC-OX_normal.jpeg', 'profile_image_url_https': 'https://pbs.twimg.com/profile_images/442466677645508608/3EBBC-OX_normal.jpeg', 'profile_image_extensions_alt_text': None, 'profile_link_color': '1DA1F2', 'profile_sidebar_border_color': 'C0DEED', 'profile_sidebar_fill_color': 'DDEEF6', 'profile_text_color': '333333', 'profile_use_background_image': True, 'has_extended_profile': False, 'default_profile': True, 'default_profile_image': False, 'following': False, 'follow_request_sent': False, 'notifications': False, 'translator_type': 'regular', 'withheld_in_countries': []}, 'geo': None, 'coordinates': None, 'place': None, 'contributors': None, 'is_quote_status': False, 'retweet_count': 6, 'favorite_count': 2, 'favorited': False, 'retweeted': False, 'possibly_sensitive': False, 'possibly_sensitive_appealable': False, 'lang': 'en'}
yield Status().parse(api=None, json=data)
@pytest.fixture
def basic_tweet_multiple_mentions():
data = {'created_at': 'Mon Dec 27 21:21:25 +0000 2021', 'id': 1475577584947707909, 'id_str': '1475577584947707909', 'full_text': '@tamaranatalia9 @Darkstrings @Chris88171572 @manuelcortez00 Well done, thanks Tamara', 'truncated': False, 'display_text_range': [60, 84], 'entities': {'hashtags': [], 'symbols': [], 'user_mentions': [{'screen_name': 'tamaranatalia9', 'name': 'Tamara', 'id': 914114584591597568, 'id_str': '914114584591597568', 'indices': [0, 15]}, {'screen_name': 'Darkstrings', 'name': 'Luc', 'id': 1374154151115042823, 'id_str': '1374154151115042823', 'indices': [16, 28]}, {'screen_name': 'Chris88171572', 'name': 'Chris', 'id': 1323980014799495168, 'id_str': '1323980014799495168', 'indices': [29, 43]}, {'screen_name': 'manuelcortez00', 'name': 'Manuel Cortez', 'id': 258677951, 'id_str': '258677951', 'indices': [44, 59]}], 'urls': []}, 'source': '<a href="http://twitter.com/download/android" rel="nofollow">Twitter for Android</a>', 'in_reply_to_status_id': 1475550502083563526, 'in_reply_to_status_id_str': '1475550502083563526', 'in_reply_to_user_id': 914114584591597568, 'in_reply_to_user_id_str': '914114584591597568', 'in_reply_to_screen_name': 'tamaranatalia9', 'user': {'id': 784837522157436929, 'id_str': '784837522157436929', 'name': 'Paulus', 'screen_name': 'PauloPer01', 'location': '', 'description': '', 'url': None, 'entities': {'description': {'urls': []}}, 'protected': False, 'followers_count': 1082, 'friends_count': 3029, 'listed_count': 2, 'created_at': 'Sat Oct 08 19:27:01 +0000 2016', 'favourites_count': 78862, 'utc_offset': None, 'time_zone': None, 'geo_enabled': False, 'verified': False, 'statuses_count': 4976, 'lang': None, 'contributors_enabled': False, 'is_translator': False, 'is_translation_enabled': False, 'profile_background_color': 'F5F8FA', 'profile_background_image_url': None, 'profile_background_image_url_https': None, 'profile_background_tile': False, 'profile_image_url': 'http://pbs.twimg.com/profile_images/1464572633014587395/246oPPLa_normal.jpg', 'profile_image_url_https': 'https://pbs.twimg.com/profile_images/1464572633014587395/246oPPLa_normal.jpg', 'profile_image_extensions_alt_text': None, 'profile_link_color': '1DA1F2', 'profile_sidebar_border_color': 'C0DEED', 'profile_sidebar_fill_color': 'DDEEF6', 'profile_text_color': '333333', 'profile_use_background_image': True, 'has_extended_profile': True, 'default_profile': True, 'default_profile_image': False, 'following': False, 'follow_request_sent': False, 'notifications': False, 'translator_type': 'none', 'withheld_in_countries': []}, 'geo': None, 'coordinates': None, 'place': None, 'contributors': None, 'is_quote_status': False, 'retweet_count': 1, 'favorite_count': 2, 'favorited': False, 'retweeted': False, 'lang': 'en'}
yield Status().parse(api=None, json=data)

View File

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
import pytest
import gettext
import datetime
gettext.install("test")
from unittest import mock
from sessions.twitter import templates
def test_default_values():
""" Tests wheter default values are the expected ones.
This might be useful so we will have this failing when we update anything from those values.
As TWBlue might be using those from other dialogs.
"""
assert templates.tweet_variables == ["date", "display_name", "screen_name", "source", "lang", "text", "image_descriptions"]
assert templates.dm_variables == ["date", "sender_display_name", "sender_screen_name", "recipient_display_name", "recipient_display_name", "text"]
assert templates.person_variables == ["display_name", "screen_name", "location", "description", "followers", "following", "listed", "likes", "tweets", "created_at"]
@pytest.mark.parametrize("offset, language, expected_result", [
(0, "en_US", "Wednesday, October 10, 2018 20:19:24"),
(-21600, "en_US", "Wednesday, October 10, 2018 14:19:24"),
(7200, "en_US", "Wednesday, October 10, 2018 22:19:24"),
(0, "es_ES", "miércoles, octubre 10, 2018 20:19:24"),
(-21600, "es_ES", "miércoles, octubre 10, 2018 14:19:24"),
(7200, "es_ES", "miércoles, octubre 10, 2018 22:19:24"),
(18000, "es_ES", "jueves, octubre 11, 2018 1:19:24"),
])
def test_process_date_absolute_time(offset, language, expected_result):
""" Tests date processing function for tweets, when relative_times is set to False. """
# Date representation used by twitter, converted to datetime object, as tweepy already does this.
# Original date was Wed Oct 10 20:19:24 +0000 2018
date_field = datetime.datetime(2018, 10, 10, 20, 19, 24)
with mock.patch("languageHandler.curLang", new=language):
processed_date = templates.process_date(date_field, relative_times=False, offset_seconds=offset)
assert processed_date == expected_result
def test_process_date_relative_time():
date_field = datetime.datetime(2018, 10, 10, 20, 19, 24)
with mock.patch("languageHandler.curLang", new="es_ES"):
processed_date = templates.process_date(date_field, relative_times=True, offset_seconds=7200)
# As this depends in relative times and this is subject to change, let's do some light checks here and hope the string is going to be valid.
assert isinstance(processed_date, str)
assert "hace" in processed_date and "años" in processed_date
def test_process_text_basic_tweet(basic_tweet):
expected_result = "Changes in projects for next year https://manuelcortez.net/blog/changes-in-projects-for-next-year/#.YdMQQU6t1FI.twitter"
text = templates.process_text(basic_tweet)
assert text == expected_result
def test_process_text_basic_tweet_multiple_mentions(basic_tweet_multiple_mentions):
expected_result = "@tamaranatalia9, @Darkstrings and 2 more: Well done, thanks Tamara"
text = templates.process_text(basic_tweet_multiple_mentions)
assert text == expected_result

View File

@ -1,200 +0,0 @@
# -*- coding: utf-8 -*-
""" Test case to check some of the scenarios we might face when storing tweets in cache, both loading into memory or rreading from disk. """
import unittest
import os
import paths
import sqlitedict
import shutil
# The base session module requires sound as a dependency, and this needs libVLC to be locatable.
os.environ['PYTHON_VLC_MODULE_PATH']=os.path.abspath(os.path.join(paths.app_path(), "..", "windows-dependencies", "x86"))
os.environ['PYTHON_VLC_LIB_PATH']=os.path.abspath(os.path.join(paths.app_path(), "..", "windows-dependencies", "x86", "libvlc.dll"))
from sessions import base
class cacheTestCase(unittest.TestCase):
def setUp(self):
""" Configures a fake session to check caching objects here. """
self.session = base.baseSession("testing")
if os.path.exists(os.path.join(paths.config_path(), "testing")) == False:
os.mkdir(os.path.join(paths.config_path(), "testing"))
self.session.get_configuration()
def tearDown(self):
""" Removes the previously configured session. """
session_folder = os.path.join(paths.config_path(), "testing")
if os.path.exists(session_folder):
shutil.rmtree(session_folder)
def generate_dataset(self):
""" Generates a sample dataset"""
dataset = dict(home_timeline=["message" for i in range(10000)], mentions_timeline=["mention" for i in range(20000)])
return dataset
### Testing database being read from disk.
def test_cache_in_disk_unlimited_size(self):
""" Tests cache database being read from disk, storing the whole datasets. """
dataset = self.generate_dataset()
self.session.settings["general"]["load_cache_in_memory"] = False
self.session.settings["general"]["persist_size"] = -1
self.session.load_persistent_data()
self.session.db["home_timeline"] = dataset["home_timeline"]
self.session.db["mentions_timeline"] = dataset["mentions_timeline"]
self.session.save_persistent_data()
self.assertIsInstance(self.session.db, sqlitedict.SqliteDict)
self.assertTrue(self.session.db.get("home_timeline") != None)
self.assertTrue(self.session.db.get("mentions_timeline") != None)
self.assertEquals(len(self.session.db.get("home_timeline")), 10000)
self.assertEquals(len(self.session.db.get("mentions_timeline")), 20000)
self.session.db.close()
def test_cache_in_disk_limited_dataset(self):
""" Tests wether the cache stores only the amount of items we ask it to store. """
dataset = self.generate_dataset()
self.session.settings["general"]["load_cache_in_memory"] = False
self.session.settings["general"]["persist_size"] = 100
self.session.load_persistent_data()
self.session.db["home_timeline"] = dataset["home_timeline"]
self.session.db["mentions_timeline"] = dataset["mentions_timeline"]
# We need to save and load the db again because we cannot modify buffers' size while the database is opened.
# As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db
# Might cause an out of sync error between the GUI lists and the database.
# So we perform the changes to buffer size when loading data during app startup if the DB is read from disk.
self.session.save_persistent_data()
self.session.db = dict()
self.session.load_persistent_data()
self.assertIsInstance(self.session.db, sqlitedict.SqliteDict)
self.assertTrue(self.session.db.get("home_timeline") != None)
self.assertTrue(self.session.db.get("mentions_timeline") != None)
self.assertEquals(len(self.session.db.get("home_timeline")), 100)
self.assertEquals(len(self.session.db.get("mentions_timeline")), 100)
self.session.db.close()
def test_cache_in_disk_limited_dataset_unreversed(self):
"""Test if the cache is saved properly in unreversed buffers, when newest items are at the end of the list. """
dataset = dict(home_timeline=[i for i in range(20)], mentions_timeline=[i for i in range(20)])
self.session.settings["general"]["load_cache_in_memory"] = False
self.session.settings["general"]["persist_size"] = 10
self.session.load_persistent_data()
self.session.db["home_timeline"] = dataset["home_timeline"]
self.session.db["mentions_timeline"] = dataset["mentions_timeline"]
# We need to save and load the db again because we cannot modify buffers' size while the database is opened.
# As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db
# Might cause an out of sync error between the GUI lists and the database.
# So we perform the changes to buffer size when loading data during app startup if the DB is read from disk.
self.session.save_persistent_data()
self.session.db = dict()
self.session.load_persistent_data()
self.assertIsInstance(self.session.db, sqlitedict.SqliteDict)
self.assertTrue(self.session.db.get("home_timeline") != None)
self.assertTrue(self.session.db.get("mentions_timeline") != None)
self.assertEquals(self.session.db.get("home_timeline")[0], 10)
self.assertEquals(self.session.db.get("mentions_timeline")[0], 10)
self.assertEquals(self.session.db.get("home_timeline")[-1], 19)
self.assertEquals(self.session.db.get("mentions_timeline")[-1], 19)
self.session.db.close()
def test_cache_in_disk_limited_dataset_reversed(self):
"""Test if the cache is saved properly in reversed buffers, when newest items are at the start of the list. """
dataset = dict(home_timeline=[i for i in range(19, -1, -1)], mentions_timeline=[i for i in range(19, -1, -1)])
self.session.settings["general"]["load_cache_in_memory"] = False
self.session.settings["general"]["persist_size"] = 10
self.session.settings["general"]["reverse_timelines"] = True
self.session.load_persistent_data()
self.session.db["home_timeline"] = dataset["home_timeline"]
self.session.db["mentions_timeline"] = dataset["mentions_timeline"]
# We need to save and load the db again because we cannot modify buffers' size while the database is opened.
# As TWBlue reads directly from db when reading from disk, an attempt to modify buffers size while Blue is reading the db
# Might cause an out of sync error between the GUI lists and the database.
# So we perform the changes to buffer size when loading data during app startup if the DB is read from disk.
self.session.save_persistent_data()
self.session.db = dict()
self.session.load_persistent_data()
self.assertIsInstance(self.session.db, sqlitedict.SqliteDict)
self.assertTrue(self.session.db.get("home_timeline") != None)
self.assertTrue(self.session.db.get("mentions_timeline") != None)
self.assertEquals(self.session.db.get("home_timeline")[0], 19)
self.assertEquals(self.session.db.get("mentions_timeline")[0], 19)
self.assertEquals(self.session.db.get("home_timeline")[-1], 10)
self.assertEquals(self.session.db.get("mentions_timeline")[-1], 10)
self.session.db.close()
### Testing database being loaded into memory. Those tests should give the same results than before
### but as we have different code depending whether we load db into memory or read it from disk,
### We need to test this anyways.
def test_cache_in_memory_unlimited_size(self):
""" Tests cache database being loaded in memory, storing the whole datasets. """
dataset = self.generate_dataset()
self.session.settings["general"]["load_cache_in_memory"] = True
self.session.settings["general"]["persist_size"] = -1
self.session.load_persistent_data()
self.session.db["home_timeline"] = dataset["home_timeline"]
self.session.db["mentions_timeline"] = dataset["mentions_timeline"]
self.session.save_persistent_data()
self.session.db = dict()
self.session.load_persistent_data()
self.assertIsInstance(self.session.db, dict)
self.assertTrue(self.session.db.get("home_timeline") != None)
self.assertTrue(self.session.db.get("mentions_timeline") != None)
self.assertEquals(len(self.session.db.get("home_timeline")), 10000)
self.assertEquals(len(self.session.db.get("mentions_timeline")), 20000)
def test_cache_in_memory_limited_dataset(self):
""" Tests wether the cache stores only the amount of items we ask it to store, when loaded in memory. """
dataset = self.generate_dataset()
self.session.settings["general"]["load_cache_in_memory"] = True
self.session.settings["general"]["persist_size"] = 100
self.session.load_persistent_data()
self.session.db["home_timeline"] = dataset["home_timeline"]
self.session.db["mentions_timeline"] = dataset["mentions_timeline"]
self.session.save_persistent_data()
self.session.db = dict()
self.session.load_persistent_data()
self.assertIsInstance(self.session.db, dict)
self.assertTrue(self.session.db.get("home_timeline") != None)
self.assertTrue(self.session.db.get("mentions_timeline") != None)
self.assertEquals(len(self.session.db.get("home_timeline")), 100)
self.assertEquals(len(self.session.db.get("mentions_timeline")), 100)
def test_cache_in_memory_limited_dataset_unreversed(self):
"""Test if the cache is saved properly when loaded in memory in unreversed buffers, when newest items are at the end of the list. """
dataset = dict(home_timeline=[i for i in range(20)], mentions_timeline=[i for i in range(20)])
self.session.settings["general"]["load_cache_in_memory"] = True
self.session.settings["general"]["persist_size"] = 10
self.session.load_persistent_data()
self.assertTrue(len(self.session.db)==1)
self.session.db["home_timeline"] = dataset["home_timeline"]
self.session.db["mentions_timeline"] = dataset["mentions_timeline"]
self.session.save_persistent_data()
self.session.db = dict()
self.session.load_persistent_data()
self.assertIsInstance(self.session.db, dict)
self.assertTrue(self.session.db.get("home_timeline") != None)
self.assertTrue(self.session.db.get("mentions_timeline") != None)
self.assertEquals(self.session.db.get("home_timeline")[0], 10)
self.assertEquals(self.session.db.get("mentions_timeline")[0], 10)
self.assertEquals(self.session.db.get("home_timeline")[-1], 19)
self.assertEquals(self.session.db.get("mentions_timeline")[-1], 19)
def test_cache_in_memory_limited_dataset_reversed(self):
"""Test if the cache is saved properly in reversed buffers, when newest items are at the start of the list. This test if for db read into memory. """
dataset = dict(home_timeline=[i for i in range(19, -1, -1)], mentions_timeline=[i for i in range(19, -1, -1)])
self.session.settings["general"]["load_cache_in_memory"] = True
self.session.settings["general"]["persist_size"] = 10
self.session.settings["general"]["reverse_timelines"] = True
self.session.load_persistent_data()
self.session.db["home_timeline"] = dataset["home_timeline"]
self.session.db["mentions_timeline"] = dataset["mentions_timeline"]
self.session.save_persistent_data()
self.session.db = dict()
self.session.load_persistent_data()
self.assertIsInstance(self.session.db, dict)
self.assertTrue(self.session.db.get("home_timeline") != None)
self.assertTrue(self.session.db.get("mentions_timeline") != None)
self.assertEquals(self.session.db.get("home_timeline")[0], 19)
self.assertEquals(self.session.db.get("mentions_timeline")[0], 19)
self.assertEquals(self.session.db.get("home_timeline")[-1], 10)
self.assertEquals(self.session.db.get("mentions_timeline")[-1], 10)
if __name__ == "__main__":
unittest.main()

View File

@ -1,13 +1,3 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import absolute_import from . import twitter, mastodon
from __future__ import unicode_literals
from .base import basePanel
from .dm import dmPanel
from .events import eventsPanel
from .favourites import favsPanel
from .lists import listPanel
from .panels import accountPanel, emptyPanel from .panels import accountPanel, emptyPanel
from .people import peoplePanel
from .trends import trendsPanel
from .tweet_searches import searchPanel
from .user_searches import searchUsersPanel

View File

@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from .base import basePanel

View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
import wx
from multiplatform_widgets import widgets
class basePanel(wx.Panel):
def set_focus_function(self, f):
self.list.list.Bind(wx.EVT_LIST_ITEM_FOCUSED, f)
def create_list(self):
self.list = widgets.list(self, _(u"User"), _(u"Text"), _(u"Date"), _(u"Client"), style=wx.LC_REPORT|wx.LC_SINGLE_SEL|wx.LC_VRULES)
self.list.set_windows_size(0, 60)
self.list.set_windows_size(1, 320)
self.list.set_windows_size(2, 110)
self.list.set_windows_size(3, 84)
self.list.set_size()
def __init__(self, parent, name):
super(basePanel, self).__init__(parent)
self.name = name
self.type = "baseBuffer"
self.sizer = wx.BoxSizer(wx.VERTICAL)
self.create_list()
self.toot = wx.Button(self, -1, _("Toot"))
self.retoot = wx.Button(self, -1, _("Retoot"))
self.reply = wx.Button(self, -1, _(u"Reply"))
self.dm = wx.Button(self, -1, _(u"Direct message"))
btnSizer = wx.BoxSizer(wx.HORIZONTAL)
btnSizer.Add(self.tweet, 0, wx.ALL, 5)
btnSizer.Add(self.retweet, 0, wx.ALL, 5)
btnSizer.Add(self.reply, 0, wx.ALL, 5)
btnSizer.Add(self.dm, 0, wx.ALL, 5)
self.sizer.Add(btnSizer, 0, wx.ALL, 5)
self.sizer.Add(self.list.list, 0, wx.ALL|wx.EXPAND, 5)
self.SetSizer(self.sizer)
self.SetClientSize(self.sizer.CalcMin())
def set_position(self, reversed=False):
if reversed == False:
self.list.select_item(self.list.get_count()-1)
else:
self.list.select_item(0)
def set_focus_in_list(self):
self.list.list.SetFocus()

View File

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
from .base import basePanel
from .dm import dmPanel
from .events import eventsPanel
from .favourites import favsPanel
from .lists import listPanel
from .people import peoplePanel
from .trends import trendsPanel
from .tweet_searches import searchPanel
from .user_searches import searchUsersPanel