Refactor y eliminar patrones raros.

This commit is contained in:
Jesús Pavón Abián
2026-02-01 20:40:09 +01:00
parent 0d8395c6fc
commit 6f0514fd6a
29 changed files with 583 additions and 127 deletions

View File

@@ -6,3 +6,4 @@ from .users import UserBuffer
from .notifications import NotificationsBuffer
from .search import SearchBuffer
from .community import CommunityBuffer
from .announcements import AnnouncementsBuffer

View File

@@ -0,0 +1,165 @@
# -*- coding: utf-8 -*-
import time
import logging
import arrow
import widgetUtils
import wx
import output
import languageHandler
import config
from pubsub import pub
from controller.buffers.mastodon.base import BaseBuffer
from sessions.mastodon import compose, templates
from wxUI import buffers
from wxUI.dialogs.mastodon import menus
from mysc.thread_utils import call_threaded
log = logging.getLogger("controller.buffers.mastodon.announcements")
class AnnouncementsBuffer(BaseBuffer):
def __init__(self, *args, **kwargs):
# We enforce compose_func="compose_announcement"
kwargs["compose_func"] = "compose_announcement"
super(AnnouncementsBuffer, self).__init__(*args, **kwargs)
self.type = "announcementsBuffer"
def create_buffer(self, parent, name):
self.buffer = buffers.mastodon.announcementsPanel(parent, name)
def get_buffer_name(self):
return _("Announcements")
def bind_events(self):
self.buffer.set_focus_function(self.onFocus)
widgetUtils.connect_event(self.buffer.list.list, widgetUtils.KEYPRESS, self.get_event)
widgetUtils.connect_event(self.buffer, widgetUtils.BUTTON_PRESSED, self.dismiss_announcement, self.buffer.dismiss)
widgetUtils.connect_event(self.buffer.list.list, wx.EVT_LIST_ITEM_RIGHT_CLICK, self.show_menu)
widgetUtils.connect_event(self.buffer.list.list, wx.EVT_LIST_KEY_DOWN, self.show_menu_by_key)
def dismiss_announcement(self, event=None, item=None, *args, **kwargs):
index = self.buffer.list.get_selected()
if index == -1: return
item = self.session.db[self.name][index]
# Optimistic UI update or wait for API? Let's wait for API to be safe, but run threaded.
# We need a custom call because 'announcements_dismiss' returns None on success usually.
def _do_dismiss():
try:
self.session.api_call(call_name="announcements_dismiss", id=str(item.id))
# If success, update UI in main thread
wx.CallAfter(self._on_dismiss_success, index)
except Exception as e:
log.exception("Error dismissing announcement")
self.session.sound.play("error.ogg")
call_threaded(_do_dismiss)
def _on_dismiss_success(self, index):
if index < len(self.session.db[self.name]):
self.session.db[self.name].pop(index)
self.buffer.list.remove_item(index)
output.speak(_("Announcement dismissed."))
def show_menu(self, ev, pos=0, *args, **kwargs):
if self.buffer.list.get_count() == 0:
return
# Create a simple menu
menu = wx.Menu()
dismiss_item = menu.Append(wx.ID_ANY, _("Dismiss"))
copy_item = menu.Append(wx.ID_ANY, _("Copy text"))
self.buffer.Bind(wx.EVT_MENU, self.dismiss_announcement, dismiss_item)
self.buffer.Bind(wx.EVT_MENU, self.copy, copy_item)
if pos != 0:
self.buffer.PopupMenu(menu, pos)
else:
self.buffer.PopupMenu(menu, self.buffer.list.list.GetPosition())
def url(self, *args, **kwargs):
self.dismiss_announcement()
def get_more_items(self): output.speak(_("This buffer does not support loading more items."), True)
# Disable social interactions not applicable to announcements
def reply(self, *args, **kwargs):
pass
def share_item(self, *args, **kwargs):
pass
def toggle_favorite(self, *args, **kwargs):
pass
def add_to_favorites(self, *args, **kwargs):
pass
def remove_from_favorites(self, *args, **kwargs):
pass
def toggle_bookmark(self, *args, **kwargs):
pass
def mute_conversation(self, *args, **kwargs):
pass
def vote(self, *args, **kwargs):
pass
def send_message(self, *args, **kwargs):
pass
def user_details(self, *args, **kwargs):
pass
def view_item(self, *args, **kwargs):
# We could implement a specific viewer for announcements if needed,
# but the default one expects a status object structure.
pass
def copy(self, event=None):
item = self.get_item()
if item:
pub.sendMessage("execute-action", action="copy_to_clipboard")
def onFocus(self, *args, **kwargs):
# Similar logic to BaseBuffer but adapted if needed.
# BaseBuffer.onFocus handles reading long posts.
if config.app["app-settings"]["read_long_posts_in_gui"] == True and self.buffer.list.list.HasFocus():
wx.CallLater(40, output.speak, self.get_message(), interrupt=True)
def get_message(self):
# Override to use announcement template
announcement = self.get_item()
if announcement == None:
return
template = self.session.settings.get("templates", {}).get("announcement", templates.announcement_default_template)
t = templates.render_announcement(announcement, template, self.session.settings, relative_times=self.session.settings["general"]["relative_times"], offset_hours=self.session.db["utc_offset"])
return t
def start_stream(self, mandatory=False, play_sound=True, avoid_autoreading=False):
current_time = time.time()
if self.execution_time == 0 or current_time-self.execution_time >= 300 or mandatory==True:
self.execution_time = current_time
log.debug("Starting stream for announcements buffer")
try:
# The announcements API does not accept min_id or limit parameters
results = self.session.api.announcements()
# Reverse the list so order_buffer processes them according to user preference
results.reverse()
except Exception as e:
log.exception("Error retrieving announcements: %s" % (str(e)))
return 0
# order_buffer handles duplication filtering by ID internally
number_of_items = self.session.order_buffer(self.name, results)
log.debug("Number of new announcements retrieved: %d" % (number_of_items,))
self.put_items_on_list(number_of_items)
if number_of_items > 0 and play_sound == True and self.sound != None and self.session.settings["sound"]["session_mute"] == False:
self.session.sound.play(self.sound)
return number_of_items
return 0

