# -*- coding: utf-8 -*- import os import sys import logging import webbrowser import wx import requests import keystrokeEditor import sessions import widgetUtils import config import languageHandler import application import sound import output from pubsub import pub from extra import SoundsTutorial from update import updater from wxUI import view, dialogs, commonMessageDialogs, sysTrayIcon from keyboard_handler.wx_handler import WXKeyboardHandler from sessionmanager import manager, sessionManager from controller import buffers from mysc import restart from mysc import localization from mysc.thread_utils import call_threaded from mysc.repeating_timer import RepeatingTimer from controller.mastodon import handler as MastodonHandler from controller.atprotosocial import handler as ATProtoSocialHandler # Added import from . import settings, userAlias log = logging.getLogger("mainController") class Controller(object): """ Main Controller for TWBlue. It manages the main window and sessions.""" def search_buffer(self, name_, user): """ Searches a buffer. name_ str: The name for the buffer user str: The account for the buffer. for example you may want to search the home_timeline buffer for the tw_blue2 user. Return type: buffers.buffer object.""" for i in self.buffers: if i.name == name_ and i.account == user: return i def get_current_buffer(self): """ Get the current focused bufferObject. Return type: buffers.buffer object.""" buffer = self.view.get_current_buffer() if hasattr(buffer, "account"): buffer = self.search_buffer(buffer.name, buffer.account) return buffer def get_best_buffer(self): """ Get the best buffer for doing something using the session object. This function is useful when you need to open a timeline or post a tweet, and the user is in a buffer without a session, for example the events buffer. Return type: twitterBuffers.buffer object.""" # Gets the parent buffer to know what account is doing an action view_buffer = self.view.get_current_buffer() # If the account has no session attached, we will need to search the first available non-empty buffer for that account to use its session. if view_buffer.type == "account" or view_buffer.type == "empty": buffer = self.get_first_buffer(view_buffer.account) else: buffer = self.search_buffer(view_buffer.name, view_buffer.account) if buffer != None: return buffer def get_first_buffer(self, account): """ Gets the first valid buffer for an account. account str: A twitter username. The first valid buffer is the home timeline.""" for i in self.buffers: if i.account == account and i.invisible == True and i.session != None: return i def get_last_buffer(self, account): """ Gets the last valid buffer for an account. account str: A twitter username. The last valid buffer is the last buffer that contains a session object assigned.""" results = self.get_buffers_for_account(account) return results[-1] def get_first_buffer_index(self, account): buff = self.get_first_buffer(account) return self.view.search(buff.name, buff.account) def get_last_buffer_index(self, account): buff = self.get_last_buffer(account) return self.view.search(buff.name, buff.account) def get_buffers_for_account(self, account): results = [] buffers = self.view.get_buffers() [results.append(self.search_buffer(i.name, i.account)) for i in buffers if i.account == account and (i.type != "account")] return results def bind_other_events(self): """ Binds the local application events with their functions.""" log.debug("Binding other application events...") # Core application pubsub events. pub.subscribe(self.logout_account, "logout") pub.subscribe(self.login_account, "login") pub.subscribe(self.execute_action, "execute-action") pub.subscribe(self.search_topic, "search") pub.subscribe(self.create_buffer, "createBuffer") pub.subscribe(self.toggle_share_settings, "toggleShare") pub.subscribe(self.invisible_shorcuts_changed, "invisible-shorcuts-changed") pub.subscribe(self.create_account_buffer, "core.create_account") pub.subscribe(self.change_buffer_title, "core.change_buffer_title") # Mastodon specific events. pub.subscribe(self.mastodon_new_item, "mastodon.new_item") pub.subscribe(self.mastodon_updated_item, "mastodon.updated_item") pub.subscribe(self.mastodon_new_conversation, "mastodon.conversation_received") pub.subscribe(self.mastodon_error_post, "mastodon.error_post") # connect application events to GUI widgetUtils.connect_event(self.view, widgetUtils.CLOSE_EVENT, self.exit_) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.show_hide, menuitem=self.view.show_hide) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.update_profile, menuitem=self.view.updateProfile) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.search, menuitem=self.view.menuitem_search) # widgetUtils.connect_event(self.view, widgetUtils.MENU, self.list_manager, menuitem=self.view.lists) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.find, menuitem=self.view.find) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.accountConfiguration, menuitem=self.view.account_settings) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.configuration, menuitem=self.view.prefs) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.ocr_image, menuitem=self.view.ocr) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.learn_sounds, menuitem=self.view.sounds_tutorial) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.exit, menuitem=self.view.close) widgetUtils.connect_event(self.view, widgetUtils.CLOSE_EVENT, self.exit) if widgetUtils.toolkit == "wx": log.debug("Binding the exit function...") widgetUtils.connectExitFunction(self.exit_) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.edit_keystrokes, menuitem=self.view.keystroke_editor) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.post_tweet, self.view.compose) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.post_reply, self.view.reply) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.post_retweet, self.view.share) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.add_to_favourites, self.view.fav) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.remove_from_favourites, self.view.unfav) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.view_item, self.view.view) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.delete, self.view.delete) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.follow, menuitem=self.view.follow) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.send_dm, self.view.dm) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.user_details, self.view.details) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.get_more_items, menuitem=self.view.load_previous_items) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.clear_buffer, menuitem=self.view.clear) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.remove_buffer, self.view.deleteTl) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.check_for_updates, self.view.check_for_updates) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.about, menuitem=self.view.about) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.visit_website, menuitem=self.view.visit_website) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.get_soundpacks, menuitem=self.view.get_soundpacks) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.manage_accounts, self.view.manage_accounts) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.toggle_autoread, menuitem=self.view.autoread) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.toggle_buffer_mute, self.view.mute_buffer) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.open_timeline, self.view.timeline) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.open_favs_timeline, self.view.favs) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.community_timeline, self.view.community_timeline) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.open_conversation, menuitem=self.view.view_conversation) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.seekLeft, menuitem=self.view.seekLeft) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.seekRight, menuitem=self.view.seekRight) if widgetUtils.toolkit == "wx": widgetUtils.connect_event(self.view.nb, widgetUtils.NOTEBOOK_PAGE_CHANGED, self.buffer_changed) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.view_documentation, self.view.doc) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.view_changelog, self.view.changelog) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.add_alias, self.view.addAlias) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.update_buffer, self.view.update_buffer) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.manage_aliases, self.view.manageAliases) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.report_error, self.view.reportError) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.create_filter, self.view.filter) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.manage_filters, self.view.manage_filters) def set_systray_icon(self): self.systrayIcon = sysTrayIcon.SysTrayIcon() widgetUtils.connect_event(self.systrayIcon, widgetUtils.MENU, self.post_tweet, menuitem=self.systrayIcon.post) widgetUtils.connect_event(self.systrayIcon, widgetUtils.MENU, self.configuration, menuitem=self.systrayIcon.global_settings) widgetUtils.connect_event(self.systrayIcon, widgetUtils.MENU, self.accountConfiguration, menuitem=self.systrayIcon.account_settings) widgetUtils.connect_event(self.systrayIcon, widgetUtils.MENU, self.show_hide, menuitem=self.systrayIcon.show_hide) widgetUtils.connect_event(self.systrayIcon, widgetUtils.MENU, self.check_for_updates, menuitem=self.systrayIcon.check_for_updates) widgetUtils.connect_event(self.systrayIcon, widgetUtils.MENU, self.view_documentation, menuitem=self.systrayIcon.doc) widgetUtils.connect_event(self.systrayIcon, widgetUtils.MENU, self.exit, menuitem=self.systrayIcon.exit) widgetUtils.connect_event(self.systrayIcon, widgetUtils.TASKBAR_LEFT_CLICK, self.taskbar_left_click) widgetUtils.connect_event(self.systrayIcon, widgetUtils.TASKBAR_RIGHT_CLICK, self.taskbar_right_click) def taskbar_left_click(self, *args, **kwargs): if self.showing == True: self.view.set_focus() else: self.show_hide() def taskbar_right_click(self, *args, **kwargs): self.systrayIcon.show_menu() def get_handler(self, type): handler = self.handlers.get(type) if handler == None: if type == "mastodon": handler = MastodonHandler.Handler() elif type == "atprotosocial": # Added case for atprotosocial # Assuming session_store and config_proxy are accessible or passed if needed by Handler constructor # For now, let's assume constructor is similar or adapted to not require them, # or that they can be accessed via self if mainController has them. # Based on atprotosocial.Handler, it needs session_store and config. # mainController doesn't seem to store these directly for passing. # This might indicate Handler init needs to be simplified or these need to be plumbed. # For now, proceeding with a simplified instantiation, assuming it can get what it needs # or its __init__ will be adapted. # A common pattern is self.session_store and self.config from a base controller class if mainController inherits one. # Let's assume for now they are not strictly needed for just getting menu labels or simple actions. # This part might need refinement based on Handler's actual dependencies for menu updates. # Looking at atprotosocial/handler.py, it takes session_store and config. # mainController itself doesn't seem to have these as direct attributes to pass on. # This implies a potential refactor need or that these handlers are simpler than thought for menu updates. # For now, let's assume a simplified handler for menu updates or that it gets these elsewhere. # This needs to be compatible with how MastodonHandler is instantiated and used. # MastodonHandler() is called without params here. handler = ATProtoSocialHandler.Handler(session_store=sessions.sessions, config=config.app) # Adjusted: Pass global sessions and config self.handlers[type]=handler return handler def __init__(self): super(Controller, self).__init__() # Visibility state. self.showing = True # main window self.view = view.mainFrame() # buffers list. self.buffers = [] self.started = False # accounts list. self.accounts = [] # This saves the current account (important in invisible mode) self.current_account = "" # this saves current menu bar layout. self.menubar_current_handler = "" # Handlers are special objects as they manage the mapping of available features and events in different social networks. self.handlers = dict() self.view.prepare() self.bind_other_events() self.set_systray_icon() def check_invisible_at_startup(self): # Visibility check. if config.app["app-settings"]["hide_gui"] == True: self.show_hide() self.view.Show() self.view.Hide() # Invisible keyboard Shorcuts check. if config.app["app-settings"]["use_invisible_keyboard_shorcuts"] == True: km = self.create_invisible_keyboard_shorcuts() self.register_invisible_keyboard_shorcuts(km) def do_work(self): """ Creates the buffer objects for all accounts. This does not starts the buffer streams, only creates the objects.""" log.debug("Creating buffers for all sessions...") for i in sessions.sessions: log.debug("Working on session %s" % (i,)) if sessions.sessions[i].is_logged == False: self.create_ignored_session_buffer(sessions.sessions[i]) continue # Valid types currently are mastodon (Work in progress) # More can be added later. valid_session_types = ["mastodon"] if sessions.sessions[i].type in valid_session_types: handler = self.get_handler(type=sessions.sessions[i].type) handler.create_buffers(sessions.sessions[i], controller=self) log.debug("Setting updates to buffers every %d seconds..." % (60*config.app["app-settings"]["update_period"],)) self.update_buffers_function = RepeatingTimer(60*config.app["app-settings"]["update_period"], self.update_buffers) self.update_buffers_function.start() def start(self): """ Starts all buffer objects. Loads their items.""" for i in sessions.sessions: if sessions.sessions[i].is_logged == False: continue self.start_buffers(sessions.sessions[i]) self.set_buffer_positions(sessions.sessions[i]) if hasattr(sessions.sessions[i], "start_streaming"): sessions.sessions[i].start_streaming() if config.app["app-settings"]["play_ready_sound"] == True: sessions.sessions[list(sessions.sessions.keys())[0]].sound.play("ready.ogg") if config.app["app-settings"]["speak_ready_msg"] == True: output.speak(_(u"Ready")) self.started = True if len(self.accounts) > 0: b = self.get_first_buffer(self.accounts[0]) self.update_menus(handler=self.get_handler(b.session.type)) def create_ignored_session_buffer(self, session): pub.sendMessage("core.create_account", name=session.get_name(), session_id=session.session_id) def login_account(self, session_id): for i in sessions.sessions: if sessions.sessions[i].session_id == session_id: session = sessions.sessions[i] session.login() handler = self.get_handler(type=session.type) if handler != None and hasattr(handler, "create_buffers"): handler.create_buffers(session=session, controller=self, createAccounts=False) self.start_buffers(session) if hasattr(session, "start_streaming"): session.start_streaming() def create_account_buffer(self, name, session_id, logged=False): account = buffers.base.AccountBuffer(self.view.nb, name, name, session_id) if logged == False: account.logged = logged account.setup_account() self.buffers.append(account) self.view.add_buffer(account.buffer , name=name) def create_buffer(self, buffer_type="baseBuffer", session_type="twitter", buffer_title="", parent_tab=None, start=False, kwargs={}): log.debug("Creating buffer of type {0} with parent_tab of {2} arguments {1}".format(buffer_type, kwargs, parent_tab)) if kwargs.get("parent") == None: kwargs["parent"] = self.view.nb if not hasattr(buffers, session_type): raise AttributeError("Session type %s does not exist yet." % (session_type)) available_buffers = getattr(buffers, session_type) if not hasattr(available_buffers, buffer_type): raise AttributeError("Specified buffer type does not exist: %s" % (buffer_type,)) buffer = getattr(available_buffers, buffer_type)(**kwargs) if start: if kwargs.get("function") == "user_timeline": try: buffer.start_stream(play_sound=False) except ValueError: commonMessageDialogs.unauthorized() return else: call_threaded(buffer.start_stream) self.buffers.append(buffer) if parent_tab == None: log.debug("Appending buffer {}...".format(buffer,)) self.view.add_buffer(buffer.buffer, buffer_title) else: self.view.insert_buffer(buffer.buffer, buffer_title, parent_tab) log.debug("Inserting buffer {0} into control {1}".format(buffer, parent_tab)) def set_buffer_positions(self, session): "Sets positions for buffers if values exist in the database." for i in self.buffers: if i.account == session.get_name() and i.name+"_pos" in session.db and hasattr(i.buffer,'list'): i.buffer.list.select_item(session.db[str(i.name+"_pos")]) def logout_account(self, session_id): for i in sessions.sessions: if sessions.sessions[i].session_id == session_id: session = sessions.sessions[i] name =session.get_name() delete_buffers = [] for i in self.buffers: if i.account == name and i.name != name: delete_buffers.append(i.name) for i in delete_buffers: self.destroy_buffer(i, name) session.db = None session.logged = False def destroy_buffer(self, buffer_name, session_name): buffer = self.search_buffer(buffer_name, session_name) if buffer == None: return buff = self.view.search(buffer.name, session_name) if buff == None: return self.view.delete_buffer(buff) self.buffers.remove(buffer) del buffer def search_topic(self, term): self.search(value=term) def search(self, event=None, value="", *args, **kwargs): buffer = self.get_best_buffer() handler = self.get_handler(type=buffer.session.type) if handler != None and hasattr(handler, "search"): return handler.search(controller=self, session=buffer.session, value=value) def find(self, *args, **kwargs): if 'string' in kwargs: string=kwargs['string'] else: string='' dlg = dialogs.find.findDialog(string) if dlg.get_response() == widgetUtils.OK and dlg.get("string") != "": string = dlg.get("string") #If we still have an empty string for some reason (I.E. user clicked cancel, etc), return here. if string == '': log.debug("Find canceled.") return page = self.get_current_buffer() if not hasattr(page.buffer, "list"): output.speak(_(u"No session is currently in focus. Focus a session with the next or previous session shortcut."), True) return count = page.buffer.list.get_count() if count < 1: output.speak(_(u"Empty buffer."), True) return start = page.buffer.list.get_selected() for i in range(start, count): if string.lower() in page.buffer.list.get_text_column(i, 1).lower(): page.buffer.list.select_item(i) return output.speak(page.get_message(), True) output.speak(_(u"{0} not found.").format(string,), True) def seekLeft(self, *args, **kwargs): try: sound.URLPlayer.seek(-5000) except: output.speak("Unable to seek.",True) def seekRight(self, *args, **kwargs): try: sound.URLPlayer.seek(5000) except: output.speak("Unable to seek.",True) def edit_keystrokes(self, *args, **kwargs): buffer = self.get_best_buffer() editor = keystrokeEditor.KeystrokeEditor(buffer.session.type) if editor.changed == True: config.keymap.write() register = False # determines if we need to reassign the keymap. if self.showing == False: register = True elif config.app["app-settings"]["use_invisible_keyboard_shorcuts"] == True: register = True # If there is a keyboard handler instance we need unregister all old keystrokes before register the new ones. if hasattr(self, "keyboard_handler"): keymap = {} for i in editor.hold_map: if hasattr(self, i): keymap[editor.hold_map[i]] = getattr(self, i) self.unregister_invisible_keyboard_shorcuts(keymap) self.invisible_shorcuts_changed(registered=register) def learn_sounds(self, *args, **kwargs): """ Opens the sounds tutorial for the current account.""" buffer = self.get_best_buffer() SoundsTutorial.soundsTutorial(buffer.session) def configuration(self, *args, **kwargs): """ Opens the global settings dialogue.""" d = settings.globalSettingsController() if d.response == widgetUtils.OK: d.save_configuration() if d.needs_restart == True: commonMessageDialogs.needs_restart() restart.restart_program() def accountConfiguration(self, *args, **kwargs): """ Opens the account settings dialogue for the current account.""" buffer = self.get_best_buffer() handler = self.get_handler(type=buffer.session.type) if handler != None and hasattr(handler, "account_settings"): manager.manager.set_current_session(buffer.session.session_id) return handler.account_settings(buffer=buffer, controller=self) def check_for_updates(self, *args, **kwargs): if not getattr(sys, 'frozen', False): log.debug("Running from source, can't update") commonMessageDialogs.cant_update_source() return update = updater.do_update() if update == False: view.no_update_available() def delete(self, *args, **kwargs): """ Deletes an item in the current buffer. Users can only remove their tweets and direct messages, other users' tweets and people (followers, friends, blocked, etc) can not be removed using this method.""" buffer = self.view.get_current_buffer() if hasattr(buffer, "account"): buffer = self.search_buffer(buffer.name, buffer.account) buffer.destroy_status() def exit(self, *args, **kwargs): if config.app["app-settings"]["ask_at_exit"] == True: answer = commonMessageDialogs.exit_dialog(self.view) if answer == widgetUtils.YES: self.exit_() else: self.exit_() def exit_(self, *args, **kwargs): for i in self.buffers: i.save_positions() log.debug("Exiting...") log.debug("Saving global configuration...") for item in sessions.sessions: if sessions.sessions[item].logged == False: continue sessions.sessions[item].sound.cleaner.cancel() log.debug("Saving database for " + sessions.sessions[item].session_id) sessions.sessions[item].save_persistent_data() self.systrayIcon.RemoveIcon() pidpath = os.path.join(os.getenv("temp"), "{}.pid".format(application.name)) if os.path.exists(pidpath): os.remove(pidpath) widgetUtils.exit_application() def follow(self, *args, **kwargs): buffer = self.get_current_buffer() handler = self.get_handler(type=buffer.session.type) if handler != None and hasattr(handler, "follow"): return handler.follow(buffer=buffer) def add_alias(self, *args, **kwargs): buffer = self.get_best_buffer() handler = self.get_handler(type=buffer.session.type) if handler != None and hasattr(handler, "add_alias"): return handler.add_alias(buffer=buffer) def manage_aliases(self, *args, **kwargs): buffer = self.get_best_buffer() alias_controller = userAlias.userAliasController(buffer.session.settings) def post_tweet(self, event=None): buffer = self.get_best_buffer() if hasattr(buffer, "post_status"): buffer.post_status() def post_reply(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "reply"): return buffer.reply() def send_dm(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "send_message"): buffer.send_message() def post_retweet(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "share_item"): # Generic buffer method return buffer.share_item() # This likely calls back to a session/handler method # If direct handling is needed for ATProtoSocial: elif buffer.session and buffer.session.KIND == "atprotosocial": item_uri = buffer.get_selected_item_id() # Assuming this gets the AT URI if not item_uri: output.speak(_("No item selected to repost."), True) return social_handler = self.get_handler(buffer.session.KIND) async def _repost(): result = await social_handler.repost_item(buffer.session, item_uri) output.speak(result["message"], True) wx.CallAfter(asyncio.create_task, _repost()) def add_to_favourites(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "add_to_favorites"): # Generic buffer method return buffer.add_to_favorites() elif buffer.session and buffer.session.KIND == "atprotosocial": item_uri = buffer.get_selected_item_id() if not item_uri: output.speak(_("No item selected to like."), True) return social_handler = self.get_handler(buffer.session.KIND) async def _like(): result = await social_handler.like_item(buffer.session, item_uri) output.speak(result["message"], True) if result.get("status") == "success" and result.get("like_uri"): # Store the like URI if the buffer supports it, for unliking if hasattr(buffer, "store_item_viewer_state"): buffer.store_item_viewer_state(item_uri, "like_uri", result["like_uri"]) wx.CallAfter(asyncio.create_task, _like()) def remove_from_favourites(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "remove_from_favorites"): # Generic buffer method return buffer.remove_from_favorites() elif buffer.session and buffer.session.KIND == "atprotosocial": item_uri = buffer.get_selected_item_id() # URI of the post if not item_uri: output.speak(_("No item selected to unlike."), True) return like_uri = None if hasattr(buffer, "get_item_viewer_state"): like_uri = buffer.get_item_viewer_state(item_uri, "like_uri") if not like_uri: output.speak(_("Could not find the original like record for this post. You might need to unlike it from the Bluesky app directly or refresh your timeline."), True) # As a fallback, one could try to *find* the like record by listing likes for the post, # but this is complex and slow for a quick action. # For now, we rely on having the like_uri stored. # Alternatively, some platforms allow unliking by post URI directly if the like exists. # ATProto delete_like requires the like record URI. logger.warning(f"Attempted to unlike post {item_uri} but its like_uri was not found in buffer's viewer_state.") return social_handler = self.get_handler(buffer.session.KIND) async def _unlike(): result = await social_handler.unlike_item(buffer.session, like_uri) # Pass the like's own URI output.speak(result["message"], True) if result.get("status") == "success": if hasattr(buffer, "store_item_viewer_state"): buffer.store_item_viewer_state(item_uri, "like_uri", None) # Clear stored like URI wx.CallAfter(asyncio.create_task, _unlike()) def toggle_like(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "toggle_favorite"): return buffer.toggle_favorite() def vote(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "vote"): return buffer.vote() def view_item(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "view_item"): return buffer.view_item() def open_in_browser(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "open_in_browser"): buffer.open_in_browser() def open_favs_timeline(self, *args, **kwargs): buffer = self.get_best_buffer() handler = self.get_handler(type=buffer.session.type) if handler != None and hasattr(handler, "open_timeline"): return handler.open_timeline(controller=self, buffer=buffer, default="favorites") def open_timeline(self, *args, **kwargs): buffer = self.get_best_buffer() handler = self.get_handler(type=buffer.session.type) if handler != None and hasattr(handler, "open_timeline"): return handler.open_timeline(controller=self, buffer=buffer) def open_conversation(self, *args, **kwargs): buffer = self.get_best_buffer() handler = self.get_handler(type=buffer.session.type) if handler != None and hasattr(handler, "open_conversation"): return handler.open_conversation(controller=self, buffer=buffer) def show_hide(self, *args, **kwargs): km = self.create_invisible_keyboard_shorcuts() if self.showing == True: if config.app["app-settings"]["use_invisible_keyboard_shorcuts"] == False: self.register_invisible_keyboard_shorcuts(km) self.view.Hide() self.fix_wrong_buffer() self.showing = False else: if config.app["app-settings"]["use_invisible_keyboard_shorcuts"] == False: self.unregister_invisible_keyboard_shorcuts(km) self.view.Show() self.showing = True def get_more_items(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "get_more_items"): return buffer.get_more_items() def clear_buffer(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "clear_list"): return buffer.clear_list() def remove_buffer(self, *args, **kwargs): buffer = self.get_current_buffer() if not hasattr(buffer, "account"): return buff = self.view.search(buffer.name, buffer.account) answer = buffer.remove_buffer() if answer == False: return log.debug("destroying buffer...") self.right() self.view.delete_buffer(buff) buffer.session.sound.play("delete_timeline.ogg") self.buffers.remove(buffer) del buffer def skip_buffer(self, forward=True): buff = self.get_current_buffer() if buff.invisible == False: self.view.advance_selection(forward) def buffer_changed(self, *args, **kwargs): buffer = self.get_current_buffer() old_account = self.current_account new_account = buffer.account if new_account != old_account: self.current_account = buffer.account new_first_buffer = self.get_first_buffer(new_account) if new_first_buffer != None and new_first_buffer.session.type != self.menubar_current_handler: handler = self.get_handler(new_first_buffer.session.type) self.menubar_current_handler = new_first_buffer.session.type self.update_menus(handler) if not hasattr(buffer, "session") or buffer.session == None: return muted = autoread = False if buffer.name in buffer.session.settings["other_buffers"]["muted_buffers"]: muted = True elif buffer.name in buffer.session.settings["other_buffers"]["autoread_buffers"]: autoread = True self.view.check_menuitem("mute_buffer", muted) self.view.check_menuitem("autoread", autoread) def update_menus(self, handler): if hasattr(handler, "menus"): for m in list(handler.menus.keys()): if hasattr(self.view, m): menu_item = getattr(self.view, m) if handler.menus[m] == None: menu_item.Enable(False) else: menu_item.Enable(True) menu_item.SetItemLabel(handler.menus[m]) if hasattr(handler, "item_menu"): self.view.menubar.SetMenuLabel(1, handler.item_menu) def fix_wrong_buffer(self): buf = self.get_best_buffer() if buf == None: for i in self.accounts: buffer = self.view.search("home_timeline", i) if buffer != None: break else: buffer = self.view.search("home_timeline", buf.session.get_name()) if buffer!=None: self.view.change_buffer(buffer) def up(self, *args, **kwargs): page = self.get_current_buffer() if not hasattr(page.buffer, "list"): output.speak(_(u"No session is currently in focus. Focus a session with the next or previous session shortcut."), True) return position = page.buffer.list.get_selected() index = position-1 try: page.buffer.list.select_item(index) except: pass if position == page.buffer.list.get_selected(): page.session.sound.play("limit.ogg") # try: output.speak(page.get_message(), True) # except: # pass def down(self, *args, **kwargs): page = self.get_current_buffer() if not hasattr(page.buffer, "list"): output.speak(_(u"No session is currently in focus. Focus a session with the next or previous session shortcut."), True) return position = page.buffer.list.get_selected() index = position+1 # try: page.buffer.list.select_item(index) # except: # pass if position == page.buffer.list.get_selected(): page.session.sound.play("limit.ogg") # try: output.speak(page.get_message(), True) # except: # pass def left(self, *args, **kwargs): buff = self.view.get_current_buffer_pos() buffer = self.get_current_buffer() if not hasattr(buffer.buffer, "list"): output.speak(_(u"No session is currently in focus. Focus a session with the next or previous session shortcut."), True) return if buff == self.get_first_buffer_index(buffer.account) or buff == 0: self.view.change_buffer(self.get_last_buffer_index(buffer.account)) else: self.view.change_buffer(buff-1) while self.get_current_buffer().invisible == False: self.skip_buffer(False) buffer = self.get_current_buffer() if self.showing == True: buffer.buffer.set_focus_in_list() try: msg = _(u"%s, %s of %s") % (self.view.get_buffer_text(), buffer.buffer.list.get_selected()+1, buffer.buffer.list.get_count()) except: msg = _(u"%s. Empty") % (self.view.get_buffer_text(),) output.speak(msg, True) def right(self, *args, **kwargs): buff = self.view.get_current_buffer_pos() buffer = self.get_current_buffer() if not hasattr(buffer.buffer, "list"): output.speak(_(u"No session is currently in focus. Focus a session with the next or previous session shortcut."), True) return if buff == self.get_last_buffer_index(buffer.account) or buff+1 == self.view.get_buffer_count(): self.view.change_buffer(self.get_first_buffer_index(buffer.account)) else: self.view.change_buffer(buff+1) while self.get_current_buffer().invisible == False: self.skip_buffer(True) buffer = self.get_current_buffer() if self.showing == True: buffer.buffer.set_focus_in_list() try: msg = _(u"%s, %s of %s") % (self.view.get_buffer_text(), buffer.buffer.list.get_selected()+1, buffer.buffer.list.get_count()) except: msg = _(u"%s. Empty") % (self.view.get_buffer_text(),) output.speak(msg, True) def next_account(self, *args, **kwargs): try: index = self.accounts.index(self.current_account) except ValueError: index = -1 if index+1 == len(self.accounts): index = 0 else: index = index+1 account = self.accounts[index] self.current_account = account buffer_object = self.get_first_buffer(account) if buffer_object == None: output.speak(_(u"{0}: This account is not logged into Twitter.").format(account), True) return buff = self.view.search(buffer_object.name, account) if buff == None: output.speak(_(u"{0}: This account is not logged into Twitter.").format(account), True) return self.view.change_buffer(buff) buffer = self.get_current_buffer() if self.showing == True: buffer.buffer.set_focus_in_list() try: msg = _(u"%s. %s, %s of %s") % (buffer.account, self.view.get_buffer_text(), buffer.buffer.list.get_selected()+1, buffer.buffer.list.get_count()) except: msg = _(u"%s. Empty") % (self.view.get_buffer_text(),) output.speak(msg, True) def previous_account(self, *args, **kwargs): try: index = self.accounts.index(self.current_account) except ValueError: index = 0 if index-1 < 0: index = len(self.accounts)-1 else: index = index-1 account = self.accounts[index] self.current_account = account buffer_object = self.get_first_buffer(account) if buffer_object == None: output.speak(_(u"{0}: This account is not logged into Twitter.").format(account), True) return buff = self.view.search(buffer_object.name, account) if buff == None: output.speak(_(u"{0}: This account is not logged into twitter.").format(account), True) return self.view.change_buffer(buff) buffer = self.get_current_buffer() if self.showing == True: buffer.buffer.set_focus_in_list() try: msg = _(u"%s. %s, %s of %s") % (buffer.account, self.view.get_buffer_text(), buffer.buffer.list.get_selected()+1, buffer.buffer.list.get_count()) except: msg = _(u"%s. Empty") % (self.view.get_buffer_text(),) output.speak(msg, True) def go_home(self): buffer = self.get_current_buffer() buffer.buffer.list.select_item(0) # try: output.speak(buffer.get_message(), True) # except: # pass def go_end(self): buffer = self.get_current_buffer() buffer.buffer.list.select_item(buffer.buffer.list.get_count()-1) # try: output.speak(buffer.get_message(), True) # except: # pass def go_page_up(self): buffer = self.get_current_buffer() if buffer.buffer.list.get_selected() <= 20: index = 0 else: index = buffer.buffer.list.get_selected() - 20 buffer.buffer.list.select_item(index) # try: output.speak(buffer.get_message(), True) # except: # pass def go_page_down(self): buffer = self.get_current_buffer() if buffer.buffer.list.get_selected() >= buffer.buffer.list.get_count() - 20: index = buffer.buffer.list.get_count()-1 else: index = buffer.buffer.list.get_selected() + 20 buffer.buffer.list.select_item(index) # try: output.speak(buffer.get_message(), True) # except: # pass def url(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "url"): buffer.url() def audio(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "audio"): return buffer.audio() def volume_down(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "volume_down"): return buffer.volume_down() def volume_up(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "volume_up"): return buffer.volume_up() def create_invisible_keyboard_shorcuts(self): keymap = {} for i in config.keymap["keymap"]: if hasattr(self, i): if config.keymap["keymap"][i] != "": keymap[config.keymap["keymap"][i]] = getattr(self, i) return keymap def register_invisible_keyboard_shorcuts(self, keymap): if config.changed_keymap: build_number = sys.getwindowsversion().build if build_number > 22000: system = "Windows 11" keystroke_editor_shortcut = "Control+Win+Alt+K" else: system = "Windows 10" keystroke_editor_shortcut = "Win+Alt+K" commonMessageDialogs.changed_keymap(system, keystroke_editor_shortcut) # Make sure we pass a keymap without undefined keystrokes. new_keymap = {key: keymap[key] for key in keymap.keys() if keymap[key] != ""} self.keyboard_handler = WXKeyboardHandler(self.view) self.keyboard_handler.register_keys(new_keymap) def unregister_invisible_keyboard_shorcuts(self, keymap): try: self.keyboard_handler.unregister_keys(keymap) del self.keyboard_handler except AttributeError: pass def notify(self, session, play_sound=None, message=None, notification=False): if session.settings["sound"]["session_mute"] == True: return if play_sound != None: session.sound.play(play_sound) if message != None: output.speak(message, speech=session.settings["reporting"]["speech_reporting"], braille=session.settings["reporting"]["braille_reporting"]) def start_buffers(self, session): log.debug("starting buffers... Session %s" % (session.session_id,)) handler = self.get_handler(type=session.type) for i in self.buffers: if i.session == session and i.needs_init == True: handler.start_buffer(controller=self, buffer=i) def set_positions(self): for i in sessions.sessions: self.set_buffer_positions(i) def invisible_shorcuts_changed(self, registered): if registered == True: km = self.create_invisible_keyboard_shorcuts() self.register_invisible_keyboard_shorcuts(km) elif registered == False: km = self.create_invisible_keyboard_shorcuts() self.unregister_invisible_keyboard_shorcuts(km) def about(self, *args, **kwargs): self.view.about_dialog() def get_soundpacks(self, *args, **kwargs): # This should redirect users of other languages to the right version of the TWBlue website. lang = languageHandler.curLang[:2] url = application.url final_url = "{0}/{1}/soundpacks".format(url, lang) try: response = requests.get(final_url) except: output.speak(_(u"An error happened while trying to connect to the server. Please try later.")) return # There is no twblue.mcvsoftware.com/en, so if English is the language used this should be False anyway. if response.status_code == 200 and lang != "en": webbrowser.open_new_tab(final_url) else: webbrowser.open_new_tab(application.url+"/soundpacks") def visit_website(self, *args, **kwargs): # This should redirect users of other languages to the right version of the TWBlue website. lang = languageHandler.curLang[:2] url = application.url final_url = "{0}/{1}".format(url, lang) try: response = requests.get(final_url) except: output.speak(_(u"An error happened while trying to connect to the server. Please try later.")) return # There is no twblue.mcvsoftware.com/en, so if English is the language used this should be False anyway. if response.status_code == 200 and lang != "en": webbrowser.open_new_tab(final_url) else: webbrowser.open_new_tab(application.url) def manage_accounts(self, *args, **kwargs): sm = sessionManager.sessionManagerController(started=True) sm.fill_list() sm.show() for i in sm.new_sessions: handler = self.get_handler(type=sessions.sessions[i].type) if handler != None and hasattr(handler, "create_buffers"): handler.create_buffers(controller=self, session=sessions.sessions[i]) call_threaded(self.start_buffers, sessions.sessions[i]) for i in sm.removed_sessions: if sessions.sessions[i].logged == True: self.logout_account(sessions.sessions[i].session_id) self.destroy_buffer(sessions.sessions[i].get_name(), sessions.sessions[i].get_name()) if sessions.sessions[i].get_name() in self.accounts: self.accounts.remove(sessions.sessions[i].get_name()) sessions.sessions.pop(i) def toggle_autoread(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "session") and buffer.session == None: return if buffer.name not in buffer.session.settings["other_buffers"]["autoread_buffers"]: buffer.session.settings["other_buffers"]["autoread_buffers"].append(buffer.name) output.speak(_(u"The auto-reading of new tweets is enabled for this buffer"), True) elif buffer.name in buffer.session.settings["other_buffers"]["autoread_buffers"]: buffer.session.settings["other_buffers"]["autoread_buffers"].remove(buffer.name) output.speak(_(u"The auto-reading of new tweets is disabled for this buffer"), True) buffer.session.settings.write() def toggle_session_mute(self, *args, **kwargs): buffer = self.get_best_buffer() if buffer.session.settings["sound"]["session_mute"] == False: buffer.session.settings["sound"]["session_mute"] = True output.speak(_(u"Session mute on"), True) elif buffer.session.settings["sound"]["session_mute"] == True: buffer.session.settings["sound"]["session_mute"] = False output.speak(_(u"Session mute off"), True) buffer.session.settings.write() def toggle_buffer_mute(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "session") and buffer.session == None: return if buffer.name not in buffer.session.settings["other_buffers"]["muted_buffers"]: buffer.session.settings["other_buffers"]["muted_buffers"].append(buffer.name) output.speak(_(u"Buffer mute on"), True) elif buffer.name in buffer.session.settings["other_buffers"]["muted_buffers"]: buffer.session.settings["other_buffers"]["muted_buffers"].remove(buffer.name) output.speak(_(u"Buffer mute off"), True) buffer.session.settings.write() def view_documentation(self, *args, **kwargs): lang = localization.get("documentation") os.chdir("documentation/%s" % (lang,)) webbrowser.open("manual.html") os.chdir("../../") def view_changelog(self, *args, **kwargs): lang = localization.get("documentation") os.chdir("documentation/%s" % (lang,)) webbrowser.open("changelog.html") os.chdir("../../") def copy_to_clipboard(self, *args, **kwargs): output.copy(self.get_current_buffer().get_message()) output.speak(_(u"Copied")) def repeat_item(self, *args, **kwargs): output.speak(self.get_current_buffer().get_message()) def execute_action(self, action, kwargs={}): if hasattr(self, action): getattr(self, action)(**kwargs) def update_buffers(self): for i in self.buffers[:]: if i.session != None and i.session.is_logged == True: # For ATProtoSocial, initial load is in session.start() or manual. # Periodic updates would need a separate timer or manual refresh via update_buffer. if i.session.KIND != "atprotosocial": try: i.start_stream(mandatory=True) # This is likely for streaming connections or timed polling within buffer except Exception as err: log.exception("Error %s starting buffer %s on account %s, with args %r and kwargs %r." % (str(err), i.name, i.account, i.args, i.kwargs)) def update_buffer(self, *args, **kwargs): """Handles the 'Update buffer' menu command to fetch newest items.""" bf = self.get_current_buffer() # bf is the buffer panel instance if not bf or not hasattr(bf, "session") or not bf.session: output.speak(_(u"No active session for this buffer."), True) return output.speak(_(u"Updating buffer..."), True) session = bf.session async def do_update(): new_ids = [] try: if session.KIND == "atprotosocial": if bf.name == f"{session.label} Home": # Assuming buffer name indicates type # ATProtoSocial home timeline uses new_only=True for fetching newest new_ids, _ = await session.fetch_home_timeline(limit=config.app["app-settings"].get("items_per_request", 20), new_only=True) elif bf.name == f"{session.label} Notifications": _, _ = await session.fetch_notifications(limit=config.app["app-settings"].get("items_per_request", 20)) # new_only implied by unread # fetch_notifications itself handles UI updates via send_notification_to_channel # so new_ids might not be directly applicable here unless fetch_notifications returns them # For simplicity, we'll assume it updates the buffer internally or via pubsub. # The count 'n' below might not be accurate for notifications this way. # Add other ATProtoSocial buffer types here (e.g., user timeline, mentions) # elif bf.name.startswith(f"{session.label} User Feed"): # Example for a user feed buffer # target_user_did = getattr(bf, 'target_user_did', None) # Panel needs to store this # if target_user_did: # new_ids, _ = await session.fetch_user_timeline(user_did=target_user_did, limit=config.app["app-settings"].get("items_per_request", 20), new_only=True) else: # Fallback to original buffer's start_stream if it's not an ATProtoSocial specific buffer we handle here if hasattr(bf, "start_stream"): count = bf.start_stream(mandatory=True, avoid_autoreading=True) if count is not None: new_ids = [str(x) for x in range(count)] # Dummy IDs for count else: output.speak(_(u"This buffer type cannot be updated in this way."), True) return else: # For other session types (e.g. Mastodon) if hasattr(bf, "start_stream"): count = bf.start_stream(mandatory=True, avoid_autoreading=True) if count is not None: new_ids = [str(x) for x in range(count)] # Dummy IDs for count else: output.speak(_(u"Unable to update this buffer."), True) return # Generic feedback based on new_ids for timelines if bf.name == f"{session.label} Home" or bf.name.startswith(f"{session.label} User Feed"): # Refine condition output.speak(_("{0} items retrieved").format(len(new_ids)), True) elif bf.name == f"{session.label} Notifications": output.speak(_("Notifications updated."), True) # Or specific count if fetch_notifications returns it except NotificationError as e: output.speak(str(e), True) except Exception as e_general: logger.error(f"Error updating buffer {bf.name}: {e_general}", exc_info=True) output.speak(_("An error occurred while updating the buffer."), True) wx.CallAfter(asyncio.create_task, do_update()) def get_more_items(self, *args, **kwargs): """Handles 'Load previous items' menu command.""" bf = self.get_current_buffer() # bf is the buffer panel instance if not bf or not hasattr(bf, "session") or not bf.session: output.speak(_(u"No active session for this buffer."), True) return session = bf.session # The buffer panel (bf) needs to store its own cursor for pagination of older items # e.g., bf.pagination_cursor or bf.older_items_cursor # This cursor should be set by the result of previous fetch_..._timeline(new_only=False) calls. # For ATProtoSocial, session methods like fetch_home_timeline store their own cursor (e.g., session.home_timeline_cursor) # The panel might need to get this initial cursor or manage its own if it's for a dynamic list (user feed). current_cursor = None if session.KIND == "atprotosocial": if bf.name == f"{session.label} Home": current_cursor = session.home_timeline_cursor # elif bf.name.startswith(f"{session.label} User Feed"): # current_cursor = getattr(bf, 'pagination_cursor', None) # Panel specific cursor # elif bf.name == f"{session.label} Notifications": # current_cursor = getattr(bf, 'pagination_cursor', None) # Panel specific cursor for notifications else: # Fallback or other buffer types if hasattr(bf, "get_more_items"): # Try generic buffer method return bf.get_more_items() else: output.speak(_(u"This buffer does not support loading more items in this way."), True) return else: # For other session types if hasattr(bf, "get_more_items"): return bf.get_more_items() else: output.speak(_(u"This buffer does not support loading more items."), True) return output.speak(_(u"Loading more items..."), True) async def do_load_more(): loaded_ids = [] try: if session.KIND == "atprotosocial": if bf.name == f"{session.label} Home": loaded_ids, _ = await session.fetch_home_timeline(cursor=current_cursor, limit=config.app["app-settings"].get("items_per_request", 20), new_only=False) # elif bf.name.startswith(f"{session.label} User Feed"): # target_user_did = getattr(bf, 'target_user_did', None) # if target_user_did: # loaded_ids, new_cursor = await session.fetch_user_timeline(user_did=target_user_did, cursor=current_cursor, limit=config.app["app-settings"].get("items_per_request", 20), new_only=False) # if hasattr(bf, "pagination_cursor"): bf.pagination_cursor = new_cursor # elif bf.name == f"{session.label} Notifications": # new_cursor = await session.fetch_notifications(cursor=current_cursor, limit=config.app["app-settings"].get("items_per_request", 20)) # if hasattr(bf, "pagination_cursor"): bf.pagination_cursor = new_cursor # fetch_notifications updates UI itself. loaded_ids might not be directly applicable. # For now, only home timeline "load more" is fully wired via session cursor. if loaded_ids: # Check if any IDs were actually loaded output.speak(_("{0} more items retrieved").format(len(loaded_ids)), True) else: output.speak(_("No more items found or loaded."), True) except NotificationError as e: output.speak(str(e), True) except Exception as e_general: logger.error(f"Error loading more items for buffer {bf.name}: {e_general}", exc_info=True) output.speak(_("An error occurred while loading more items."), True) wx.CallAfter(asyncio.create_task, do_load_more()) def buffer_title_changed(self, buffer): if buffer.name.endswith("-timeline"): title = _(u"Timeline for {}").format(buffer.username,) elif buffer.name.endswith("-followers"): title = _(u"Followers for {}").format(buffer.username,) elif buffer.name.endswith("-friends"): title = _(u"Friends for {}").format(buffer.username,) elif buffer.name.endswith("-following"): title = _(u"Following for {}").format(buffer.username,) buffer_index = self.view.search(buffer.name, buffer.account) self.view.set_page_title(buffer_index, title) def ocr_image(self, *args, **kwargs): buffer = self.get_current_buffer() if hasattr(buffer, "ocr_image"): return buffer.ocr_image() def save_data_in_db(self): for i in sessions.sessions: sessions.sessions[i].save_persistent_data() def toggle_share_settings(self, shareable=True): self.view.share.Enable(shareable) def mastodon_new_item(self, item, session_name, _buffers): sound_to_play = None for buff in _buffers: buffer = self.search_buffer(buff, session_name) if buffer == None or buffer.session.get_name() != session_name: return buffer.add_new_item(item) if buff == "home_timeline": sound_to_play = "tweet_received.ogg" elif buff == "mentions": sound_to_play = "mention_received.ogg" elif buff == "direct_messages": sound_to_play = "dm_received.ogg" elif buff == "sent": sound_to_play = "tweet_send.ogg" elif buff == "followers" or buff == "following": sound_to_play = "update_followers.ogg" elif buff == "notifications": sound_to_play = "new_event.ogg" elif "timeline" in buff: sound_to_play = "tweet_timeline.ogg" else: sound_to_play = None if sound_to_play != None and buff not in buffer.session.settings["other_buffers"]["muted_buffers"]: self.notify(buffer.session, sound_to_play) def mastodon_updated_item(self, item, session_name, _buffers): sound_to_play = None for buff in _buffers.keys(): buffer = self.search_buffer(buff, session_name) if buffer == None or buffer.session.get_name() != session_name: return buffer.update_item(item, _buffers[buff]) # Normally, we'd define this function on mastodon's session, but we need to access conversationListBuffer and here is the best place to do so. def mastodon_new_conversation(self, conversation, session_name): buffer = self.search_buffer("direct_messages", session_name) if buffer == None: log.error("Buffer not found: direct_messages on {}".format(session_name)) return # Direct messages buffer is hidden new_position, number_of_items = buffer.order_buffer([conversation]) buffer.put_items_on_list(number_of_items) if new_position > -1: buffer.buffer.list.select_item(new_position) # if number_of_items > 0: # sound_to_play = "dm_received.ogg" # if "direct_messages" not in buffer.session.settings["other_buffers"]["muted_buffers"]: # self.notify(buffer.session, sound_to_play) def mastodon_error_post(self, name, reply_to, visibility, posts, language): home = self.search_buffer("home_timeline", name) if home != None: wx.CallAfter(home.post_from_error, visibility=visibility, reply_to=reply_to, data=posts, lang=language) def change_buffer_title(self, name, buffer, title): buffer_index = self.view.search(buffer, name) if buffer_index != None and buffer_index > -1: self.view.set_page_title(buffer_index, title) def report_error(self, *args, **kwargs): """Redirects the user to the issue page on github""" log.debug("Redirecting the user to report an error...") webbrowser.open_new_tab(application.report_bugs_url) def update_profile(self, *args): """Updates the users profile""" log.debug("Update profile") buffer = self.get_best_buffer() handler = self.get_handler(buffer.session.type) if handler: handler.update_profile(buffer.session) def user_details(self, *args): """Displays a user's profile.""" log.debug("Showing user profile...") buffer = self.get_current_buffer() # Use current buffer to get context if item is selected if not buffer or not buffer.session: buffer = self.get_best_buffer() # Fallback if current buffer has no session if not buffer or not buffer.session: output.speak(_("No active session to view user details."), True) return handler = self.get_handler(type=buffer.session.type) if handler and hasattr(handler, 'user_details'): # user_details handler in Mastodon takes the buffer directly, which then extracts item/user # For ATProtoSocial, we might need to pass the user DID or handle if available from selected item # This part assumes the buffer has a way to provide the target user's identifier handler.user_details(buffer) else: output.speak(_("This session type does not support viewing user details in this way."), True) def openPostTimeline(self, *args, user=None): # "user" here is often the user object from selected item """Opens selected user's posts timeline Parameters: args: Other argument. Useful when binding to widgets. user: if specified, open this user timeline. It is currently mandatory, but could be optional when user selection is implemented in handler. `user` is typically a dict or object with user info from the selected post/item. """ current_buffer = self.get_current_buffer() # Get context from current buffer first if not current_buffer or not current_buffer.session: current_buffer = self.get_best_buffer() if not current_buffer or not current_buffer.session: output.speak(_("No active session available."), True) return session_to_use = current_buffer.session handler = self.get_handler(type=session_to_use.type) if handler and hasattr(handler, 'open_user_timeline'): # Changed to a more generic name # The handler's open_user_timeline should extract user_id (DID for ATProto) # from the 'user' object or prompt if 'user' is None. # 'user' object is often derived from the selected item in the current buffer. if user is None and hasattr(current_buffer, 'get_selected_item_author_details'): # Try to get author from selected item in current buffer if 'user' not passed directly author_details = current_buffer.get_selected_item_author_details() if author_details: user = author_details # This would be a dict/object the handler can parse # Call the handler method. It will be responsible for creating the new buffer. # The handler's open_user_timeline will need access to 'self' (mainController) to call add_buffer. async def _open_timeline(): await handler.open_user_timeline(main_controller=self, session=session_to_use, user_payload=user) wx.CallAfter(asyncio.create_task, _open_timeline()) elif handler and hasattr(handler, 'openPostTimeline'): # Fallback for older handler structure handler.openPostTimeline(self, current_buffer, user) else: output.speak(_("This session type does not support opening user timelines directly."), True) def openFollowersTimeline(self, *args, user=None): """Opens selected user's followers timeline Parameters: args: Other argument. Useful when binding to widgets. user: if specified, open this user timeline. It is currently mandatory, but could be optional when user selection is implemented in handler """ current_buffer = self.get_current_buffer() if not current_buffer or not current_buffer.session: current_buffer = self.get_best_buffer() if not current_buffer or not current_buffer.session: output.speak(_("No active session available."), True) return session_to_use = current_buffer.session handler = self.get_handler(type=session_to_use.type) if user is None and hasattr(current_buffer, 'get_selected_item_author_details'): author_details = current_buffer.get_selected_item_author_details() if author_details: user = author_details if handler and hasattr(handler, 'open_followers_timeline'): async def _open_followers(): await handler.open_followers_timeline(main_controller=self, session=session_to_use, user_payload=user) wx.CallAfter(asyncio.create_task, _open_followers()) elif handler and hasattr(handler, 'openFollowersTimeline'): # Fallback handler.openFollowersTimeline(self, current_buffer, user) else: output.speak(_("This session type does not support opening followers list."), True) def openFollowingTimeline(self, *args, user=None): """Opens selected user's following timeline Parameters: args: Other argument. Useful when binding to widgets. user: if specified, open this user timeline. It is currently mandatory, but could be optional when user selection is implemented in handler """ current_buffer = self.get_current_buffer() if not current_buffer or not current_buffer.session: current_buffer = self.get_best_buffer() if not current_buffer or not current_buffer.session: output.speak(_("No active session available."), True) return session_to_use = current_buffer.session handler = self.get_handler(type=session_to_use.type) if user is None and hasattr(current_buffer, 'get_selected_item_author_details'): author_details = current_buffer.get_selected_item_author_details() if author_details: user = author_details if handler and hasattr(handler, 'open_following_timeline'): async def _open_following(): await handler.open_following_timeline(main_controller=self, session=session_to_use, user_payload=user) wx.CallAfter(asyncio.create_task, _open_following()) elif handler and hasattr(handler, 'openFollowingTimeline'): # Fallback handler.openFollowingTimeline(self, current_buffer, user) else: output.speak(_("This session type does not support opening following list."), True) def community_timeline(self, *args, user=None): # user param seems unused here based on mastodon impl buffer = self.get_best_buffer() handler = self.get_handler(type=buffer.session.type) if handler and hasattr(handler, 'community_timeline'): handler.community_timeline(self, buffer) def create_filter(self, *args, **kwargs): buffer = self.get_best_buffer() handler = self.get_handler(type=buffer.session.type) if handler and hasattr(handler, 'create_filter'): handler.create_filter(self, buffer) def manage_filters(self, *args, **kwargs): buffer = self.get_best_buffer() handler = self.get_handler(type=buffer.session.type) if handler and hasattr(handler, 'manage_filters'): handler.manage_filters(self, buffer)