Improve Bluesky reply/quote accessibility and split test bootstrap workflow

- Fix Bluesky quoted post rendering across list output, screen-reader speech, and View Post by centralizing quote extraction.
- Add robust quote URL extraction (facets/embed/text), include quoted URLs in URL shortcuts, and append full quoted URLs when hidden/truncated.
- Improve reply context handling:
  - add and use `$reply_to` template variable,
  - hydrate missing reply target handles in home/feed items,
  - keep backward compatibility for templates that do not include `$reply_to`.
- Align Bluesky default/fallback post templates to include reply context (`$reply_to`).
- Add/extend focused Bluesky tests for quote text, quote URLs, reply context, and template fallback behavior.
- Refactor scripts:
  - add bootstrap-dev.ps1 for environment setup (submodules, venv, deps),
  - keep run-tests.ps1 focused on running tests only,
  - add PowerShell comment-based help in English.
- Update README with the new bootstrap/test workflow and examples.
This commit is contained in:
Juanjo M
2026-02-15 23:50:00 +00:00
parent abf4cb0df1
commit 6e56d94448
11 changed files with 919 additions and 35 deletions

View File

@@ -71,6 +71,50 @@ Now that you have installed all these packages, you can run TW Blue from source
If necessary, change the first part of the command to reflect the location of your python executable.
### Development bootstrap (Windows / PowerShell)
If you are starting fresh in this repository, run:
```powershell
./scripts/bootstrap-dev.ps1
```
This script initializes submodules, creates `.venv` (unless you opt out), and installs dependencies.
Useful options:
```powershell
# Recreate virtual environment from scratch
./scripts/bootstrap-dev.ps1 -RecreateVenv
# Upgrade pip tooling before installing requirements
./scripts/bootstrap-dev.ps1 -UpgradePip
# Use system Python instead of .venv
./scripts/bootstrap-dev.ps1 -SystemPython
```
### Running tests (Windows / PowerShell)
After bootstrap, run tests with:
```powershell
./scripts/run-tests.ps1
```
Useful options:
```powershell
# Run only Bluesky tests
./scripts/run-tests.ps1 -PytestTargets src/test/sessions/blueski -PytestArgs "-q"
# Run only a specific test file
./scripts/run-tests.ps1 -PytestTargets src/test/sessions/blueski/test_blueski_quotes.py -PytestArgs "-q"
# Use system Python instead of .venv
./scripts/run-tests.ps1 -SystemPython
```
### Generating the documentation
To generate the documentation in html format, navigate to the doc folder inside this repo. After that, run these commands:

176
scripts/bootstrap-dev.ps1 Normal file
View File