View File

@@ -126,14 +126,16 @@ class BaseBuffer(base.Buffer):
min_id = None
# toDo: Implement reverse timelines properly here.
if (self.name != "favorites" and self.name != "bookmarks") and self.name in self.session.db and len(self.session.db[self.name]) > 0:
if self.session.settings["general"]["reverse_timelines"]:
min_id = self.session.db[self.name][0].id
else:
min_id = self.session.db[self.name][-1].id
# We use the maximum ID present in the buffer to ensure we only request posts
# that are newer than our most recent chronological post.
# This prevents old pinned posts from pulling in hundreds of previous statuses.
min_id = max(item.id for item in self.session.db[self.name])
# loads pinned posts from user accounts.
# Load those posts only when there are no items previously loaded.
if "-timeline" in self.name and "account_statuses" in self.function and len(self.session.db.get(self.name, [])) == 0:
pinned_posts = self.session.api.account_statuses(pinned=True, limit=count, *self.args, **self.kwargs)
for p in pinned_posts:
p["pinned"] = True
pinned_posts.reverse()
else:
pinned_posts = None
@@ -182,10 +184,17 @@ class BaseBuffer(base.Buffer):
def get_more_items(self):
elements = []
if self.session.settings["general"]["reverse_timelines"] == False:
max_id = self.session.db[self.name][0].id
if len(self.session.db[self.name]) == 0:
return
# We use the minimum ID in the buffer to correctly request the next page of older items.
# This prevents old pinned posts from causing us to skip chronological posts.
# We try to exclude pinned posts from this calculation as they are usually outliers at the top.
unpinned_ids = [item.id for item in self.session.db[self.name] if not getattr(item, "pinned", False)]
if unpinned_ids:
max_id = min(unpinned_ids)
else:
max_id = self.session.db[self.name][-1].id
max_id = min(item.id for item in self.session.db[self.name])
try:
items = getattr(self.session.api, self.function)(max_id=max_id, limit=self.session.settings["general"]["max_posts_per_call"], *self.args, **self.kwargs)
except Exception as e:
@@ -311,8 +320,10 @@ class BaseBuffer(base.Buffer):
widgetUtils.connect_event(menu, widgetUtils.MENU, self.user_actions, menuitem=menu.userActions)
if self.can_share() == True:
widgetUtils.connect_event(menu, widgetUtils.MENU, self.share_item, menuitem=menu.boost)
widgetUtils.connect_event(menu, widgetUtils.MENU, self.quote, menuitem=menu.quote)
else:
menu.boost.Enable(False)
menu.quote.Enable(False)
widgetUtils.connect_event(menu, widgetUtils.MENU, self.fav, menuitem=menu.fav)
widgetUtils.connect_event(menu, widgetUtils.MENU, self.unfav, menuitem=menu.unfav)
widgetUtils.connect_event(menu, widgetUtils.MENU, self.mute_conversation, menuitem=menu.mute)
@@ -442,11 +453,30 @@ class BaseBuffer(base.Buffer):
id = item.id
if self.session.settings["general"]["boost_mode"] == "ask":
answer = mastodon_dialogs.boost_question()
if answer == True:
if answer == 1:
self._direct_boost(id)
elif answer == 2:
self.quote(item=item)
else:
self._direct_boost(id)
def quote(self, event=None, item=None, *args, **kwargs):
if item == None:
item = self.get_item()
if self.can_share(item=item) == False:
return output.speak(_("This action is not supported on conversations."))
title = _("Quote post")
caption = _("Write your comment here")
post = messages.post(session=self.session, title=title, caption=caption)
response = post.message.ShowModal()
if response == wx.ID_OK:
post_data = post.get_data()
call_threaded(self.session.send_post, quote_id=item.id, posts=post_data, visibility=post.get_visibility(), language=post.get_language(), **kwargs)
if hasattr(post.message, "destroy"):
post.message.destroy()
def _direct_boost(self, id):
item = self.session.api_call(call_name="status_reblog", _sound="retweet_send.ogg", id=id)

