mirror of
https://github.com/MCV-Software/TWBlue.git
synced 2026-03-06 01:17:32 +01:00
Resolve sender DIDs to display names by building member maps from conversation data. Fix compose functions to prefer snake_case attributes (ATProto SDK convention). Ensure stable key comparison in dedup logic by converting ATProto objects to strings. Move Chats buffer to appear after Mentions and before Notifications. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1230 lines
50 KiB
Python
1230 lines
50 KiB
Python
# -*- coding: utf-8 -*-
|
|
import logging
|
|
import wx
|
|
import arrow
|
|
import output
|
|
import sound
|
|
import config
|
|
import widgetUtils
|
|
import languageHandler
|
|
from pubsub import pub
|
|
from controller.buffers.base import base
|
|
from controller.blueski import messages as blueski_messages
|
|
from sessions.blueski import compose, utils, templates
|
|
from mysc.thread_utils import call_threaded
|
|
from wxUI.buffers.blueski import panels as BlueskiPanels
|
|
from wxUI import commonMessageDialogs
|
|
from wxUI.dialogs.blueski import menus
|
|
|
|
log = logging.getLogger("controller.buffers.blueski.base")
|
|
|
|
class BaseBuffer(base.Buffer):
|
|
def __init__(self, parent=None, name=None, session=None, *args, **kwargs):
|
|
# Adapt params to BaseBuffer
|
|
# BaseBuffer expects (parent, function, name, sessionObject, account)
|
|
function = "timeline" # Dummy
|
|
sessionObject = session
|
|
account = session.get_name() if session else "Unknown"
|
|
|
|
super(BaseBuffer, self).__init__(parent, function, name=name, sessionObject=sessionObject, account=account, *args, **kwargs)
|
|
|
|
self.session = sessionObject
|
|
self.account = account
|
|
self.name = name
|
|
self.create_buffer(parent, name)
|
|
self.buffer.account = account
|
|
self.invisible = True
|
|
compose_func = kwargs.get("compose_func", "compose_post")
|
|
self.compose_function = getattr(compose, compose_func)
|
|
self.sound = kwargs.get("sound", None)
|
|
|
|
# Initialize DB list if needed
|
|
if self.name not in self.session.db:
|
|
self.session.db[self.name] = []
|
|
|
|
self.bind_events()
|
|
|
|
def get_max_items(self):
|
|
"""Get max items per call from settings."""
|
|
return self.session.settings["general"]["max_posts_per_call"]
|
|
|
|
def create_buffer(self, parent, name):
|
|
# Default to HomePanel, can be overridden
|
|
self.buffer = BlueskiPanels.HomePanel(parent, name, account=self.account)
|
|
self.buffer.session = self.session
|
|
|
|
def bind_events(self):
|
|
# Bind essential events
|
|
log.debug("Binding events for buffer %s" % self.name)
|
|
self.buffer.set_focus_function(self.onFocus)
|
|
widgetUtils.connect_event(self.buffer.list.list, widgetUtils.KEYPRESS, self.get_event)
|
|
widgetUtils.connect_event(self.buffer.list.list, wx.EVT_LIST_ITEM_RIGHT_CLICK, self.show_menu)
|
|
widgetUtils.connect_event(self.buffer.list.list, wx.EVT_LIST_KEY_DOWN, self.show_menu_by_key)
|
|
|
|
# Buttons
|
|
if hasattr(self.buffer, "post"):
|
|
self.buffer.post.Bind(wx.EVT_BUTTON, self.on_post)
|
|
if hasattr(self.buffer, "reply"):
|
|
self.buffer.reply.Bind(wx.EVT_BUTTON, self.on_reply)
|
|
if hasattr(self.buffer, "repost"):
|
|
self.buffer.repost.Bind(wx.EVT_BUTTON, self.on_repost)
|
|
if hasattr(self.buffer, "like"):
|
|
self.buffer.like.Bind(wx.EVT_BUTTON, self.on_like)
|
|
if hasattr(self.buffer, "dm"):
|
|
self.buffer.dm.Bind(wx.EVT_BUTTON, self.on_dm)
|
|
if hasattr(self.buffer, "actions"):
|
|
self.buffer.actions.Bind(wx.EVT_BUTTON, self.user_actions)
|
|
|
|
def get_buffer_name(self):
|
|
"""Get human-readable buffer name."""
|
|
basic_buffers = dict(
|
|
home_timeline=_("Home"),
|
|
notifications=_("Notifications"),
|
|
mentions=_("Mentions"),
|
|
sent=_("Sent"),
|
|
likes=_("Likes"),
|
|
chats=_("Chats"),
|
|
)
|
|
if self.name in basic_buffers:
|
|
return basic_buffers[self.name]
|
|
if hasattr(self, "username"):
|
|
if "timeline" in self.name.lower():
|
|
return _("{username}'s timeline").format(username=self.username)
|
|
if "followers" in self.name.lower():
|
|
return _("{username}'s followers").format(username=self.username)
|
|
if "following" in self.name.lower():
|
|
return _("{username}'s following").format(username=self.username)
|
|
return self.name
|
|
|
|
def onFocus(self, *args, **kwargs):
|
|
"""Handle focus event for accessibility features."""
|
|
post = self.get_item()
|
|
if not post:
|
|
return
|
|
|
|
# Update relative time display
|
|
if self.session.settings["general"].get("relative_times", False):
|
|
try:
|
|
index = self.buffer.list.get_selected()
|
|
if index < 0:
|
|
return
|
|
# Only update if the list has at least 3 columns (Author, Text, Date)
|
|
if self.buffer.list.list.GetColumnCount() < 3:
|
|
return
|
|
|
|
def g(obj, key, default=None):
|
|
if isinstance(obj, dict):
|
|
return obj.get(key, default)
|
|
return getattr(obj, key, default)
|
|
|
|
actual_post = g(post, "post", post)
|
|
indexed_at = g(actual_post, "indexed_at", "") or g(actual_post, "indexedAt", "")
|
|
if indexed_at:
|
|
original_date = arrow.get(indexed_at)
|
|
ts = original_date.humanize(locale=languageHandler.curLang[:2])
|
|
self.buffer.list.list.SetItem(index, 2, ts)
|
|
except Exception as e:
|
|
log.error("Error updating relative time on focus: %s", e)
|
|
|
|
# Read long posts in GUI
|
|
if config.app["app-settings"].get("read_long_posts_in_gui", False) and self.buffer.list.list.HasFocus():
|
|
wx.CallLater(40, output.speak, self.get_message(), interrupt=True)
|
|
|
|
# Audio/video indicator sound
|
|
if self.session.settings["sound"].get("indicate_audio", False) and utils.is_audio_or_video(post):
|
|
self.session.sound.play("audio.ogg")
|
|
|
|
# Image indicator sound
|
|
if self.session.settings["sound"].get("indicate_img", False) and utils.is_image(post):
|
|
self.session.sound.play("image.ogg")
|
|
|
|
def auto_read(self, number_of_items):
|
|
"""Automatically read new items for accessibility."""
|
|
if number_of_items == 0:
|
|
return
|
|
if self.name in self.session.settings["other_buffers"].get("muted_buffers", []):
|
|
return
|
|
if self.session.settings["sound"].get("session_mute", False):
|
|
return
|
|
if self.name not in self.session.settings["other_buffers"].get("autoread_buffers", []):
|
|
return
|
|
|
|
safe = True
|
|
relative_times = self.session.settings["general"].get("relative_times", False)
|
|
show_screen_names = self.session.settings["general"].get("show_screen_names", False)
|
|
|
|
if number_of_items == 1:
|
|
if self.session.settings["general"].get("reverse_timelines", False):
|
|
post = self.session.db[self.name][0]
|
|
else:
|
|
post = self.session.db[self.name][-1]
|
|
output.speak(_("New post in {0}").format(self.get_buffer_name()))
|
|
output.speak(" ".join(self.compose_function(post, self.session.db, self.session.settings, relative_times, show_screen_names, safe=safe)))
|
|
elif number_of_items > 1:
|
|
output.speak(_("{0} new posts in {1}.").format(number_of_items, self.get_buffer_name()))
|
|
|
|
def show_menu(self, ev, pos=0, *args, **kwargs):
|
|
"""Show context menu for current item."""
|
|
if self.buffer.list.get_count() == 0:
|
|
return
|
|
menu = menus.baseMenu()
|
|
widgetUtils.connect_event(menu, widgetUtils.MENU, self.reply, menuitem=menu.reply)
|
|
widgetUtils.connect_event(menu, widgetUtils.MENU, self.share_item, menuitem=menu.repost)
|
|
if hasattr(menu, "quote"):
|
|
widgetUtils.connect_event(menu, widgetUtils.MENU, self.quote, menuitem=menu.quote)
|
|
widgetUtils.connect_event(menu, widgetUtils.MENU, self.add_to_favorites, menuitem=menu.like)
|
|
widgetUtils.connect_event(menu, widgetUtils.MENU, self.user_actions, menuitem=menu.userActions)
|
|
widgetUtils.connect_event(menu, widgetUtils.MENU, self.url_, menuitem=menu.openUrl)
|
|
if hasattr(menu, "openInBrowser"):
|
|
widgetUtils.connect_event(menu, widgetUtils.MENU, self.open_in_browser, menuitem=menu.openInBrowser)
|
|
widgetUtils.connect_event(menu, widgetUtils.MENU, self.view, menuitem=menu.view)
|
|
widgetUtils.connect_event(menu, widgetUtils.MENU, self.copy, menuitem=menu.copy)
|
|
widgetUtils.connect_event(menu, widgetUtils.MENU, self.destroy_status, menuitem=menu.remove)
|
|
if pos != 0:
|
|
self.buffer.PopupMenu(menu, pos)
|
|
else:
|
|
self.buffer.PopupMenu(menu, self.buffer.list.list.GetPosition())
|
|
|
|
def show_menu_by_key(self, ev):
|
|
"""Show context menu when pressing menu key."""
|
|
if self.buffer.list.get_count() == 0:
|
|
return
|
|
if ev.GetKeyCode() == wx.WXK_WINDOWS_MENU:
|
|
self.show_menu(widgetUtils.MENU, pos=self.buffer.list.list.GetPosition())
|
|
|
|
def copy(self, *args, **kwargs):
|
|
"""Copy post to clipboard."""
|
|
pub.sendMessage("execute-action", action="copy_to_clipboard")
|
|
|
|
def on_post(self, evt):
|
|
dlg = blueski_messages.post(session=self.session, title=_("New Post"), caption=_("New Post"))
|
|
if dlg.message.ShowModal() == wx.ID_OK:
|
|
text, files, cw, langs = dlg.get_data()
|
|
self._send_post_async(
|
|
text=text,
|
|
files=files,
|
|
cw_text=cw,
|
|
langs=langs,
|
|
success_message=_("Sent."),
|
|
error_message=_("An error occurred while posting."),
|
|
sound="tweet_send.ogg",
|
|
refresh_args=(False, False),
|
|
)
|
|
dlg.message.Destroy()
|
|
|
|
def on_reply(self, evt):
|
|
item = self.get_item()
|
|
if not item: return
|
|
|
|
# item is a feed object or dict.
|
|
# We need its URI.
|
|
uri = self.get_selected_item_id()
|
|
if not uri:
|
|
uri = item.get("uri") if isinstance(item, dict) else getattr(item, "uri", None)
|
|
reply_cid = self.get_selected_item_cid()
|
|
# Attempt to get CID if present for consistency, though send_message handles it
|
|
|
|
def g(obj, key, default=None):
|
|
if isinstance(obj, dict):
|
|
return obj.get(key, default)
|
|
return getattr(obj, key, default)
|
|
|
|
author = g(item, "author")
|
|
if not author:
|
|
post = g(item, "post") or g(item, "record")
|
|
author = g(post, "author") if post else None
|
|
handle = g(author, "handle", "")
|
|
initial_text = f"@{handle} " if handle and not handle.startswith("@") else (f"{handle} " if handle else "")
|
|
|
|
dlg = blueski_messages.post(session=self.session, title=_("Reply"), caption=_("Reply"), text=initial_text)
|
|
if dlg.message.ShowModal() == wx.ID_OK:
|
|
text, files, cw, langs = dlg.get_data()
|
|
refresh_args = (True, False) if getattr(self, "type", "") == "conversation" else None
|
|
self._send_post_async(
|
|
text=text,
|
|
files=files,
|
|
cw_text=cw,
|
|
langs=langs,
|
|
reply_to=uri,
|
|
reply_to_cid=reply_cid,
|
|
success_message=_("Reply sent."),
|
|
error_message=_("An error occurred while replying."),
|
|
sound="reply_send.ogg",
|
|
refresh_args=refresh_args,
|
|
)
|
|
dlg.message.Destroy()
|
|
|
|
def _send_post_async(
|
|
self,
|
|
*,
|
|
text,
|
|
files,
|
|
cw_text,
|
|
langs,
|
|
reply_to=None,
|
|
reply_to_cid=None,
|
|
success_message="",
|
|
error_message="",
|
|
sound=None,
|
|
refresh_args=None,
|
|
):
|
|
if not text and not files:
|
|
return
|
|
|
|
def do_send():
|
|
try:
|
|
uri_resp = self.session.send_message(
|
|
message=text,
|
|
files=files,
|
|
reply_to=reply_to,
|
|
reply_to_cid=reply_to_cid,
|
|
cw_text=cw_text,
|
|
langs=langs,
|
|
)
|
|
if uri_resp:
|
|
if sound:
|
|
wx.CallAfter(self.session.sound.play, sound)
|
|
if success_message:
|
|
wx.CallAfter(output.speak, success_message)
|
|
if refresh_args and hasattr(self, "start_stream"):
|
|
try:
|
|
wx.CallAfter(self.start_stream, *refresh_args)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
wx.CallAfter(output.speak, _("Failed to send post."), True)
|
|
except Exception:
|
|
log.exception("Error sending Bluesky post")
|
|
if error_message:
|
|
wx.CallAfter(output.speak, error_message, True)
|
|
else:
|
|
wx.CallAfter(output.speak, _("An error occurred while posting."), True)
|
|
|
|
call_threaded(do_send)
|
|
|
|
def on_repost(self, evt):
|
|
self.share_item()
|
|
|
|
def share_item(self, event=None, item=None, *args, **kwargs):
|
|
if item is None:
|
|
item = self.get_item()
|
|
if not item:
|
|
return
|
|
|
|
def g(obj, key, default=None):
|
|
if isinstance(obj, dict):
|
|
return obj.get(key, default)
|
|
return getattr(obj, key, default)
|
|
|
|
# Get the URI for reposting
|
|
uri = g(item, "uri") or g(g(item, "post"), "uri")
|
|
cid = g(item, "cid") or g(g(item, "post"), "cid")
|
|
if not uri:
|
|
output.speak(_("Could not find post to repost."), True)
|
|
return
|
|
|
|
# Check boost_mode setting
|
|
boost_mode = self.session.settings["general"].get("boost_mode", "ask")
|
|
if boost_mode == "ask":
|
|
from wxUI.dialogs.blueski.postDialogs import repost_question
|
|
answer = repost_question()
|
|
if answer == 1:
|
|
self._direct_repost(uri)
|
|
elif answer == 2:
|
|
self.quote(item=item)
|
|
else:
|
|
self._direct_repost(uri)
|
|
|
|
def _direct_repost(self, uri):
|
|
try:
|
|
self.session.repost(uri)
|
|
self.session.sound.play("retweet_send.ogg")
|
|
output.speak(_("Reposted."))
|
|
except Exception as e:
|
|
log.error("Error reposting: %s", e)
|
|
output.speak(_("Failed to repost."), True)
|
|
|
|
def quote(self, event=None, item=None, *args, **kwargs):
|
|
if item is None:
|
|
item = self.get_item()
|
|
if not item:
|
|
return
|
|
|
|
def g(obj, key, default=None):
|
|
if isinstance(obj, dict):
|
|
return obj.get(key, default)
|
|
return getattr(obj, key, default)
|
|
|
|
uri = g(item, "uri") or g(g(item, "post"), "uri")
|
|
if not uri:
|
|
output.speak(_("Could not find post to quote."), True)
|
|
return
|
|
|
|
title = _("Quote post")
|
|
caption = _("Write your comment here")
|
|
dlg = blueski_messages.post(session=self.session, title=title, caption=caption)
|
|
if dlg.message.ShowModal() == wx.ID_OK:
|
|
text, files, cw, langs = dlg.get_data()
|
|
if text or files:
|
|
def do_quote():
|
|
try:
|
|
result = self.session.send_message(
|
|
message=text,
|
|
files=files,
|
|
cw_text=cw,
|
|
langs=langs,
|
|
quote_uri=uri,
|
|
)
|
|
if result:
|
|
wx.CallAfter(self.session.sound.play, "retweet_send.ogg")
|
|
wx.CallAfter(output.speak, _("Quote posted."))
|
|
else:
|
|
wx.CallAfter(output.speak, _("Failed to post quote."), True)
|
|
except Exception as e:
|
|
log.error("Error posting quote: %s", e)
|
|
wx.CallAfter(output.speak, _("Failed to post quote."), True)
|
|
call_threaded(do_quote)
|
|
dlg.message.Destroy()
|
|
|
|
def on_like(self, evt):
|
|
self.toggle_favorite(confirm=False)
|
|
|
|
def toggle_favorite(self, confirm=False, *args, **kwargs):
|
|
item = self.get_item()
|
|
if not item:
|
|
output.speak(_("No item to like."), True)
|
|
return
|
|
|
|
def g(obj, key, default=None):
|
|
if isinstance(obj, dict):
|
|
return obj.get(key, default)
|
|
return getattr(obj, key, default)
|
|
|
|
uri = g(item, "uri")
|
|
if not uri:
|
|
post = g(item, "post") or g(item, "record")
|
|
uri = g(post, "uri") if post else None
|
|
|
|
if not uri:
|
|
output.speak(_("Could not find post identifier."), True)
|
|
return
|
|
|
|
if confirm:
|
|
if wx.MessageBox(_("Like this post?"), _("Confirm"), wx.YES_NO | wx.ICON_QUESTION) != wx.YES:
|
|
return
|
|
|
|
# Check if already liked
|
|
viewer = g(item, "viewer")
|
|
already_liked = g(viewer, "like") if viewer else None
|
|
|
|
if already_liked:
|
|
output.speak(_("Already liked."), True)
|
|
return
|
|
|
|
# Perform the like
|
|
like_uri = self.session.like(uri)
|
|
if not like_uri:
|
|
output.speak(_("Failed to like post."), True)
|
|
return
|
|
|
|
self.session.sound.play("favourite.ogg")
|
|
output.speak(_("Liked."))
|
|
|
|
# Update the viewer state in the item
|
|
if isinstance(item, dict):
|
|
if "viewer" not in item:
|
|
item["viewer"] = {}
|
|
item["viewer"]["like"] = like_uri
|
|
else:
|
|
# For SDK models, create or update viewer
|
|
if not hasattr(item, "viewer") or item.viewer is None:
|
|
# Create a simple object to hold the like state
|
|
class Viewer:
|
|
def __init__(self):
|
|
self.like = None
|
|
item.viewer = Viewer()
|
|
item.viewer.like = like_uri
|
|
|
|
# Refresh the displayed item in the list
|
|
try:
|
|
index = self.buffer.list.get_selected()
|
|
if index > -1:
|
|
# Recompose and update the list item
|
|
safe = True
|
|
relative_times = self.session.settings["general"].get("relative_times", False)
|
|
show_screen_names = self.session.settings["general"].get("show_screen_names", False)
|
|
post_data = self.compose_function(item, self.session.db, self.session.settings,
|
|
relative_times=relative_times,
|
|
show_screen_names=show_screen_names,
|
|
safe=safe)
|
|
|
|
# Update the item in place (only 3 columns: Author, Post, Date)
|
|
self.buffer.list.list.SetItem(index, 0, post_data[0]) # Author
|
|
self.buffer.list.list.SetItem(index, 1, post_data[1]) # Text
|
|
self.buffer.list.list.SetItem(index, 2, post_data[2]) # Date
|
|
# Note: compose_post returns 4 items but list only has 3 columns
|
|
except Exception as e:
|
|
log.error("Error refreshing list item after like: %s", e)
|
|
|
|
def add_to_favorites(self, *args, **kwargs):
|
|
self.toggle_favorite(confirm=False)
|
|
|
|
def remove_from_favorites(self, *args, **kwargs):
|
|
# We need unlike support in session
|
|
pass
|
|
|
|
def on_dm(self, evt):
|
|
self.send_message()
|
|
|
|
def send_message(self, *args, **kwargs):
|
|
# Global shortcut for DM - similar to Mastodon's implementation
|
|
|
|
# Use the robust author extraction method
|
|
author_details = self.get_selected_item_author_details()
|
|
if not author_details:
|
|
output.speak(_("No item selected."), True)
|
|
return
|
|
|
|
did = author_details.get("did")
|
|
handle = author_details.get("handle") or "unknown"
|
|
|
|
if not did:
|
|
output.speak(_("Could not identify user to message."), True)
|
|
return
|
|
|
|
# Use full post dialog like Mastodon
|
|
title = _("Conversation with {0}").format(handle)
|
|
caption = _("Write your message here")
|
|
initial_text = "@{} ".format(handle)
|
|
|
|
post = blueski_messages.post(session=self.session, title=title, caption=caption, text=initial_text)
|
|
if post.message.ShowModal() == wx.ID_OK:
|
|
text, files, cw_text, langs = post.get_data()
|
|
if text:
|
|
def do_send():
|
|
try:
|
|
api = self.session._ensure_client()
|
|
dm_client = api.with_bsky_chat_proxy()
|
|
# Get or create conversation
|
|
res = dm_client.chat.bsky.convo.get_convo_for_members({"members": [did]})
|
|
convo_id = res.convo.id
|
|
self.session.send_chat_message(convo_id, text)
|
|
wx.CallAfter(self.session.sound.play, "dm_sent.ogg")
|
|
wx.CallAfter(output.speak, _("Message sent."))
|
|
except Exception as e:
|
|
log.error("Error sending Bluesky DM: %s", e)
|
|
wx.CallAfter(output.speak, _("Failed to send message."), True)
|
|
call_threaded(do_send)
|
|
if hasattr(post.message, "Destroy"):
|
|
post.message.Destroy()
|
|
|
|
def view(self, *args, **kwargs):
|
|
self.view_item()
|
|
|
|
def view_item(self, item=None):
|
|
if item is None:
|
|
item = self.get_item()
|
|
if not item:
|
|
return
|
|
if not blueski_messages.has_post_data(item):
|
|
pub.sendMessage("execute-action", action="user_details")
|
|
return
|
|
try:
|
|
blueski_messages.viewPost(self.session, item, controller=getattr(self, "controller", None))
|
|
except Exception as e:
|
|
log.error("Error opening Bluesky post viewer: %s", e)
|
|
|
|
def url_(self, *args, **kwargs):
|
|
self.url()
|
|
|
|
def url(self, announce=True, item=None, *args, **kwargs):
|
|
"""Open URLs found in the post content."""
|
|
if item is None:
|
|
item = self.get_item()
|
|
if not item:
|
|
return
|
|
|
|
import webbrowser
|
|
from wxUI.dialogs import urlList
|
|
|
|
urls = utils.find_urls(item)
|
|
url = ""
|
|
if len(urls) == 1:
|
|
url = urls[0]
|
|
elif len(urls) > 1:
|
|
urls_list = urlList.urlList()
|
|
urls_list.populate_list(urls)
|
|
if urls_list.get_response() == widgetUtils.OK:
|
|
url = urls_list.get_string()
|
|
if hasattr(urls_list, "destroy"):
|
|
urls_list.destroy()
|
|
if url != '':
|
|
if announce:
|
|
output.speak(_(u"Opening URL..."), True)
|
|
webbrowser.open_new_tab(url)
|
|
|
|
def user_actions(self, *args, **kwargs):
|
|
pub.sendMessage("execute-action", action="follow")
|
|
|
|
def view_chat_with_user(self, did, handle):
|
|
try:
|
|
api = self.session._ensure_client()
|
|
res = api.chat.bsky.convo.get_convo_for_members({"members": [did]})
|
|
convo_id = res.convo.id
|
|
|
|
title = _("Chat: {0}").format(handle)
|
|
self.controller.create_buffer(
|
|
buffer_type="chat_messages",
|
|
session_type="blueski",
|
|
buffer_title=title,
|
|
kwargs={"session": self.session, "convo_id": convo_id, "name": title},
|
|
start=True
|
|
)
|
|
except:
|
|
output.speak(_("Could not open chat."), True)
|
|
|
|
def block_user(self, *args, **kwargs):
|
|
item = self.get_item()
|
|
if not item: return
|
|
author = getattr(item, "author", None) or (item.get("post", {}).get("author") if isinstance(item, dict) else item)
|
|
did = getattr(author, "did", None) or (author.get("did") if isinstance(author, dict) else None)
|
|
handle = getattr(author, "handle", "unknown") or (author.get("handle") if isinstance(author, dict) else "unknown")
|
|
|
|
if wx.MessageBox(_("Are you sure you want to block {0}?").format(handle), _("Block"), wx.YES_NO | wx.ICON_WARNING) == wx.YES:
|
|
if self.session.block_user(did):
|
|
output.speak(_("User blocked."))
|
|
else:
|
|
output.speak(_("Failed to block user."))
|
|
|
|
def unblock_user(self, *args, **kwargs):
|
|
# Unblocking usually needs the block record URI.
|
|
# In a UserBuffer (Blocks), it might be present.
|
|
item = self.get_item()
|
|
if not item: return
|
|
|
|
# Check if item itself is a block record or user object with viewer.blocking
|
|
block_uri = None
|
|
if isinstance(item, dict):
|
|
block_uri = item.get("viewer", {}).get("blocking")
|
|
else:
|
|
viewer = getattr(item, "viewer", None)
|
|
block_uri = getattr(viewer, "blocking", None) if viewer else None
|
|
|
|
if not block_uri:
|
|
output.speak(_("Could not find block information for this user."), True)
|
|
return
|
|
|
|
if self.session.unblock_user(block_uri):
|
|
output.speak(_("User unblocked."))
|
|
else:
|
|
output.speak(_("Failed to unblock user."))
|
|
|
|
def put_items_on_list(self, number_of_items):
|
|
list_to_use = self.session.db[self.name]
|
|
count = self.buffer.list.get_count()
|
|
reverse = False
|
|
try:
|
|
reverse = self.session.settings["general"].get("reverse_timelines", False)
|
|
except: pass
|
|
|
|
if number_of_items == 0:
|
|
return
|
|
|
|
safe = True
|
|
relative_times = self.session.settings["general"].get("relative_times", False)
|
|
show_screen_names = self.session.settings["general"].get("show_screen_names", False)
|
|
|
|
if count == 0:
|
|
for i in list_to_use:
|
|
post = self.compose_function(i, self.session.db, self.session.settings, relative_times=relative_times, show_screen_names=show_screen_names, safe=safe)
|
|
self.buffer.list.insert_item(False, *post)
|
|
# Set selection
|
|
total = self.buffer.list.get_count()
|
|
if total > 0:
|
|
if not reverse:
|
|
self.buffer.list.select_item(total - 1) # Bottom
|
|
else:
|
|
self.buffer.list.select_item(0) # Top
|
|
|
|
elif count > 0 and number_of_items > 0:
|
|
if not reverse:
|
|
items = list_to_use[:number_of_items] # If we prepended items for normal (oldest first) timeline... wait.
|
|
# Standard flow: "New items" come from API.
|
|
# If standard timeline (oldest at top, newest at bottom): new items appended to DB.
|
|
# UI: append to bottom.
|
|
items = list_to_use[len(list_to_use)-number_of_items:]
|
|
for i in items:
|
|
post = self.compose_function(i, self.session.db, self.session.settings, relative_times=relative_times, show_screen_names=show_screen_names, safe=safe)
|
|
self.buffer.list.insert_item(False, *post)
|
|
else:
|
|
# Reverse timeline (Newest at top).
|
|
# New items appended to DB? Or inserted at 0?
|
|
# Mastodon BaseBuffer:
|
|
# if reverse_timelines == False: items_db.insert(0, i) (Wait, insert at 0?)
|
|
# Actually let's look at `get_more_items` in Mastodon BaseBuffer again.
|
|
# "if self.session.settings["general"]["reverse_timelines"] == False: items_db.insert(0, i)"
|
|
# This means for standard timeline, new items (newer time) go to index 0?
|
|
# No, standard timeline usually has oldest at top. Retrieve "more items" usually means "newer items" or "older items" depending on context (streaming vs styling).
|
|
|
|
# Let's trust that we just need to insert based on how we updated DB in start_stream.
|
|
|
|
# For now, simplistic approach:
|
|
items = list_to_use[0:number_of_items] # Assuming we inserted at 0 in DB
|
|
# items.reverse() if needed?
|
|
for i in items:
|
|
post = self.compose_function(i, self.session.db, self.session.settings, relative_times=relative_times, show_screen_names=show_screen_names, safe=safe)
|
|
self.buffer.list.insert_item(True, *post) # Insert at 0 (True)
|
|
|
|
def reply(self, *args, **kwargs):
|
|
self.on_reply(None)
|
|
|
|
def post_status(self, *args, **kwargs):
|
|
self.on_post(None)
|
|
|
|
def destroy_status(self, *args, **kwargs):
|
|
# Delete post
|
|
item = self.get_item()
|
|
if not item: return
|
|
uri = self.get_selected_item_id()
|
|
if not uri:
|
|
if isinstance(item, dict):
|
|
uri = item.get("uri") or item.get("post", {}).get("uri")
|
|
else:
|
|
post = getattr(item, "post", None)
|
|
uri = getattr(item, "uri", None) or getattr(post, "uri", None)
|
|
if not uri:
|
|
output.speak(_("Could not find the post identifier."), True)
|
|
return
|
|
|
|
# Check if author is self
|
|
# Implementation depends on parsing URI or checking active user DID vs author DID
|
|
# For now, just try and handle error
|
|
if wx.MessageBox(_("Delete this post?"), _("Confirm"), wx.YES_NO | wx.ICON_QUESTION) == wx.YES:
|
|
try:
|
|
ok = self.session.delete_post(uri)
|
|
if not ok:
|
|
output.speak(_("Could not delete."), True)
|
|
return
|
|
index = self.buffer.list.get_selected()
|
|
if index > -1 and self.session.db.get(self.name):
|
|
try:
|
|
self.session.db[self.name].pop(index)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.buffer.list.remove_item(index)
|
|
except Exception:
|
|
pass
|
|
output.speak(_("Deleted."))
|
|
except Exception as e:
|
|
log.error("Error deleting Bluesky post: %s", e)
|
|
output.speak(_("Could not delete."), True)
|
|
|
|
|
|
def audio(self, event=None, item=None, *args, **kwargs):
|
|
"""Play audio/video from the current post."""
|
|
if sound.URLPlayer.player.is_playing():
|
|
return sound.URLPlayer.stop_audio()
|
|
if item is None:
|
|
item = self.get_item()
|
|
if not item:
|
|
return
|
|
urls = utils.get_media_urls(item)
|
|
if not urls:
|
|
output.speak(_("This post has no playable media."), True)
|
|
return
|
|
url = ""
|
|
if len(urls) == 1:
|
|
url = urls[0]
|
|
elif len(urls) > 1:
|
|
from wxUI.dialogs import urlList
|
|
urls_list = urlList.urlList()
|
|
urls_list.populate_list(urls)
|
|
if urls_list.get_response() == widgetUtils.OK:
|
|
url = urls_list.get_string()
|
|
if hasattr(urls_list, "destroy"):
|
|
urls_list.destroy()
|
|
if url:
|
|
sound.URLPlayer.play(url, self.session.settings["sound"]["volume"])
|
|
|
|
def ocr_image(self, *args, **kwargs):
|
|
"""Perform OCR on images in the current post."""
|
|
post = self.get_item()
|
|
if not post:
|
|
return
|
|
|
|
image_list = utils.get_image_urls(post)
|
|
if not image_list:
|
|
return
|
|
|
|
if len(image_list) > 1:
|
|
from wxUI.dialogs import urlList
|
|
labels = [_("Picture {0}").format(i + 1) for i in range(len(image_list))]
|
|
dialog = urlList.urlList(title=_("Select the picture"))
|
|
dialog.populate_list(labels)
|
|
if dialog.get_response() != widgetUtils.OK:
|
|
return
|
|
img = image_list[dialog.get_item()]
|
|
else:
|
|
img = image_list[0]
|
|
|
|
url = img.get("url")
|
|
if not url:
|
|
return
|
|
|
|
from extra import ocr as ocr_module
|
|
api = ocr_module.OCRSpace.OCRSpaceAPI()
|
|
try:
|
|
text = api.OCR_URL(url)
|
|
except ocr_module.OCRSpace.APIError:
|
|
output.speak(_("Unable to extract text"), True)
|
|
return
|
|
except Exception as e:
|
|
log.error("OCR error: %s", e)
|
|
output.speak(_("Unable to extract text"), True)
|
|
return
|
|
|
|
viewer = blueski_messages.text(title=_("OCR Result"), text=text["ParsedText"])
|
|
viewer.message.ShowModal()
|
|
viewer.message.Destroy()
|
|
|
|
# Also implement "view_item" if standard keymap uses it
|
|
def get_formatted_message(self):
|
|
return self.compose_function(self.get_item(), self.session.db, self.session.settings, self.session.settings["general"].get("relative_times", False), self.session.settings["general"].get("show_screen_names", False))[1]
|
|
|
|
def get_message(self):
|
|
item = self.get_item()
|
|
if item is None:
|
|
return
|
|
relative_times = self.session.settings["general"].get("relative_times", False)
|
|
offset_hours = 0
|
|
if isinstance(self.session.db, dict):
|
|
offset_hours = self.session.db.get("utc_offset", 0) or 0
|
|
template_settings = self.session.settings.get("templates", {})
|
|
try:
|
|
if self.type == "notifications":
|
|
template = template_settings.get("notification", "$display_name $text, $date")
|
|
post_template = template_settings.get("post", "$display_name, $reply_to$safe_text $date.")
|
|
return templates.render_notification(item, template, post_template, self.session.settings, relative_times, offset_hours)
|
|
if self.type in ("user", "post_user_list"):
|
|
template = template_settings.get("person", "$display_name (@$screen_name). $followers followers, $following following, $posts posts. Joined $created_at.")
|
|
return templates.render_user(item, template, self.session.settings, relative_times, offset_hours)
|
|
template = template_settings.get("post", "$display_name, $reply_to$safe_text $date.")
|
|
return templates.render_post(item, template, self.session.settings, relative_times, offset_hours)
|
|
except Exception:
|
|
# Fallback to compose if any template render fails.
|
|
composed = self.compose_function(
|
|
item,
|
|
self.session.db,
|
|
self.session.settings,
|
|
relative_times,
|
|
self.session.settings["general"].get("show_screen_names", False),
|
|
)
|
|
return " ".join(composed)
|
|
|
|
def view_conversation(self, *args, **kwargs):
|
|
item = self.get_item()
|
|
if not item: return
|
|
|
|
uri = item.get("uri") if isinstance(item, dict) else getattr(item, "uri", None)
|
|
if not uri: return
|
|
|
|
controller = self.controller
|
|
|
|
details = self.get_selected_item_author_details()
|
|
handle = "Unknown"
|
|
if details:
|
|
handle = details.get("handle") or "Unknown"
|
|
title = _("Conversation: {0}").format(handle)
|
|
|
|
controller.create_buffer(
|
|
buffer_type="conversation",
|
|
session_type="blueski",
|
|
buffer_title=title,
|
|
kwargs={"session": self.session, "uri": uri, "name": title},
|
|
start=True
|
|
)
|
|
|
|
def get_item(self):
|
|
index = self.buffer.list.get_selected()
|
|
if index > -1 and self.session.db.get(self.name) is not None:
|
|
# Logic implies DB order matches UI order
|
|
return self.session.db[self.name][index]
|
|
|
|
def get_selected_item_id(self):
|
|
item = self.get_item()
|
|
if not item:
|
|
return None
|
|
|
|
if isinstance(item, dict):
|
|
uri = item.get("uri")
|
|
if uri:
|
|
return uri
|
|
post = item.get("post") or item.get("record")
|
|
if isinstance(post, dict):
|
|
return post.get("uri")
|
|
return getattr(post, "uri", None)
|
|
|
|
return getattr(item, "uri", None) or getattr(getattr(item, "post", None), "uri", None)
|
|
|
|
def get_selected_item_cid(self):
|
|
item = self.get_item()
|
|
if not item:
|
|
return None
|
|
|
|
if isinstance(item, dict):
|
|
cid = item.get("cid")
|
|
if cid:
|
|
return cid
|
|
post = item.get("post") or item.get("record")
|
|
if isinstance(post, dict):
|
|
return post.get("cid")
|
|
return getattr(post, "cid", None)
|
|
|
|
return getattr(item, "cid", None) or getattr(getattr(item, "post", None), "cid", None)
|
|
|
|
def get_selected_item_author_details(self):
|
|
item = self.get_item()
|
|
if not item:
|
|
return None
|
|
|
|
def g(obj, key, default=None):
|
|
if isinstance(obj, dict):
|
|
return obj.get(key, default)
|
|
return getattr(obj, key, default)
|
|
|
|
author = None
|
|
|
|
# Check if item itself is a user object (UserBuffer, FollowersBuffer, etc.)
|
|
if g(item, "did") or g(item, "handle"):
|
|
author = item
|
|
else:
|
|
# Use the same pattern as compose_post: get actual_post first
|
|
# This handles FeedViewPost (item.post) and PostView (item itself)
|
|
actual_post = g(item, "post", item)
|
|
author = g(actual_post, "author")
|
|
|
|
# If still no author, try other nested structures
|
|
if not author:
|
|
# Try record.author
|
|
record = g(item, "record")
|
|
if record:
|
|
author = g(record, "author")
|
|
|
|
# Try subject (for notifications)
|
|
if not author:
|
|
subject = g(item, "subject")
|
|
if subject:
|
|
actual_subject = g(subject, "post", subject)
|
|
author = g(actual_subject, "author")
|
|
|
|
if not author:
|
|
return None
|
|
|
|
did = g(author, "did")
|
|
handle = g(author, "handle")
|
|
|
|
# Only return if we have at least did or handle
|
|
if not did and not handle:
|
|
return None
|
|
|
|
return {
|
|
"did": did,
|
|
"handle": handle,
|
|
}
|
|
|
|
def _hydrate_reply_handles(self, items):
|
|
"""Populate _reply_to_handle on items when reply metadata lacks hydrated author."""
|
|
if not items:
|
|
return
|
|
|
|
def g(obj, key, default=None):
|
|
if isinstance(obj, dict):
|
|
return obj.get(key, default)
|
|
return getattr(obj, key, default)
|
|
|
|
def set_handle(obj, handle):
|
|
if not obj or not handle:
|
|
return
|
|
if isinstance(obj, dict):
|
|
obj["_reply_to_handle"] = handle
|
|
else:
|
|
try:
|
|
setattr(obj, "_reply_to_handle", handle)
|
|
except Exception:
|
|
pass
|
|
|
|
def get_parent_uri(item):
|
|
actual_post = g(item, "post", item)
|
|
record = g(actual_post, "record", {}) or {}
|
|
reply = g(record, "reply", None)
|
|
if not reply:
|
|
return None
|
|
parent = g(reply, "parent", None) or reply
|
|
return g(parent, "uri", None)
|
|
|
|
unresolved_by_parent_uri = {}
|
|
for item in items:
|
|
if utils.extract_reply_to_handle(item):
|
|
continue
|
|
parent_uri = get_parent_uri(item)
|
|
if not parent_uri:
|
|
continue
|
|
unresolved_by_parent_uri.setdefault(parent_uri, []).append(item)
|
|
|
|
if not unresolved_by_parent_uri:
|
|
return
|
|
|
|
api = self.session._ensure_client()
|
|
if not api:
|
|
return
|
|
|
|
uris = list(unresolved_by_parent_uri.keys())
|
|
uri_to_handle = {}
|
|
chunk_size = 25
|
|
|
|
for start in range(0, len(uris), chunk_size):
|
|
chunk = uris[start:start + chunk_size]
|
|
try:
|
|
try:
|
|
res = api.app.bsky.feed.get_posts({"uris": chunk})
|
|
except Exception:
|
|
res = api.app.bsky.feed.get_posts(uris=chunk)
|
|
posts = list(getattr(res, "posts", None) or [])
|
|
for parent_post in posts:
|
|
uri = g(parent_post, "uri", None)
|
|
author = g(parent_post, "author", None) or {}
|
|
handle = g(author, "handle", None)
|
|
if uri and handle:
|
|
uri_to_handle[uri] = handle
|
|
except Exception as e:
|
|
log.debug("Could not hydrate reply handles for chunk: %s", e)
|
|
|
|
if not uri_to_handle:
|
|
return
|
|
|
|
for parent_uri, item_list in unresolved_by_parent_uri.items():
|
|
handle = uri_to_handle.get(parent_uri)
|
|
if not handle:
|
|
continue
|
|
for item in item_list:
|
|
set_handle(item, handle)
|
|
actual_post = g(item, "post", item)
|
|
set_handle(actual_post, handle)
|
|
|
|
def process_items(self, items, play_sound=True, avoid_autoreading=False):
|
|
"""
|
|
Process list of items (FeedViewPost objects), update DB, and update UI.
|
|
Returns number of new items.
|
|
"""
|
|
if not items:
|
|
return 0
|
|
|
|
# Identify new items
|
|
new_items = []
|
|
current_uris = set()
|
|
# Create a set of keys from existing db to check duplicates
|
|
def get_key(it):
|
|
if isinstance(it, dict):
|
|
post = it.get("post")
|
|
if isinstance(post, dict) and post.get("uri"):
|
|
return post.get("uri")
|
|
if it.get("uri"):
|
|
return it.get("uri")
|
|
for key in ("id", "cid", "rev", "convoId", "convo_id", "messageId", "message_id", "msgId", "msg_id"):
|
|
if it.get(key):
|
|
return it.get(key)
|
|
nested = it.get("message") or it.get("record")
|
|
if isinstance(nested, dict):
|
|
for key in ("id", "cid", "rev", "messageId", "message_id", "msgId", "msg_id"):
|
|
if nested.get(key):
|
|
return nested.get(key)
|
|
if it.get("did"):
|
|
return it.get("did")
|
|
if it.get("handle"):
|
|
return it.get("handle")
|
|
author = it.get("author")
|
|
if isinstance(author, dict):
|
|
return author.get("did") or author.get("handle")
|
|
# Chat message fallback — use str() to ensure stable hash/comparison
|
|
sent_at = it.get("sentAt") or it.get("sent_at") or it.get("createdAt") or it.get("created_at")
|
|
sender = it.get("sender") or (nested.get("sender") if isinstance(nested, dict) else {}) or {}
|
|
sender_did = sender.get("did") if isinstance(sender, dict) else None
|
|
text = it.get("text") or (nested.get("text") if isinstance(nested, dict) else None)
|
|
if sent_at or sender_did or text:
|
|
return (str(sent_at) if sent_at else None, str(sender_did) if sender_did else None, str(text) if text else None)
|
|
return None
|
|
post = getattr(it, "post", None)
|
|
if post is not None:
|
|
return getattr(post, "uri", None)
|
|
for attr in ("uri", "id", "cid", "rev", "convoId", "convo_id", "messageId", "message_id", "msgId", "msg_id", "did", "handle"):
|
|
val = getattr(it, attr, None)
|
|
if val:
|
|
return val
|
|
nested = getattr(it, "message", None) or getattr(it, "record", None)
|
|
if nested is not None:
|
|
for attr in ("id", "cid", "rev", "messageId", "message_id", "msgId", "msg_id"):
|
|
val = getattr(nested, attr, None)
|
|
if val:
|
|
return val
|
|
author = getattr(it, "author", None)
|
|
if author is not None:
|
|
return getattr(author, "did", None) or getattr(author, "handle", None)
|
|
sent_at = getattr(it, "sentAt", None) or getattr(it, "sent_at", None) or getattr(it, "createdAt", None) or getattr(it, "created_at", None)
|
|
sender = getattr(it, "sender", None) or (getattr(nested, "sender", None) if nested is not None else None)
|
|
sender_did = getattr(sender, "did", None) if sender is not None else None
|
|
text = getattr(it, "text", None) or (getattr(nested, "text", None) if nested is not None else None)
|
|
if sent_at or sender_did or text:
|
|
return (str(sent_at) if sent_at else None, str(sender_did) if sender_did else None, str(text) if text else None)
|
|
return None
|
|
|
|
for item in self.session.db[self.name]:
|
|
key = get_key(item)
|
|
if key:
|
|
current_uris.add(key)
|
|
|
|
for item in items:
|
|
key = get_key(item)
|
|
if key:
|
|
if key in current_uris:
|
|
continue
|
|
current_uris.add(key)
|
|
new_items.append(item)
|
|
|
|
if not new_items:
|
|
return 0
|
|
|
|
self._hydrate_reply_handles(new_items)
|
|
|
|
# Add to DB
|
|
# Reverse timeline setting
|
|
reverse = False
|
|
try: reverse = self.session.settings["general"].get("reverse_timelines", False)
|
|
except: pass
|
|
|
|
# If reverse (newest at top), we insert new items at index 0?
|
|
# Typically API returns newest first.
|
|
# If DB is [Newest ... Oldest] (Reverse order)
|
|
# Then we insert new items at 0.
|
|
# If DB is [Oldest ... Newest] (Normal order)
|
|
# Then we append new items at end.
|
|
|
|
# But traditionally APIs return [Newest ... Oldest].
|
|
# So 'items' list is [Newest ... Oldest].
|
|
|
|
if reverse: # Newest at top
|
|
# DB: [Newest (Index 0) ... Oldest]
|
|
# We want to insert 'new_items' at 0.
|
|
# But 'new_items' are also [Newest...Oldest]
|
|
# So duplicates check handled.
|
|
# We insert the whole block at 0?
|
|
for it in reversed(new_items): # Insert oldest of new first, so newest ends up at 0
|
|
self.session.db[self.name].insert(0, it)
|
|
else: # Oldest at top
|
|
# DB: [Oldest ... Newest]
|
|
# APIs return [Newest ... Oldest]
|
|
# We want to append them.
|
|
# So we append reversed(new_items)?
|
|
for it in reversed(new_items):
|
|
self.session.db[self.name].append(it)
|
|
|
|
# Update UI
|
|
self.put_items_on_list(len(new_items))
|
|
|
|
# Play sound
|
|
if play_sound and self.sound and not self.session.settings["sound"]["session_mute"]:
|
|
self.session.sound.play(self.sound)
|
|
|
|
# Auto-read for accessibility
|
|
if not avoid_autoreading and len(new_items) > 0:
|
|
self.auto_read(len(new_items))
|
|
|
|
return len(new_items)
|
|
|
|
def add_new_item(self, item):
|
|
"""Add a single new item from streaming."""
|
|
self._hydrate_reply_handles([item])
|
|
safe = True
|
|
relative_times = self.session.settings["general"].get("relative_times", False)
|
|
show_screen_names = self.session.settings["general"].get("show_screen_names", False)
|
|
post = self.compose_function(item, self.session.db, self.session.settings, relative_times, show_screen_names, safe=safe)
|
|
|
|
if self.session.settings["general"].get("reverse_timelines", False):
|
|
self.buffer.list.insert_item(True, *post)
|
|
self.session.db[self.name].insert(0, item)
|
|
else:
|
|
self.buffer.list.insert_item(False, *post)
|
|
self.session.db[self.name].append(item)
|
|
|
|
# Auto-read single item
|
|
if self.name in self.session.settings["other_buffers"].get("autoread_buffers", []) and \
|
|
self.name not in self.session.settings["other_buffers"].get("muted_buffers", []) and \
|
|
not self.session.settings["sound"].get("session_mute", False):
|
|
output.speak(" ".join(post[:2]),
|
|
speech=self.session.settings["reporting"].get("speech_reporting", True),
|
|
braille=self.session.settings["reporting"].get("braille_reporting", True))
|
|
|
|
def update_item(self, item, position):
|
|
"""Update an existing item at the specified position."""
|
|
safe = True
|
|
relative_times = self.session.settings["general"].get("relative_times", False)
|
|
show_screen_names = self.session.settings["general"].get("show_screen_names", False)
|
|
post = self.compose_function(item, self.session.db, self.session.settings, relative_times, show_screen_names, safe=safe)
|
|
self.buffer.list.list.SetItem(position, 1, post[1])
|
|
|
|
def open_in_browser(self, *args, **kwargs):
|
|
"""Open the current post in web browser."""
|
|
item = self.get_item()
|
|
if not item:
|
|
return
|
|
|
|
import webbrowser
|
|
|
|
def g(obj, key, default=None):
|
|
if isinstance(obj, dict):
|
|
return obj.get(key, default)
|
|
return getattr(obj, key, default)
|
|
|
|
uri = g(item, "uri") or g(g(item, "post"), "uri")
|
|
author = g(item, "author") or g(g(item, "post"), "author")
|
|
handle = g(author, "handle")
|
|
|
|
if uri and handle:
|
|
if "app.bsky.feed.post" in uri:
|
|
rkey = uri.split("/")[-1]
|
|
url = f"https://bsky.app/profile/{handle}/post/{rkey}"
|
|
output.speak(_("Opening item in web browser..."))
|
|
webbrowser.open(url)
|
|
return
|
|
|
|
# Fallback to profile
|
|
if handle:
|
|
url = f"https://bsky.app/profile/{handle}"
|
|
output.speak(_("Opening item in web browser..."))
|
|
webbrowser.open(url)
|
|
|
|
def save_positions(self):
|
|
try:
|
|
self.session.db[self.name+"_pos"] = self.buffer.list.get_selected()
|
|
except: pass
|
|
|
|
def clear_list(self):
|
|
dlg = commonMessageDialogs.clear_list()
|
|
if dlg == widgetUtils.YES:
|
|
self.session.db[self.name] = []
|
|
self.buffer.list.clear()
|
|
|
|
def remove_buffer(self, force=False):
|
|
if self.type in ("conversation", "chat_messages") or self.name.lower().startswith("conversation"):
|
|
if not force:
|
|
dlg = commonMessageDialogs.remove_buffer()
|
|
if dlg != widgetUtils.YES:
|
|
return False
|
|
try:
|
|
self.session.db.pop(self.name, None)
|
|
except Exception:
|
|
pass
|
|
return True
|
|
return False
|
|
|