@@ -0,0 +1,176 @@
<#
.SYNOPSIS
Bootstraps a local TWBlue development environment on Windows.
.DESCRIPTION
This script initializes git submodules, creates/uses a virtual environment,
and installs Python dependencies from requirements.txt.
It is intended to be run once when starting on the repository (or whenever
you need to rebuild your local environment).
.PARAMETER RecreateVenv
Deletes and recreates the `.venv` folder before installing dependencies.
.PARAMETER UpgradePip
Upgrades `pip`, `setuptools`, and `wheel` before installing requirements.
.PARAMETER SkipSubmodules
Skips `git submodule init` and `git submodule update --recursive`.
.PARAMETER SystemPython
Uses a detected system Python instead of creating/using `.venv`.
.EXAMPLE
./scripts/bootstrap-dev.ps1
.EXAMPLE
./scripts/bootstrap-dev.ps1 -RecreateVenv -UpgradePip
.EXAMPLE
./scripts/bootstrap-dev.ps1 -SystemPython -SkipSubmodules
#>
param(
[switch]$RecreateVenv,
[switch]$UpgradePip,
[switch]$SkipSubmodules,
[switch]$SystemPython
)
$ErrorActionPreference = "Stop"
function Write-Step {
param([string]$Message)
Write-Host "==> $Message" -ForegroundColor Cyan
}
function Resolve-RepoRoot {
param([string]$ScriptDir)
return (Resolve-Path (Join-Path $ScriptDir "..")).Path
}
function Test-PythonCandidate {
param(
[string]$Exe,
[string[]]$BaseArgs = @()
)
try {
& $Exe @BaseArgs -c "import sys; print(sys.version)" | Out-Null
return ($LASTEXITCODE -eq 0)
}
catch {
return $false
}
}
function New-PythonSpec {
param(
[string]$Exe,
[string[]]$BaseArgs = @()
)
return [pscustomobject]@{
Exe = $Exe
BaseArgs = $BaseArgs
}
}
function Get-PythonSpec {
param([string]$RepoRoot, [bool]$UseSystemPython)
if (-not $UseSystemPython) {
$venvPython = Join-Path $RepoRoot ".venv\Scripts\python.exe"
if (Test-Path $venvPython) {
return (New-PythonSpec -Exe $venvPython)
}
}
$candidates = @()
$pythonCmd = Get-Command python -ErrorAction SilentlyContinue
if ($pythonCmd) {
$candidates += ,(New-PythonSpec -Exe "python")
}
$pyCmd = Get-Command py -ErrorAction SilentlyContinue
if ($pyCmd) {
$candidates += ,(New-PythonSpec -Exe "py" -BaseArgs @("-3.10"))
$candidates += ,(New-PythonSpec -Exe "py" -BaseArgs @("-3"))
}
foreach ($candidate in $candidates) {
if (Test-PythonCandidate -Exe $candidate.Exe -BaseArgs $candidate.BaseArgs) {
return $candidate
}
}
throw "Could not find a usable Python. Install Python 3.10+ and try again."
}
function Invoke-Python {
param(
[pscustomobject]$PythonSpec,
[string[]]$Arguments,
[string]$WorkingDirectory
)
Push-Location $WorkingDirectory
try {
& $PythonSpec.Exe @($PythonSpec.BaseArgs + $Arguments)
if ($LASTEXITCODE -ne 0) {
throw "Python command failed with exit code $LASTEXITCODE"
}
}
finally {
Pop-Location
}
}
$scriptDir = Split-Path -Parent $MyInvocation.MyCommand.Path
$repoRoot = Resolve-RepoRoot -ScriptDir $scriptDir
Write-Step "Repository: $repoRoot"
if (-not $SkipSubmodules) {
$gitCmd = Get-Command git -ErrorAction SilentlyContinue
if ($gitCmd) {
Write-Step "Initializing/updating submodules"
Push-Location $repoRoot
try {
& git submodule init | Out-Host
& git submodule update --recursive | Out-Host
}
finally {
Pop-Location
}
}
else {
Write-Warning "Git is not available. Skipping submodule initialization."
}
}
$venvPath = Join-Path $repoRoot ".venv"
if ($RecreateVenv -and (Test-Path $venvPath)) {
Write-Step "Removing existing virtual environment"
Remove-Item -Recurse -Force $venvPath
}
if (-not $SystemPython -and -not (Test-Path (Join-Path $venvPath "Scripts\python.exe"))) {
Write-Step "Creating virtual environment in .venv"
$bootstrapPython = Get-PythonSpec -RepoRoot $repoRoot -UseSystemPython $true
Invoke-Python -PythonSpec $bootstrapPython -Arguments @("-m", "venv", ".venv") -WorkingDirectory $repoRoot
}
$python = Get-PythonSpec -RepoRoot $repoRoot -UseSystemPython $SystemPython.IsPresent
Write-Step "Using Python: $($python.Exe) $($python.BaseArgs -join ' ')"
if ($UpgradePip) {
Write-Step "Upgrading pip/setuptools/wheel"
Invoke-Python -PythonSpec $python -Arguments @("-m", "pip", "install", "--upgrade", "pip", "setuptools", "wheel") -WorkingDirectory $repoRoot
}
Write-Step "Installing dependencies"
Invoke-Python -PythonSpec $python -Arguments @("-m", "pip", "install", "-r", "requirements.txt") -WorkingDirectory $repoRoot
Write-Step "Bootstrap completed"

163
scripts/run-tests.ps1 Normal file
View File