View File

@@ -33,10 +33,7 @@ class CommunityBuffer(base.BaseBuffer):
min_id = None
# toDo: Implement reverse timelines properly here.
if self.name in self.session.db and len(self.session.db[self.name]) > 0:
if self.session.settings["general"]["reverse_timelines"]:
min_id = self.session.db[self.name][0].id
else:
min_id = self.session.db[self.name][-1].id
min_id = max(item.id for item in self.session.db[self.name])
try:
results = self.community_api.timeline(timeline=self.timeline, min_id=min_id, limit=count, *self.args, **self.kwargs)
results.reverse()
@@ -55,10 +52,15 @@ class CommunityBuffer(base.BaseBuffer):
def get_more_items(self):
elements = []
if self.session.settings["general"]["reverse_timelines"] == False:
max_id = self.session.db[self.name][0].id
if len(self.session.db[self.name]) == 0:
return
unpinned_ids = [item.id for item in self.session.db[self.name] if not getattr(item, "pinned", False)]
if unpinned_ids:
max_id = min(unpinned_ids)
else:
max_id = self.session.db[self.name][-1].id
max_id = min(item.id for item in self.session.db[self.name])
try:
items = self.community_api.timeline(timeline=self.timeline, max_id=max_id, limit=self.session.settings["general"]["max_posts_per_call"], *self.args, **self.kwargs)
except Exception as e:

View File

@@ -42,10 +42,22 @@ class MentionsBuffer(BaseBuffer):
def get_more_items(self):
elements = []
if self.session.settings["general"]["reverse_timelines"] == False:
max_id = self.session.db[self.name][0].id
else:
max_id = self.session.db[self.name][-1].id
if len(self.session.db[self.name]) == 0:
return
# In mentions buffer, items are notification objects which don't have 'pinned' attribute directly.
# But we check the status attached to the notification if it exists.
# However, notifications are strictly chronological usually. Pinned mentions don't exist?
# But let's stick to the safe ID extraction.
# The logic here is tricky because self.session.db stores notification objects, but sometimes just dicts?
# Let's assume they are objects with 'id' attribute.
# Notifications don't have 'pinned', so we just take the min ID.
# But wait, did I change this file previously to use min()? Yes.
# Is there any case where a notification ID is "pinned" (old)? No.
# So min() should be fine here. But for consistency with other buffers if any weird logic exists...
# Actually, let's keep min() as notifications don't support pinning.
max_id = min(item.id for item in self.session.db[self.name])
try:
items = getattr(self.session.api, self.function)(max_id=max_id, limit=self.session.settings["general"]["max_posts_per_call"], types=["mention"], *self.args, **self.kwargs)
except Exception as e:

View File

@@ -33,10 +33,7 @@ class SearchBuffer(BaseBuffer):
self.execution_time = current_time
min_id = None
if self.name in self.session.db and len(self.session.db[self.name]) > 0:
if self.session.settings["general"]["reverse_timelines"]:
min_id = self.session.db[self.name][0].id
else:
min_id = self.session.db[self.name][-1].id
min_id = max(item.id for item in self.session.db[self.name])
try:
results = getattr(self.session.api, self.function)(min_id=min_id, **self.kwargs)
except Exception as mess:

View File

