diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 00000000..63da4213 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,7 @@ +{ + "permissions": { + "allow": [ + "Bash(ls:*)" + ] + } +} diff --git a/src/controller/blueski/handler.py b/src/controller/blueski/handler.py index df46eec0..a821e6a3 100644 --- a/src/controller/blueski/handler.py +++ b/src/controller/blueski/handler.py @@ -44,16 +44,26 @@ class Handler: start=True, kwargs=dict(parent=controller.view.nb, name="home_timeline", session=session) ) - # Following-only timeline (reverse-chronological) + # Home (Following-only timeline - reverse-chronological) pub.sendMessage( "createBuffer", buffer_type="following_timeline", session_type="blueski", - buffer_title=_("Following (Chronological)"), + buffer_title=_("Home"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="following_timeline", session=session) ) + # Mentions (replies, mentions, quotes) + pub.sendMessage( + "createBuffer", + buffer_type="MentionsBuffer", + session_type="blueski", + buffer_title=_("Mentions"), + parent_tab=root_position, + start=False, + kwargs=dict(parent=controller.view.nb, name="mentions", session=session) + ) # Notifications pub.sendMessage( "createBuffer", @@ -64,6 +74,16 @@ class Handler: start=False, kwargs=dict(parent=controller.view.nb, name="notifications", session=session) ) + # Sent posts + pub.sendMessage( + "createBuffer", + buffer_type="SentBuffer", + session_type="blueski", + buffer_title=_("Sent"), + parent_tab=root_position, + start=False, + kwargs=dict(parent=controller.view.nb, name="sent", session=session) + ) # Likes pub.sendMessage( "createBuffer", @@ -84,12 +104,12 @@ class Handler: start=False, kwargs=dict(parent=controller.view.nb, name="followers", session=session) ) - # Following (Users) + # Followings (Users you follow) pub.sendMessage( "createBuffer", buffer_type="FollowingBuffer", session_type="blueski", - buffer_title=_("Following (Users)"), + buffer_title=_("Followings"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="following", session=session) @@ -115,6 +135,12 @@ class Handler: kwargs=dict(parent=controller.view.nb, name="direct_messages", session=session) ) + # Start the background poller for real-time-like updates + try: + session.start_streaming() + except Exception: + logger.exception("Failed to start Bluesky streaming for session %s", name) + def start_buffer(self, controller, buffer): """Start a newly created Bluesky buffer.""" try: @@ -343,10 +369,10 @@ class Handler: """Standard action for delete key / menu item""" item = buffer.get_item() if not item: return - + uri = getattr(item, "uri", None) or (item.get("post", {}).get("uri") if isinstance(item, dict) else None) if not uri: return - + import wx if wx.MessageBox(_("Are you sure you want to delete this post?"), _("Delete post"), wx.YES_NO | wx.ICON_QUESTION) == wx.YES: if buffer.session.delete_post(uri): @@ -358,3 +384,54 @@ class Handler: else: import output output.speak(_("Failed to delete post.")) + + def search(self, controller, session): + """Open search dialog and create search buffer for results.""" + dlg = wx.TextEntryDialog( + controller.view, + _("Enter search term:"), + _("Search Bluesky") + ) + if dlg.ShowModal() != wx.ID_OK: + dlg.Destroy() + return + + query = dlg.GetValue().strip() + dlg.Destroy() + + if not query: + return + + # Create unique buffer name for this search + buffer_name = f"search_{query[:20]}" + account_name = session.get_name() + + # Check if buffer already exists + existing = controller.search_buffer(buffer_name, account_name) + if existing: + # Navigate to existing buffer + index = controller.view.search(buffer_name, account_name) + if index is not None: + controller.view.change_buffer(index) + # Refresh search + existing.search_query = query + existing.start_stream(mandatory=True, play_sound=False) + return + + # Create new search buffer + title = _("Search: {query}").format(query=query) + from pubsub import pub + pub.sendMessage( + "createBuffer", + buffer_type="SearchBuffer", + session_type="blueski", + buffer_title=title, + parent_tab=controller.view.search(account_name, account_name), + start=True, + kwargs=dict( + parent=controller.view.nb, + name=buffer_name, + session=session, + query=query + ) + ) diff --git a/src/controller/buffers/blueski/__init__.py b/src/controller/buffers/blueski/__init__.py index ea23ad64..7da31204 100644 --- a/src/controller/buffers/blueski/__init__.py +++ b/src/controller/buffers/blueski/__init__.py @@ -1,4 +1,13 @@ # -*- coding: utf-8 -*- -from .timeline import HomeTimeline, FollowingTimeline, NotificationBuffer, Conversation +from .timeline import ( + HomeTimeline, + FollowingTimeline, + NotificationBuffer, + Conversation, + LikesBuffer, + MentionsBuffer, + SentBuffer, + SearchBuffer, +) from .user import FollowersBuffer, FollowingBuffer, BlocksBuffer from .chat import ConversationListBuffer, ChatBuffer as ChatMessageBuffer diff --git a/src/controller/buffers/blueski/base.py b/src/controller/buffers/blueski/base.py index 51caac60..648eae8c 100644 --- a/src/controller/buffers/blueski/base.py +++ b/src/controller/buffers/blueski/base.py @@ -123,20 +123,84 @@ class BaseBuffer(base.Buffer): output.speak(_("Reposted.")) def on_like(self, evt): - self.toggle_favorite(confirm=True) + self.toggle_favorite(confirm=False) def toggle_favorite(self, confirm=False, *args, **kwargs): item = self.get_item() - if not item: return - uri = item.get("uri") if isinstance(item, dict) else getattr(item, "uri", None) + if not item: + output.speak(_("No item to like."), True) + return + + def g(obj, key, default=None): + if isinstance(obj, dict): + return obj.get(key, default) + return getattr(obj, key, default) + + uri = g(item, "uri") + if not uri: + post = g(item, "post") or g(item, "record") + uri = g(post, "uri") if post else None + + if not uri: + output.speak(_("Could not find post identifier."), True) + return if confirm: if wx.MessageBox(_("Like this post?"), _("Confirm"), wx.YES_NO | wx.ICON_QUESTION) != wx.YES: return - self.session.like(uri) + # Check if already liked + viewer = g(item, "viewer") + already_liked = g(viewer, "like") if viewer else None + + if already_liked: + output.speak(_("Already liked."), True) + return + + # Perform the like + like_uri = self.session.like(uri) + if not like_uri: + output.speak(_("Failed to like post."), True) + return + output.speak(_("Liked.")) + # Update the viewer state in the item + if isinstance(item, dict): + if "viewer" not in item: + item["viewer"] = {} + item["viewer"]["like"] = like_uri + else: + # For SDK models, create or update viewer + if not hasattr(item, "viewer") or item.viewer is None: + # Create a simple object to hold the like state + class Viewer: + def __init__(self): + self.like = None + item.viewer = Viewer() + item.viewer.like = like_uri + + # Refresh the displayed item in the list + try: + index = self.buffer.list.get_selected() + if index > -1: + # Recompose and update the list item + safe = True + relative_times = self.session.settings["general"].get("relative_times", False) + show_screen_names = self.session.settings["general"].get("show_screen_names", False) + post_data = self.compose_function(item, self.session.db, self.session.settings, + relative_times=relative_times, + show_screen_names=show_screen_names, + safe=safe) + + # Update the item in place (only 3 columns: Author, Post, Date) + self.buffer.list.list.SetItem(index, 0, post_data[0]) # Author + self.buffer.list.list.SetItem(index, 1, post_data[1]) # Text (with ♥ indicator) + self.buffer.list.list.SetItem(index, 2, post_data[2]) # Date + # Note: compose_post returns 4 items but list only has 3 columns + except Exception: + log.exception("Error refreshing list item after like") + def add_to_favorites(self, *args, **kwargs): self.toggle_favorite(confirm=False) @@ -172,8 +236,9 @@ class BaseBuffer(base.Buffer): if text: try: api = self.session._ensure_client() + dm_client = api.with_bsky_chat_proxy() # Get or create conversation - res = api.chat.bsky.convo.get_convo_for_members({"members": [did]}) + res = dm_client.chat.bsky.convo.get_convo_for_members({"members": [did]}) convo_id = res.convo.id self.session.send_chat_message(convo_id, text) output.speak(_("Message sent."), True) @@ -186,6 +251,46 @@ class BaseBuffer(base.Buffer): # If showing, we'll just open the chat buffer for now as it's more structured self.view_chat_with_user(did, handle) + def url(self, *args, **kwargs): + item = self.get_item() + if not item: return + + import webbrowser + + def g(obj, key, default=None): + if isinstance(obj, dict): + return obj.get(key, default) + return getattr(obj, key, default) + + uri = g(item, "uri") + author = g(item, "author") or g(g(item, "post"), "author") + handle = g(author, "handle") + + if uri and handle: + # URI format: at://did:plc:xxx/app.bsky.feed.post/rkey + if "app.bsky.feed.post" in uri: + rkey = uri.split("/")[-1] + url = f"https://bsky.app/profile/{handle}/post/{rkey}" + webbrowser.open(url) + return + elif "app.bsky.feed.like" in uri: + # It's a like notification, try to get the subject + subject = g(item, "subject") + subject_uri = g(subject, "uri") if subject else None + if subject_uri: + rkey = subject_uri.split("/")[-1] + # We might not have the handle of the post author here easily if it's not in the notification + # But let's try... + # Actually, notification items usually have enough info or we can't deep direct link easily without fetching. + # For now, let's just open the profile of the liker + pass + + # Fallback to profile + if handle: + url = f"https://bsky.app/profile/{handle}" + webbrowser.open(url) + return + def user_actions(self, *args, **kwargs): pub.sendMessage("execute-action", action="follow") diff --git a/src/controller/buffers/blueski/timeline.py b/src/controller/buffers/blueski/timeline.py index a1d9b310..9252f090 100644 --- a/src/controller/buffers/blueski/timeline.py +++ b/src/controller/buffers/blueski/timeline.py @@ -100,28 +100,43 @@ class FollowingTimeline(BaseBuffer): class NotificationBuffer(BaseBuffer): def __init__(self, *args, **kwargs): + # Override compose_func before calling super().__init__ + kwargs["compose_func"] = "compose_notification" super(NotificationBuffer, self).__init__(*args, **kwargs) self.type = "notifications" - + self.sound = "notification_received.ogg" + def create_buffer(self, parent, name): - self.buffer = BlueskiPanels.NotificationPanel(parent, name) + self.buffer = BlueskiPanels.NotificationPanel(parent, name) self.buffer.session = self.session def start_stream(self, mandatory=False, play_sound=True): - count = 50 - api = self.session._ensure_client() - try: - res = api.app.bsky.notification.list_notifications({"limit": count}) - notifs = getattr(res, "notifications", []) - items = [] - # Notifications are not FeedViewPost. They have different structure. - # self.compose_function expects FeedViewPost-like structure (post, author, etc). - # We need to map them or have a different compose function. - # For now, let's skip items to avoid crash - # Or attempt to map. - except: - return 0 - return 0 + count = 50 + try: + count = self.session.settings["general"].get("max_posts_per_call", 50) + except Exception: + pass + + api = self.session._ensure_client() + if not api: + return 0 + + try: + res = api.app.bsky.notification.list_notifications({"limit": count}) + notifications = getattr(res, "notifications", []) + if not notifications: + return 0 + + # Process notifications using the notification compose function + return self.process_items(list(notifications), play_sound) + + except Exception: + log.exception("Error fetching Bluesky notifications") + return 0 + + def add_new_item(self, notification): + """Add a single new notification from streaming/polling.""" + return self.process_items([notification], play_sound=True) class Conversation(BaseBuffer): def __init__(self, *args, **kwargs): @@ -207,3 +222,154 @@ class LikesBuffer(BaseBuffer): return 0 return self.process_items(list(items), play_sound) + + +class MentionsBuffer(BaseBuffer): + """Buffer for mentions and replies to the current user.""" + + def __init__(self, *args, **kwargs): + # Use notification compose function since mentions come from notifications + kwargs["compose_func"] = "compose_notification" + super(MentionsBuffer, self).__init__(*args, **kwargs) + self.type = "mentions" + self.sound = "mention_received.ogg" + + def create_buffer(self, parent, name): + self.buffer = BlueskiPanels.NotificationPanel(parent, name) + self.buffer.session = self.session + + def start_stream(self, mandatory=False, play_sound=True): + count = 50 + try: + count = self.session.settings["general"].get("max_posts_per_call", 50) + except Exception: + pass + + api = self.session._ensure_client() + if not api: + return 0 + + try: + res = api.app.bsky.notification.list_notifications({"limit": count}) + notifications = getattr(res, "notifications", []) + if not notifications: + return 0 + + # Filter only mentions and replies + mentions = [ + n for n in notifications + if getattr(n, "reason", "") in ("mention", "reply", "quote") + ] + + if not mentions: + return 0 + + return self.process_items(mentions, play_sound) + + except Exception: + log.exception("Error fetching Bluesky mentions") + return 0 + + def add_new_item(self, notification): + """Add a single new mention from streaming/polling.""" + reason = getattr(notification, "reason", "") + if reason in ("mention", "reply", "quote"): + return self.process_items([notification], play_sound=True) + return 0 + + +class SentBuffer(BaseBuffer): + """Buffer for posts sent by the current user.""" + + def __init__(self, *args, **kwargs): + super(SentBuffer, self).__init__(*args, **kwargs) + self.type = "sent" + + def create_buffer(self, parent, name): + self.buffer = BlueskiPanels.HomePanel(parent, name) + self.buffer.session = self.session + + def start_stream(self, mandatory=False, play_sound=True): + count = 50 + try: + count = self.session.settings["general"].get("max_posts_per_call", 50) + except Exception: + pass + + api = self.session._ensure_client() + if not api or not api.me: + return 0 + + try: + # Get author's own posts (excluding replies) + res = api.app.bsky.feed.get_author_feed({ + "actor": api.me.did, + "limit": count, + "filter": "posts_no_replies" + }) + items = getattr(res, "feed", []) + + if not items: + return 0 + + return self.process_items(list(items), play_sound) + + except Exception: + log.exception("Error fetching sent posts") + return 0 + + +class SearchBuffer(BaseBuffer): + """Buffer for search results (posts).""" + + def __init__(self, *args, **kwargs): + self.search_query = kwargs.pop("query", "") + super(SearchBuffer, self).__init__(*args, **kwargs) + self.type = "search" + + def create_buffer(self, parent, name): + self.buffer = BlueskiPanels.HomePanel(parent, name) + self.buffer.session = self.session + + def start_stream(self, mandatory=False, play_sound=True): + if not self.search_query: + return 0 + + count = 50 + try: + count = self.session.settings["general"].get("max_posts_per_call", 50) + except Exception: + pass + + api = self.session._ensure_client() + if not api: + return 0 + + try: + # Search posts + res = api.app.bsky.feed.search_posts({ + "q": self.search_query, + "limit": count + }) + posts = getattr(res, "posts", []) + + if not posts: + return 0 + + # Clear existing results for new search + self.session.db[self.name] = [] + self.buffer.list.clear() + + return self.process_items(list(posts), play_sound) + + except Exception: + log.exception("Error searching Bluesky posts") + return 0 + + def remove_buffer(self, force=False): + """Search buffers can always be removed.""" + try: + self.session.db.pop(self.name, None) + except Exception: + pass + return True diff --git a/src/controller/mainController.py b/src/controller/mainController.py index 92f60d09..5585eac9 100644 --- a/src/controller/mainController.py +++ b/src/controller/mainController.py @@ -128,6 +128,9 @@ class Controller(object): pub.subscribe(self.mastodon_new_conversation, "mastodon.conversation_received") pub.subscribe(self.mastodon_error_post, "mastodon.error_post") + # Bluesky specific events. + pub.subscribe(self.blueski_new_item, "blueski.new_item") + # connect application events to GUI widgetUtils.connect_event(self.view, widgetUtils.CLOSE_EVENT, self.exit_) widgetUtils.connect_event(self.view, widgetUtils.MENU, self.show_hide, menuitem=self.view.show_hide) @@ -388,6 +391,12 @@ class Controller(object): "notifications": BlueskiTimelines.NotificationBuffer, "conversation": BlueskiTimelines.Conversation, "likes": BlueskiTimelines.LikesBuffer, + "MentionsBuffer": BlueskiTimelines.MentionsBuffer, + "mentions": BlueskiTimelines.MentionsBuffer, + "SentBuffer": BlueskiTimelines.SentBuffer, + "sent": BlueskiTimelines.SentBuffer, + "SearchBuffer": BlueskiTimelines.SearchBuffer, + "search": BlueskiTimelines.SearchBuffer, "UserBuffer": BlueskiUsers.UserBuffer, "FollowersBuffer": BlueskiUsers.FollowersBuffer, "FollowingBuffer": BlueskiUsers.FollowingBuffer, @@ -757,10 +766,10 @@ class Controller(object): dlg.Destroy() if not text: return - try: - uri = session.send_message(text, reply_to=selected_item_uri, reply_to_cid=selected_item_cid) - if uri: - output.speak(_("Reply sent."), True) + try: + uri = session.send_message(text, reply_to=selected_item_uri, reply_to_cid=selected_item_cid) + if uri: + output.speak(_("Reply sent."), True) else: output.speak(_("Failed to send reply."), True) except Exception: @@ -902,29 +911,13 @@ class Controller(object): buffer = self.get_current_buffer() if hasattr(buffer, "add_to_favorites"): # Generic buffer method return buffer.add_to_favorites() + elif hasattr(buffer, "toggle_favorite"): + return buffer.toggle_favorite() elif buffer.session and buffer.session.KIND == "blueski": - item_uri = buffer.get_selected_item_id() - if not item_uri: - output.speak(_("No item selected to like."), True) - return - social_handler = self.get_handler(buffer.session.KIND) - async def _like(): - result = await social_handler.like_item(buffer.session, item_uri) - wx.CallAfter(output.speak, result["message"], True) # Ensure UI updates on main thread - if result.get("status") == "success" and result.get("like_uri"): - if hasattr(buffer, "store_item_viewer_state"): - # Ensure store_item_viewer_state is called on main thread if it modifies UI/shared data - wx.CallAfter(buffer.store_item_viewer_state, item_uri, "like_uri", result["like_uri"]) - # Also update the item in message_cache to reflect the like - if buffer.session and hasattr(buffer.session, "message_cache") and item_uri in buffer.session.message_cache: - cached_post = buffer.session.message_cache[item_uri] - if isinstance(cached_post, dict) and isinstance(cached_post.get("viewer"), dict): - cached_post["viewer"]["like"] = result["like_uri"] - elif hasattr(cached_post, "viewer") and cached_post.viewer: # SDK model - cached_post.viewer.like = result["like_uri"] - # No need to call buffer.update_item here unless it re-renders from scratch - # The visual feedback might come from a list refresh or specific item update later - asyncio.create_task(_like()) # wx.CallAfter for the task itself if _like might interact with UI before await + # Fallback if buffer doesn't have the method but session is blueski (e.g. ChatBuffer) + # Chat messages can't be liked yet in this implementation, or handled by specific buffer + output.speak(_("This item cannot be liked."), True) + return def remove_from_favourites(self, *args, **kwargs): @@ -1475,7 +1468,12 @@ class Controller(object): output.speak(_(u"Updating buffer..."), True) session = bf.session - async def do_update(): + output.speak(_(u"Updating buffer..."), True) + session = bf.session + + import threading + + def do_update_sync(): new_ids = [] try: if session.KIND == "blueski": @@ -1483,26 +1481,34 @@ class Controller(object): count = bf.start_stream(mandatory=True) if count: new_ids = [str(x) for x in range(count)] else: - output.speak(_(u"This buffer type cannot be updated."), True) + wx.CallAfter(output.speak, _(u"This buffer type cannot be updated."), True) return else: # Generic fallback for other sessions + # If they are async, this might be tricky in a thread without a loop + # But most old sessions in TWBlue are sync (using threads) if hasattr(bf, "start_stream"): count = bf.start_stream(mandatory=True, avoid_autoreading=True) if count: new_ids = [str(x) for x in range(count)] else: - output.speak(_(u"Unable to update this buffer."), True) + wx.CallAfter(output.speak, _(u"Unable to update this buffer."), True) return # Generic feedback - if bf.type in ["home_timeline", "user_timeline"]: - output.speak(_("{0} posts retrieved").format(len(new_ids)), True) - elif bf.type == "notifications": - output.speak(_("Notifications updated."), True) + if bf.type in ["home_timeline", "user_timeline", "notifications", "mentions"]: + wx.CallAfter(output.speak, _("{0} new items.").format(len(new_ids)), True) + except Exception as e: log.exception("Error updating buffer %s", bf.name) - output.speak(_("An error occurred while updating the buffer."), True) - - wx.CallAfter(asyncio.create_task, do_update()) + wx.CallAfter(output.speak, _("An error occurred while updating the buffer."), True) + + if session.KIND == "blueski": + threading.Thread(target=do_update_sync).start() + else: + # Original async logic for others if needed, but likely they are sync too. + # Assuming TWBlue architecture is mostly thread-based for legacy sessions. + # If we have an async loop running, we could use it for async-capable sessions. + # For safety, let's use the thread approach generally if we are not sure about the loop state. + threading.Thread(target=do_update_sync).start() def get_more_items(self, *args, **kwargs): @@ -1637,6 +1643,33 @@ class Controller(object): # if "direct_messages" not in buffer.session.settings["other_buffers"]["muted_buffers"]: # self.notify(buffer.session, sound_to_play) + def blueski_new_item(self, item, session_name, _buffers): + """Handle new items from Bluesky polling.""" + sound_to_play = None + for buff in _buffers: + buffer = self.search_buffer(buff, session_name) + if buffer is None or buffer.session.get_name() != session_name: + continue + if hasattr(buffer, "add_new_item"): + buffer.add_new_item(item) + # Determine sound to play + if buff == "notifications": + sound_to_play = "notification_received.ogg" + elif buff == "home_timeline": + sound_to_play = "tweet_received.ogg" + elif "timeline" in buff: + sound_to_play = "tweet_timeline.ogg" + else: + sound_to_play = None + # Play sound if buffer is not muted + if sound_to_play is not None: + try: + muted = buffer.session.settings["other_buffers"].get("muted_buffers", []) + if buff not in muted: + self.notify(buffer.session, sound_to_play) + except Exception: + pass + def mastodon_error_post(self, name, reply_to, visibility, posts, language): home = self.search_buffer("home_timeline", name) if home != None: diff --git a/src/controller/mastodon/messages.py b/src/controller/mastodon/messages.py index 63c6a7da..5a4648bb 100644 --- a/src/controller/mastodon/messages.py +++ b/src/controller/mastodon/messages.py @@ -21,7 +21,7 @@ def character_count(post_text, post_cw, character_limit=500): # We will use text for counting character limit only. full_text = post_text+post_cw # find remote users as Mastodon doesn't count the domain in char limit. - users = re.findall("@[\w\.-]+@[\w\.-]+", full_text) + users = re.findall(r"@[\w\.-]+@[\w\.-]+", full_text) for user in users: domain = user.split("@")[-1] full_text = full_text.replace("@"+domain, "") diff --git a/src/controller/mastodon/templateEditor.py b/src/controller/mastodon/templateEditor.py index 330a304c..df49ee6c 100644 --- a/src/controller/mastodon/templateEditor.py +++ b/src/controller/mastodon/templateEditor.py @@ -20,7 +20,7 @@ class EditTemplate(object): self.template: str = template def validate_template(self, template: str) -> bool: - used_variables: List[str] = re.findall("\$\w+", template) + used_variables: List[str] = re.findall(r"\$\w+", template) validated: bool = True for var in used_variables: if var[1:] not in self.variables: diff --git a/src/sessions/blueski/compose.py b/src/sessions/blueski/compose.py index f1b338ec..ae0035ea 100644 --- a/src/sessions/blueski/compose.py +++ b/src/sessions/blueski/compose.py @@ -1,297 +1,64 @@ # -*- coding: utf-8 -*- -from __future__ import annotations +""" +Compose functions for Bluesky content display in TWBlue. + +These functions format API data into user-readable strings for display in +list controls. They follow the TWBlue compose function pattern: + compose_function(item, db, relative_times, show_screen_names, session) + Returns a list of strings for display columns. +""" import logging -from typing import TYPE_CHECKING, Any -from datetime import datetime import arrow import languageHandler -if TYPE_CHECKING: - from sessions.blueski.session import Session as BlueskiSession - from atproto.xrpc_client import models # For type hinting ATProto models - -logger = logging.getLogger(__name__) - -# For SUPPORTED_LANG_CHOICES in composeDialog.py -SUPPORTED_LANG_CHOICES_COMPOSE = { - _("English"): "en", _("Spanish"): "es", _("French"): "fr", _("German"): "de", - _("Japanese"): "ja", _("Portuguese"): "pt", _("Russian"): "ru", _("Chinese"): "zh", -} - - -class BlueskiCompose: - MAX_CHARS = 300 - MAX_MEDIA_ATTACHMENTS = 4 - MAX_LANGUAGES = 3 - MAX_IMAGE_SIZE_BYTES = 1_000_000 - - def __init__(self, session: BlueskiSession) -> None: - self.session = session - self.supported_media_types: list[str] = ["image/jpeg", "image/png"] - self.max_image_size_bytes: int = self.MAX_IMAGE_SIZE_BYTES - - def get_panel_configuration(self) -> dict[str, Any]: - """Returns configuration for the compose panel specific to Blueski.""" - return { - "max_chars": self.MAX_CHARS, - "max_media_attachments": self.MAX_MEDIA_ATTACHMENTS, - "supports_content_warning": True, - "supports_scheduled_posts": False, - "supported_media_types": self.supported_media_types, - "max_media_size_bytes": self.max_image_size_bytes, - "supports_alternative_text": True, - "sensitive_reasons_options": self.session.get_sensitive_reason_options(), - "supports_language_selection": True, - "max_languages": self.MAX_LANGUAGES, - "supports_quoting": True, - "supports_polls": False, - } - - async def get_quote_text(self, message_id: str, url: str) -> str | None: - return "" - - async def get_reply_text(self, message_id: str, author_handle: str) -> str | None: - if not author_handle.startswith("@"): - return f"@{author_handle} " - return f"{author_handle} " - - def get_text_formatting_rules(self) -> dict[str, Any]: - return { - "markdown_enabled": False, - "custom_emojis_enabled": False, - "max_length": self.MAX_CHARS, - "line_break_char": "\n", - "link_format": "Full URL (e.g., https://example.com)", - "mention_format": "@handle.bsky.social", - "tag_format": "#tag (becomes a facet link)", - } - - def is_media_type_supported(self, mime_type: str) -> bool: - return mime_type.lower() in self.supported_media_types - - def get_max_schedule_date(self) -> str | None: - return None - - def get_poll_configuration(self) -> dict[str, Any] | None: - return None - - def compose_post_for_display(self, post_data: dict[str, Any], session_settings: dict[str, Any] | None = None) -> str: - """ - Composes a string representation of a Bluesky post for display in UI timelines. - """ - if not post_data or not isinstance(post_data, dict): - return _("Invalid post data.") - - author_info = post_data.get("author", {}) - record = post_data.get("record", {}) - embed_data = post_data.get("embed") - viewer_state = post_data.get("viewer", {}) - - display_name = author_info.get("displayName", "") or author_info.get("handle", _("Unknown User")) - handle = author_info.get("handle", _("unknown.handle")) - - post_text = getattr(record, 'text', '') if not isinstance(record, dict) else record.get('text', '') - - reason = post_data.get("reason") - if reason: - rtype = getattr(reason, "$type", "") if not isinstance(reason, dict) else reason.get("$type", "") - if not rtype and not isinstance(reason, dict): - rtype = getattr(reason, "py_type", "") - if rtype and "reasonRepost" in rtype: - by = getattr(reason, "by", None) if not isinstance(reason, dict) else reason.get("by") - by_handle = getattr(by, "handle", "") if by and not isinstance(by, dict) else (by.get("handle", "") if by else "") - reason_line = _("Reposted by @{handle}").format(handle=by_handle) if by_handle else _("Reposted") - post_text = f"{reason_line}\n{post_text}" if post_text else reason_line - - created_at_str = getattr(record, 'createdAt', '') if not isinstance(record, dict) else record.get('createdAt', '') - timestamp_str = "" - if created_at_str: - try: - ts = arrow.get(created_at_str) - timestamp_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2]) - except Exception as e: - logger.debug(f"Could not parse timestamp {created_at_str}: {e}") - timestamp_str = created_at_str - - header = f"{display_name} (@{handle}) - {timestamp_str}" - - labels = post_data.get("labels", []) - spoiler_text = None - is_sensitive_post = False - if labels: - for label_obj in labels: - label_val = getattr(label_obj, 'val', '') if not isinstance(label_obj, dict) else label_obj.get('val', '') - if label_val == "!warn": - is_sensitive_post = True - elif label_val in ["porn", "sexual", "nudity", "gore", "graphic-media", "corpse", "self-harm", "hate", "spam", "impersonation"]: - is_sensitive_post = True - if not spoiler_text: spoiler_text = _("Sensitive Content: {label}").format(label=label_val) - elif label_val.startswith("warn:") and len(label_val) > 5: - spoiler_text = label_val.split("warn:", 1)[-1].strip() - is_sensitive_post = True - - post_text_display = post_text - if spoiler_text: - post_text_display = f"CW: {spoiler_text}\n\n{post_text}" - elif is_sensitive_post and not spoiler_text: - post_text_display = f"CW: {_('Sensitive Content')}\n\n{post_text}" - - embed_display = "" - if embed_data: - embed_type = getattr(embed_data, '$type', '') - if not embed_type and isinstance(embed_data, dict): embed_type = embed_data.get('$type', '') - - if embed_type in ['app.bsky.embed.images#view', 'app.bsky.embed.images']: - images = getattr(embed_data, 'images', []) if hasattr(embed_data, 'images') else embed_data.get('images', []) - if images: - img_count = len(images) - alt_texts_present = any(getattr(img, 'alt', '') for img in images if hasattr(img, 'alt')) or \ - any(img_dict.get('alt', '') for img_dict in images if isinstance(img_dict, dict)) - embed_display += f"\n[{img_count} Image" - if img_count > 1: embed_display += "s" - if alt_texts_present: embed_display += _(" (Alt text available)") - embed_display += "]" - - elif embed_type in ['app.bsky.embed.record#view', 'app.bsky.embed.record', 'app.bsky.embed.recordWithMedia#view', 'app.bsky.embed.recordWithMedia']: - record_embed_data = getattr(embed_data, 'record', None) if hasattr(embed_data, 'record') else embed_data.get('record', None) - if record_embed_data and isinstance(record_embed_data, dict): - record_embed_data = record_embed_data.get("record") or record_embed_data - record_embed_type = getattr(record_embed_data, '$type', '') - if not record_embed_type and isinstance(record_embed_data, dict): record_embed_type = record_embed_data.get('$type', '') - - if record_embed_type == 'app.bsky.embed.record#viewNotFound': - embed_display += f"\n[{_('Quoted post not found or unavailable')}]" - elif record_embed_type == 'app.bsky.embed.record#viewBlocked': - embed_display += f"\n[{_('Content from the quoted account is blocked')}]" - elif record_embed_data and (isinstance(record_embed_data, dict) or hasattr(record_embed_data, 'author')): - quote_author_info = getattr(record_embed_data, 'author', record_embed_data.get('author')) - quote_value = getattr(record_embed_data, 'value', record_embed_data.get('value')) - - if quote_author_info and quote_value: - quote_author_handle = getattr(quote_author_info, 'handle', 'unknown') - quote_text_content = getattr(quote_value, 'text', '') if not isinstance(quote_value, dict) else quote_value.get('text', '') - quote_text_snippet = (quote_text_content[:75] + "...") if quote_text_content else _("post content") - embed_display += f"\n[ {_('Quote by')} @{quote_author_handle}: \"{quote_text_snippet}\" ]" - else: - embed_display += f"\n[{_('Quoted Post')}]" - - elif embed_type in ['app.bsky.embed.external#view', 'app.bsky.embed.external']: - external_data = getattr(embed_data, 'external', None) if hasattr(embed_data, 'external') else embed_data.get('external', None) - if external_data: - ext_uri = getattr(external_data, 'uri', _('External Link')) - ext_title = getattr(external_data, 'title', '') or ext_uri - embed_display += f"\n[{_('Link')}: {ext_title}]" - - reply_context_str = "" - actual_record = post_data.get("record", {}) - reply_ref = getattr(actual_record, 'reply', None) if not isinstance(actual_record, dict) else actual_record.get('reply') - - if reply_ref: - reply_context_str = f"[{_('In reply to a post')}] " - - counts_str_parts = [] - reply_count = post_data.get("replyCount", 0) - repost_count = post_data.get("repostCount", 0) - like_count = post_data.get("likeCount", 0) - - if reply_count > 0: counts_str_parts.append(f"{_('Replies')}: {reply_count}") - if repost_count > 0: counts_str_parts.append(f"{_('Reposts')}: {repost_count}") - if like_count > 0: counts_str_parts.append(f"{_('Likes')}: {like_count}") - - viewer_liked_uri = viewer_state.get("like") if isinstance(viewer_state, dict) else getattr(viewer_state, 'like', None) - viewer_reposted_uri = viewer_state.get("repost") if isinstance(viewer_state, dict) else getattr(viewer_state, 'repost', None) - - if viewer_liked_uri: counts_str_parts.append(f"({_('Liked by you')})") - if viewer_reposted_uri: counts_str_parts.append(f"({_('Reposted by you')})") - - counts_line = "" - if counts_str_parts: - counts_line = "\n" + " | ".join(counts_str_parts) - - full_display = f"{header}\n{reply_context_str}{post_text_display}{embed_display}{counts_line}" - return full_display.strip() - - def compose_notification_for_display(self, notif_data: dict[str, Any]) -> str: - """ - Composes a string representation of a Bluesky notification for display. - - Args: - notif_data: A dictionary representing the notification, - typically from BlueskiSession._handle_*_notification methods - which create an approve.notifications.Notification object and then - convert it to dict or pass relevant parts. - Expected keys: 'title', 'body', 'author_name', 'timestamp_dt', 'kind'. - The 'title' usually already contains the core action. - Returns: - A formatted string for display. - """ - if not notif_data or not isinstance(notif_data, dict): - return _("Invalid notification data.") - - title = notif_data.get('title', _("Notification")) - body = notif_data.get('body', '') - author_name = notif_data.get('author_name') # Author of the action (e.g. who liked) - timestamp_dt = notif_data.get('timestamp_dt') # datetime object - - timestamp_str = "" - if timestamp_dt and isinstance(timestamp_dt, datetime): - try: - timestamp_str = timestamp_dt.strftime("%I:%M %p - %b %d, %Y") - except Exception as e: - logger.debug(f"Could not format notification timestamp {timestamp_dt}: {e}") - timestamp_str = str(timestamp_dt) - - display_parts = [] - if timestamp_str: - display_parts.append(f"[{timestamp_str}]") - - # Title already contains good info like "UserX liked your post" - display_parts.append(title) - - if body: # Body might be text of a reply/mention/quote - # Truncate body if too long for a list display - body_snippet = (body[:100] + "...") if len(body) > 103 else body - display_parts.append(f"\"{body_snippet}\"") - - return " ".join(display_parts).strip() +log = logging.getLogger("sessions.blueski.compose") def compose_post(post, db, settings, relative_times, show_screen_names=False, safe=True): """ - Compose a Bluesky post into a list of strings [User, Text, Date, Source]. - post: dict or ATProto model object. + Compose a Bluesky post into a list of strings for display. + + Args: + post: dict or ATProto model object (FeedViewPost or PostView) + db: Session database dict + settings: Session settings + relative_times: If True, use relative time formatting + show_screen_names: If True, show only @handle instead of display name + safe: If True, handle exceptions gracefully + + Returns: + List of strings: [User, Text, Date, Source] """ - # Extract data using getattr for models or .get for dicts def g(obj, key, default=None): + """Helper to get attribute from dict or object.""" if isinstance(obj, dict): return obj.get(key, default) return getattr(obj, key, default) # Resolve Post View or Feed View structure - # Feed items often have .post field. Direct post objects don't. - actual_post = g(post, "post", post) - + # Feed items have .post field, direct post objects don't + actual_post = g(post, "post", post) + record = g(actual_post, "record", {}) author = g(actual_post, "author", {}) - + # Author handle = g(author, "handle", "") display_name = g(author, "displayName") or g(author, "display_name") or handle or "Unknown" - + if show_screen_names: user_str = f"@{handle}" else: - # "Display Name (@handle)" if handle and display_name != handle: user_str = f"{display_name} (@{handle})" else: user_str = f"@{handle}" - + # Text text = g(record, "text", "") - # Repost reason (so users know why they see an unfamiliar post) + # Repost reason reason = g(post, "reason", None) if reason: rtype = g(reason, "$type") or g(reason, "py_type") @@ -300,113 +67,196 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa by_handle = g(by, "handle", "") reason_line = _("Reposted by @{handle}").format(handle=by_handle) if by_handle else _("Reposted") text = f"{reason_line}\n{text}" if text else reason_line - + # Labels / Content Warning labels = g(actual_post, "labels", []) cw_text = "" - is_sensitive = False - + for label in labels: val = g(label, "val", "") if val in ["!warn", "porn", "sexual", "nudity", "gore", "graphic-media", "corpse", "self-harm", "hate", "spam", "impersonation"]: - is_sensitive = True - if not cw_text: cw_text = _("Sensitive Content") + if not cw_text: + cw_text = _("Sensitive Content") elif val.startswith("warn:"): - is_sensitive = True cw_text = val.split("warn:", 1)[-1].strip() if cw_text: text = f"CW: {cw_text}\n\n{text}" - - # Embeds (Images, Quotes) + + # Embeds (Images, Quotes, Links) embed = g(actual_post, "embed", None) if embed: etype = g(embed, "$type") or g(embed, "py_type") + + # Images if etype and ("images" in etype): images = g(embed, "images", []) if images: text += f"\n[{len(images)} {_('Images')}]" - - # Handle Record (Quote) or RecordWithMedia (Quote + Media) + + # Quote posts quote_rec = None if etype and ("recordWithMedia" in etype): - # Extract the nested record - rec_embed = g(embed, "record", {}) - if rec_embed: - quote_rec = g(rec_embed, "record", None) or rec_embed - # Also check for media in the wrapper - media = g(embed, "media", {}) - mtype = g(media, "$type") or g(media, "py_type") - if mtype and "images" in mtype: - images = g(media, "images", []) - if images: text += f"\n[{len(images)} {_('Images')}]" - - elif etype and ("record" in etype): - # Direct quote - quote_rec = g(embed, "record", {}) - if isinstance(quote_rec, dict): - quote_rec = quote_rec.get("record") or quote_rec - - if quote_rec: - # It is likely a ViewRecord - # Check type (ViewRecord, ViewNotFound, ViewBlocked, etc) - qtype = g(quote_rec, "$type") or g(quote_rec, "py_type") - - if qtype and "viewNotFound" in qtype: - text += f"\n[{_('Quoted post not found')}]" - elif qtype and "viewBlocked" in qtype: - text += f"\n[{_('Quoted post blocked')}]" - elif qtype and "generatorView" in qtype: - # Feed generator - gen = g(quote_rec, "displayName", "Feed") - text += f"\n[{_('Quoting Feed')}: {gen}]" - else: - # Assume ViewRecord - q_author = g(quote_rec, "author", {}) - q_handle = g(q_author, "handle", "unknown") - - q_val = g(quote_rec, "value", {}) - q_text = g(q_val, "text", "") + rec_embed = g(embed, "record", {}) + if rec_embed: + quote_rec = g(rec_embed, "record", None) or rec_embed + # Media in wrapper + media = g(embed, "media", {}) + mtype = g(media, "$type") or g(media, "py_type") + if mtype and "images" in mtype: + images = g(media, "images", []) + if images: + text += f"\n[{len(images)} {_('Images')}]" - if q_text: - text += f"\n[{_('Quoting')} @{q_handle}: {q_text}]" - else: - text += f"\n[{_('Quoting')} @{q_handle}]" + elif etype and ("record" in etype): + quote_rec = g(embed, "record", {}) + if isinstance(quote_rec, dict): + quote_rec = quote_rec.get("record") or quote_rec + + if quote_rec: + qtype = g(quote_rec, "$type") or g(quote_rec, "py_type") + + if qtype and "viewNotFound" in qtype: + text += f"\n[{_('Quoted post not found')}]" + elif qtype and "viewBlocked" in qtype: + text += f"\n[{_('Quoted post blocked')}]" + elif qtype and "generatorView" in qtype: + gen = g(quote_rec, "displayName", "Feed") + text += f"\n[{_('Quoting Feed')}: {gen}]" + else: + q_author = g(quote_rec, "author", {}) + q_handle = g(q_author, "handle", "unknown") + q_val = g(quote_rec, "value", {}) + q_text = g(q_val, "text", "") + + if q_text: + text += f"\n[{_('Quoting')} @{q_handle}: {q_text}]" + else: + text += f"\n[{_('Quoting')} @{q_handle}]" elif etype and ("external" in etype): ext = g(embed, "external", {}) - uri = g(ext, "uri", "") title = g(ext, "title", "") text += f"\n[{_('Link')}: {title}]" # Date - indexed_at = g(actual_post, "indexed_at", "") + indexed_at = g(actual_post, "indexed_at", "") or g(actual_post, "indexedAt", "") ts_str = "" if indexed_at: try: - # Try arrow parsing - import arrow - ts = arrow.get(indexed_at) - if relative_times: - ts_str = ts.humanize(locale=languageHandler.curLang[:2]) - else: - ts_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2]) + ts = arrow.get(indexed_at) + if relative_times: + ts_str = ts.humanize(locale=languageHandler.curLang[:2]) + else: + ts_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2]) except Exception: - ts_str = str(indexed_at)[:16].replace("T", " ") + ts_str = str(indexed_at)[:16].replace("T", " ") - # Source (not always available in Bsky view, often just client) - # We'll leave it empty or mock it if needed + # Source / Client source = "Bluesky" - + + # Viewer state (liked, reposted, etc.) + viewer_indicators = [] + viewer = g(actual_post, "viewer") or g(post, "viewer") + if viewer: + if g(viewer, "like"): + viewer_indicators.append("♥") # Liked + if g(viewer, "repost"): + viewer_indicators.append("🔁") # Reposted + + # Add viewer indicators to the source column or create a prefix for text + if viewer_indicators: + indicator_str = " ".join(viewer_indicators) + # Add to beginning of text for visibility + text = f"{indicator_str} {text}" return [user_str, text, ts_str, source] + +def compose_notification(notification, db, settings, relative_times, show_screen_names=False, safe=True): + """ + Compose a Bluesky notification into a list of strings for display. + + Args: + notification: ATProto notification object + db: Session database dict + settings: Session settings + relative_times: If True, use relative time formatting + show_screen_names: If True, show only @handle + safe: If True, handle exceptions gracefully + + Returns: + List of strings: [User, Action/Text, Date] + """ + def g(obj, key, default=None): + if isinstance(obj, dict): + return obj.get(key, default) + return getattr(obj, key, default) + + # Author of the notification (who performed the action) + author = g(notification, "author", {}) + handle = g(author, "handle", "unknown") + display_name = g(author, "displayName") or g(author, "display_name") or handle + + if show_screen_names: + user_str = f"@{handle}" + else: + user_str = f"{display_name} (@{handle})" + + # Notification reason/type + reason = g(notification, "reason", "unknown") + + # Map reason to user-readable text + reason_text_map = { + "like": _("liked your post"), + "repost": _("reposted your post"), + "follow": _("followed you"), + "mention": _("mentioned you"), + "reply": _("replied to you"), + "quote": _("quoted your post"), + "starterpack-joined": _("joined your starter pack"), + } + + action_text = reason_text_map.get(reason, reason) + + # For mentions/replies/quotes, include snippet of the text + record = g(notification, "record", {}) + post_text = g(record, "text", "") + if post_text and reason in ["mention", "reply", "quote"]: + snippet = post_text[:100] + "..." if len(post_text) > 100 else post_text + action_text = f"{action_text}: {snippet}" + + # Date + indexed_at = g(notification, "indexedAt", "") or g(notification, "indexed_at", "") + ts_str = "" + if indexed_at: + try: + ts = arrow.get(indexed_at) + if relative_times: + ts_str = ts.humanize(locale=languageHandler.curLang[:2]) + else: + ts_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2]) + except Exception: + ts_str = str(indexed_at)[:16].replace("T", " ") + + return [user_str, action_text, ts_str] + + def compose_user(user, db, settings, relative_times, show_screen_names=False, safe=True): """ - Compose a Bluesky user for list display. - Returns: [User summary string] + Compose a Bluesky user profile for list display. + + Args: + user: User profile dict or ATProto model + db: Session database dict + settings: Session settings + relative_times: If True, use relative time formatting + show_screen_names: If True, show only @handle + safe: If True, handle exceptions gracefully + + Returns: + List of strings: [User summary] """ - # Extract data using getattr for models or .get for dicts def g(obj, key, default=None): if isinstance(obj, dict): return obj.get(key, default) @@ -422,7 +272,6 @@ def compose_user(user, db, settings, relative_times, show_screen_names=False, sa ts = "" if created_at: try: - import arrow original_date = arrow.get(created_at) if relative_times: ts = original_date.humanize(locale=languageHandler.curLang[:2]) @@ -442,10 +291,21 @@ def compose_user(user, db, settings, relative_times, show_screen_names=False, sa return [" ".join(parts).strip()] + def compose_convo(convo, db, settings, relative_times, show_screen_names=False, safe=True): """ Compose a Bluesky chat conversation for list display. - Returns: [Participants, Last Message, Date] + + Args: + convo: Conversation dict or ATProto model + db: Session database dict + settings: Session settings + relative_times: If True, use relative time formatting + show_screen_names: If True, show only @handle + safe: If True, handle exceptions gracefully + + Returns: + List of strings: [Participants, Last Message, Date] """ def g(obj, key, default=None): if isinstance(obj, dict): @@ -454,6 +314,8 @@ def compose_convo(convo, db, settings, relative_times, show_screen_names=False, members = g(convo, "members", []) self_did = db.get("user_id") if isinstance(db, dict) else None + + # Get other participants (exclude self) others = [] for m in members: did = g(m, "did", None) @@ -461,35 +323,36 @@ def compose_convo(convo, db, settings, relative_times, show_screen_names=False, continue label = g(m, "displayName") or g(m, "display_name") or g(m, "handle", "unknown") others.append(label) + if not others: others = [g(m, "displayName") or g(m, "display_name") or g(m, "handle", "unknown") for m in members] + participants = ", ".join(others) - + + # Last message last_msg_obj = g(convo, "lastMessage") or g(convo, "last_message") last_text = "" last_sender = "" + if last_msg_obj: last_text = g(last_msg_obj, "text", "") sender = g(last_msg_obj, "sender", None) if sender: last_sender = g(sender, "displayName") or g(sender, "display_name") or g(sender, "handle", "") - - # Date (using lastMessage.sentAt) + + # Date date_str = "" - sent_at = None if last_msg_obj: sent_at = g(last_msg_obj, "sentAt") or g(last_msg_obj, "sent_at") - - if sent_at: - try: - import arrow - ts = arrow.get(sent_at) - if relative_times: - date_str = ts.humanize(locale=languageHandler.curLang[:2]) - else: - date_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2]) - except: - date_str = str(sent_at)[:16] + if sent_at: + try: + ts = arrow.get(sent_at) + if relative_times: + date_str = ts.humanize(locale=languageHandler.curLang[:2]) + else: + date_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2]) + except Exception: + date_str = str(sent_at)[:16] if last_sender and last_text: last_text = _("Last message from {user}: {text}").format(user=last_sender, text=last_text) @@ -498,10 +361,21 @@ def compose_convo(convo, db, settings, relative_times, show_screen_names=False, return [participants, last_text, date_str] + def compose_chat_message(msg, db, settings, relative_times, show_screen_names=False, safe=True): """ - Compose an individual chat message for display in a thread. - Returns: [Sender, Text, Date] + Compose an individual chat message for display. + + Args: + msg: Chat message dict or ATProto model + db: Session database dict + settings: Session settings + relative_times: If True, use relative time formatting + show_screen_names: If True, show only @handle + safe: If True, handle exceptions gracefully + + Returns: + List of strings: [Sender, Text, Date] """ def g(obj, key, default=None): if isinstance(obj, dict): @@ -510,20 +384,20 @@ def compose_chat_message(msg, db, settings, relative_times, show_screen_names=Fa sender = g(msg, "sender", {}) handle = g(sender, "displayName") or g(sender, "display_name") or g(sender, "handle", "unknown") - + text = g(msg, "text", "") - + + # Date sent_at = g(msg, "sentAt") or g(msg, "sent_at") date_str = "" if sent_at: try: - import arrow ts = arrow.get(sent_at) if relative_times: date_str = ts.humanize(locale=languageHandler.curLang[:2]) else: date_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2]) - except: + except Exception: date_str = str(sent_at)[:16] - + return [handle, text, date_str] diff --git a/src/sessions/blueski/session.py b/src/sessions/blueski/session.py index 8d1b8df9..0db7dbc5 100644 --- a/src/sessions/blueski/session.py +++ b/src/sessions/blueski/session.py @@ -6,6 +6,8 @@ from typing import Any import wx +from pubsub import pub + from sessions import base from sessions import session_exceptions as Exceptions import output @@ -36,6 +38,9 @@ class Session(base.baseSession): self.type = "blueski" self.char_limit = 300 self.api = None + self.poller = None + # Subscribe to pub/sub events from the poller + pub.subscribe(self.on_notification, "blueski.notification_received") def _ensure_settings_namespace(self) -> None: """Migrate legacy atprotosocial settings to blueski namespace.""" @@ -648,19 +653,119 @@ class Session(base.baseSession): def list_convos(self, limit: int = 50, cursor: str | None = None) -> dict[str, Any]: api = self._ensure_client() - res = api.chat.bsky.convo.list_convos({"limit": limit, "cursor": cursor}) + # Chat API requires using the chat proxy + dm_client = api.with_bsky_chat_proxy() + res = dm_client.chat.bsky.convo.list_convos({"limit": limit, "cursor": cursor}) return {"items": res.convos, "cursor": res.cursor} def get_convo_messages(self, convo_id: str, limit: int = 50, cursor: str | None = None) -> dict[str, Any]: api = self._ensure_client() - res = api.chat.bsky.convo.get_messages({"convoId": convo_id, "limit": limit, "cursor": cursor}) + dm_client = api.with_bsky_chat_proxy() + res = dm_client.chat.bsky.convo.get_messages({"convoId": convo_id, "limit": limit, "cursor": cursor}) return {"items": res.messages, "cursor": res.cursor} def send_chat_message(self, convo_id: str, text: str) -> Any: api = self._ensure_client() - return api.chat.bsky.convo.send_message({ + dm_client = api.with_bsky_chat_proxy() + return dm_client.chat.bsky.convo.send_message({ "convoId": convo_id, "message": { "text": text } }) + + # Streaming/Polling methods + + def start_streaming(self): + """Start the background poller for notifications.""" + if not self.logged: + log.debug("Cannot start Bluesky poller: not logged in.") + return + + if self.poller is not None and self.poller.is_alive(): + log.debug("Bluesky poller already running for %s", self.get_name()) + return + + try: + from sessions.blueski.streaming import BlueskyPoller + poll_interval = 60 + try: + poll_interval = self.settings["general"].get("update_period", 60) + except Exception: + pass + + self.poller = BlueskyPoller( + session=self, + session_name=self.get_name(), + poll_interval=poll_interval + ) + self.poller.start() + log.info("Started Bluesky poller for session %s", self.get_name()) + except Exception: + log.exception("Failed to start Bluesky poller") + + def stop_streaming(self): + """Stop the background poller.""" + if self.poller is not None: + self.poller.stop() + self.poller = None + log.info("Stopped Bluesky poller for session %s", self.get_name()) + + def on_notification(self, notification, session_name): + """Handle notification received from the poller via pub/sub.""" + # Discard if notification is for a different session + if self.get_name() != session_name: + return + + # Add notification to the notifications buffer + try: + num = self.order_buffer("notifications", [notification]) + if num > 0: + pub.sendMessage( + "blueski.new_item", + session_name=self.get_name(), + item=notification, + _buffers=["notifications"] + ) + except Exception: + log.exception("Error processing Bluesky notification") + + def order_buffer(self, buffer_name, items): + """Add items to the specified buffer's database. + + Returns the number of new items added. + """ + if buffer_name not in self.db: + self.db[buffer_name] = [] + + # Get existing URIs to avoid duplicates + existing_uris = set() + for item in self.db[buffer_name]: + uri = None + if isinstance(item, dict): + uri = item.get("uri") + else: + uri = getattr(item, "uri", None) + if uri: + existing_uris.add(uri) + + # Add new items + new_count = 0 + for item in items: + uri = None + if isinstance(item, dict): + uri = item.get("uri") + else: + uri = getattr(item, "uri", None) + + if uri and uri in existing_uris: + continue + + if uri: + existing_uris.add(uri) + + # Insert at beginning (newest first) + self.db[buffer_name].insert(0, item) + new_count += 1 + + return new_count diff --git a/src/sessions/blueski/streaming.py b/src/sessions/blueski/streaming.py index ebb97920..409b6144 100644 --- a/src/sessions/blueski/streaming.py +++ b/src/sessions/blueski/streaming.py @@ -1,209 +1,196 @@ -from __future__ import annotations +# -*- coding: utf-8 -*- +""" +Bluesky polling-based update system for TWBlue. + +Since Bluesky's Firehose requires complex CAR/CBOR decoding and filtering +of millions of events, we use a polling approach instead of true streaming. +This matches the existing start_stream() pattern used by buffers. + +Events are published via pub/sub to maintain consistency with Mastodon's +streaming implementation. +""" -import asyncio import logging -from typing import TYPE_CHECKING, Any, Callable, Coroutine +import threading +import time +from pubsub import pub -if TYPE_CHECKING: - fromapprove.sessions.blueski.session import Session as BlueskiSession - -logger = logging.getLogger(__name__) - -# Blueski (Bluesky) uses a Firehose model for streaming. -# This typically involves connecting to a WebSocket endpoint and receiving events. -# The atproto SDK provides tools for this. +log = logging.getLogger("sessions.blueski.streaming") -class BlueskiStreaming: - def __init__(self, session: BlueskiSession, stream_type: str, params: dict[str, Any] | None = None) -> None: +class BlueskyPoller: + """ + Polling-based update system for Bluesky. + + Periodically checks for new notifications and publishes them via pub/sub. + This provides a similar interface to Mastodon's StreamListener but uses + polling instead of WebSocket streaming. + """ + + def __init__(self, session, session_name, poll_interval=60): + """ + Initialize the poller. + + Args: + session: The Bluesky session instance + session_name: Unique identifier for this session (for pub/sub routing) + poll_interval: Seconds between API polls (default 60, min 30) + """ self.session = session - self.stream_type = stream_type # e.g., 'user', 'public', 'hashtag' - will need mapping to Firehose concepts - self.params = params or {} - self._handler: Callable[[dict[str, Any]], Coroutine[Any, Any, None]] | None = None - self._connection_task: asyncio.Task[None] | None = None - self._should_stop = False - # self._client = None # This would be an instance of atproto.firehose.FirehoseSubscribeReposClient or similar + self.session_name = session_name + self.poll_interval = max(30, poll_interval) # Minimum 30 seconds to respect rate limits - # TODO: Map stream_type and params to ATProto Firehose subscription needs. - # For example, 'user' might mean subscribing to mentions, replies, follows for the logged-in user. - # This would likely involve filtering the general repo firehose for relevant events, - # or using a more specific subscription if available for user-level events. + self._stop_event = threading.Event() + self._thread = None + self._last_notification_cursor = None + self._last_seen_notification_uri = None - async def _connect(self) -> None: - """Internal method to connect to the Blueski Firehose.""" - # from atproto import AsyncClient - # from atproto.firehose import FirehoseSubscribeReposClient, parse_subscribe_repos_message - # from atproto.xrpc_client.models import get_or_create, ids, models - - logger.info(f"Blueski streaming: Connecting to Firehose for user {self.session.user_id}, stream type {self.stream_type}") - self._should_stop = False - - try: - # TODO: Replace with actual atproto SDK usage - # client = self.session.util.get_client() # Get authenticated client from session utils - # if not client or not client.me: # Check if client is authenticated - # logger.error("Blueski client not authenticated. Cannot start Firehose.") - # return - - # self._firehose_client = FirehoseSubscribeReposClient(params=None, base_uri=self.session.api_base_url) # Adjust base_uri if needed - - # async def on_message_handler(message: models.ComAtprotoSyncSubscribeRepos.Message) -> None: - # if self._should_stop: - # await self._firehose_client.stop() # Ensure client stops if flag is set - # return - - # # This is a simplified example. Real implementation needs to: - # # 1. Determine the type of message (commit, handle, info, migrate, tombstone) - # # 2. For commits, unpack operations to find posts, likes, reposts, follows, etc. - # # 3. Filter these events to be relevant to the user (e.g., mentions, replies to user, new posts from followed users) - # # 4. Format the data into a structure that self._handle_event expects. - # # This filtering can be complex. - - # # Example: if it's a commit and contains a new post that mentions the user - # # if isinstance(message, models.ComAtprotoSyncSubscribeRepos.Commit): - # # # This part is highly complex due to CAR CIBOR decoding - # # # Operations need to be extracted from the commit block - # # # For each op, check if it's a create, and if the record is a post - # # # Then, check if the post's text or facets mention the current user. - # # # This is a placeholder for that logic. - # # logger.debug(f"Firehose commit from {message.repo} at {message.time}") - # # # Example of processing ops (pseudo-code, actual decoding is more involved): - # # # ops = message.ops - # # # for op in ops: - # # # if op.action == 'create' and op.path.endswith('/app.bsky.feed.post/...'): - # # # record_data = ... # decode op.cid from message.blocks - # # # if self.session.util.is_mention_of_me(record_data): - # # # event_data = self.session.util.format_post_event(record_data) - # # # await self._handle_event("mention", event_data) - - # # For now, we'll just log that a message was received - # logger.debug(f"Blueski Firehose message received: {message.__class__.__name__}") - - - # await self._firehose_client.start(on_message_handler) - - # Placeholder loop to simulate receiving events - while not self._should_stop: - await asyncio.sleep(1) - # In a real implementation, this loop wouldn't exist; it'd be driven by the SDK's event handler. - # To simulate an event: - # if self._handler: - # mock_event = {"type": "placeholder_event", "data": {"text": "Hello from mock stream"}} - # await self._handler(mock_event) # Call the registered handler - - logger.info(f"Blueski streaming: Placeholder loop for {self.session.user_id} stopped.") - - - except asyncio.CancelledError: - logger.info(f"Blueski streaming task for user {self.session.user_id} was cancelled.") - except Exception as e: - logger.error(f"Blueski streaming error for user {self.session.user_id}: {e}", exc_info=True) - # Optional: implement retry logic here or in the start_streaming method - if not self._should_stop: - await asyncio.sleep(30) # Wait before trying to reconnect (if auto-reconnect is desired) - if not self._should_stop: # Check again before restarting - self._connection_task = asyncio.create_task(self._connect()) - - - finally: - # if self._firehose_client: - # await self._firehose_client.stop() - logger.info(f"Blueski streaming connection closed for user {self.session.user_id}.") - - - async def _handle_event(self, event_type: str, data: dict[str, Any]) -> None: - """ - Internal method to process an event from the stream and pass it to the session's handler. - """ - if self._handler: - try: - # The data should be transformed into a common format expected by session.handle_streaming_event - # This is where Blueski-specific event data is mapped to Approve's internal event structure. - # For example, an Blueski 'mention' event needs to be structured similarly to - # how a Mastodon 'mention' event would be. - await self.session.handle_streaming_event(event_type, data) - except Exception as e: - logger.error(f"Error handling Blueski streaming event type {event_type}: {e}", exc_info=True) - else: - logger.warning(f"Blueski streaming: No handler registered for session {self.session.user_id}, event: {event_type}") - - - def start_streaming(self, handler: Callable[[dict[str, Any]], Coroutine[Any, Any, None]]) -> None: - """Starts the streaming connection.""" - if self._connection_task and not self._connection_task.done(): - logger.warning(f"Blueski streaming already active for user {self.session.user_id}.") + def start(self): + """Start the polling thread.""" + if self._thread is not None and self._thread.is_alive(): + log.warning(f"Bluesky poller for {self.session_name} is already running.") return - self._handler = handler # This handler is what session.py's handle_streaming_event calls - self._should_stop = False - logger.info(f"Blueski streaming: Starting for user {self.session.user_id}, type: {self.stream_type}") - self._connection_task = asyncio.create_task(self._connect()) + self._stop_event.clear() + self._thread = threading.Thread( + target=self._poll_loop, + name=f"BlueskyPoller-{self.session_name}", + daemon=True + ) + self._thread.start() + log.info(f"Bluesky poller started for {self.session_name} (interval: {self.poll_interval}s)") + def stop(self): + """Stop the polling thread.""" + if self._thread is None: + return - async def stop_streaming(self) -> None: - """Stops the streaming connection.""" - logger.info(f"Blueski streaming: Stopping for user {self.session.user_id}") - self._should_stop = True - # if self._firehose_client: # Assuming the SDK has a stop method - # await self._firehose_client.stop() + self._stop_event.set() + self._thread.join(timeout=5) + self._thread = None + log.info(f"Bluesky poller stopped for {self.session_name}") - if self._connection_task: - if not self._connection_task.done(): - self._connection_task.cancel() + def is_alive(self): + """Check if the polling thread is running.""" + return self._thread is not None and self._thread.is_alive() + + def _poll_loop(self): + """Main polling loop running in background thread.""" + log.debug(f"Polling loop started for {self.session_name}") + + # Initial delay to let the app fully initialize + time.sleep(5) + + while not self._stop_event.is_set(): try: - await self._connection_task - except asyncio.CancelledError: - logger.info(f"Blueski streaming task successfully cancelled for {self.session.user_id}.") - self._connection_task = None - self._handler = None - logger.info(f"Blueski streaming stopped for user {self.session.user_id}.") + self._check_notifications() + except Exception as e: + log.exception(f"Error in Bluesky polling loop for {self.session_name}: {e}") - def is_alive(self) -> bool: - """Checks if the streaming connection is currently active.""" - # return self._connection_task is not None and not self._connection_task.done() and self._firehose_client and self._firehose_client.is_connected - return self._connection_task is not None and not self._connection_task.done() # Simplified check + # Wait for next poll interval, checking stop event periodically + for _ in range(self.poll_interval): + if self._stop_event.is_set(): + break + time.sleep(1) - def get_stream_type(self) -> str: - return self.stream_type + log.debug(f"Polling loop ended for {self.session_name}") - def get_params(self) -> dict[str, Any]: - return self.params + def _check_notifications(self): + """Check for new notifications and publish events.""" + if not self.session.logged: + return - # TODO: Add methods specific to Blueski streaming if necessary, - # e.g., methods to modify subscription details on the fly if the API supports it. - # For Bluesky Firehose, this might not be applicable as you usually connect and filter client-side. - # However, if there were different Firehose endpoints (e.g., one for public posts, one for user-specific events), - # this class might manage multiple connections or re-establish with new parameters. + try: + api = self.session._ensure_client() + if not api: + return - # Example of how events might be processed (highly simplified): - # This would be called by the on_message_handler in _connect - # async def _process_firehose_message(self, message: models.ComAtprotoSyncSubscribeRepos.Message): - # if isinstance(message, models.ComAtprotoSyncSubscribeRepos.Commit): - # # Decode CAR file in message.blocks to get ops - # # For each op (create, update, delete of a record): - # # record = get_record_from_blocks(message.blocks, op.cid) - # # if op.path.startswith("app.bsky.feed.post"): # It's a post - # # # Check if it's a new post, a reply, a quote, etc. - # # # Check for mentions of the current user - # # # Example: - # # if self.session.util.is_mention_of_me(record): - # # formatted_event = self.session.util.format_post_as_notification(record, "mention") - # # await self._handle_event("mention", formatted_event) - # # elif op.path.startswith("app.bsky.graph.follow"): - # # # Check if it's a follow of the current user - # # if record.subject == self.session.util.get_my_did(): # Assuming get_my_did() exists - # # formatted_event = self.session.util.format_follow_as_notification(record) - # # await self._handle_event("follow", formatted_event) - # # # Handle likes (app.bsky.feed.like), reposts (app.bsky.feed.repost), etc. - # pass - # elif isinstance(message, models.ComAtprotoSyncSubscribeRepos.Handle): - # # Handle DID to handle mapping updates if necessary - # logger.debug(f"Handle update: {message.handle} now points to {message.did} at {message.time}") - # elif isinstance(message, models.ComAtprotoSyncSubscribeRepos.Migrate): - # logger.info(f"Repo migration: {message.did} migrating from {message.migrateTo} at {message.time}") - # elif isinstance(message, models.ComAtprotoSyncSubscribeRepos.Tombstone): - # logger.info(f"Repo tombstone: {message.did} at {message.time}") - # elif isinstance(message, models.ComAtprotoSyncSubscribeRepos.Info): - # logger.info(f"Firehose info: {message.name} - {message.message}") - # else: - # logger.debug(f"Unknown Firehose message type: {message.__class__.__name__}") + # Fetch recent notifications + res = api.app.bsky.notification.list_notifications({"limit": 20}) + notifications = getattr(res, "notifications", []) + + if not notifications: + return + + # Track which notifications are new + new_notifications = [] + newest_uri = None + + for notif in notifications: + uri = getattr(notif, "uri", None) + if not uri: + continue + + # First time running - just record the newest and don't flood + if self._last_seen_notification_uri is None: + newest_uri = uri + break + + # Check if we've seen this notification before + if uri == self._last_seen_notification_uri: + break + + new_notifications.append(notif) + if newest_uri is None: + newest_uri = uri + + # Update last seen + if newest_uri: + self._last_seen_notification_uri = newest_uri + + # Publish new notifications (in reverse order so oldest first) + for notif in reversed(new_notifications): + self._publish_notification(notif) + + except Exception as e: + log.debug(f"Error checking notifications for {self.session_name}: {e}") + + def _publish_notification(self, notification): + """Publish a notification event via pub/sub.""" + try: + reason = getattr(notification, "reason", "unknown") + log.debug(f"Publishing Bluesky notification: {reason} for {self.session_name}") + + pub.sendMessage( + "blueski.notification_received", + notification=notification, + session_name=self.session_name + ) + + # Also publish specific events for certain notification types + if reason == "mention": + pub.sendMessage( + "blueski.mention_received", + notification=notification, + session_name=self.session_name + ) + elif reason == "reply": + pub.sendMessage( + "blueski.reply_received", + notification=notification, + session_name=self.session_name + ) + elif reason == "follow": + pub.sendMessage( + "blueski.follow_received", + notification=notification, + session_name=self.session_name + ) + + except Exception as e: + log.exception(f"Error publishing notification event: {e}") + + +def create_poller(session, session_name, poll_interval=60): + """ + Factory function to create a BlueskyPoller instance. + + Args: + session: The Bluesky session instance + session_name: Unique identifier for this session + poll_interval: Seconds between polls (default 60) + + Returns: + BlueskyPoller instance + """ + return BlueskyPoller(session, session_name, poll_interval) diff --git a/src/sessions/blueski/templates.py b/src/sessions/blueski/templates.py deleted file mode 100644 index ce4f81c8..00000000 --- a/src/sessions/blueski/templates.py +++ /dev/null @@ -1,123 +0,0 @@ -from __future__ import annotations - -import logging -from typing import TYPE_CHECKING, Any - -fromapprove.translation import translate as _ - -if TYPE_CHECKING: - fromapprove.sessions.blueski.session import Session as BlueskiSession - -logger = logging.getLogger(__name__) - - -class BlueskiTemplates: - def __init__(self, session: BlueskiSession) -> None: - self.session = session - - def get_template_data(self, template_name: str, context: dict[str, Any] | None = None) -> dict[str, Any]: - """ - Returns data required for rendering a specific template for Blueski. - This method would populate template variables based on the template name and context. - """ - base_data = { - "session_kind": self.session.kind, - "session_label": self.session.label, - "user_id": self.session.user_id, - # Add any other common data needed by Blueski templates - } - if context: - base_data.update(context) - - # TODO: Implement specific data fetching for different Blueski templates - # Example: - # if template_name == "profile_summary.html": - # # profile_info = await self.session.util.get_my_profile_info() # Assuming such a method exists - # # base_data["profile"] = profile_info - # base_data["profile"] = {"display_name": "User", "handle": "user.bsky.social"} # Placeholder - # elif template_name == "post_details.html": - # # post_id = context.get("post_id") - # # post_details = await self.session.util.get_post_by_id(post_id) - # # base_data["post"] = post_details - # base_data["post"] = {"text": "A sample post", "author_handle": "author.bsky.social"} # Placeholder - - return base_data - - def get_message_card_template(self) -> str: - """Returns the path to the message card template for Blueski.""" - # This template would define how a single Blueski post (or other message type) - # is rendered in a list (e.g., in a timeline or search results). - # return "sessions/blueski/cards/message.html" # Example path - return "sessions/generic/cards/message_generic.html" # Placeholder, use generic if no specific yet - - def get_notification_template_map(self) -> dict[str, str]: - """ - Returns a map of Blueski notification types to their respective template paths. - """ - # TODO: Define templates for different Blueski notification types - # (e.g., mention, reply, new follower, like, repost). - # The keys should match the notification types used internally by Approve - # when processing Blueski events. - # Example: - # return { - # "mention": "sessions/blueski/notifications/mention.html", - # "reply": "sessions/blueski/notifications/reply.html", - # "follow": "sessions/blueski/notifications/follow.html", - # "like": "sessions/blueski/notifications/like.html", # Bluesky uses 'like' - # "repost": "sessions/blueski/notifications/repost.html", # Bluesky uses 'repost' - # # ... other notification types - # } - # Using generic templates as placeholders: - return { - "mention": "sessions/generic/notifications/mention.html", - "reply": "sessions/generic/notifications/reply.html", - "follow": "sessions/generic/notifications/follow.html", - "like": "sessions/generic/notifications/favourite.html", # Map to favourite if generic expects that - "repost": "sessions/generic/notifications/reblog.html", # Map to reblog if generic expects that - } - - def get_settings_template(self) -> str | None: - """Returns the path to the settings template for Blueski, if any.""" - # This template would be used to render Blueski-specific settings in the UI. - # return "sessions/blueski/settings.html" - return "sessions/generic/settings_auth_password.html" # If using simple handle/password auth - - def get_user_action_templates(self) -> dict[str, str] | None: - """ - Returns a map of user action identifiers to their template paths for Blueski. - User actions are typically buttons or forms displayed on a user's profile. - """ - # TODO: Define templates for Blueski user actions - # Example: - # return { - # "view_profile_on_bsky": "sessions/blueski/actions/view_profile_button.html", - # "send_direct_message": "sessions/blueski/actions/send_dm_form.html", # If DMs are supported - # } - return None # Placeholder - - def get_user_list_action_templates(self) -> dict[str, str] | None: - """ - Returns a map of user list action identifiers to their template paths for Blueski. - These actions might appear on lists of users (e.g., followers, following). - """ - # TODO: Define templates for Blueski user list actions - # Example: - # return { - # "follow_all_visible": "sessions/blueski/list_actions/follow_all_button.html", - # } - return None # Placeholder - - # Add any other template-related helper methods specific to Blueski. - # For example, methods to get templates for specific types of content (images, polls) - # if they need special rendering. - - def get_template_for_message_type(self, message_type: str) -> str | None: - """ - Returns a specific template path for a given message type (e.g., post, reply, quote). - This can be useful if different types of messages need distinct rendering beyond the standard card. - """ - # TODO: Define specific templates if Blueski messages have varied structures - # that require different display logic. - # if message_type == "quote_post": - # return "sessions/blueski/cards/quote_post.html" - return None # Default to standard message card if not specified diff --git a/src/sessions/blueski/utils.py b/src/sessions/blueski/utils.py deleted file mode 100644 index b7c0e92c..00000000 --- a/src/sessions/blueski/utils.py +++ /dev/null @@ -1,1168 +0,0 @@ -from __future__ import annotations - -import logging -from typing import TYPE_CHECKING, Any - -from atproto import AsyncClient # Removed Client as AsyncClient is used -from atproto.exceptions import AtProtocolError, NetworkError, RequestException # Specific exceptions -from atproto.xrpc_client import models -from atproto.lexicon import models as lexicon_models # For lexicon definitions like com.atproto.moderation.defs -from atproto.xrpc_client.models import ids # For collection IDs like ids.AppBskyFeedPost - - -fromapprove.util import GenerateID, get_http_client, httpx_max_retries_hook -fromapprove.translation import translate as _ -fromapprove.notifications import NotificationError - - -if TYPE_CHECKING: - fromapprove.sessions.blueski.session import Session as BlueskiSession - # Define common type aliases if needed - ATUserProfile = models.AppBskyActorDefs.ProfileViewDetailed - ATPost = models.AppBskyFeedDefs.PostView - ATNotification = models.AppBskyNotificationListNotifications.Notification # Item in notification list - # StrongRef might be models.ComAtprotoRepoStrongRef.Main or similar - - -logger = logging.getLogger(__name__) - - -class BlueskiUtils: - def __init__(self, session: BlueskiSession) -> None: - self.session = session - # _own_did and _own_handle are now set by Session.login upon successful authentication - # and directly on the util instance. - self._own_did: str | None = self.session.db.get("did") or self.session.config_get("did") - self._own_handle: str | None = self.session.db.get("handle") or self.session.config_get("handle") - - # --- Client Initialization and Management --- - - async def _get_client(self) -> AsyncClient | None: - """Returns the authenticated ATProto AsyncClient from the session.""" - if self.session.client and self.session.is_ready(): # is_ready checks if client is authenticated - # Ensure internal DID/handle are in sync if possible, though session.login should handle this. - if not self._own_did and self.session.client.me: - self._own_did = self.session.client.me.did - if not self._own_handle and self.session.client.me: - self._own_handle = self.session.client.me.handle - return self.session.client - - logger.warning("BlueskiUtils: Client not available or not authenticated.") - # Optionally, try to trigger re-authentication if appropriate, - # but generally, the caller should ensure session is ready. - # For example, by calling session.start() or session.authorise() - # if await self.session.authorise(): # This could trigger UI prompts, be careful - # return self.session.client - return None - - async def get_own_profile_info(self) -> ATUserProfile | None: - """Retrieves the authenticated user's profile information.""" - client = await self._get_client() - if not client or not self.get_own_did(): # Use getter for _own_did - logger.warning("Blueski client not available or user DID not known.") - return None - try: - # client.me should be populated after login by the SDK - if client.me: # client.me is ProfileViewDetailed after login - # To get the most detailed, up-to-date profile: - response = await client.app.bsky.actor.get_profile(models.AppBskyActorGetProfile.Params(actor=self.get_own_did())) - if isinstance(response, models.AppBskyActorDefs.ProfileViewDetailed): - return response # This is already the correct type - else: # Should not happen if DID is correct - logger.error(f"Unexpected response type from get_profile: {type(response)}") - return None - else: # Fallback if client.me is somehow not populated - logger.info("client.me not populated, attempting get_profile for own DID.") - response = await client.app.bsky.actor.get_profile(models.AppBskyActorGetProfile.Params(actor=self.get_own_did())) - if isinstance(response, models.AppBskyActorDefs.ProfileViewDetailed): - return response - return None - except AtProtocolError as e: - logger.error(f"Error fetching own Blueski profile: {e}") - return None - - def get_own_did(self) -> str | None: - """Returns the authenticated user's DID.""" - if not self._own_did: # If not set during init (e.g. session not fully loaded yet) - self._own_did = self.session.db.get("did") or self.session.config_get("did") - - # Fallback: try to get from client if it's alive and has .me property - if not self._own_did and self.session.client and self.session.client.me: - self._own_did = self.session.client.me.did - return self._own_did - - def get_own_username(self) -> str | None: # "username" here means handle - """Returns the authenticated user's handle.""" - if not self._own_handle: - self._own_handle = self.session.db.get("handle") or self.session.config_get("handle") - - if not self._own_handle and self.session.client and self.session.client.me: - self._own_handle = self.session.client.me.handle - return self._own_handle - - - # --- Post / Status Operations --- - - async def post_status( - self, - text: str, - media_ids: list[str] | None = None, # Here media_ids would be blob refs from upload_media - reply_to_uri: str | None = None, # ATURI of the post being replied to - quote_uri: str | None = None, # ATURI of the post being quoted - cw_text: str | None = None, # For content warning (labels) - is_sensitive: bool = False, # General sensitivity flag - langs: list[str] | None = None, # List of language codes e.g. ['en', 'ja'] - tags: list[str] | None = None, # Hashtags - **kwargs: Any - ) -> str | None: # Returns the ATURI of the new post, or None on failure - """ - Posts a status (skeet) to Blueski. - Handles text, images, replies, quotes, and content warnings (labels). - """ - client = await self._get_client() - if not client: - logger.error("Blueski client not available for posting.") - raise NotificationError(_("Not connected to Blueski. Please check your connection settings or log in.")) - - if not self.get_own_did(): - logger.error("Cannot post status: User DID not available.") - raise NotificationError(_("User identity not found. Cannot create post.")) - - try: - # Prepare core post record - post_record_data = {'text': text, 'createdAt': client.get_current_time_iso()} # SDK handles datetime format - - if langs: - post_record_data['langs'] = langs - - # Facets (mentions, links, tags) should be processed before other embeds - # as they are part of the main post record. - facets = await self._extract_facets(text, tags) # Pass client for potential resolutions - if facets: - post_record_data['facets'] = facets - - # Embeds: images, quote posts, external links - # Note: Bluesky typically allows one main embed type (images OR record OR external) - embed_to_add: models.AppBskyFeedPost.Embed | None = None - - # Embeds: images, quote posts, external links - # ATProto allows one main embed type: app.bsky.embed.images, app.bsky.embed.record (quote/post embed), - # or app.bsky.embed.external. - # Priority: 1. Quote, 2. Images. External embeds are not handled in this example. - # If both quote and images are provided, quote takes precedence. - - embed_to_add: models.AppBskyFeedPost.Embed | None = None - - if quote_uri: - logger.info(f"Attempting to add quote embed for URI: {quote_uri}") - quoted_post_strong_ref = await self._get_strong_ref_for_uri(quote_uri) - if quoted_post_strong_ref: - embed_to_add = models.AppBskyEmbedRecord.Main(record=quoted_post_strong_ref) - if media_ids: - logger.warning(f"Quote URI provided ({quote_uri}), images will be ignored due to embed priority.") - else: - logger.warning(f"Could not create strong reference for quote URI: {quote_uri}. Quote will be omitted.") - - # Handle media attachments (images) only if no quote embed was successfully created - if not embed_to_add and media_ids: - logger.info(f"Attempting to add image embed with {len(media_ids)} media items.") - images_for_embed = [] - for media_info in media_ids: - if isinstance(media_info, dict) and media_info.get("blob_ref"): - images_for_embed.append( - models.AppBskyEmbedImages.Image( - image=media_info["blob_ref"], # This should be a BlobRef instance - alt=media_info.get("alt_text", "") - ) - ) - if images_for_embed: - embed_to_add = models.AppBskyEmbedImages.Main(images=images_for_embed) - - if embed_to_add: - post_record_data['embed'] = embed_to_add - - # Handle replies - if reply_to_uri: - parent_strong_ref = await self._get_strong_ref_for_uri(reply_to_uri) - if parent_strong_ref: - # Determine root. If parent is also a reply, its root should be used. - # This requires fetching the parent post and checking its .reply.root - # For simplicity now, assume direct parent is root, or parent_strong_ref itself is enough. - # A robust solution fetches parent post: parent_post_details = await client.app.bsky.feed.get_posts([reply_to_uri]) - # then uses parent_post_details.posts[0].record.reply.root if present. - # Placeholder: use parent as root for now - post_record_data['reply'] = models.AppBskyFeedPost.ReplyRef( - root=parent_strong_ref, # Simplified: SDK might need full root post ref - parent=parent_strong_ref - ) - else: - logger.warning(f"Could not create strong reference for reply URI: {reply_to_uri}. Reply info will be omitted.") - - - # Handle content warnings (labels) - # Bluesky uses self-labeling. - post_labels: list[models.ComAtprotoLabelDefs.SelfLabel] = [] - if cw_text: # Custom label for "content warning" text itself - # Ensure cw_text is not too long for a label value (max 64 chars for label value, but this is for the *value*, not a predefined category) - # Using a generic "!warn" or a custom scheme like "cw:..." - # The SDK might have specific ways to add !warn or other standard labels. - # For a simple text CW, it's often just part of the text or a client-side convention. - # If mapping to official labels: - post_labels.append(models.ComAtprotoLabelDefs.SelfLabel(val="!warn")) # Generic warning - # Or if cw_text is a specific category that maps to a label: - # post_labels.append(models.ComAtprotoLabelDefs.SelfLabel(val=cw_text)) # if cw_text is like "nudity" - if is_sensitive and not any(l.val == "!warn" for l in post_labels): # Add generic !warn if sensitive and not already added by cw_text - post_labels.append(models.ComAtprotoLabelDefs.SelfLabel(val="!warn")) - - if post_labels: - post_record_data['labels'] = models.ComAtprotoLabelDefs.SelfLabels(values=post_labels) - - # Create the post record object - final_post_record = models.AppBskyFeedPost.Main(**post_record_data) - - response = await client.com.atproto.repo.create_record( - models.ComAtprotoRepoCreateRecord.Input( - repo=self.get_own_did(), # Must be own DID - collection=ids.AppBskyFeedPost, # e.g., "app.bsky.feed.post" - record=final_post_record, - ) - ) - logger.info(f"Successfully posted to Blueski. URI: {response.uri}") - return response.uri - except AtProtocolError as e: - logger.error(f"Error posting status to Blueski: {e.error} - {e.message}", exc_info=True) - raise NotificationError(_("Failed to post: {error} - {message}").format(error=e.error or "Error", message=e.message or "Protocol error")) from e - except Exception as e: # Catch any other unexpected errors - logger.error(f"Unexpected error posting status to Blueski: {e}", exc_info=True) - raise NotificationError(_("An unexpected error occurred while posting: {error}").format(error=str(e))) from e - - - async def delete_status(self, post_uri: str) -> bool: - """Deletes a status (post) given its AT URI.""" - client = await self._get_client() - if not client: - logger.error("Blueski client not available for deleting post.") - return False - if not self.get_own_did(): - logger.error("Cannot delete status: User DID not available.") - return False - - try: - # Extract rkey from URI. URI format: at://// - uri_parts = post_uri.replace("at://", "").split("/") - if len(uri_parts) != 3: - logger.error(f"Invalid AT URI format for deletion: {post_uri}") - return False - # repo_did = uri_parts[0] # Should match self.get_own_did() - collection = uri_parts[1] - rkey = uri_parts[2] - - if collection != ids.AppBskyFeedPost: # Ensure it's the correct collection - logger.error(f"Attempting to delete from incorrect collection '{collection}'. Expected '{ids.AppBskyFeedPost}'.") - return False - - await client.com.atproto.repo.delete_record( - models.ComAtprotoRepoDeleteRecord.Input( - repo=self.get_own_did(), # Must be own DID - collection=collection, - rkey=rkey, - ) - ) - logger.info(f"Successfully deleted post {post_uri} from Blueski.") - return True - except AtProtocolError as e: - logger.error(f"Error deleting post {post_uri} from Blueski: {e.error} - {e.message}") - except Exception as e: - logger.error(f"Unexpected error deleting post {post_uri}: {e}", exc_info=True) - return False - - - async def upload_media(self, file_path: str, mime_type: str, alt_text: str | None = None) -> dict[str, Any] | None: - """ - Uploads media (image) to Blueski. - Returns a dictionary containing the SDK's BlobRef object and alt_text, or None on failure. - """ - client = await self._get_client() - if not client: - logger.error("Blueski client not available for media upload.") - return None - try: - with open(file_path, "rb") as f: - image_data = f.read() - - # The SDK's upload_blob takes bytes directly. - response = await client.com.atproto.repo.upload_blob(image_data, mime_type=mime_type) - if response and response.blob: - logger.info(f"Media uploaded successfully: {file_path}, Blob CID: {response.blob.cid}") - # Return the actual blob object from the SDK, as it's needed for post creation. - return { - "blob_ref": response.blob, # This is models.ComAtprotoRepoStrongRef.Blob - "alt_text": alt_text or "", - } - else: - logger.error(f"Media upload failed for {file_path}, no blob in response.") - return None - except AtProtocolError as e: - logger.error(f"Error uploading media {file_path} to Blueski: {e.error} - {e.message}", exc_info=True) - except Exception as e: - logger.error(f"Unexpected error uploading media {file_path}: {e}", exc_info=True) - return None - - - # --- User Profile and Interaction --- - - async def get_profile_by_handle(self, handle: str) -> ATUserProfile | None: - """Fetches a user's profile by their handle.""" - client = await self._get_client() - if not client: return None - try: - response = await client.app.bsky.actor.get_profile(models.AppBskyActorGetProfile.Params(actor=handle)) - if isinstance(response, models.AppBskyActorDefs.ProfileViewDetailed): - return response - return None # Should not happen if handle is valid - except AtProtocolError as e: - logger.error(f"Error fetching profile for {handle}: {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error fetching profile for {handle}: {e}", exc_info=True) - return None - - - async def follow_user(self, user_did: str) -> bool: - """Follows a user by their DID.""" - client = await self._get_client() - if not client: return False - if not self.get_own_did(): - logger.error("Cannot follow user: Own DID not available.") - return False - - try: - await client.com.atproto.repo.create_record( - models.ComAtprotoRepoCreateRecord.Input( - repo=self.get_own_did(), - collection=ids.AppBskyGraphFollow, # "app.bsky.graph.follow" - record=models.AppBskyGraphFollow.Main(subject=user_did, createdAt=client.get_current_time_iso()), - ) - ) - logger.info(f"Successfully followed user {user_did}.") - return True - except AtProtocolError as e: - logger.error(f"Error following user {user_did}: {e.error} - {e.message}") - except Exception as e: - logger.error(f"Unexpected error following user {user_did}: {e}", exc_info=True) - return False - - - async def unfollow_user(self, user_did: str) -> bool: - """Unfollows a user by their DID. Requires finding the follow record's URI (rkey).""" - client = await self._get_client() - if not client: return False - if not self.get_own_did(): - logger.error("Cannot unfollow user: Own DID not available.") - return False - - try: - # Find the URI of the follow record. This is the tricky part. - # We need the rkey of the follow record. - follow_rkey = await self._find_follow_record_rkey(user_did) - if not follow_rkey: - logger.warning(f"Could not find follow record for user {user_did} to unfollow.") - return False - - await client.com.atproto.repo.delete_record( - models.ComAtprotoRepoDeleteRecord.Input( - repo=self.get_own_did(), - collection=ids.AppBskyGraphFollow, - rkey=follow_rkey, - ) - ) - logger.info(f"Successfully unfollowed user {user_did}.") - return True - except AtProtocolError as e: - logger.error(f"Error unfollowing user {user_did}: {e.error} - {e.message}") - except Exception as e: - logger.error(f"Unexpected error unfollowing user {user_did}: {e}", exc_info=True) - return False - - - # --- Notifications and Timelines (Illustrative - actual implementation is complex) --- - - async def get_notifications(self, limit: int = 20, cursor: str | None = None) -> tuple[list[ATNotification], str | None] | None: - """Fetches notifications for the authenticated user. Returns (notifications, cursor) or None.""" - client = await self._get_client() - if not client: return None - try: - response = await client.app.bsky.notification.list_notifications( - models.AppBskyNotificationListNotifications.Params(limit=limit, cursor=cursor) - ) - # No need to format further if ATNotification type hint is used and callers expect SDK models - return response.notifications, response.cursor - except AtProtocolError as e: - logger.error(f"Error fetching notifications: {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error fetching notifications: {e}", exc_info=True) - return None - - - async def get_timeline(self, algorithm: str | None = None, limit: int = 20, cursor: str | None = None) -> tuple[list[models.AppBskyFeedDefs.FeedViewPost], str | None] | None: - """ - Fetches a timeline (feed) for the authenticated user. - Returns (feed_items, cursor) or None. - 'algorithm' can be None for default "Following" or a custom feed URI (at://did/app.bsky.feed.generator/uri). - """ - client = await self._get_client() - if not client: return None - try: - params = models.AppBskyFeedGetTimeline.Params(limit=limit, cursor=cursor) - if algorithm: # Only add algorithm if it's specified, SDK might default to 'following' - params.algorithm = algorithm - - response = await client.app.bsky.feed.get_timeline(params) - # response.feed is a list of FeedViewPost items - return response.feed, response.cursor - except AtProtocolError as e: - logger.error(f"Error fetching timeline (algorithm: {algorithm}): {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error fetching timeline (algorithm: {algorithm}): {e}", exc_info=True) - return None - - - async def get_author_feed(self, actor_did: str, limit: int = 20, cursor: str | None = None, filter: str = "posts_with_replies") -> tuple[list[models.AppBskyFeedDefs.FeedViewPost], str | None] | None: - """ - Fetches a specific user's timeline (feed). - Returns (feed_items, cursor) or None. - filter can be: "posts_with_replies", "posts_no_replies", "posts_with_media". - The default "posts_with_replies" includes user's posts and their replies. - To get only original posts (no replies): "posts_no_replies". - To get posts and reposts: This is not a direct filter in getAuthorFeed. Reposts are part of the feed if the PDS includes them. - The `filter` parameter in `getAuthorFeed` is actually `filter: Literal['posts_with_replies', 'posts_no_replies', 'posts_and_author_threads', 'posts_with_media']` - Let's use a sensible default or make it configurable if needed by the session. - For "posts and reposts", you typically just get the author feed and it includes reposts. - """ - client = await self._get_client() - if not client: return None - try: - # Ensure filter is a valid choice for the SDK if it uses an Enum or Literal - # For this example, we assume the string is passed directly. - # Valid filters are 'posts_with_replies', 'posts_no_replies', 'posts_and_author_threads', 'posts_with_media'. - # Defaulting to 'posts_and_author_threads' to include posts, replies, and reposts by the author. - # Actually, getAuthorFeed's `filter` param does not directly control inclusion of reposts in a way that - # "posts_and_reposts" would imply. Reposts by the author of things *they reposted* are part of their feed. - # A common default is 'posts_with_replies'. If we want to see their reposts, that's typically included by default. - - current_filter_value = filter - if current_filter_value not in ['posts_with_replies', 'posts_no_replies', 'posts_and_author_threads', 'posts_with_media']: - logger.warning(f"Invalid filter '{current_filter_value}' for getAuthorFeed. Defaulting to 'posts_with_replies'.") - current_filter_value = 'posts_with_replies' - - - params = models.AppBskyFeedGetAuthorFeed.Params(actor=actor_did, limit=limit, cursor=cursor, filter=current_filter_value) - response = await client.app.bsky.feed.get_author_feed(params) - return response.feed, response.cursor - except AtProtocolError as e: - logger.error(f"Error fetching author feed for {actor_did}: {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error fetching author feed for {actor_did}: {e}", exc_info=True) - return None - - async def get_user_profile(self, user_ident: str) -> ATUserProfile | None: - """Fetches a detailed user profile by DID or handle.""" - client = await self._get_client() - if not client: - logger.error(f"Cannot get profile for {user_ident}: ATProto client not available.") - return None - try: - response = await client.app.bsky.actor.get_profile(models.AppBskyActorGetProfile.Params(actor=user_ident)) - if isinstance(response, models.AppBskyActorDefs.ProfileViewDetailed): - return response - logger.error(f"Unexpected response type when fetching profile for {user_ident}: {type(response)}") - return None - except AtProtocolError as e: - logger.error(f"Error fetching profile for {user_ident}: {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error fetching profile for {user_ident}: {e}", exc_info=True) - return None - - - async def get_followers(self, user_did: str, limit: int = 30, cursor: str | None = None) -> tuple[list[models.AppBskyActorDefs.ProfileView], str | None] | None: - """Fetches followers for a given user DID.""" - client = await self._get_client() - if not client: - logger.error(f"Cannot get followers for {user_did}: ATProto client not available.") - return None - try: - response = await client.app.bsky.graph.get_followers( - models.AppBskyGraphGetFollowers.Params(actor=user_did, limit=limit, cursor=cursor) - ) - return response.followers, response.cursor - except AtProtocolError as e: - logger.error(f"Error fetching followers for {user_did}: {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error fetching followers for {user_did}: {e}", exc_info=True) - return None - - async def get_following(self, user_did: str, limit: int = 30, cursor: str | None = None) -> tuple[list[models.AppBskyActorDefs.ProfileView], str | None] | None: - """Fetches accounts followed by a given user DID.""" - client = await self._get_client() - if not client: - logger.error(f"Cannot get following for {user_did}: ATProto client not available.") - return None - try: - response = await client.app.bsky.graph.get_follows( # Correct endpoint is get_follows - models.AppBskyGraphGetFollows.Params(actor=user_did, limit=limit, cursor=cursor) - ) - return response.follows, response.cursor - except AtProtocolError as e: - logger.error(f"Error fetching accounts followed by {user_did}: {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error fetching accounts followed by {user_did}: {e}", exc_info=True) - return None - - async def search_users(self, term: str, limit: int = 20, cursor: str | None = None) -> tuple[list[models.AppBskyActorDefs.ProfileView], str | None] | None: - """Searches for users based on a term.""" - client = await self._get_client() - if not client: - logger.error(f"Cannot search users for '{term}': ATProto client not available.") - return None - try: - response = await client.app.bsky.actor.search_actors( - models.AppBskyActorSearchActors.Params(term=term, limit=limit, cursor=cursor) - ) - return response.actors, response.cursor - except AtProtocolError as e: - logger.error(f"Error searching users for '{term}': {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error searching users for '{term}': {e}", exc_info=True) - return None - - - async def mute_user(self, user_did: str) -> bool: - """Mutes a user by their DID.""" - client = await self._get_client() - if not client: - logger.error("Cannot mute user: ATProto client not available.") - return False - try: - await client.app.bsky.graph.mute_actor(models.AppBskyGraphMuteActor.Input(actor=user_did)) - logger.info(f"Successfully muted user {user_did}.") - return True - except AtProtocolError as e: - # Check if already muted - SDK might throw specific error or general one. - # For now, log and return False. A more specific error could be returned to UI. - logger.error(f"Error muting user {user_did}: {e.error} - {e.message}") - except Exception as e: - logger.error(f"Unexpected error muting user {user_did}: {e}", exc_info=True) - return False - - async def unmute_user(self, user_did: str) -> bool: - """Unmutes a user by their DID.""" - client = await self._get_client() - if not client: - logger.error("Cannot unmute user: ATProto client not available.") - return False - try: - await client.app.bsky.graph.unmute_actor(models.AppBskyGraphUnmuteActor.Input(actor=user_did)) - logger.info(f"Successfully unmuted user {user_did}.") - return True - except AtProtocolError as e: - # Check if already unmuted / not muted. - logger.error(f"Error unmuting user {user_did}: {e.error} - {e.message}") - except Exception as e: - logger.error(f"Unexpected error unmuting user {user_did}: {e}", exc_info=True) - return False - - async def block_user(self, user_did: str) -> str | None: - """ - Blocks a user by their DID. - Returns the AT URI of the block record on success, None on failure. - """ - client = await self._get_client() - if not client: - logger.error("Cannot block user: ATProto client not available.") - return None - if not self.get_own_did(): - logger.error("Cannot block user: Own DID not available.") - return None - - try: - response = await client.com.atproto.repo.create_record( - models.ComAtprotoRepoCreateRecord.Input( - repo=self.get_own_did(), - collection=ids.AppBskyGraphBlock, # "app.bsky.graph.block" - record=models.AppBskyGraphBlock.Main(subject=user_did, createdAt=client.get_current_time_iso()), - ) - ) - logger.info(f"Successfully blocked user {user_did}. Block record URI: {response.uri}") - return response.uri - except AtProtocolError as e: - # Handle specific errors, e.g., if user is already blocked. - # The SDK might raise an error like "already exists" or a generic one. - logger.error(f"Error blocking user {user_did}: {e.error} - {e.message}") - except Exception as e: - logger.error(f"Unexpected error blocking user {user_did}: {e}", exc_info=True) - return None - - async def _find_block_record_rkey(self, target_did: str) -> str | None: - """Helper to find the rkey of a block record for a given target DID.""" - client = await self._get_client() - own_did = self.get_own_did() - if not client or not own_did: return None - - cursor = None - try: - while True: - response = await client.com.atproto.repo.list_records( - models.ComAtprotoRepoListRecords.Params( - repo=own_did, - collection=ids.AppBskyGraphBlock, - limit=100, - cursor=cursor, - ) - ) - if not response or not response.records: - break - - for record_item in response.records: - if record_item.value and isinstance(record_item.value, models.AppBskyGraphBlock.Main): - if record_item.value.subject == target_did: - return record_item.uri.split("/")[-1] # Extract rkey from URI - - cursor = response.cursor - if not cursor: - break - logger.info(f"No active block record found for user {target_did} by {own_did}.") - return None - except AtProtocolError as e: - logger.error(f"Error listing block records for {own_did} to find {target_did}: {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error finding block rkey for {target_did}: {e}", exc_info=True) - return None - - async def repost_post(self, post_uri: str, post_cid: str | None = None) -> str | None: - """Creates a repost for a given post URI and CID. Returns URI of the repost record or None.""" - client = await self._get_client() - if not client or not self.get_own_did(): - logger.error("Cannot repost: client or own DID not available.") - return None - - if not post_cid: # If CID is not provided, try to get it - strong_ref = await self._get_strong_ref_for_uri(post_uri) - if not strong_ref: - logger.error(f"Could not get strong reference for post {post_uri} to repost.") - return None - post_cid = strong_ref.cid # type: ignore - - try: - response = await client.com.atproto.repo.create_record( - models.ComAtprotoRepoCreateRecord.Input( - repo=self.get_own_did(), - collection=ids.AppBskyFeedRepost, - record=models.AppBskyFeedRepost.Main( - subject=models.ComAtprotoRepoStrongRef.Main(uri=post_uri, cid=post_cid), - createdAt=client.get_current_time_iso() - ) - ) - ) - logger.info(f"Successfully reposted {post_uri}. Repost URI: {response.uri}") - return response.uri - except AtProtocolError as e: - logger.error(f"Error reposting {post_uri}: {e.error} - {e.message}") - # Consider raising NotificationError for specific, user-understandable errors - if "already exists" in str(e.message).lower() or "duplicate" in str(e.message).lower(): - raise NotificationError(_("You have already reposted this post.")) from e - except Exception as e: - logger.error(f"Unexpected error reposting {post_uri}: {e}", exc_info=True) - return None - - async def like_post(self, post_uri: str, post_cid: str | None = None) -> str | None: - """Likes a given post URI and CID. Returns URI of the like record or None.""" - client = await self._get_client() - if not client or not self.get_own_did(): - logger.error("Cannot like post: client or own DID not available.") - return None - - if not post_cid: # If CID is not provided, try to get it - strong_ref = await self._get_strong_ref_for_uri(post_uri) - if not strong_ref: - logger.error(f"Could not get strong reference for post {post_uri} to like.") - return None - post_cid = strong_ref.cid # type: ignore - - try: - response = await client.com.atproto.repo.create_record( - models.ComAtprotoRepoCreateRecord.Input( - repo=self.get_own_did(), - collection=ids.AppBskyFeedLike, - record=models.AppBskyFeedLike.Main( - subject=models.ComAtprotoRepoStrongRef.Main(uri=post_uri, cid=post_cid), - createdAt=client.get_current_time_iso() - ) - ) - ) - logger.info(f"Successfully liked {post_uri}. Like URI: {response.uri}") - return response.uri - except AtProtocolError as e: - logger.error(f"Error liking {post_uri}: {e.error} - {e.message}") - if "already exists" in str(e.message).lower() or "duplicate" in str(e.message).lower(): - raise NotificationError(_("You have already liked this post.")) from e - except Exception as e: - logger.error(f"Unexpected error liking {post_uri}: {e}", exc_info=True) - return None - - async def delete_like(self, like_uri: str) -> bool: - """Deletes a like given the URI of the like record itself.""" - client = await self._get_client() - if not client or not self.get_own_did(): - logger.error("Cannot delete like: client or own DID not available.") - return False - - try: - # Extract rkey from like_uri - # Format: at:///app.bsky.feed.like/ - uri_parts = like_uri.replace("at://", "").split("/") - if len(uri_parts) != 3 or uri_parts[1] != ids.AppBskyFeedLike: - logger.error(f"Invalid like URI format for deletion: {like_uri}") - return False - - rkey = uri_parts[2] - - await client.com.atproto.repo.delete_record( - models.ComAtprotoRepoDeleteRecord.Input( - repo=self.get_own_did(), - collection=ids.AppBskyFeedLike, - rkey=rkey - ) - ) - logger.info(f"Successfully deleted like {like_uri}.") - return True - except AtProtocolError as e: - logger.error(f"Error deleting like {like_uri}: {e.error} - {e.message}") - # Could check for "not found" type errors if user tries to unlike something not liked - except Exception as e: - logger.error(f"Unexpected error deleting like {like_uri}: {e}", exc_info=True) - return False - - async def repost_post(self, post_uri: str, post_cid: str | None = None) -> str | None: - """Creates a repost for a given post URI and CID. Returns URI of the repost record or None.""" - client = await self._get_client() - if not client or not self.get_own_did(): - logger.error("Cannot repost: client or own DID not available.") - # raise NotificationError(_("Session not ready. Please log in.")) # Alternative - return None - - if not post_cid: # If CID is not provided, try to get it from the URI - strong_ref_to_post = await self._get_strong_ref_for_uri(post_uri) - if not strong_ref_to_post: - logger.error(f"Could not get strong reference for post {post_uri} to repost it.") - raise NotificationError(_("Could not find the post to repost.")) - post_cid = strong_ref_to_post.cid # type: ignore # SDK uses .cid - - try: - response = await client.com.atproto.repo.create_record( - models.ComAtprotoRepoCreateRecord.Input( - repo=self.get_own_did(), # Must be own DID - collection=ids.AppBskyFeedRepost, - record=models.AppBskyFeedRepost.Main( - subject=models.ComAtprotoRepoStrongRef.Main(uri=post_uri, cid=post_cid), - createdAt=client.get_current_time_iso() # SDK helper for current time - ) - ) - ) - logger.info(f"Successfully reposted {post_uri}. Repost URI: {response.uri}") - return response.uri - except AtProtocolError as e: - logger.error(f"Error reposting {post_uri}: {e.error} - {e.message}") - if e.error == "DuplicateRecord": # Or similar error code for existing repost - raise NotificationError(_("You have already reposted this post.")) from e - raise NotificationError(_("Failed to repost: {error}").format(error=e.message or e.error)) from e - except Exception as e: - logger.error(f"Unexpected error reposting {post_uri}: {e}", exc_info=True) - raise NotificationError(_("An unexpected error occurred while reposting.")) - - - async def like_post(self, post_uri: str, post_cid: str | None = None) -> str | None: - """Likes a given post URI and CID. Returns URI of the like record or None.""" - client = await self._get_client() - if not client or not self.get_own_did(): - logger.error("Cannot like post: client or own DID not available.") - return None # Or raise NotificationError - - if not post_cid: # If CID is not provided, try to get it from the URI - strong_ref_to_post = await self._get_strong_ref_for_uri(post_uri) - if not strong_ref_to_post: - logger.error(f"Could not get strong reference for post {post_uri} to like it.") - raise NotificationError(_("Could not find the post to like.")) - post_cid = strong_ref_to_post.cid # type: ignore - - try: - response = await client.com.atproto.repo.create_record( - models.ComAtprotoRepoCreateRecord.Input( - repo=self.get_own_did(), - collection=ids.AppBskyFeedLike, # "app.bsky.feed.like" - record=models.AppBskyFeedLike.Main( - subject=models.ComAtprotoRepoStrongRef.Main(uri=post_uri, cid=post_cid), - createdAt=client.get_current_time_iso() - ) - ) - ) - logger.info(f"Successfully liked {post_uri}. Like URI: {response.uri}") - return response.uri - except AtProtocolError as e: - logger.error(f"Error liking {post_uri}: {e.error} - {e.message}") - if e.error == "DuplicateRecord": # Or similar error code for existing like - raise NotificationError(_("You have already liked this post.")) from e - raise NotificationError(_("Failed to like post: {error}").format(error=e.message or e.error)) from e - except Exception as e: - logger.error(f"Unexpected error liking {post_uri}: {e}", exc_info=True) - raise NotificationError(_("An unexpected error occurred while liking the post.")) - - - async def delete_like(self, like_uri: str) -> bool: - """Deletes a like given the URI of the like record itself.""" - client = await self._get_client() - if not client or not self.get_own_did(): - logger.error("Cannot delete like: client or own DID not available.") - return False - - try: - # Extract rkey from like_uri - # Format: at:///app.bsky.feed.like/ - uri_parts = like_uri.replace("at://", "").split("/") - if len(uri_parts) != 3 or uri_parts[1] != ids.AppBskyFeedLike: # Check collection is correct - logger.error(f"Invalid like URI format for deletion: {like_uri}") - return False # Or raise error - - # own_did_from_uri = uri_parts[0] # This should match self.get_own_did() - # if own_did_from_uri != self.get_own_did(): - # logger.error(f"Attempting to delete a like not owned by the current user: {like_uri}") - # return False - - rkey = uri_parts[2] - - await client.com.atproto.repo.delete_record( - models.ComAtprotoRepoDeleteRecord.Input( - repo=self.get_own_did(), # Must be own DID - collection=ids.AppBskyFeedLike, - rkey=rkey - ) - ) - logger.info(f"Successfully deleted like {like_uri}.") - return True - except AtProtocolError as e: - logger.error(f"Error deleting like {like_uri}: {e.error} - {e.message}") - # If error indicates "not found", it means it was already unliked or never existed. - # We could return True for idempotency or False for strict "did I delete it now?" - # For now, let's return False on any API error. - return False - except Exception as e: - logger.error(f"Unexpected error deleting like {like_uri}: {e}", exc_info=True) - return False - - - async def unblock_user(self, user_did: str) -> bool: - """Unblocks a user by their DID. Requires finding the block record's rkey.""" - client = await self._get_client() - if not client: - logger.error("Cannot unblock user: ATProto client not available.") - return False - if not self.get_own_did(): - logger.error("Cannot unblock user: Own DID not available.") - return False - - try: - block_rkey = await self._find_block_record_rkey(user_did) - if not block_rkey: - logger.warning(f"Could not find block record for user {user_did} to unblock. User might not be blocked.") - # Depending on desired UX, this could be True (idempotency) or False (strict "not found") - return False - - await client.com.atproto.repo.delete_record( - models.ComAtprotoRepoDeleteRecord.Input( - repo=self.get_own_did(), - collection=ids.AppBskyGraphBlock, - rkey=block_rkey, - ) - ) - logger.info(f"Successfully unblocked user {user_did}.") - return True - except AtProtocolError as e: - logger.error(f"Error unblocking user {user_did}: {e.error} - {e.message}") - except Exception as e: - logger.error(f"Unexpected error unblocking user {user_did}: {e}", exc_info=True) - return False - - - # --- Helper Methods for Formatting and URI/DID manipulation --- - - def _format_profile_data(self, profile_model: models.AppBskyActorDefs.ProfileViewDetailed | models.AppBskyActorDefs.ProfileView | models.AppBskyActorDefs.ProfileViewBasic) -> dict[str, Any]: - """Converts an ATProto profile model to a standardized dictionary for Approve internal use if needed.""" - # This is an example if Approve needs its own dict format instead of using SDK models directly. - # For now, many methods return SDK models directly (ATUserProfile, ATPost types). - return { - "did": profile_model.did, - "handle": profile_model.handle, - "displayName": getattr(profile_model, 'displayName', None) or profile_model.handle, - "description": getattr(profile_model, 'description', None), - "avatar": getattr(profile_model, 'avatar', None), - "banner": getattr(profile_model, 'banner', None) if hasattr(profile_model, 'banner') else None, - "followersCount": getattr(profile_model, 'followersCount', 0) if hasattr(profile_model, 'followersCount') else None, - "followsCount": getattr(profile_model, 'followsCount', 0) if hasattr(profile_model, 'followsCount') else None, - "postsCount": getattr(profile_model, 'postsCount', 0) if hasattr(profile_model, 'postsCount') else None, - "indexedAt": getattr(profile_model, 'indexedAt', None), # For ProfileView, not Basic - "labels": getattr(profile_model, 'labels', []), # For ProfileView, not Basic - "viewer": getattr(profile_model, 'viewer', None), # For ProfileView, not Basic (viewer state like muted/following) - } - - def _format_post_data(self, post_view_model: models.AppBskyFeedDefs.PostView) -> dict[str, Any]: - """Converts an ATProto PostView model to a standardized dictionary if needed.""" - # record_data = post_view_model.record # This is the actual app.bsky.feed.post record - # if not isinstance(record_data, models.AppBskyFeedPost.Main()): # Check it's the expected type - # logger.warning(f"Post record is not of type Main, actual: {type(record_data)}") - # text_content = "Unsupported post record type" - # else: - # text_content = record_data.text - - return { - "uri": post_view_model.uri, - "cid": post_view_model.cid, - "author": self._format_profile_data(post_view_model.author) if post_view_model.author else None, - "record": post_view_model.record, # Keep the raw record for full data - # "text": text_content, # Extracted text - "embed": post_view_model.embed, # Raw embed - "replyCount": post_view_model.replyCount if post_view_model.replyCount is not None else 0, - "repostCount": post_view_model.repostCount if post_view_model.repostCount is not None else 0, - "likeCount": post_view_model.likeCount if post_view_model.likeCount is not None else 0, - "indexedAt": post_view_model.indexedAt, - "labels": post_view_model.labels or [], - "viewer": post_view_model.viewer, # Viewer state (e.g. like URI, repost URI) - } - - def _format_notification_data(self, notification_model: ATNotification) -> dict[str,Any]: - # This is an example if Approve needs its own dict format. - return { - "uri": notification_model.uri, - "cid": notification_model.cid, - "author": self._format_profile_data(notification_model.author) if notification_model.author else None, - "reason": notification_model.reason, # e.g., "like", "repost", "follow", "mention", "reply", "quote" - "reasonSubject": notification_model.reasonSubject, # AT URI of the subject of the notification (e.g. URI of the like record) - "record": notification_model.record, # The record that actioned this notification (e.g. the like record itself) - "isRead": notification_model.isRead, - "indexedAt": notification_model.indexedAt, - "labels": notification_model.labels or [], - } - - async def _get_strong_ref_for_uri(self, at_uri: str) -> models.ComAtprotoRepoStrongRef.Main | None: - """ - Given an AT URI, describe the record to get its CID and create a strong reference. - This is needed for replies, quotes, etc. - Alternatively, SDK's client.com.atproto.repo.create_strong_ref can be used if available. - """ - client = await self._get_client() - if not client: return None - try: - # URI format: at://// - parts = at_uri.replace("at://", "").split("/") - if len(parts) != 3: - logger.error(f"Invalid AT URI for strong ref: {at_uri}") - return None - - repo_did, collection, rkey = parts - - # This is one way to get the CID if not already known. - # If the CID is known, models.ComAtprotoRepoStrongRef.Main(uri=at_uri, cid=known_cid) is simpler. - # However, for replies/quotes, the record must exist and be resolvable. - described_record = await client.com.atproto.repo.describe_record( - models.ComAtprotoRepoDescribeRecord.Params(repo=repo_did, collection=collection, rkey=rkey) - ) - if described_record and described_record.cid: - return models.ComAtprotoRepoStrongRef.Main(uri=described_record.uri, cid=described_record.cid) - else: - logger.error(f"Could not describe record for URI {at_uri} to create strong ref.") - return None - except AtProtocolError as e: - logger.error(f"Could not get strong reference for URI {at_uri}: {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error getting strong ref for {at_uri}: {e}", exc_info=True) - return None - - - async def _find_follow_record_rkey(self, target_did: str) -> str | None: - """Helper to find the rkey of a follow record for a given target DID.""" - client = await self._get_client() - own_did = self.get_own_did() - if not client or not own_did: return None - - cursor = None - try: - while True: - response = await client.com.atproto.repo.list_records( - models.ComAtprotoRepoListRecords.Params( - repo=own_did, - collection=ids.AppBskyGraphFollow, # "app.bsky.graph.follow" - limit=100, - cursor=cursor, - ) - ) - if not response or not response.records: - break - - for record_item in response.records: - # record_item.value is the actual follow record (AppBskyGraphFollow.Main) - if record_item.value and isinstance(record_item.value, models.AppBskyGraphFollow.Main): - if record_item.value.subject == target_did: - # The rkey is part of the URI: at:///app.bsky.graph.follow/ - return record_item.uri.split("/")[-1] - - cursor = response.cursor - if not cursor: - break - return None # Follow record not found - except AtProtocolError as e: - logger.error(f"Error listing follow records for {own_did} to find {target_did}: {e.error} - {e.message}") - return None - except Exception as e: - logger.error(f"Unexpected error finding follow rkey for {target_did}: {e}", exc_info=True) - return None - - - async def _extract_facets(self, text: str, tags: list[str] | None = None) -> list[models.AppBskyRichtextFacet.Main] | None: - """ - Detects mentions, links, and tags in text and creates facet objects. - Uses the atproto SDK's `detect_facets` via the client if available and resolves mentions. - """ - client = await self._get_client() - if not client: - logger.warning("Cannot extract facets: ATProto client not available.") - return None - - try: - # The SDK's detect_facets is a utility function, not a client method. - # from atproto. conhecido.facets import detect_facets # Path may vary - # For now, assume a simplified version or that client might expose it. - # A full implementation needs to handle byte offsets correctly. - # This is a complex part of posting. - - # Placeholder for actual facet detection logic. - # This would involve regex for mentions (@handle.bsky.social), links (http://...), and tags (#tag). - # For mentions, DIDs need to be resolved. For links, URI needs to be validated. - # Example (very simplified, not for production): - # facets = [] - # import re - # # Mentions - # for match in re.finditer(r'@([a-zA-Z0-9.-]+)', text): - # handle = match.group(1) - # try: - # # profile = await client.app.bsky.actor.get_profile(models.AppBskyActorGetProfile.Params(actor=handle)) # This is a network call per mention! - # # if profile: - # # facets.append(models.AppBskyRichtextFacet.Main( - # # index=models.AppBskyRichtextFacet.ByteSlice(byteStart=match.start(), byteEnd=match.end()), - # # features=[models.AppBskyRichtextFacet.Mention(did=profile.did)] - # # )) - # pass # Proper implementation needed - # except Exception: # Handle resolution failure - # logger.warning(f"Could not resolve DID for mention @{handle}") - # # Links - # # Tags - # if tags: - # for tag in tags: - # # find occurrences of #tag in text and add facet - # pass - - # If the SDK has a robust way to do this (even if it's a static method you import) use it. - # e.g. from atproto. अमीर_text import RichText - # rt = RichText(text) - # await rt.resolve_facets(client) # if it needs async client for resolution - # return rt.facets - - logger.debug("Facet extraction is currently a placeholder and may not correctly identify all rich text features.") - return None - except Exception as e: - logger.error(f"Error during facet extraction: {e}", exc_info=True) - return None - - - async def report_post(self, post_uri: str, reason_type: str, reason_text: str | None = None) -> bool: - """ - Reports a post (skeet) to the PDS/moderation service. - reason_type should be one of com.atproto.moderation.defs#reasonType - (e.g., lexicon_models.COM_ATPROTO_MODERATION_DEFS_REASONSPAM -> "com.atproto.moderation.defs#reasonSpam") - """ - client = await self._get_client() - if not client: - logger.error("Blueski client not available for reporting.") - return False - - try: - # We need a strong reference to the post being reported. - subject_strong_ref = await self._get_strong_ref_for_uri(post_uri) - if not subject_strong_ref: - logger.error(f"Could not get strong reference for post URI {post_uri} to report.") - return False - - # The 'subject' for reporting a record is ComAtprotoRepoStrongRef.Main - report_subject = models.ComAtprotoRepoStrongRef.Main(uri=subject_strong_ref.uri, cid=subject_strong_ref.cid) - - # For reporting an account, it would be ComAtprotoAdminDefs.RepoRef(did=...) - - await client.com.atproto.moderation.create_report( - models.ComAtprotoModerationCreateReport.Input( - reasonType=reason_type, # e.g. lexicon_models.COM_ATPROTO_MODERATION_DEFS_REASONSPAM - reason=reason_text if reason_text else None, - subject=report_subject - ) - ) - logger.info(f"Successfully reported post {post_uri} for reason {reason_type}.") - return True - except AtProtocolError as e: - logger.error(f"Error reporting post {post_uri}: {e.error} - {e.message}", exc_info=True) - except Exception as e: - logger.error(f"Unexpected error reporting post {post_uri}: {e}", exc_info=True) - return False - - - def is_mention_of_me(self, post_data: models.AppBskyFeedPost.Main | dict) -> bool: - """ - Checks if a post record (not post view) mentions the authenticated user. - This requires parsing facets from the post's record. - `post_data` should be the `record` field of a `PostView` or a `FeedViewPost`. - """ - my_did = self.get_own_did() - if not my_did: - return False - - facets_to_check = None - if isinstance(post_data, models.AppBskyFeedPost.Main): - facets_to_check = post_data.facets - elif isinstance(post_data, dict) and 'facets' in post_data: # If raw dict from JSON - # Need to parse dict facets into SDK models or handle dict structure - # This simplified check assumes facets are already models.AppBskyRichtextFacet.Main objects - # For robustness, parse dict to models.AppBskyRichtextFacet.Main if necessary. - facets_to_check = post_data['facets'] - - - if not facets_to_check: - return False - - for facet_item_model in facets_to_check: - # Ensure facet_item_model is the correct SDK model type if it came from dict - if isinstance(facet_item_model, models.AppBskyRichtextFacet.Main): - for feature in facet_item_model.features: - if isinstance(feature, models.AppBskyRichtextFacet.Mention) and feature.did == my_did: - return True - # Add handling for dict-based facet items if not pre-parsed - elif isinstance(facet_item_model, dict) and 'features' in facet_item_model: - for feature_dict in facet_item_model['features']: - if isinstance(feature_dict, dict) and feature_dict.get('$type') == ids.AppBskyRichtextFacetMention and feature_dict.get('did') == my_did: - return True - return False - - # Add more utility functions as needed, e.g., for specific API calls, data transformations, etc.