@@ -0,0 +1,163 @@
<#
.SYNOPSIS
Runs TWBlue tests with minimal runtime setup.
.DESCRIPTION
This script only executes pytest. It does not install dependencies,
create virtual environments, or initialize submodules.
Use `./scripts/bootstrap-dev.ps1` first to prepare a development environment.
.PARAMETER PytestTargets
One or more pytest target paths/files. Defaults to `src/test`.
.PARAMETER PytestArgs
Additional pytest arguments. Defaults to `-q`.
.PARAMETER SystemPython
Uses a detected system Python instead of `.venv`.
.EXAMPLE
./scripts/run-tests.ps1
.EXAMPLE
./scripts/run-tests.ps1 -PytestTargets src/test/sessions/blueski -PytestArgs "-q"
.EXAMPLE
./scripts/run-tests.ps1 -SystemPython
#>
param(
[string[]]$PytestTargets = @("src/test"),
[string[]]$PytestArgs = @("-q"),
[switch]$SystemPython
)
$ErrorActionPreference = "Stop"
function Write-Step {
param([string]$Message)
Write-Host "==> $Message" -ForegroundColor Cyan
}
function Resolve-RepoRoot {
param([string]$ScriptDir)
return (Resolve-Path (Join-Path $ScriptDir "..")).Path
}
function Test-PythonCandidate {
param(
[string]$Exe,
[string[]]$BaseArgs = @()
)
try {
& $Exe @BaseArgs -c "import sys; print(sys.version)" | Out-Null
return ($LASTEXITCODE -eq 0)
}
catch {
return $false
}
}
function New-PythonSpec {
param(
[string]$Exe,
[string[]]$BaseArgs = @()
)
return [pscustomobject]@{
Exe = $Exe
BaseArgs = $BaseArgs
}
}
function Get-PythonSpec {
param([string]$RepoRoot, [bool]$UseSystemPython)
if (-not $UseSystemPython) {
$venvPython = Join-Path $RepoRoot ".venv\Scripts\python.exe"
if (Test-Path $venvPython) {
return (New-PythonSpec -Exe $venvPython)
}
}
$candidates = @()
$pythonCmd = Get-Command python -ErrorAction SilentlyContinue
if ($pythonCmd) {
$candidates += ,(New-PythonSpec -Exe "python")
}
$pyCmd = Get-Command py -ErrorAction SilentlyContinue
if ($pyCmd) {
$candidates += ,(New-PythonSpec -Exe "py" -BaseArgs @("-3.10"))
$candidates += ,(New-PythonSpec -Exe "py" -BaseArgs @("-3"))
}
foreach ($candidate in $candidates) {
if (Test-PythonCandidate -Exe $candidate.Exe -BaseArgs $candidate.BaseArgs) {
return $candidate
}
}
throw "Could not find a usable Python. Install Python 3.10+ and try again."
}
function Invoke-Python {
param(
[pscustomobject]$PythonSpec,
[string[]]$Arguments,
[string]$WorkingDirectory
)
Push-Location $WorkingDirectory
try {
& $PythonSpec.Exe @($PythonSpec.BaseArgs + $Arguments)
if ($LASTEXITCODE -ne 0) {
throw "Python command failed with exit code $LASTEXITCODE"
}
}
finally {
Pop-Location
}
}
$scriptDir = Split-Path -Parent $MyInvocation.MyCommand.Path
$repoRoot = Resolve-RepoRoot -ScriptDir $scriptDir
Write-Step "Repository: $repoRoot"
$venvPython = Join-Path $repoRoot ".venv\Scripts\python.exe"
if (-not $SystemPython -and -not (Test-Path $venvPython)) {
throw "No .venv Python found. Run ./scripts/bootstrap-dev.ps1 first, or pass -SystemPython."
}
$python = Get-PythonSpec -RepoRoot $repoRoot -UseSystemPython $SystemPython.IsPresent
Write-Step "Using Python: $($python.Exe) $($python.BaseArgs -join ' ')"
Write-Step "Detecting Python architecture"
$archOutput = & $python.Exe @($python.BaseArgs + @("-c", "import struct; print('x64' if struct.calcsize('P')*8 == 64 else 'x86')"))
if ($LASTEXITCODE -ne 0 -or -not $archOutput) {
throw "Could not determine Python architecture."
}
$arch = ($archOutput | Select-Object -Last 1).Trim()
if ($arch -ne "x86" -and $arch -ne "x64") {
throw "Could not determine Python architecture (result: '$arch')."
}
$vlcPath = Join-Path $repoRoot "windows-dependencies\$arch"
if (-not (Test-Path $vlcPath)) {
throw "Could not find '$vlcPath'. Run ./scripts/bootstrap-dev.ps1 first."
}
$env:PYTHON_VLC_MODULE_PATH = $vlcPath
if (-not ($env:PATH -split ';' | Where-Object { $_ -eq $vlcPath })) {
$env:PATH = "$vlcPath;$env:PATH"
}
Write-Step "PYTHON_VLC_MODULE_PATH=$($env:PYTHON_VLC_MODULE_PATH)"
Write-Step "Running tests"
$pytestCommandArgs = @("-m", "pytest") + $PytestTargets + $PytestArgs
Invoke-Python -PythonSpec $python -Arguments $pytestCommandArgs -WorkingDirectory $repoRoot

View File

@@ -44,7 +44,7 @@ braille_reporting = boolean(default=True)
speech_reporting = boolean(default=True)
[templates]
post = string(default="$display_name, $safe_text $date.")
post = string(default="$display_name, $reply_to$safe_text $date.")
person = string(default="$display_name (@$screen_name). $followers followers, $following following, $posts posts. Joined $created_at.")
notification = string(default="$display_name $text, $date")

