Terminando de refactorizar

This commit is contained in:
Jesús Pavón Abián
2026-02-01 18:58:38 +01:00
parent 5d4ac82c4d
commit 25ecd8b5fd
6 changed files with 607 additions and 123 deletions

View File

@@ -18,17 +18,7 @@ log = logging.getLogger("sessions.blueski.compose")
def compose_post(post, db, settings, relative_times, show_screen_names=False, safe=True):
"""
Compose a Bluesky post into a list of strings for display.
Args:
post: dict or ATProto model object (FeedViewPost or PostView)
db: Session database dict
settings: Session settings
relative_times: If True, use relative time formatting
show_screen_names: If True, show only @handle instead of display name
safe: If True, handle exceptions gracefully
Returns:
List of strings: [User, Text, Date, Source]
Format matches Mastodon: [user+", ", text, date+", ", source]
"""
def g(obj, key, default=None):
"""Helper to get attribute from dict or object."""
@@ -37,41 +27,58 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa
return getattr(obj, key, default)
# Resolve Post View or Feed View structure
# Feed items have .post field, direct post objects don't
actual_post = g(post, "post", post)
record = g(actual_post, "record", {})
author = g(actual_post, "author", {})
# Author
handle = g(author, "handle", "")
display_name = g(author, "displayName") or g(author, "display_name") or handle or "Unknown"
# Original author info
original_handle = g(author, "handle", "")
original_display_name = g(author, "displayName") or g(author, "display_name") or original_handle or "Unknown"
if show_screen_names:
user_str = f"@{handle}"
else:
if handle and display_name != handle:
user_str = f"{display_name} (@{handle})"
else:
user_str = f"@{handle}"
# Text
text = g(record, "text", "")
# Repost reason
# Check if this is a repost
reason = g(post, "reason", None)
is_repost = False
reposter_handle = ""
reposter_display_name = ""
if reason:
rtype = g(reason, "$type") or g(reason, "py_type")
if rtype and "reasonRepost" in rtype:
is_repost = True
by = g(reason, "by", {})
by_handle = g(by, "handle", "")
reason_line = _("Reposted by @{handle}").format(handle=by_handle) if by_handle else _("Reposted")
text = f"{reason_line}\n{text}" if text else reason_line
reposter_handle = g(by, "handle", "")
reposter_display_name = g(by, "displayName") or g(by, "display_name") or reposter_handle
# User column: show reposter if repost, otherwise original author (like Mastodon)
if is_repost and reposter_handle:
if show_screen_names:
user_str = f"@{reposter_handle}"
else:
if reposter_display_name and reposter_display_name != reposter_handle:
user_str = f"{reposter_display_name} (@{reposter_handle})"
else:
user_str = f"@{reposter_handle}"
else:
if show_screen_names:
user_str = f"@{original_handle}"
else:
if original_display_name and original_display_name != original_handle:
user_str = f"{original_display_name} (@{original_handle})"
else:
user_str = f"@{original_handle}"
# Text
original_text = g(record, "text", "")
# Build text - if repost, format like Mastodon: "Reposted from @original: text"
if is_repost:
text = _("Reposted from @{}: {}").format(original_handle, original_text)
else:
text = original_text
# Labels / Content Warning
labels = g(actual_post, "labels", [])
cw_text = ""
for label in labels:
val = g(label, "val", "")
if val in ["!warn", "porn", "sexual", "nudity", "gore", "graphic-media", "corpse", "self-harm", "hate", "spam", "impersonation"]:
@@ -92,7 +99,7 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa
if etype and ("images" in etype):
images = g(embed, "images", [])
if images:
text += f"\n[{len(images)} {_('Images')}]"
text += f" [{len(images)} {_('images')}]"
# Quote posts
quote_rec = None
@@ -100,13 +107,12 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa
rec_embed = g(embed, "record", {})
if rec_embed:
quote_rec = g(rec_embed, "record", None) or rec_embed
# Media in wrapper
media = g(embed, "media", {})
mtype = g(media, "$type") or g(media, "py_type")
if mtype and "images" in mtype:
images = g(media, "images", [])
if images:
text += f"\n[{len(images)} {_('Images')}]"
text += f" [{len(images)} {_('images')}]"
elif etype and ("record" in etype):
quote_rec = g(embed, "record", {})
@@ -115,29 +121,28 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa
if quote_rec:
qtype = g(quote_rec, "$type") or g(quote_rec, "py_type")
if qtype and "viewNotFound" in qtype:
text += f"\n[{_('Quoted post not found')}]"
text += f" [{_('Quoted post not found')}]"
elif qtype and "viewBlocked" in qtype:
text += f"\n[{_('Quoted post blocked')}]"
text += f" [{_('Quoted post blocked')}]"
elif qtype and "generatorView" in qtype:
gen = g(quote_rec, "displayName", "Feed")
text += f"\n[{_('Quoting Feed')}: {gen}]"
text += f" [{_('Quoting Feed')}: {gen}]"
else:
q_author = g(quote_rec, "author", {})
q_handle = g(q_author, "handle", "unknown")
q_val = g(quote_rec, "value", {})
q_text = g(q_val, "text", "")
if q_text:
text += f"\n[{_('Quoting')} @{q_handle}: {q_text}]"
text += " " + _("Quoting @{}: {}").format(q_handle, q_text)
else:
text += f"\n[{_('Quoting')} @{q_handle}]"
text += " " + _("Quoting @{}").format(q_handle)
elif etype and ("external" in etype):
ext = g(embed, "external", {})
title = g(ext, "title", "")
text += f"\n[{_('Link')}: {title}]"
if title:
text += f" [{_('Link')}: {title}]"
# Date
indexed_at = g(actual_post, "indexed_at", "") or g(actual_post, "indexedAt", "")
@@ -154,39 +159,15 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa
# Source / Client
source = "Bluesky"
# Viewer state (liked, reposted, etc.)
viewer_indicators = []
viewer = g(actual_post, "viewer") or g(post, "viewer")
if viewer:
if g(viewer, "like"):
viewer_indicators.append("") # Liked
if g(viewer, "repost"):
viewer_indicators.append("🔁") # Reposted
# Add viewer indicators to the source column or create a prefix for text
if viewer_indicators:
indicator_str = " ".join(viewer_indicators)
# Add to beginning of text for visibility
text = f"{indicator_str} {text}"
return [user_str, text, ts_str, source]
# Format like Mastodon: add ", " after user and date
return [user_str + ", ", text, ts_str + ", ", source]
def compose_notification(notification, db, settings, relative_times, show_screen_names=False, safe=True):
"""
Compose a Bluesky notification into a list of strings for display.
Args:
notification: ATProto notification object
db: Session database dict
settings: Session settings
relative_times: If True, use relative time formatting
show_screen_names: If True, show only @handle
safe: If True, handle exceptions gracefully
Returns:
List of strings: [User, Action/Text, Date]
Format matches Mastodon: [user, text, date]
"""
def g(obj, key, default=None):
if isinstance(obj, dict):
@@ -201,30 +182,50 @@ def compose_notification(notification, db, settings, relative_times, show_screen
if show_screen_names:
user_str = f"@{handle}"
else:
user_str = f"{display_name} (@{handle})"
if display_name and display_name != handle:
user_str = f"{display_name} (@{handle})"
else:
user_str = f"@{handle}"
# Notification reason/type
reason = g(notification, "reason", "unknown")
# Map reason to user-readable text
reason_text_map = {
"like": _("liked your post"),
"repost": _("reposted your post"),
"follow": _("followed you"),
"mention": _("mentioned you"),
"reply": _("replied to you"),
"quote": _("quoted your post"),
"starterpack-joined": _("joined your starter pack"),
}
action_text = reason_text_map.get(reason, reason)
# For mentions/replies/quotes, include snippet of the text
# Get post text if available
record = g(notification, "record", {})
post_text = g(record, "text", "")
if post_text and reason in ["mention", "reply", "quote"]:
snippet = post_text[:100] + "..." if len(post_text) > 100 else post_text
action_text = f"{action_text}: {snippet}"
# Format like Mastodon: "{username} has action: {status}"
if reason == "like":
if post_text:
text = _("{username} has added to favorites: {status}").format(username=user_str, status=post_text)
else:
text = _("{username} has added to favorites").format(username=user_str)
elif reason == "repost":
if post_text:
text = _("{username} has reposted: {status}").format(username=user_str, status=post_text)
else:
text = _("{username} has reposted").format(username=user_str)
elif reason == "follow":
text = _("{username} has followed you.").format(username=user_str)
elif reason == "mention":
if post_text:
text = _("{username} has mentioned you: {status}").format(username=user_str, status=post_text)
else:
text = _("{username} has mentioned you").format(username=user_str)
elif reason == "reply":
if post_text:
text = _("{username} has replied: {status}").format(username=user_str, status=post_text)
else:
text = _("{username} has replied").format(username=user_str)
elif reason == "quote":
if post_text:
text = _("{username} has quoted your post: {status}").format(username=user_str, status=post_text)
else:
text = _("{username} has quoted your post").format(username=user_str)
elif reason == "starterpack-joined":
text = _("{username} has joined your starter pack.").format(username=user_str)
else:
text = f"{user_str}: {reason}"
# Date
indexed_at = g(notification, "indexedAt", "") or g(notification, "indexed_at", "")
@@ -239,23 +240,13 @@ def compose_notification(notification, db, settings, relative_times, show_screen
except Exception:
ts_str = str(indexed_at)[:16].replace("T", " ")
return [user_str, action_text, ts_str]
return [user_str, text, ts_str]
def compose_user(user, db, settings, relative_times, show_screen_names=False, safe=True):
"""
Compose a Bluesky user profile for list display.
Args:
user: User profile dict or ATProto model
db: Session database dict
settings: Session settings
relative_times: If True, use relative time formatting
show_screen_names: If True, show only @handle
safe: If True, handle exceptions gracefully
Returns:
List of strings: [User summary]
Format matches Mastodon: single string with all info.
"""
def g(obj, key, default=None):
if isinstance(obj, dict):
@@ -264,9 +255,9 @@ def compose_user(user, db, settings, relative_times, show_screen_names=False, sa
handle = g(user, "handle", "unknown")
display_name = g(user, "displayName") or g(user, "display_name") or handle
followers = g(user, "followersCount", None)
following = g(user, "followsCount", None)
posts = g(user, "postsCount", None)
followers = g(user, "followersCount", 0) or 0
following = g(user, "followsCount", 0) or 0
posts = g(user, "postsCount", 0) or 0
created_at = g(user, "createdAt", None)
ts = ""
@@ -279,17 +270,13 @@ def compose_user(user, db, settings, relative_times, show_screen_names=False, sa
offset = db.get("utc_offset", 0) if isinstance(db, dict) else 0
ts = original_date.shift(hours=offset).format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2])
except Exception:
ts = str(created_at)
ts = ""
parts = [f"{display_name} (@{handle})."]
if followers is not None and following is not None and posts is not None:
parts.append(_("{followers} followers, {following} following, {posts} posts.").format(
followers=followers, following=following, posts=posts
))
# Format like Mastodon: "Name (@handle). X followers, Y following, Z posts. Joined date"
if ts:
parts.append(_("Joined {date}").format(date=ts))
return [" ".join(parts).strip()]
return [_("%s (@%s). %s followers, %s following, %s posts. Joined %s") % (display_name, handle, followers, following, posts, ts)]
else:
return [_("%s (@%s). %s followers, %s following, %s posts.") % (display_name, handle, followers, following, posts)]
def compose_convo(convo, db, settings, relative_times, show_screen_names=False, safe=True):

View File

@@ -0,0 +1,189 @@
# -*- coding: utf-8 -*-
"""
Utility functions for Bluesky session.
"""
import logging
log = logging.getLogger("sessions.blueski.utils")
def g(obj, key, default=None):
"""Helper to get attribute from dict or object."""
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def is_audio_or_video(post):
"""
Check if post contains audio or video content.
Args:
post: Bluesky post object (FeedViewPost or PostView)
Returns:
bool: True if post has audio/video media
"""
actual_post = g(post, "post", post)
embed = g(actual_post, "embed", None)
if not embed:
return False
etype = g(embed, "$type") or g(embed, "py_type")
# Check for video embed
if etype and "video" in etype.lower():
return True
# Check for external link that might be video (YouTube, etc.)
if etype and "external" in etype:
ext = g(embed, "external", {})
uri = g(ext, "uri", "")
# Common video hosting sites
video_hosts = ["youtube.com", "youtu.be", "vimeo.com", "twitch.tv", "dailymotion.com"]
for host in video_hosts:
if host in uri.lower():
return True
# Check in recordWithMedia wrapper
if etype and "recordWithMedia" in etype:
media = g(embed, "media", {})
mtype = g(media, "$type") or g(media, "py_type")
if mtype and "video" in mtype.lower():
return True
return False
def is_image(post):
"""
Check if post contains image content.
Args:
post: Bluesky post object (FeedViewPost or PostView)
Returns:
bool: True if post has image media
"""
actual_post = g(post, "post", post)
embed = g(actual_post, "embed", None)
if not embed:
return False
etype = g(embed, "$type") or g(embed, "py_type")
# Direct images embed
if etype and "images" in etype:
images = g(embed, "images", [])
return len(images) > 0
# Check in recordWithMedia wrapper
if etype and "recordWithMedia" in etype:
media = g(embed, "media", {})
mtype = g(media, "$type") or g(media, "py_type")
if mtype and "images" in mtype:
images = g(media, "images", [])
return len(images) > 0
return False
def get_media_urls(post):
"""
Get URLs for media attachments (video/audio) from post.
Args:
post: Bluesky post object
Returns:
list: List of media URLs
"""
urls = []
actual_post = g(post, "post", post)
embed = g(actual_post, "embed", None)
if not embed:
return urls
etype = g(embed, "$type") or g(embed, "py_type")
# Video embed
if etype and "video" in etype.lower():
playlist = g(embed, "playlist", None)
if playlist:
urls.append(playlist)
# Alternative URL fields
for key in ["url", "uri", "thumb"]:
val = g(embed, key)
if val and val not in urls:
urls.append(val)
# External links (YouTube, etc.)
if etype and "external" in etype:
ext = g(embed, "external", {})
uri = g(ext, "uri", "")
if uri:
urls.append(uri)
return urls
def find_urls(post):
"""
Find all URLs in post content.
Args:
post: Bluesky post object
Returns:
list: List of URLs found
"""
urls = []
actual_post = g(post, "post", post)
record = g(actual_post, "record", {})
# Check facets for link annotations
facets = g(record, "facets", [])
for facet in facets:
features = g(facet, "features", [])
for feature in features:
ftype = g(feature, "$type") or g(feature, "py_type")
if ftype and "link" in ftype:
uri = g(feature, "uri", "")
if uri and uri not in urls:
urls.append(uri)
# Check embed for external links
embed = g(actual_post, "embed", None)
if embed:
etype = g(embed, "$type") or g(embed, "py_type")
if etype and "external" in etype:
ext = g(embed, "external", {})
uri = g(ext, "uri", "")
if uri and uri not in urls:
urls.append(uri)
return urls
def find_item(item, items_list):
"""
Find item index in list by URI.
Args:
item: Item to find
items_list: List to search
Returns:
int or None: Index if found, None otherwise
"""
item_uri = g(item, "uri") or g(g(item, "post"), "uri")
if not item_uri:
return None
for i, existing in enumerate(items_list):
existing_uri = g(existing, "uri") or g(g(existing, "post"), "uri")
if existing_uri == item_uri:
return i
return None