@@ -35,6 +35,7 @@ class Handler(object):
compose=_("&Post"),
reply=_("Re&ply"),
share=_("&Boost"),
quote=_("&Quote"),
fav=_("&Add to favorites"),
unfav=_("Remove from favorites"),
view=_("&Show post"),
@@ -92,6 +93,8 @@ class Handler(object):
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Blocked users"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="blocks", name="blocked", sessionObject=session, account=name))
elif i == 'notifications':
pub.sendMessage("createBuffer", buffer_type="NotificationsBuffer", session_type=session.type, buffer_title=_("Notifications"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_notification", function="notifications", name="notifications", sessionObject=session, account=name))
elif i == 'announcements':
pub.sendMessage("createBuffer", buffer_type="AnnouncementsBuffer", session_type=session.type, buffer_title=_("Announcements"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="announcements", name="announcements", sessionObject=session, account=name, sound="new_event.ogg"))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="timelines", account=name))
timelines_position =controller.view.search("timelines", name)
for i in session.settings["other_buffers"]["timelines"]:

View File

@@ -10,7 +10,7 @@ import languageHandler
from twitter_text import parse_tweet, config
from mastodon import MastodonError
from controller import messages
from sessions.mastodon import templates
from sessions.mastodon import templates, utils
from wxUI.dialogs.mastodon import postDialogs
from extra.autocompletionUsers import completion
from . import userList
@@ -282,10 +282,7 @@ class editPost(post):
# Extract text from post
if item.reblog != None:
item = item.reblog
text = item.content
# Remove HTML tags from content
import re
text = re.sub('<[^<]+?>', '', text)
text = utils.html_filter(item.content)
# Initialize parent class
super(editPost, self).__init__(session, title, caption, text=text, *args, **kwargs)
# Store the post ID for editing

View File

@@ -52,10 +52,12 @@ class accountSettingsController(globalSettingsController):
post_template = self.config["templates"]["post"]
conversation_template = self.config["templates"]["conversation"]
person_template = self.config["templates"]["person"]
self.dialog.create_templates(post_template=post_template, conversation_template=conversation_template, person_template=person_template)
announcement_template = self.config.get("templates", {}).get("announcement", "$text. Published $published_at. $read")
self.dialog.create_templates(post_template=post_template, conversation_template=conversation_template, person_template=person_template, announcement_template=announcement_template)
widgetUtils.connect_event(self.dialog.templates.post, widgetUtils.BUTTON_PRESSED, self.edit_post_template)
widgetUtils.connect_event(self.dialog.templates.conversation, widgetUtils.BUTTON_PRESSED, self.edit_conversation_template)
widgetUtils.connect_event(self.dialog.templates.person, widgetUtils.BUTTON_PRESSED, self.edit_person_template)
widgetUtils.connect_event(self.dialog.templates.announcement, widgetUtils.BUTTON_PRESSED, self.edit_announcement_template)
self.dialog.create_other_buffers()
buffer_values = self.get_buffers_list()
self.dialog.buffers.insert_buffers(buffer_values)
@@ -109,6 +111,15 @@ class accountSettingsController(globalSettingsController):
self.config.write()
self.dialog.templates.person.SetLabel(_("Edit template for persons. Current template: {}").format(result))
def edit_announcement_template(self, *args, **kwargs):
template = self.config.get("templates", {}).get("announcement", "$text. Published $published_at. $read")
control = EditTemplate(template=template, type="announcement")
result = control.run_dialog()
if result != "": # Template has been saved.
self.config["templates"]["announcement"] = result
self.config.write()
self.dialog.templates.announcement.SetLabel(_("Edit template for announcements. Current template: {}").format(result))
def save_configuration(self):
if self.config["general"]["relative_times"] != self.dialog.get_value("general", "relative_time"):
self.needs_restart = True
@@ -204,6 +215,7 @@ class accountSettingsController(globalSettingsController):
all_buffers['blocked']=_("Blocked users")
all_buffers['muted']=_("Muted users")
all_buffers['notifications']=_("Notifications")
all_buffers['announcements']=_("Announcements")
list_buffers = []
hidden_buffers=[]
all_buffers_keys = list(all_buffers.keys())

View File

@@ -2,7 +2,7 @@
import re
import wx
from typing import List
from sessions.mastodon.templates import post_variables, conversation_variables, person_variables
from sessions.mastodon.templates import post_variables, conversation_variables, person_variables, announcement_variables
from wxUI.dialogs import templateDialogs
class EditTemplate(object):
@@ -13,6 +13,8 @@ class EditTemplate(object):
self.variables = post_variables
elif type == "conversation":
self.variables = conversation_variables
elif type == "announcement":
self.variables = announcement_variables
else:
self.variables = person_variables
self.template: str = template