View File

@@ -357,7 +357,7 @@ class Handler:
buffer.session.settings["templates"] = {}
templates_cfg = buffer.session.settings.get("templates", {})
template_state = {
"post": templates_cfg.get("post", "$display_name, $safe_text $date."),
"post": templates_cfg.get("post", "$display_name, $reply_to$safe_text $date."),
"person": templates_cfg.get("person", "$display_name (@$screen_name). $followers followers, $following following, $posts posts. Joined $created_at."),
"notification": templates_cfg.get("notification", "$display_name $text, $date"),
}

View File

@@ -8,6 +8,7 @@ import languageHandler
import output
import widgetUtils
from controller import messages as base_messages
from sessions.blueski import utils as bluesky_utils
from wxUI.dialogs.blueski import postDialogs
from extra.autocompletionUsers import completion
@@ -258,6 +259,29 @@ def _extract_post_view_data(session: Any, item: Any) -> dict[str, Any] | None:
author_label = display_name
text = _g(record, "text", "") or ""
reply_to_handle = bluesky_utils.extract_reply_to_handle(item)
if reply_to_handle:
if text:
text = _("Replying to @{handle}: {text}").format(handle=reply_to_handle, text=text)
else:
text = _("Replying to @{handle}").format(handle=reply_to_handle)
quote_info = bluesky_utils.extract_quoted_post_info(item)
if quote_info:
if quote_info["kind"] == "not_found":
text += f" [{_('Quoted post not found')}]"
elif quote_info["kind"] == "blocked":
text += f" [{_('Quoted post blocked')}]"
elif quote_info["kind"] == "feed":
text += f" [{_('Quoting Feed')}: {quote_info.get('feed_name', 'Feed')}]"
else:
q_handle = quote_info.get("handle", "unknown")
q_text = quote_info.get("text", "")
if q_text:
text += " " + _("Quoting @{handle}: {text}").format(handle=q_handle, text=q_text)
else:
text += " " + _("Quoting @{handle}").format(handle=q_handle)
cw_text = _extract_cw_text(post, record)
if cw_text:
text = f"CW: {cw_text}\n\n{text}" if text else f"CW: {cw_text}"

View File

