mirror of
https://github.com/MCV-Software/TWBlue.git
synced 2026-03-06 09:27:33 +01:00
Terminando integración
This commit is contained in:
@@ -1,297 +1,64 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import annotations
|
||||
"""
|
||||
Compose functions for Bluesky content display in TWBlue.
|
||||
|
||||
These functions format API data into user-readable strings for display in
|
||||
list controls. They follow the TWBlue compose function pattern:
|
||||
compose_function(item, db, relative_times, show_screen_names, session)
|
||||
Returns a list of strings for display columns.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from datetime import datetime
|
||||
import arrow
|
||||
import languageHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from sessions.blueski.session import Session as BlueskiSession
|
||||
from atproto.xrpc_client import models # For type hinting ATProto models
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# For SUPPORTED_LANG_CHOICES in composeDialog.py
|
||||
SUPPORTED_LANG_CHOICES_COMPOSE = {
|
||||
_("English"): "en", _("Spanish"): "es", _("French"): "fr", _("German"): "de",
|
||||
_("Japanese"): "ja", _("Portuguese"): "pt", _("Russian"): "ru", _("Chinese"): "zh",
|
||||
}
|
||||
|
||||
|
||||
class BlueskiCompose:
|
||||
MAX_CHARS = 300
|
||||
MAX_MEDIA_ATTACHMENTS = 4
|
||||
MAX_LANGUAGES = 3
|
||||
MAX_IMAGE_SIZE_BYTES = 1_000_000
|
||||
|
||||
def __init__(self, session: BlueskiSession) -> None:
|
||||
self.session = session
|
||||
self.supported_media_types: list[str] = ["image/jpeg", "image/png"]
|
||||
self.max_image_size_bytes: int = self.MAX_IMAGE_SIZE_BYTES
|
||||
|
||||
def get_panel_configuration(self) -> dict[str, Any]:
|
||||
"""Returns configuration for the compose panel specific to Blueski."""
|
||||
return {
|
||||
"max_chars": self.MAX_CHARS,
|
||||
"max_media_attachments": self.MAX_MEDIA_ATTACHMENTS,
|
||||
"supports_content_warning": True,
|
||||
"supports_scheduled_posts": False,
|
||||
"supported_media_types": self.supported_media_types,
|
||||
"max_media_size_bytes": self.max_image_size_bytes,
|
||||
"supports_alternative_text": True,
|
||||
"sensitive_reasons_options": self.session.get_sensitive_reason_options(),
|
||||
"supports_language_selection": True,
|
||||
"max_languages": self.MAX_LANGUAGES,
|
||||
"supports_quoting": True,
|
||||
"supports_polls": False,
|
||||
}
|
||||
|
||||
async def get_quote_text(self, message_id: str, url: str) -> str | None:
|
||||
return ""
|
||||
|
||||
async def get_reply_text(self, message_id: str, author_handle: str) -> str | None:
|
||||
if not author_handle.startswith("@"):
|
||||
return f"@{author_handle} "
|
||||
return f"{author_handle} "
|
||||
|
||||
def get_text_formatting_rules(self) -> dict[str, Any]:
|
||||
return {
|
||||
"markdown_enabled": False,
|
||||
"custom_emojis_enabled": False,
|
||||
"max_length": self.MAX_CHARS,
|
||||
"line_break_char": "\n",
|
||||
"link_format": "Full URL (e.g., https://example.com)",
|
||||
"mention_format": "@handle.bsky.social",
|
||||
"tag_format": "#tag (becomes a facet link)",
|
||||
}
|
||||
|
||||
def is_media_type_supported(self, mime_type: str) -> bool:
|
||||
return mime_type.lower() in self.supported_media_types
|
||||
|
||||
def get_max_schedule_date(self) -> str | None:
|
||||
return None
|
||||
|
||||
def get_poll_configuration(self) -> dict[str, Any] | None:
|
||||
return None
|
||||
|
||||
def compose_post_for_display(self, post_data: dict[str, Any], session_settings: dict[str, Any] | None = None) -> str:
|
||||
"""
|
||||
Composes a string representation of a Bluesky post for display in UI timelines.
|
||||
"""
|
||||
if not post_data or not isinstance(post_data, dict):
|
||||
return _("Invalid post data.")
|
||||
|
||||
author_info = post_data.get("author", {})
|
||||
record = post_data.get("record", {})
|
||||
embed_data = post_data.get("embed")
|
||||
viewer_state = post_data.get("viewer", {})
|
||||
|
||||
display_name = author_info.get("displayName", "") or author_info.get("handle", _("Unknown User"))
|
||||
handle = author_info.get("handle", _("unknown.handle"))
|
||||
|
||||
post_text = getattr(record, 'text', '') if not isinstance(record, dict) else record.get('text', '')
|
||||
|
||||
reason = post_data.get("reason")
|
||||
if reason:
|
||||
rtype = getattr(reason, "$type", "") if not isinstance(reason, dict) else reason.get("$type", "")
|
||||
if not rtype and not isinstance(reason, dict):
|
||||
rtype = getattr(reason, "py_type", "")
|
||||
if rtype and "reasonRepost" in rtype:
|
||||
by = getattr(reason, "by", None) if not isinstance(reason, dict) else reason.get("by")
|
||||
by_handle = getattr(by, "handle", "") if by and not isinstance(by, dict) else (by.get("handle", "") if by else "")
|
||||
reason_line = _("Reposted by @{handle}").format(handle=by_handle) if by_handle else _("Reposted")
|
||||
post_text = f"{reason_line}\n{post_text}" if post_text else reason_line
|
||||
|
||||
created_at_str = getattr(record, 'createdAt', '') if not isinstance(record, dict) else record.get('createdAt', '')
|
||||
timestamp_str = ""
|
||||
if created_at_str:
|
||||
try:
|
||||
ts = arrow.get(created_at_str)
|
||||
timestamp_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2])
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not parse timestamp {created_at_str}: {e}")
|
||||
timestamp_str = created_at_str
|
||||
|
||||
header = f"{display_name} (@{handle}) - {timestamp_str}"
|
||||
|
||||
labels = post_data.get("labels", [])
|
||||
spoiler_text = None
|
||||
is_sensitive_post = False
|
||||
if labels:
|
||||
for label_obj in labels:
|
||||
label_val = getattr(label_obj, 'val', '') if not isinstance(label_obj, dict) else label_obj.get('val', '')
|
||||
if label_val == "!warn":
|
||||
is_sensitive_post = True
|
||||
elif label_val in ["porn", "sexual", "nudity", "gore", "graphic-media", "corpse", "self-harm", "hate", "spam", "impersonation"]:
|
||||
is_sensitive_post = True
|
||||
if not spoiler_text: spoiler_text = _("Sensitive Content: {label}").format(label=label_val)
|
||||
elif label_val.startswith("warn:") and len(label_val) > 5:
|
||||
spoiler_text = label_val.split("warn:", 1)[-1].strip()
|
||||
is_sensitive_post = True
|
||||
|
||||
post_text_display = post_text
|
||||
if spoiler_text:
|
||||
post_text_display = f"CW: {spoiler_text}\n\n{post_text}"
|
||||
elif is_sensitive_post and not spoiler_text:
|
||||
post_text_display = f"CW: {_('Sensitive Content')}\n\n{post_text}"
|
||||
|
||||
embed_display = ""
|
||||
if embed_data:
|
||||
embed_type = getattr(embed_data, '$type', '')
|
||||
if not embed_type and isinstance(embed_data, dict): embed_type = embed_data.get('$type', '')
|
||||
|
||||
if embed_type in ['app.bsky.embed.images#view', 'app.bsky.embed.images']:
|
||||
images = getattr(embed_data, 'images', []) if hasattr(embed_data, 'images') else embed_data.get('images', [])
|
||||
if images:
|
||||
img_count = len(images)
|
||||
alt_texts_present = any(getattr(img, 'alt', '') for img in images if hasattr(img, 'alt')) or \
|
||||
any(img_dict.get('alt', '') for img_dict in images if isinstance(img_dict, dict))
|
||||
embed_display += f"\n[{img_count} Image"
|
||||
if img_count > 1: embed_display += "s"
|
||||
if alt_texts_present: embed_display += _(" (Alt text available)")
|
||||
embed_display += "]"
|
||||
|
||||
elif embed_type in ['app.bsky.embed.record#view', 'app.bsky.embed.record', 'app.bsky.embed.recordWithMedia#view', 'app.bsky.embed.recordWithMedia']:
|
||||
record_embed_data = getattr(embed_data, 'record', None) if hasattr(embed_data, 'record') else embed_data.get('record', None)
|
||||
if record_embed_data and isinstance(record_embed_data, dict):
|
||||
record_embed_data = record_embed_data.get("record") or record_embed_data
|
||||
record_embed_type = getattr(record_embed_data, '$type', '')
|
||||
if not record_embed_type and isinstance(record_embed_data, dict): record_embed_type = record_embed_data.get('$type', '')
|
||||
|
||||
if record_embed_type == 'app.bsky.embed.record#viewNotFound':
|
||||
embed_display += f"\n[{_('Quoted post not found or unavailable')}]"
|
||||
elif record_embed_type == 'app.bsky.embed.record#viewBlocked':
|
||||
embed_display += f"\n[{_('Content from the quoted account is blocked')}]"
|
||||
elif record_embed_data and (isinstance(record_embed_data, dict) or hasattr(record_embed_data, 'author')):
|
||||
quote_author_info = getattr(record_embed_data, 'author', record_embed_data.get('author'))
|
||||
quote_value = getattr(record_embed_data, 'value', record_embed_data.get('value'))
|
||||
|
||||
if quote_author_info and quote_value:
|
||||
quote_author_handle = getattr(quote_author_info, 'handle', 'unknown')
|
||||
quote_text_content = getattr(quote_value, 'text', '') if not isinstance(quote_value, dict) else quote_value.get('text', '')
|
||||
quote_text_snippet = (quote_text_content[:75] + "...") if quote_text_content else _("post content")
|
||||
embed_display += f"\n[ {_('Quote by')} @{quote_author_handle}: \"{quote_text_snippet}\" ]"
|
||||
else:
|
||||
embed_display += f"\n[{_('Quoted Post')}]"
|
||||
|
||||
elif embed_type in ['app.bsky.embed.external#view', 'app.bsky.embed.external']:
|
||||
external_data = getattr(embed_data, 'external', None) if hasattr(embed_data, 'external') else embed_data.get('external', None)
|
||||
if external_data:
|
||||
ext_uri = getattr(external_data, 'uri', _('External Link'))
|
||||
ext_title = getattr(external_data, 'title', '') or ext_uri
|
||||
embed_display += f"\n[{_('Link')}: {ext_title}]"
|
||||
|
||||
reply_context_str = ""
|
||||
actual_record = post_data.get("record", {})
|
||||
reply_ref = getattr(actual_record, 'reply', None) if not isinstance(actual_record, dict) else actual_record.get('reply')
|
||||
|
||||
if reply_ref:
|
||||
reply_context_str = f"[{_('In reply to a post')}] "
|
||||
|
||||
counts_str_parts = []
|
||||
reply_count = post_data.get("replyCount", 0)
|
||||
repost_count = post_data.get("repostCount", 0)
|
||||
like_count = post_data.get("likeCount", 0)
|
||||
|
||||
if reply_count > 0: counts_str_parts.append(f"{_('Replies')}: {reply_count}")
|
||||
if repost_count > 0: counts_str_parts.append(f"{_('Reposts')}: {repost_count}")
|
||||
if like_count > 0: counts_str_parts.append(f"{_('Likes')}: {like_count}")
|
||||
|
||||
viewer_liked_uri = viewer_state.get("like") if isinstance(viewer_state, dict) else getattr(viewer_state, 'like', None)
|
||||
viewer_reposted_uri = viewer_state.get("repost") if isinstance(viewer_state, dict) else getattr(viewer_state, 'repost', None)
|
||||
|
||||
if viewer_liked_uri: counts_str_parts.append(f"({_('Liked by you')})")
|
||||
if viewer_reposted_uri: counts_str_parts.append(f"({_('Reposted by you')})")
|
||||
|
||||
counts_line = ""
|
||||
if counts_str_parts:
|
||||
counts_line = "\n" + " | ".join(counts_str_parts)
|
||||
|
||||
full_display = f"{header}\n{reply_context_str}{post_text_display}{embed_display}{counts_line}"
|
||||
return full_display.strip()
|
||||
|
||||
def compose_notification_for_display(self, notif_data: dict[str, Any]) -> str:
|
||||
"""
|
||||
Composes a string representation of a Bluesky notification for display.
|
||||
|
||||
Args:
|
||||
notif_data: A dictionary representing the notification,
|
||||
typically from BlueskiSession._handle_*_notification methods
|
||||
which create an approve.notifications.Notification object and then
|
||||
convert it to dict or pass relevant parts.
|
||||
Expected keys: 'title', 'body', 'author_name', 'timestamp_dt', 'kind'.
|
||||
The 'title' usually already contains the core action.
|
||||
Returns:
|
||||
A formatted string for display.
|
||||
"""
|
||||
if not notif_data or not isinstance(notif_data, dict):
|
||||
return _("Invalid notification data.")
|
||||
|
||||
title = notif_data.get('title', _("Notification"))
|
||||
body = notif_data.get('body', '')
|
||||
author_name = notif_data.get('author_name') # Author of the action (e.g. who liked)
|
||||
timestamp_dt = notif_data.get('timestamp_dt') # datetime object
|
||||
|
||||
timestamp_str = ""
|
||||
if timestamp_dt and isinstance(timestamp_dt, datetime):
|
||||
try:
|
||||
timestamp_str = timestamp_dt.strftime("%I:%M %p - %b %d, %Y")
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not format notification timestamp {timestamp_dt}: {e}")
|
||||
timestamp_str = str(timestamp_dt)
|
||||
|
||||
display_parts = []
|
||||
if timestamp_str:
|
||||
display_parts.append(f"[{timestamp_str}]")
|
||||
|
||||
# Title already contains good info like "UserX liked your post"
|
||||
display_parts.append(title)
|
||||
|
||||
if body: # Body might be text of a reply/mention/quote
|
||||
# Truncate body if too long for a list display
|
||||
body_snippet = (body[:100] + "...") if len(body) > 103 else body
|
||||
display_parts.append(f"\"{body_snippet}\"")
|
||||
|
||||
return " ".join(display_parts).strip()
|
||||
log = logging.getLogger("sessions.blueski.compose")
|
||||
|
||||
|
||||
def compose_post(post, db, settings, relative_times, show_screen_names=False, safe=True):
|
||||
"""
|
||||
Compose a Bluesky post into a list of strings [User, Text, Date, Source].
|
||||
post: dict or ATProto model object.
|
||||
Compose a Bluesky post into a list of strings for display.
|
||||
|
||||
Args:
|
||||
post: dict or ATProto model object (FeedViewPost or PostView)
|
||||
db: Session database dict
|
||||
settings: Session settings
|
||||
relative_times: If True, use relative time formatting
|
||||
show_screen_names: If True, show only @handle instead of display name
|
||||
safe: If True, handle exceptions gracefully
|
||||
|
||||
Returns:
|
||||
List of strings: [User, Text, Date, Source]
|
||||
"""
|
||||
# Extract data using getattr for models or .get for dicts
|
||||
def g(obj, key, default=None):
|
||||
"""Helper to get attribute from dict or object."""
|
||||
if isinstance(obj, dict):
|
||||
return obj.get(key, default)
|
||||
return getattr(obj, key, default)
|
||||
|
||||
# Resolve Post View or Feed View structure
|
||||
# Feed items often have .post field. Direct post objects don't.
|
||||
actual_post = g(post, "post", post)
|
||||
|
||||
# Feed items have .post field, direct post objects don't
|
||||
actual_post = g(post, "post", post)
|
||||
|
||||
record = g(actual_post, "record", {})
|
||||
author = g(actual_post, "author", {})
|
||||
|
||||
|
||||
# Author
|
||||
handle = g(author, "handle", "")
|
||||
display_name = g(author, "displayName") or g(author, "display_name") or handle or "Unknown"
|
||||
|
||||
|
||||
if show_screen_names:
|
||||
user_str = f"@{handle}"
|
||||
else:
|
||||
# "Display Name (@handle)"
|
||||
if handle and display_name != handle:
|
||||
user_str = f"{display_name} (@{handle})"
|
||||
else:
|
||||
user_str = f"@{handle}"
|
||||
|
||||
|
||||
# Text
|
||||
text = g(record, "text", "")
|
||||
|
||||
# Repost reason (so users know why they see an unfamiliar post)
|
||||
# Repost reason
|
||||
reason = g(post, "reason", None)
|
||||
if reason:
|
||||
rtype = g(reason, "$type") or g(reason, "py_type")
|
||||
@@ -300,113 +67,196 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa
|
||||
by_handle = g(by, "handle", "")
|
||||
reason_line = _("Reposted by @{handle}").format(handle=by_handle) if by_handle else _("Reposted")
|
||||
text = f"{reason_line}\n{text}" if text else reason_line
|
||||
|
||||
|
||||
# Labels / Content Warning
|
||||
labels = g(actual_post, "labels", [])
|
||||
cw_text = ""
|
||||
is_sensitive = False
|
||||
|
||||
|
||||
for label in labels:
|
||||
val = g(label, "val", "")
|
||||
if val in ["!warn", "porn", "sexual", "nudity", "gore", "graphic-media", "corpse", "self-harm", "hate", "spam", "impersonation"]:
|
||||
is_sensitive = True
|
||||
if not cw_text: cw_text = _("Sensitive Content")
|
||||
if not cw_text:
|
||||
cw_text = _("Sensitive Content")
|
||||
elif val.startswith("warn:"):
|
||||
is_sensitive = True
|
||||
cw_text = val.split("warn:", 1)[-1].strip()
|
||||
|
||||
if cw_text:
|
||||
text = f"CW: {cw_text}\n\n{text}"
|
||||
|
||||
# Embeds (Images, Quotes)
|
||||
|
||||
# Embeds (Images, Quotes, Links)
|
||||
embed = g(actual_post, "embed", None)
|
||||
if embed:
|
||||
etype = g(embed, "$type") or g(embed, "py_type")
|
||||
|
||||
# Images
|
||||
if etype and ("images" in etype):
|
||||
images = g(embed, "images", [])
|
||||
if images:
|
||||
text += f"\n[{len(images)} {_('Images')}]"
|
||||
|
||||
# Handle Record (Quote) or RecordWithMedia (Quote + Media)
|
||||
|
||||
# Quote posts
|
||||
quote_rec = None
|
||||
if etype and ("recordWithMedia" in etype):
|
||||
# Extract the nested record
|
||||
rec_embed = g(embed, "record", {})
|
||||
if rec_embed:
|
||||
quote_rec = g(rec_embed, "record", None) or rec_embed
|
||||
# Also check for media in the wrapper
|
||||
media = g(embed, "media", {})
|
||||
mtype = g(media, "$type") or g(media, "py_type")
|
||||
if mtype and "images" in mtype:
|
||||
images = g(media, "images", [])
|
||||
if images: text += f"\n[{len(images)} {_('Images')}]"
|
||||
|
||||
elif etype and ("record" in etype):
|
||||
# Direct quote
|
||||
quote_rec = g(embed, "record", {})
|
||||
if isinstance(quote_rec, dict):
|
||||
quote_rec = quote_rec.get("record") or quote_rec
|
||||
|
||||
if quote_rec:
|
||||
# It is likely a ViewRecord
|
||||
# Check type (ViewRecord, ViewNotFound, ViewBlocked, etc)
|
||||
qtype = g(quote_rec, "$type") or g(quote_rec, "py_type")
|
||||
|
||||
if qtype and "viewNotFound" in qtype:
|
||||
text += f"\n[{_('Quoted post not found')}]"
|
||||
elif qtype and "viewBlocked" in qtype:
|
||||
text += f"\n[{_('Quoted post blocked')}]"
|
||||
elif qtype and "generatorView" in qtype:
|
||||
# Feed generator
|
||||
gen = g(quote_rec, "displayName", "Feed")
|
||||
text += f"\n[{_('Quoting Feed')}: {gen}]"
|
||||
else:
|
||||
# Assume ViewRecord
|
||||
q_author = g(quote_rec, "author", {})
|
||||
q_handle = g(q_author, "handle", "unknown")
|
||||
|
||||
q_val = g(quote_rec, "value", {})
|
||||
q_text = g(q_val, "text", "")
|
||||
rec_embed = g(embed, "record", {})
|
||||
if rec_embed:
|
||||
quote_rec = g(rec_embed, "record", None) or rec_embed
|
||||
# Media in wrapper
|
||||
media = g(embed, "media", {})
|
||||
mtype = g(media, "$type") or g(media, "py_type")
|
||||
if mtype and "images" in mtype:
|
||||
images = g(media, "images", [])
|
||||
if images:
|
||||
text += f"\n[{len(images)} {_('Images')}]"
|
||||
|
||||
if q_text:
|
||||
text += f"\n[{_('Quoting')} @{q_handle}: {q_text}]"
|
||||
else:
|
||||
text += f"\n[{_('Quoting')} @{q_handle}]"
|
||||
elif etype and ("record" in etype):
|
||||
quote_rec = g(embed, "record", {})
|
||||
if isinstance(quote_rec, dict):
|
||||
quote_rec = quote_rec.get("record") or quote_rec
|
||||
|
||||
if quote_rec:
|
||||
qtype = g(quote_rec, "$type") or g(quote_rec, "py_type")
|
||||
|
||||
if qtype and "viewNotFound" in qtype:
|
||||
text += f"\n[{_('Quoted post not found')}]"
|
||||
elif qtype and "viewBlocked" in qtype:
|
||||
text += f"\n[{_('Quoted post blocked')}]"
|
||||
elif qtype and "generatorView" in qtype:
|
||||
gen = g(quote_rec, "displayName", "Feed")
|
||||
text += f"\n[{_('Quoting Feed')}: {gen}]"
|
||||
else:
|
||||
q_author = g(quote_rec, "author", {})
|
||||
q_handle = g(q_author, "handle", "unknown")
|
||||
q_val = g(quote_rec, "value", {})
|
||||
q_text = g(q_val, "text", "")
|
||||
|
||||
if q_text:
|
||||
text += f"\n[{_('Quoting')} @{q_handle}: {q_text}]"
|
||||
else:
|
||||
text += f"\n[{_('Quoting')} @{q_handle}]"
|
||||
|
||||
elif etype and ("external" in etype):
|
||||
ext = g(embed, "external", {})
|
||||
uri = g(ext, "uri", "")
|
||||
title = g(ext, "title", "")
|
||||
text += f"\n[{_('Link')}: {title}]"
|
||||
|
||||
# Date
|
||||
indexed_at = g(actual_post, "indexed_at", "")
|
||||
indexed_at = g(actual_post, "indexed_at", "") or g(actual_post, "indexedAt", "")
|
||||
ts_str = ""
|
||||
if indexed_at:
|
||||
try:
|
||||
# Try arrow parsing
|
||||
import arrow
|
||||
ts = arrow.get(indexed_at)
|
||||
if relative_times:
|
||||
ts_str = ts.humanize(locale=languageHandler.curLang[:2])
|
||||
else:
|
||||
ts_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2])
|
||||
ts = arrow.get(indexed_at)
|
||||
if relative_times:
|
||||
ts_str = ts.humanize(locale=languageHandler.curLang[:2])
|
||||
else:
|
||||
ts_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2])
|
||||
except Exception:
|
||||
ts_str = str(indexed_at)[:16].replace("T", " ")
|
||||
ts_str = str(indexed_at)[:16].replace("T", " ")
|
||||
|
||||
# Source (not always available in Bsky view, often just client)
|
||||
# We'll leave it empty or mock it if needed
|
||||
# Source / Client
|
||||
source = "Bluesky"
|
||||
|
||||
|
||||
# Viewer state (liked, reposted, etc.)
|
||||
viewer_indicators = []
|
||||
viewer = g(actual_post, "viewer") or g(post, "viewer")
|
||||
if viewer:
|
||||
if g(viewer, "like"):
|
||||
viewer_indicators.append("♥") # Liked
|
||||
if g(viewer, "repost"):
|
||||
viewer_indicators.append("🔁") # Reposted
|
||||
|
||||
# Add viewer indicators to the source column or create a prefix for text
|
||||
if viewer_indicators:
|
||||
indicator_str = " ".join(viewer_indicators)
|
||||
# Add to beginning of text for visibility
|
||||
text = f"{indicator_str} {text}"
|
||||
|
||||
return [user_str, text, ts_str, source]
|
||||
|
||||
|
||||
def compose_notification(notification, db, settings, relative_times, show_screen_names=False, safe=True):
|
||||
"""
|
||||
Compose a Bluesky notification into a list of strings for display.
|
||||
|
||||
Args:
|
||||
notification: ATProto notification object
|
||||
db: Session database dict
|
||||
settings: Session settings
|
||||
relative_times: If True, use relative time formatting
|
||||
show_screen_names: If True, show only @handle
|
||||
safe: If True, handle exceptions gracefully
|
||||
|
||||
Returns:
|
||||
List of strings: [User, Action/Text, Date]
|
||||
"""
|
||||
def g(obj, key, default=None):
|
||||
if isinstance(obj, dict):
|
||||
return obj.get(key, default)
|
||||
return getattr(obj, key, default)
|
||||
|
||||
# Author of the notification (who performed the action)
|
||||
author = g(notification, "author", {})
|
||||
handle = g(author, "handle", "unknown")
|
||||
display_name = g(author, "displayName") or g(author, "display_name") or handle
|
||||
|
||||
if show_screen_names:
|
||||
user_str = f"@{handle}"
|
||||
else:
|
||||
user_str = f"{display_name} (@{handle})"
|
||||
|
||||
# Notification reason/type
|
||||
reason = g(notification, "reason", "unknown")
|
||||
|
||||
# Map reason to user-readable text
|
||||
reason_text_map = {
|
||||
"like": _("liked your post"),
|
||||
"repost": _("reposted your post"),
|
||||
"follow": _("followed you"),
|
||||
"mention": _("mentioned you"),
|
||||
"reply": _("replied to you"),
|
||||
"quote": _("quoted your post"),
|
||||
"starterpack-joined": _("joined your starter pack"),
|
||||
}
|
||||
|
||||
action_text = reason_text_map.get(reason, reason)
|
||||
|
||||
# For mentions/replies/quotes, include snippet of the text
|
||||
record = g(notification, "record", {})
|
||||
post_text = g(record, "text", "")
|
||||
if post_text and reason in ["mention", "reply", "quote"]:
|
||||
snippet = post_text[:100] + "..." if len(post_text) > 100 else post_text
|
||||
action_text = f"{action_text}: {snippet}"
|
||||
|
||||
# Date
|
||||
indexed_at = g(notification, "indexedAt", "") or g(notification, "indexed_at", "")
|
||||
ts_str = ""
|
||||
if indexed_at:
|
||||
try:
|
||||
ts = arrow.get(indexed_at)
|
||||
if relative_times:
|
||||
ts_str = ts.humanize(locale=languageHandler.curLang[:2])
|
||||
else:
|
||||
ts_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2])
|
||||
except Exception:
|
||||
ts_str = str(indexed_at)[:16].replace("T", " ")
|
||||
|
||||
return [user_str, action_text, ts_str]
|
||||
|
||||
|
||||
def compose_user(user, db, settings, relative_times, show_screen_names=False, safe=True):
|
||||
"""
|
||||
Compose a Bluesky user for list display.
|
||||
Returns: [User summary string]
|
||||
Compose a Bluesky user profile for list display.
|
||||
|
||||
Args:
|
||||
user: User profile dict or ATProto model
|
||||
db: Session database dict
|
||||
settings: Session settings
|
||||
relative_times: If True, use relative time formatting
|
||||
show_screen_names: If True, show only @handle
|
||||
safe: If True, handle exceptions gracefully
|
||||
|
||||
Returns:
|
||||
List of strings: [User summary]
|
||||
"""
|
||||
# Extract data using getattr for models or .get for dicts
|
||||
def g(obj, key, default=None):
|
||||
if isinstance(obj, dict):
|
||||
return obj.get(key, default)
|
||||
@@ -422,7 +272,6 @@ def compose_user(user, db, settings, relative_times, show_screen_names=False, sa
|
||||
ts = ""
|
||||
if created_at:
|
||||
try:
|
||||
import arrow
|
||||
original_date = arrow.get(created_at)
|
||||
if relative_times:
|
||||
ts = original_date.humanize(locale=languageHandler.curLang[:2])
|
||||
@@ -442,10 +291,21 @@ def compose_user(user, db, settings, relative_times, show_screen_names=False, sa
|
||||
|
||||
return [" ".join(parts).strip()]
|
||||
|
||||
|
||||
def compose_convo(convo, db, settings, relative_times, show_screen_names=False, safe=True):
|
||||
"""
|
||||
Compose a Bluesky chat conversation for list display.
|
||||
Returns: [Participants, Last Message, Date]
|
||||
|
||||
Args:
|
||||
convo: Conversation dict or ATProto model
|
||||
db: Session database dict
|
||||
settings: Session settings
|
||||
relative_times: If True, use relative time formatting
|
||||
show_screen_names: If True, show only @handle
|
||||
safe: If True, handle exceptions gracefully
|
||||
|
||||
Returns:
|
||||
List of strings: [Participants, Last Message, Date]
|
||||
"""
|
||||
def g(obj, key, default=None):
|
||||
if isinstance(obj, dict):
|
||||
@@ -454,6 +314,8 @@ def compose_convo(convo, db, settings, relative_times, show_screen_names=False,
|
||||
|
||||
members = g(convo, "members", [])
|
||||
self_did = db.get("user_id") if isinstance(db, dict) else None
|
||||
|
||||
# Get other participants (exclude self)
|
||||
others = []
|
||||
for m in members:
|
||||
did = g(m, "did", None)
|
||||
@@ -461,35 +323,36 @@ def compose_convo(convo, db, settings, relative_times, show_screen_names=False,
|
||||
continue
|
||||
label = g(m, "displayName") or g(m, "display_name") or g(m, "handle", "unknown")
|
||||
others.append(label)
|
||||
|
||||
if not others:
|
||||
others = [g(m, "displayName") or g(m, "display_name") or g(m, "handle", "unknown") for m in members]
|
||||
|
||||
participants = ", ".join(others)
|
||||
|
||||
|
||||
# Last message
|
||||
last_msg_obj = g(convo, "lastMessage") or g(convo, "last_message")
|
||||
last_text = ""
|
||||
last_sender = ""
|
||||
|
||||
if last_msg_obj:
|
||||
last_text = g(last_msg_obj, "text", "")
|
||||
sender = g(last_msg_obj, "sender", None)
|
||||
if sender:
|
||||
last_sender = g(sender, "displayName") or g(sender, "display_name") or g(sender, "handle", "")
|
||||
|
||||
# Date (using lastMessage.sentAt)
|
||||
|
||||
# Date
|
||||
date_str = ""
|
||||
sent_at = None
|
||||
if last_msg_obj:
|
||||
sent_at = g(last_msg_obj, "sentAt") or g(last_msg_obj, "sent_at")
|
||||
|
||||
if sent_at:
|
||||
try:
|
||||
import arrow
|
||||
ts = arrow.get(sent_at)
|
||||
if relative_times:
|
||||
date_str = ts.humanize(locale=languageHandler.curLang[:2])
|
||||
else:
|
||||
date_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2])
|
||||
except:
|
||||
date_str = str(sent_at)[:16]
|
||||
if sent_at:
|
||||
try:
|
||||
ts = arrow.get(sent_at)
|
||||
if relative_times:
|
||||
date_str = ts.humanize(locale=languageHandler.curLang[:2])
|
||||
else:
|
||||
date_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2])
|
||||
except Exception:
|
||||
date_str = str(sent_at)[:16]
|
||||
|
||||
if last_sender and last_text:
|
||||
last_text = _("Last message from {user}: {text}").format(user=last_sender, text=last_text)
|
||||
@@ -498,10 +361,21 @@ def compose_convo(convo, db, settings, relative_times, show_screen_names=False,
|
||||
|
||||
return [participants, last_text, date_str]
|
||||
|
||||
|
||||
def compose_chat_message(msg, db, settings, relative_times, show_screen_names=False, safe=True):
|
||||
"""
|
||||
Compose an individual chat message for display in a thread.
|
||||
Returns: [Sender, Text, Date]
|
||||
Compose an individual chat message for display.
|
||||
|
||||
Args:
|
||||
msg: Chat message dict or ATProto model
|
||||
db: Session database dict
|
||||
settings: Session settings
|
||||
relative_times: If True, use relative time formatting
|
||||
show_screen_names: If True, show only @handle
|
||||
safe: If True, handle exceptions gracefully
|
||||
|
||||
Returns:
|
||||
List of strings: [Sender, Text, Date]
|
||||
"""
|
||||
def g(obj, key, default=None):
|
||||
if isinstance(obj, dict):
|
||||
@@ -510,20 +384,20 @@ def compose_chat_message(msg, db, settings, relative_times, show_screen_names=Fa
|
||||
|
||||
sender = g(msg, "sender", {})
|
||||
handle = g(sender, "displayName") or g(sender, "display_name") or g(sender, "handle", "unknown")
|
||||
|
||||
|
||||
text = g(msg, "text", "")
|
||||
|
||||
|
||||
# Date
|
||||
sent_at = g(msg, "sentAt") or g(msg, "sent_at")
|
||||
date_str = ""
|
||||
if sent_at:
|
||||
try:
|
||||
import arrow
|
||||
ts = arrow.get(sent_at)
|
||||
if relative_times:
|
||||
date_str = ts.humanize(locale=languageHandler.curLang[:2])
|
||||
else:
|
||||
date_str = ts.format(_("dddd, MMMM D, YYYY H:m"), locale=languageHandler.curLang[:2])
|
||||
except:
|
||||
except Exception:
|
||||
date_str = str(sent_at)[:16]
|
||||
|
||||
|
||||
return [handle, text, date_str]
|
||||
|
||||
Reference in New Issue
Block a user