@@ -805,12 +805,12 @@ class BaseBuffer(base.Buffer):
try:
if self.type == "notifications":
template = template_settings.get("notification", "$display_name $text, $date")
post_template = template_settings.get("post", "$display_name, $safe_text $date.")
post_template = template_settings.get("post", "$display_name, $reply_to$safe_text $date.")
return templates.render_notification(item, template, post_template, self.session.settings, relative_times, offset_hours)
if self.type in ("user", "post_user_list"):
template = template_settings.get("person", "$display_name (@$screen_name). $followers followers, $following following, $posts posts. Joined $created_at.")
return templates.render_user(item, template, self.session.settings, relative_times, offset_hours)
template = template_settings.get("post", "$display_name, $safe_text $date.")
template = template_settings.get("post", "$display_name, $reply_to$safe_text $date.")
return templates.render_post(item, template, self.session.settings, relative_times, offset_hours)
except Exception:
# Fallback to compose if any template render fails.
@@ -934,6 +934,85 @@ class BaseBuffer(base.Buffer):
"did": did,
"handle": handle,
}
def _hydrate_reply_handles(self, items):
"""Populate _reply_to_handle on items when reply metadata lacks hydrated author."""
if not items:
return
def g(obj, key, default=None):
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def set_handle(obj, handle):
if not obj or not handle:
return
if isinstance(obj, dict):
obj["_reply_to_handle"] = handle
else:
try:
setattr(obj, "_reply_to_handle", handle)
except Exception:
pass
def get_parent_uri(item):
actual_post = g(item, "post", item)
record = g(actual_post, "record", {}) or {}
reply = g(record, "reply", None)
if not reply:
return None
parent = g(reply, "parent", None) or reply
return g(parent, "uri", None)
unresolved_by_parent_uri = {}
for item in items:
if utils.extract_reply_to_handle(item):
continue
parent_uri = get_parent_uri(item)
if not parent_uri:
continue
unresolved_by_parent_uri.setdefault(parent_uri, []).append(item)
if not unresolved_by_parent_uri:
return
api = self.session._ensure_client()
if not api:
return
uris = list(unresolved_by_parent_uri.keys())
uri_to_handle = {}
chunk_size = 25
for start in range(0, len(uris), chunk_size):
chunk = uris[start:start + chunk_size]
try:
try:
res = api.app.bsky.feed.get_posts({"uris": chunk})
except Exception:
res = api.app.bsky.feed.get_posts(uris=chunk)
posts = list(getattr(res, "posts", None) or [])
for parent_post in posts:
uri = g(parent_post, "uri", None)
author = g(parent_post, "author", None) or {}
handle = g(author, "handle", None)
if uri and handle:
uri_to_handle[uri] = handle
except Exception as e:
log.debug("Could not hydrate reply handles for chunk: %s", e)
if not uri_to_handle:
return
for parent_uri, item_list in unresolved_by_parent_uri.items():
handle = uri_to_handle.get(parent_uri)
if not handle:
continue
for item in item_list:
set_handle(item, handle)
actual_post = g(item, "post", item)
set_handle(actual_post, handle)
def process_items(self, items, play_sound=True, avoid_autoreading=False):
"""
@@ -1016,6 +1095,8 @@ class BaseBuffer(base.Buffer):
if not new_items:
return 0
self._hydrate_reply_handles(new_items)
# Add to DB
# Reverse timeline setting
@@ -1064,6 +1145,7 @@ class BaseBuffer(base.Buffer):
def add_new_item(self, item):
"""Add a single new item from streaming."""
self._hydrate_reply_handles([item])
safe = True
relative_times = self.session.settings["general"].get("relative_times", False)
show_screen_names = self.session.settings["general"].get("show_screen_names", False)

View File

@@ -11,6 +11,7 @@ list controls. They follow the TWBlue compose function pattern:
import logging
import arrow
import languageHandler
from sessions.blueski import utils
log = logging.getLogger("sessions.blueski.compose")
@@ -76,6 +77,13 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa
else:
text = original_text
reply_to_handle = utils.extract_reply_to_handle(post)
if reply_to_handle:
if text:
text = _("Replying to @{}: {}").format(reply_to_handle, text)
else:
text = _("Replying to @{}").format(reply_to_handle)
# Check facets for links not visible in text and append them
facets = g(record, "facets", []) or []
hidden_urls = []
@@ -117,7 +125,7 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa
if cw_text:
text = f"CW: {cw_text}\n\n{text}"
# Embeds (Images, Quotes, Links)
# Embeds (Images, Links)
embed = g(actual_post, "embed", None)
if embed:
etype = g(embed, "$type") or g(embed, "py_type")
@@ -128,12 +136,7 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa
if images:
text += f" [{len(images)} {_('images')}]"
# Quote posts
quote_rec = None
if etype and ("recordWithMedia" in etype):
rec_embed = g(embed, "record", {})
if rec_embed:
quote_rec = g(rec_embed, "record", None) or rec_embed
media = g(embed, "media", {})
mtype = g(media, "$type") or g(media, "py_type")
if mtype and "images" in mtype:
@@ -145,37 +148,33 @@ def compose_post(post, db, settings, relative_times, show_screen_names=False, sa
title = g(ext, "title", "")
if title:
text += f" [{_('Link')}: {title}]"
elif etype and ("record" in etype):
quote_rec = g(embed, "record", {})
if isinstance(quote_rec, dict):
quote_rec = quote_rec.get("record") or quote_rec
if quote_rec:
qtype = g(quote_rec, "$type") or g(quote_rec, "py_type")
if qtype and "viewNotFound" in qtype:
text += f" [{_('Quoted post not found')}]"
elif qtype and "viewBlocked" in qtype:
text += f" [{_('Quoted post blocked')}]"
elif qtype and "generatorView" in qtype:
gen = g(quote_rec, "displayName", "Feed")
text += f" [{_('Quoting Feed')}: {gen}]"
else:
q_author = g(quote_rec, "author", {})
q_handle = g(q_author, "handle", "unknown")
q_val = g(quote_rec, "value", {})
q_text = g(q_val, "text", "")
if q_text:
text += " " + _("Quoting @{}: {}").format(q_handle, q_text)
else:
text += " " + _("Quoting @{}").format(q_handle)
elif etype and ("external" in etype):
ext = g(embed, "external", {})
title = g(ext, "title", "")
if title:
text += f" [{_('Link')}: {title}]"
quote_info = utils.extract_quoted_post_info(post)
if quote_info:
if quote_info["kind"] == "not_found":
text += f" [{_('Quoted post not found')}]"
elif quote_info["kind"] == "blocked":
text += f" [{_('Quoted post blocked')}]"
elif quote_info["kind"] == "feed":
text += f" [{_('Quoting Feed')}: {quote_info.get('feed_name', 'Feed')}]"
else:
q_handle = quote_info.get("handle", "unknown")
q_text = quote_info.get("text", "")
if q_text:
text += " " + _("Quoting @{}: {}").format(q_handle, q_text)
else:
text += " " + _("Quoting @{}").format(q_handle)
# Add full URLs from quoted content when they are not visible in text.
for uri in quote_info.get("urls", []):
if uri and uri not in text:
text += f" [{uri}]"
# Date
indexed_at = g(actual_post, "indexed_at", "") or g(actual_post, "indexedAt", "")
ts_str = ""

View File

@@ -2,12 +2,14 @@
import arrow
import languageHandler
from string import Template
from sessions.blueski import utils
post_variables = [
"date",
"display_name",
"screen_name",
"reply_to",
"source",
"lang",
"safe_text",
@@ -146,11 +148,32 @@ def render_post(post, template, settings, relative_times=False, offset_hours=0):
original_handle = _g(author, "handle", "")
text = _("Reposted from @{handle}: {text}").format(handle=original_handle, text=text)
quote_info = utils.extract_quoted_post_info(post)
if quote_info:
if quote_info["kind"] == "not_found":
text += f" [{_('Quoted post not found')}]"
elif quote_info["kind"] == "blocked":
text += f" [{_('Quoted post blocked')}]"
elif quote_info["kind"] == "feed":
text += f" [{_('Quoting Feed')}: {quote_info.get('feed_name', 'Feed')}]"
else:
q_handle = quote_info.get("handle", "unknown")
q_text = quote_info.get("text", "")
if q_text:
text += " " + _("Quoting @{handle}: {text}").format(handle=q_handle, text=q_text)
else:
text += " " + _("Quoting @{handle}").format(handle=q_handle)
# Add link indicator for external embeds
link_title = _extract_link_info(actual_post, record)
if link_title:
text += f" [{_('Link')}: {link_title}]"
reply_to_handle = utils.extract_reply_to_handle(post)
reply_to = ""
if reply_to_handle:
reply_to = _("Replying to @{handle}. ").format(handle=reply_to_handle)
cw_text = _extract_cw_text(actual_post, record)
safe_text = text
if cw_text:
@@ -160,6 +183,13 @@ def render_post(post, template, settings, relative_times=False, offset_hours=0):
else:
safe_text = _("Content warning: {cw}").format(cw=cw_text)
# Backward compatibility: older user templates may not include $reply_to.
# In that case, prepend the reply marker directly so users still get context.
if reply_to and "$reply_to" not in template:
text = reply_to + text
safe_text = reply_to + safe_text
reply_to = ""
created_at = _g(record, "createdAt") or _g(record, "created_at")
indexed_at = _g(actual_post, "indexedAt") or _g(actual_post, "indexed_at")
date_field = created_at or indexed_at
@@ -174,6 +204,7 @@ def render_post(post, template, settings, relative_times=False, offset_hours=0):
date=date,
display_name=display_name,
screen_name=screen_name,
reply_to=reply_to,
source="Bluesky",
lang=lang,
safe_text=safe_text,

View File

@@ -265,6 +265,13 @@ def find_urls(post):
if u not in urls:
urls.append(u)
# Include URLs from quoted post, if present.
quote_info = extract_quoted_post_info(post)
if quote_info and quote_info.get("kind") == "post":
for uri in quote_info.get("urls", []):
if uri and uri not in urls:
urls.append(uri)
return urls
@@ -289,3 +296,149 @@ def find_item(item, items_list):
return i
return None
def _resolve_quoted_record_from_embed(embed):
"""Resolve quoted record payload from a Bluesky embed structure."""
if not embed:
return None
etype = (g(embed, "$type") or g(embed, "py_type") or "").lower()
candidate = None
if "recordwithmedia" in etype:
record_view = g(embed, "record")
candidate = g(record_view, "record") or record_view
elif "record" in etype:
candidate = g(embed, "record") or embed
else:
record_view = g(embed, "record")
if record_view is not None:
candidate = g(record_view, "record") or record_view
if not candidate:
return None
# Unwrap one extra layer if still wrapped in a record-view container.
nested = g(candidate, "record")
nested_type = (g(nested, "$type") or g(nested, "py_type") or "").lower() if nested else ""
if nested and ("view" in nested_type or "record" in nested_type):
return nested
return candidate
def extract_reply_to_handle(post):
"""
Best-effort extraction of the replied-to handle for a Bluesky post.
Returns:
str | None: Handle (without @) when available.
"""
actual_post = g(post, "post", post)
# Fast path: pre-hydrated by buffers/session.
cached = g(post, "_reply_to_handle", None) or g(actual_post, "_reply_to_handle", None)
if cached:
return cached
# Feed views frequently include hydrated reply context.
reply_view = g(post, "reply", None) or g(actual_post, "reply", None)
if reply_view:
parent = g(reply_view, "parent", None) or g(reply_view, "post", None) or reply_view
parent_post = g(parent, "post", None) or parent
parent_author = g(parent_post, "author", None) or g(parent, "author", None)
handle = g(parent_author, "handle", None)
if handle:
return handle
# Some payloads include parent author directly under record.reply.parent.
record = g(actual_post, "record", {}) or {}
record_reply = g(record, "reply", None)
if record_reply:
parent = g(record_reply, "parent", None) or record_reply
parent_post = g(parent, "post", None) or parent
parent_author = g(parent_post, "author", None) or g(parent, "author", None)
handle = g(parent_author, "handle", None)
if handle:
return handle
# When only record.reply is available, we generally only have strong refs.
# No handle can be resolved here without extra API calls.
return None
def extract_quoted_post_info(post):
"""
Extract quoted content metadata from a Bluesky post.
Returns:
dict | None: one of:
- {"kind": "not_found"}
- {"kind": "blocked"}
- {"kind": "feed", "feed_name": "..."}
- {"kind": "post", "handle": "...", "text": "...", "urls": ["..."]}
"""
actual_post = g(post, "post", post)
record = g(actual_post, "record", {}) or {}
embed = g(actual_post, "embed", None) or g(record, "embed", None)
quote_rec = _resolve_quoted_record_from_embed(embed)
if not quote_rec:
return None
qtype = (g(quote_rec, "$type") or g(quote_rec, "py_type") or "").lower()
if "viewnotfound" in qtype:
return {"kind": "not_found"}
if "viewblocked" in qtype:
return {"kind": "blocked"}
if "generatorview" in qtype:
return {"kind": "feed", "feed_name": g(quote_rec, "displayName", "Feed")}
q_author = g(quote_rec, "author", {}) or {}
q_handle = g(q_author, "handle", "unknown") or "unknown"
q_value = g(quote_rec, "value") or g(quote_rec, "record") or {}
q_text = g(q_value, "text", "") or g(quote_rec, "text", "")
if not q_text:
nested_value = g(q_value, "value") or {}
q_text = g(nested_value, "text", "")
q_urls = []
q_facets = g(q_value, "facets", []) or []
for facet in q_facets:
features = g(facet, "features", []) or []
for feature in features:
ftype = (g(feature, "$type") or g(feature, "py_type") or "").lower()
if "link" in ftype:
uri = g(feature, "uri", "")
if uri and uri not in q_urls:
q_urls.append(uri)
q_embed = g(quote_rec, "embed", None) or g(q_value, "embed", None)
if q_embed:
q_etype = (g(q_embed, "$type") or g(q_embed, "py_type") or "").lower()
if "external" in q_etype:
ext = g(q_embed, "external", {})
uri = g(ext, "uri", "")
if uri and uri not in q_urls:
q_urls.append(uri)
if "recordwithmedia" in q_etype:
media = g(q_embed, "media", {})
mtype = (g(media, "$type") or g(media, "py_type") or "").lower()
if "external" in mtype:
ext = g(media, "external", {})
uri = g(ext, "uri", "")
if uri and uri not in q_urls:
q_urls.append(uri)
for uri in url_re.findall(q_text or ""):
if uri not in q_urls:
q_urls.append(uri)
return {
"kind": "post",
"handle": q_handle,
"text": q_text or "",
"urls": q_urls,
}

View File

@@ -0,0 +1,212 @@
# -*- coding: utf-8 -*-
import builtins
import unittest
from sessions.blueski import compose, templates, utils
class TestBlueskyQuotedPosts(unittest.TestCase):
def setUp(self):
if not hasattr(builtins, "_"):
builtins._ = lambda s: s
def _build_quoted_post(self):
return {
"post": {
"author": {"handle": "alice.bsky.social", "displayName": "Alice"},
"record": {"text": "Main post text"},
"embed": {
"$type": "app.bsky.embed.recordWithMedia#view",
"record": {
"$type": "app.bsky.embed.record#view",
"record": {
"$type": "app.bsky.embed.record#viewRecord",
"author": {"handle": "bob.bsky.social"},
"value": {"text": "Quoted post text"},
},
},
"media": {
"$type": "app.bsky.embed.images#view",
"images": [],
},
},
"indexedAt": "2026-02-15T10:00:00Z",
}
}
def _with_reply_context(self, item, reply_to_handle="carol.bsky.social"):
if isinstance(item, dict):
item["reply"] = {
"parent": {
"author": {
"handle": reply_to_handle,
}
}
}
return item
def test_extract_quoted_post_info_with_text(self):
item = self._build_quoted_post()
info = utils.extract_quoted_post_info(item)
self.assertIsNotNone(info)
self.assertEqual(info["kind"], "post")
self.assertEqual(info["handle"], "bob.bsky.social")
self.assertEqual(info["text"], "Quoted post text")
def test_compose_post_includes_quoted_text(self):
item = self._build_quoted_post()
result = compose.compose_post(
item,
db={},
settings={"general": {}},
relative_times=True,
show_screen_names=False,
safe=True,
)
self.assertIn("Quoting @bob.bsky.social: Quoted post text", result[1])
def test_template_render_post_includes_quoted_text(self):
item = self._build_quoted_post()
rendered = templates.render_post(
item,
template="$display_name, $safe_text $date.",
settings={"general": {}},
relative_times=True,
offset_hours=0,
)
self.assertIn("Quoting @bob.bsky.social: Quoted post text", rendered)
def test_extract_quoted_post_info_includes_urls(self):
item = self._build_quoted_post()
quoted = item["post"]["embed"]["record"]["record"]
quoted["value"]["facets"] = [
{
"features": [
{
"$type": "app.bsky.richtext.facet#link",
"uri": "https://example.com/full-link",
}
]
}
]
info = utils.extract_quoted_post_info(item)
self.assertIn("https://example.com/full-link", info["urls"])
def test_find_urls_includes_quoted_urls(self):
item = self._build_quoted_post()
quoted = item["post"]["embed"]["record"]["record"]
quoted["value"]["facets"] = [
{
"features": [
{
"$type": "app.bsky.richtext.facet#link",
"uri": "https://example.com/quoted-only",
}
]
}
]
urls = utils.find_urls(item)
self.assertIn("https://example.com/quoted-only", urls)
def test_compose_post_appends_full_quoted_url(self):
item = self._build_quoted_post()
quoted = item["post"]["embed"]["record"]["record"]
quoted["value"]["text"] = "Mira example.com/..."
quoted["value"]["facets"] = [
{
"features": [
{
"$type": "app.bsky.richtext.facet#link",
"uri": "https://example.com/full-target",
}
]
}
]
result = compose.compose_post(
item,
db={},
settings={"general": {}},
relative_times=True,
show_screen_names=False,
safe=True,
)
self.assertIn("[https://example.com/full-target]", result[1])
def test_extract_reply_to_handle_from_record_parent_author(self):
item = {
"post": {
"record": {
"text": "Reply text",
"reply": {
"parent": {
"uri": "at://did:plc:parent/app.bsky.feed.post/abc",
"author": {"handle": "parent.user"},
}
},
}
}
}
handle = utils.extract_reply_to_handle(item)
self.assertEqual(handle, "parent.user")
def test_template_render_post_reply_fallback_without_reply_to_variable(self):
item = {
"post": {
"author": {"handle": "alice.bsky.social", "displayName": "Alice"},
"record": {
"text": "Reply body",
"reply": {
"parent": {
"uri": "at://did:plc:parent/app.bsky.feed.post/abc",
"author": {"handle": "parent.user"},
}
},
},
"indexedAt": "2026-02-15T10:00:00Z",
}
}
rendered = templates.render_post(
item,
template="$display_name, $safe_text $date.",
settings={"general": {}},
relative_times=True,
offset_hours=0,
)
self.assertIn("Replying to @parent.user.", rendered)
def test_extract_reply_to_handle(self):
item = self._with_reply_context(self._build_quoted_post())
handle = utils.extract_reply_to_handle(item)
self.assertEqual(handle, "carol.bsky.social")
def test_compose_post_includes_reply_context(self):
item = self._with_reply_context(self._build_quoted_post())
result = compose.compose_post(
item,
db={},
settings={"general": {}},
relative_times=True,
show_screen_names=False,
safe=True,
)
self.assertIn("Replying to @carol.bsky.social:", result[1])
def test_template_render_post_exposes_reply_to_variable(self):
item = self._with_reply_context(self._build_quoted_post())
rendered = templates.render_post(
item,
template="$reply_to$text",
settings={"general": {}},
relative_times=True,
offset_hours=0,
)
self.assertIn("Replying to @carol.bsky.social.", rendered)
if __name__ == "__main__":
unittest.main()