This commit is contained in:
Jesús Pavón Abián
2026-01-11 20:13:56 +01:00
parent 9d9d86160d
commit 932e44a9c9
391 changed files with 120828 additions and 1090 deletions

View File

@@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
import widgetUtils
from wxUI.dialogs.mastodon.filters import create_filter as dialog
from mastodon import MastodonAPIError
class CreateFilterController(object):
def __init__(self, session, filter_data=None):
super(CreateFilterController, self).__init__()
self.session = session
self.filter_data = filter_data
self.dialog = dialog.CreateFilterDialog(parent=None)
if self.filter_data is not None:
self.keywords = self.filter_data.get("keywords")
self.load_filter_data()
else:
self.keywords = []
widgetUtils.connect_event(self.dialog.keyword_panel.add_button, widgetUtils.BUTTON_PRESSED, self.on_add_keyword)
widgetUtils.connect_event(self.dialog.keyword_panel.remove_button, widgetUtils.BUTTON_PRESSED, self.on_remove_keyword)
def on_add_keyword(self, event):
""" Adds a keyword to the list. """
keyword = self.dialog.keyword_panel.keyword_text.GetValue().strip()
whole_word = self.dialog.keyword_panel.whole_word_checkbox.GetValue()
if keyword:
for idx, kw in enumerate(self.keywords):
if kw['keyword'] == keyword:
return
keyword_data = {
'keyword': keyword,
'whole_word': whole_word
}
self.keywords.append(keyword_data)
self.dialog.keyword_panel.add_keyword(keyword, whole_word)
def on_remove_keyword(self, event):
removed = self.dialog.keyword_panel.remove_keyword()
if removed is not None:
self.keywords.pop(removed)
def get_expires_in_seconds(self, selection, value):
if selection == 0:
return None
if selection == 1:
return value * 3600
elif selection == 2:
return value * 86400
elif selection == 3:
return value * 604800
elif selection == 4:
return value * 2592000
return None
def set_expires_in(self, seconds):
if seconds is None:
self.dialog.expiration_choice.SetSelection(0)
self.dialog.expiration_value.Enable(False)
return
if seconds % 2592000 == 0 and seconds >= 2592000:
self.dialog.expiration_choice.SetSelection(4)
self.dialog.expiration_value.SetValue(seconds // 2592000)
elif seconds % 604800 == 0 and seconds >= 604800:
self.dialog.expiration_choice.SetSelection(3)
self.dialog.expiration_value.SetValue(seconds // 604800)
elif seconds % 86400 == 0 and seconds >= 86400:
self.dialog.expiration_choice.SetSelection(2)
self.dialog.expiration_value.SetValue(seconds // 86400)
else:
self.dialog.expiration_choice.SetSelection(1)
self.dialog.expiration_value.SetValue(max(1, seconds // 3600))
self.dialog.expiration_value.Enable(True)
def load_filter_data(self):
if 'title' in self.filter_data:
self.dialog.name_ctrl.SetValue(self.filter_data['title'])
self.dialog.SetTitle(_("Update Filter: {}").format(self.filter_data['title']))
if 'context' in self.filter_data:
for context in self.filter_data['context']:
if context in self.dialog.context_checkboxes:
self.dialog.context_checkboxes[context].SetValue(True)
if 'filter_action' in self.filter_data:
action_index = self.dialog.actions.index(self.filter_data['filter_action']) if self.filter_data['filter_action'] in self.dialog.actions else 0
self.dialog.action_choice.SetSelection(action_index)
if 'expires_in' in self.filter_data:
self.set_expires_in(self.filter_data['expires_in'])
print(self.filter_data)
if 'keywords' in self.filter_data:
self.keywords = self.filter_data['keywords']
self.dialog.keyword_panel.set_keywords(self.filter_data['keywords'])
def get_filter_data(self):
filter_data = {
'title': self.dialog.name_ctrl.GetValue(),
'context': [],
'filter_action': self.dialog.actions[self.dialog.action_choice.GetSelection()],
'expires_in': self.get_expires_in_seconds(selection=self.dialog.expiration_choice.GetSelection(), value=self.dialog.expiration_value.GetValue()),
'keywords_attributes': self.keywords
}
for context, checkbox in self.dialog.context_checkboxes.items():
if checkbox.GetValue():
filter_data['context'].append(context)
return filter_data
def get_response(self):
response = self.dialog.ShowModal()
if response == widgetUtils.OK:
filter_data = self.get_filter_data()
if self.filter_data == None:
result = self.session.api.create_filter_v2(**filter_data)
else:
result = self.session.api.update_filter_v2(filter_id=self.filter_data['id'], **filter_data)
return result
return None

View File

@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
import datetime
import wx
import widgetUtils
from wxUI import commonMessageDialogs
from wxUI.dialogs.mastodon.filters import manage_filters as dialog
from . import create_filter
from mastodon import MastodonError
class ManageFiltersController(object):
def __init__(self, session):
super(ManageFiltersController, self).__init__()
self.session = session
self.selected_filter_idx = -1
self.error_loading = False
self.dialog = dialog.ManageFiltersDialog(parent=None)
self.dialog.filter_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_filter_selected)
self.dialog.filter_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self.on_filter_deselected)
widgetUtils.connect_event(self.dialog.add_button, wx.EVT_BUTTON, self.on_add_filter)
widgetUtils.connect_event(self.dialog.edit_button, wx.EVT_BUTTON, self.on_edit_filter)
widgetUtils.connect_event(self.dialog.remove_button, wx.EVT_BUTTON, self.on_remove_filter)
self.load_filter_data()
def on_filter_selected(self, event):
"""Handle filter selection event."""
self.selected_filter_idx = event.GetIndex()
self.dialog.edit_button.Enable()
self.dialog.remove_button.Enable()
def on_filter_deselected(self, event):
"""Handle filter deselection event."""
self.selected_filter_idx = -1
self.dialog.edit_button.Disable()
self.dialog.remove_button.Disable()
def get_selected_filter_id(self):
"""Get the ID of the currently selected filter."""
if self.selected_filter_idx != -1:
return self.dialog.filter_list.GetItemData(self.selected_filter_idx)
return None
def load_filter_data(self):
try:
filters = self.session.api.filters_v2()
self.dialog.filter_list.DeleteAllItems()
self.on_filter_deselected(None)
for i, filter_obj in enumerate(filters):
index = self.dialog.filter_list.InsertItem(i, filter_obj.title)
keyword_count = len(filter_obj.keywords)
self.dialog.filter_list.SetItem(index, 1, str(keyword_count))
contexts = ", ".join(filter_obj.context)
self.dialog.filter_list.SetItem(index, 2, contexts)
self.dialog.filter_list.SetItem(index, 3, filter_obj.filter_action)
if filter_obj.expires_at:
expiry_str = filter_obj.expires_at.strftime("%Y-%m-%d %H:%M")
else:
expiry_str = _("Never")
self.dialog.filter_list.SetItem(index, 4, expiry_str)
self.dialog.filter_list.SetItemData(index, int(filter_obj.id) if isinstance(filter_obj.id, (int, str)) else 0)
except MastodonError as e:
commonMessageDialogs.error_loading_filters()
self.error_loading = True
def on_add_filter(self, *args, **kwargs):
filterController = create_filter.CreateFilterController(self.session)
try:
filter = filterController.get_response()
self.load_filter_data()
except MastodonError as error:
commonMessageDialogs.error_adding_filter()
return self.on_add_filter()
def on_edit_filter(self, *args, **kwargs):
filter_id = self.get_selected_filter_id()
if filter_id == None:
return
try:
filter_data = self.session.api.filter_v2(filter_id)
filterController = create_filter.CreateFilterController(self.session, filter_data=filter_data)
filterController.get_response()
self.load_filter_data()
except MastodonError as error:
commonMessageDialogs.error_adding_filter()
def on_remove_filter(self, *args, **kwargs):
filter_id = self.get_selected_filter_id()
if filter_id == None:
return
dlg = commonMessageDialogs.remove_filter()
if dlg == widgetUtils.NO:
return
try:
self.session.api.delete_filter_v2(filter_id)
self.load_filter_data()
except MastodonError as error:
commonMessageDialogs.error_removing_filter()
def get_response(self):
return self.dialog.ShowModal() == wx.ID_OK

View File

@@ -0,0 +1,422 @@
# -*- coding: utf-8 -*-
import wx
import logging
import mastodon
import output
from mastodon import MastodonError
from pubsub import pub
from mysc import restart
from mysc.thread_utils import call_threaded
from wxUI.dialogs.mastodon import search as search_dialogs
from wxUI.dialogs.mastodon import dialogs
from wxUI.dialogs import userAliasDialogs
from wxUI import commonMessageDialogs
from wxUI.dialogs.mastodon import updateProfile as update_profile_dialogs
from wxUI.dialogs.mastodon import showUserProfile, communityTimeline
from sessions.mastodon.utils import html_filter
from . import userActions, settings
from .filters import create_filter, manage_filters
log = logging.getLogger("controller.mastodon.handler")
class Handler(object):
def __init__(self):
super(Handler, self).__init__()
# Structure to hold names for menu bar items.
# empty names mean the item will be Disabled.
self.menus = dict(
# In application menu.
updateProfile=_("Update Profile"),
menuitem_search=_("&Search"),
lists=None,
manageAliases=_("Manage user aliases"),
# In item menu.
compose=_("&Post"),
reply=_("Re&ply"),
share=_("&Boost"),
fav=_("&Add to favorites"),
unfav=_("Remove from favorites"),
view=_("&Show post"),
view_conversation=_("View conversa&tion"),
ocr=_("Read text in picture"),
delete=_("&Delete"),
# In user menu.
follow=_("&Actions..."),
timeline=_("&View timeline..."),
dm=_("Direct me&ssage"),
addAlias=_("Add a&lias"),
addToList=None,
removeFromList=None,
details=_("S&how user profile"),
favs=None,
# In buffer Menu.
community_timeline =_("Create c&ommunity timeline"),
filter=_("Create a &filter"),
manage_filters=_("&Manage filters")
)
# Name for the "tweet" menu in the menu bar.
self.item_menu = _("&Post")
def create_buffers(self, session, createAccounts=True, controller=None):
session.get_user_info()
name = session.get_name()
controller.accounts.append(name)
if createAccounts == True:
pub.sendMessage("core.create_account", name=name, session_id=session.session_id, logged=True)
root_position =controller.view.search(name, name)
for i in session.settings['general']['buffer_order']:
if i == 'home':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Home"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="timeline_home", name="home_timeline", sessionObject=session, account=name, sound="tweet_received.ogg"))
elif i == 'local':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Local"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="timeline_local", name="local_timeline", sessionObject=session, account=name, sound="tweet_received.ogg"))
elif i == 'federated':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Federated"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="timeline_public", name="federated_timeline", sessionObject=session, account=name, sound="tweet_received.ogg"))
elif i == 'mentions':
pub.sendMessage("createBuffer", buffer_type="MentionsBuffer", session_type=session.type, buffer_title=_("Mentions"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="notifications", name="mentions", sessionObject=session, account=name, sound="mention_received.ogg"))
elif i == 'direct_messages':
pub.sendMessage("createBuffer", buffer_type="ConversationListBuffer", session_type=session.type, buffer_title=_("Direct messages"), parent_tab=root_position, start=False, kwargs=dict(compose_func="compose_conversation", parent=controller.view.nb, function="conversations", name="direct_messages", sessionObject=session, account=name, sound="dm_received.ogg"))
elif i == 'sent':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Sent"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="account_statuses", name="sent", sessionObject=session, account=name, sound="tweet_received.ogg", id=session.db["user_id"]))
elif i == 'favorites':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Favorites"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="favourites", name="favorites", sessionObject=session, account=name, sound="favourite.ogg"))
elif i == 'bookmarks':
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Bookmarks"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, function="bookmarks", name="bookmarks", sessionObject=session, account=name, sound="favourite.ogg"))
elif i == 'followers':
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Followers"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_followers", name="followers", sessionObject=session, account=name, sound="update_followers.ogg", id=session.db["user_id"]))
elif i == 'following':
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Following"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_following", name="following", sessionObject=session, account=name, sound="update_followers.ogg", id=session.db["user_id"]))
elif i == 'muted':
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Muted users"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="mutes", name="muted", sessionObject=session, account=name))
elif i == 'blocked':
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Blocked users"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="blocks", name="blocked", sessionObject=session, account=name))
elif i == 'notifications':
pub.sendMessage("createBuffer", buffer_type="NotificationsBuffer", session_type=session.type, buffer_title=_("Notifications"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_notification", function="notifications", name="notifications", sessionObject=session, account=name))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="timelines", account=name))
timelines_position =controller.view.search("timelines", name)
for i in session.settings["other_buffers"]["timelines"]:
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Timeline for {}").format(i), parent_tab=timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="account_statuses", name="{}-timeline".format(i), sessionObject=session, account=name, sound="tweet_timeline.ogg", id=i))
for i in session.settings["other_buffers"]["followers_timelines"]:
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Followers for {}").format(i), parent_tab=timelines_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_followers", name="{}-followers".format(i,), sessionObject=session, account=name, sound="new_event.ogg", id=i))
for i in session.settings["other_buffers"]["following_timelines"]:
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Following for {}").format(i), parent_tab=timelines_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_following", name="{}-following".format(i,), sessionObject=session, account=name, sound="new_event.ogg", id=i))
# pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Lists"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="lists", name))
# lists_position =controller.view.search("lists", session.db["user_name"])
# for i in session.settings["other_buffers"]["lists"]:
# pub.sendMessage("createBuffer", buffer_type="ListBuffer", session_type=session.type, buffer_title=_(u"List for {}").format(i), parent_tab=lists_position, start=False, kwargs=dict(parent=controller.view.nb, function="list_timeline", name="%s-list" % (i,), sessionObject=session, name, bufferType=None, sound="list_tweet.ogg", list_id=utils.find_list(i, session.db["lists"]), include_ext_alt_text=True, tweet_mode="extended"))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Searches"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="searches", account=name))
searches_position =controller.view.search("searches", name)
for term in session.settings["other_buffers"]["post_searches"]:
pub.sendMessage("createBuffer", buffer_type="SearchBuffer", session_type=session.type, buffer_title=_("Search for {}").format(term), parent_tab=searches_position, start=True, kwargs=dict(parent=controller.view.nb, compose_func="compose_post", function="search", name="%s-searchterm" % (term,), sessionObject=session, account=session.get_name(), sound="search_updated.ogg", q=term, result_type="statuses"))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Communities"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="communities", account=name))
communities_position =controller.view.search("communities", name)
for community in session.settings["other_buffers"]["communities"]:
bufftype = _("Local") if community.split("@")[0] == "local" else _("federated")
community_name = community.split("@")[1].replace("https://", "")
title = _(f"{bufftype} timeline for {community_name}")
pub.sendMessage("createBuffer", buffer_type="CommunityBuffer", session_type=session.type, buffer_title=title, parent_tab=communities_position, start=True, kwargs=dict(parent=controller.view.nb, function="timeline", compose_func="compose_post", name=community, sessionObject=session, community_url=community.split("@")[1], account=session.get_name(), sound="search_updated.ogg", timeline=community.split("@")[0]))
# for i in session.settings["other_buffers"]["trending_topic_buffers"]:
# pub.sendMessage("createBuffer", buffer_type="TrendsBuffer", session_type=session.type, buffer_title=_("Trending topics for %s") % (i), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="%s_tt" % (i,), sessionObject=session, name, trendsFor=i, sound="trends_updated.ogg"))
def start_buffer(self, controller, buffer):
if hasattr(buffer, "finished_timeline") and buffer.finished_timeline == False:
change_title = True
else:
change_title = False
try:
buffer.start_stream(play_sound=False)
except Exception as err:
log.exception("Error %s starting buffer %s on account %s, with args %r and kwargs %r." % (str(err), buffer.name, buffer.account, buffer.args, buffer.kwargs))
if change_title:
pub.sendMessage("buffer-title-changed", buffer=buffer)
def open_conversation(self, controller, buffer):
# detect if we are in a community buffer.
# Community buffers are special because we'll need to retrieve the object locally at first.
if hasattr(buffer, "community_url"):
post = buffer.get_item_from_instance()
else:
post = buffer.get_item()
if post.reblog != None:
post = post.reblog
conversations_position =controller.view.search("direct_messages", buffer.session.get_name())
pub.sendMessage("createBuffer", buffer_type="ConversationBuffer", session_type=buffer.session.type, buffer_title=_("Conversation with {0}").format(post.account.acct), parent_tab=conversations_position, start=True, kwargs=dict(parent=controller.view.nb, function="status_context", name="%s-conversation" % (post.id,), sessionObject=buffer.session, account=buffer.session.get_name(), sound="search_updated.ogg", post=post, id=post.id))
def follow(self, buffer):
if not hasattr(buffer, "get_item"):
return
# Community buffers are special because we'll need to retrieve the object locally at first.
if hasattr(buffer, "community_url"):
item = buffer.get_item_from_instance()
else:
item = buffer.get_item()
if buffer.type == "user":
users = [item.acct]
elif buffer.type == "baseBuffer":
if item.reblog != None:
users = [user.acct for user in item.reblog.mentions if user.id != buffer.session.db["user_id"]]
if item.reblog.account.acct not in users and item.account.id != buffer.session.db["user_id"]:
users.insert(0, item.reblog.account.acct)
else:
users = [user.acct for user in item.mentions if user.id != buffer.session.db["user_id"]]
if item.account.acct not in users:
users.insert(0, item.account.acct)
elif buffer.type == "notificationsBuffer":
if buffer.is_post():
status = item.status
if status.reblog != None:
users = [user.acct for user in status.reblog.mentions if user.id != buffer.session.db["user_id"]]
if status.reblog.account.acct not in users and status.account.id != buffer.session.db["user_id"]:
users.insert(0, status.reblog.account.acct)
else:
users = [user.acct for user in status.mentions if user.id != buffer.session.db["user_id"]]
if hasattr(item, "account"):
acct = item.account.acct
else:
acct = item.acct
if acct not in users:
users.insert(0, item.account.acct)
u = userActions.userActions(buffer.session, users)
def search(self, controller, session, value):
log.debug("Creating a new search...")
dlg = search_dialogs.searchDialog(value)
if dlg.ShowModal() == wx.ID_OK and dlg.term.GetValue() != "":
term = dlg.term.GetValue()
searches_position =controller.view.search("searches", session.get_name())
if dlg.posts.GetValue() == True:
if term not in session.settings["other_buffers"]["post_searches"]:
session.settings["other_buffers"]["post_searches"].append(term)
session.settings.write()
pub.sendMessage("createBuffer", buffer_type="SearchBuffer", session_type=session.type, buffer_title=_("Search for {}").format(term), parent_tab=searches_position, start=True, kwargs=dict(parent=controller.view.nb, compose_func="compose_post", function="search", name="%s-searchterm" % (term,), sessionObject=session, account=session.get_name(), sound="search_updated.ogg", q=term, result_type="statuses"))
else:
log.error("A buffer for the %s search term is already created. You can't create a duplicate buffer." % (term,))
return
elif dlg.users.GetValue() == True:
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Search for {}").format(term), parent_tab=searches_position, start=True, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_search", name="%s-searchUser" % (term,), sessionObject=session, account=session.get_name(), sound="search_updated.ogg", q=term))
dlg.Destroy()
# ToDo: explore how to play sound & save config differently.
# currently, TWBlue will play the sound and save the config for the timeline even if the buffer did not load or something else.
def open_timeline(self, controller, buffer):
if not hasattr(buffer, "get_item"):
return
if hasattr(buffer, "community_url"):
item = buffer.get_item_from_instance()
else:
item = buffer.get_item()
if buffer.type == "user":
users = [item.acct]
elif buffer.type == "baseBuffer":
if item.reblog != None:
users = [user.acct for user in item.reblog.mentions if user.id != buffer.session.db["user_id"]]
if item.reblog.account.acct not in users and item.account.id != buffer.session.db["user_id"]:
users.insert(0, item.reblog.account.acct)
else:
users = [user.acct for user in item.mentions if user.id != buffer.session.db["user_id"]]
if item.account.acct not in users and item.account.id != buffer.session.db["user_id"]:
users.insert(0, item.account.acct)
u = userActions.UserTimeline(buffer.session, users)
if u.dialog.ShowModal() == wx.ID_OK:
action = u.process_action()
if action == None:
return
user = u.user
if action == "posts":
self.openPostTimeline(controller, buffer, user)
elif action == "followers":
self.openFollowersTimeline(controller, buffer, user)
elif action == "following":
self.openFollowingTimeline(controller, buffer, user)
def openPostTimeline(self, controller, buffer, user):
"""Opens post timeline for user"""
if user.statuses_count == 0:
dialogs.no_posts()
return
if user.id in buffer.session.settings["other_buffers"]["timelines"]:
commonMessageDialogs.timeline_exist()
return
timelines_position =controller.view.search("timelines", buffer.session.get_name())
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=buffer.session.type, buffer_title=_("Timeline for {}").format(user.username,), parent_tab=timelines_position, start=True, kwargs=dict(parent=controller.view.nb, function="account_statuses", name="%s-timeline" % (user.id,), sessionObject=buffer.session, account=buffer.session.get_name(), sound="tweet_timeline.ogg", id=user.id))
buffer.session.settings["other_buffers"]["timelines"].append(user.id)
buffer.session.sound.play("create_timeline.ogg")
buffer.session.settings.write()
def openFollowersTimeline(self, controller, buffer, user):
"""Open followers timeline for user"""
if user.followers_count == 0:
dialogs.no_followers()
return
if user.id in buffer.session.settings["other_buffers"]["followers_timelines"]:
commonMessageDialogs.timeline_exist()
return
timelines_position =controller.view.search("timelines", buffer.session.get_name())
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=buffer.session.type, buffer_title=_("Followers for {}").format(user.username,), parent_tab=timelines_position, start=True, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_followers", name="%s-followers" % (user.id,), sessionObject=buffer.session, account=buffer.session.get_name(), sound="new_event.ogg", id=user.id))
buffer.session.settings["other_buffers"]["followers_timelines"].append(user.id)
buffer.session.sound.play("create_timeline.ogg")
buffer.session.settings.write()
def openFollowingTimeline(self, controller, buffer, user):
"""Open following timeline for user"""
if user.following_count == 0:
dialogs.no_following()
return
if user.id in buffer.session.settings["other_buffers"]["following_timelines"]:
commonMessageDialogs.timeline_exist()
return
timelines_position =controller.view.search("timelines", buffer.session.get_name())
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=buffer.session.type, buffer_title=_("Following for {}").format(user.username,), parent_tab=timelines_position, start=True, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_following", name="%s-followers" % (user.id,), sessionObject=buffer.session, account=buffer.session.get_name(), sound="new_event.ogg", id=user.id))
buffer.session.settings["other_buffers"]["following_timelines"].append(user.id)
buffer.session.sound.play("create_timeline.ogg")
buffer.session.settings.write()
def account_settings(self, buffer, controller):
d = settings.accountSettingsController(buffer, controller)
if d.response == wx.ID_OK:
d.save_configuration()
if d.needs_restart == True:
commonMessageDialogs.needs_restart()
buffer.session.settings.write()
buffer.session.save_persistent_data()
restart.restart_program()
def add_alias(self, buffer):
if not hasattr(buffer, "get_item"):
return
item = buffer.get_item()
if buffer.type == "user":
users = [item.acct]
elif buffer.type == "baseBuffer":
if item.reblog != None:
users = [user.acct for user in item.reblog.mentions if user.id != buffer.session.db["user_id"]]
if item.reblog.account.acct not in users and item.account.id != buffer.session.db["user_id"]:
users.insert(0, item.reblog.account.acct)
else:
users = [user.acct for user in item.mentions if user.id != buffer.session.db["user_id"]]
if item.account.acct not in users:
users.insert(0, item.account.acct)
dlg = userAliasDialogs.addAliasDialog(_("Add an user alias"), users)
if dlg.get_response() == wx.ID_OK:
user, alias = dlg.get_user()
if user == "" or alias == "":
return
try:
full_user = buffer.session.api.account_lookup(user)
except Exception as e:
log.exception("Error adding alias to user {}.".format(user))
return
buffer.session.settings["user-aliases"][str(full_user.id)] = alias
buffer.session.settings.write()
output.speak(_("Alias has been set correctly for {}.").format(user))
pub.sendMessage("alias-added")
def update_profile(self, session):
"""Updates the users dialog"""
profile = session.api.me()
data = {
'display_name': profile.display_name,
'note': html_filter(profile.note),
'header': profile.header,
'avatar': profile.avatar,
'fields': [(field.name, html_filter(field.value)) for field in profile.fields],
'locked': profile.locked,
'bot': profile.bot,
# discoverable could be None, set it to False
'discoverable': profile.discoverable if profile.discoverable else False,
}
log.debug(f"Received data_ {data['fields']}")
dialog = update_profile_dialogs.UpdateProfileDialog(**data)
if dialog.ShowModal() != wx.ID_OK:
log.debug("User canceled dialog")
return
updated_data = dialog.data
if updated_data == data:
log.debug("No profile info was changed.")
return
# remove data that hasn't been updated
for key in data:
if data[key] == updated_data[key]:
del updated_data[key]
log.debug(f"Updating users profile with: {updated_data}")
call_threaded(session.api_call, "account_update_credentials", _("Update profile"), report_success=True, **updated_data)
def user_details(self, buffer):
"""Displays user profile in a dialog.
This works as long as the focused item hass a 'account' key."""
if not hasattr(buffer, 'get_item'):
return # Tell user?
item = buffer.get_item()
if not item:
return # empty buffer
log.debug(f"Opening user profile. dictionary: {item}")
mentionedUsers = list()
holdUser = item.account if item.get('account') else None
if hasattr(item, "type") and item.type in ["status", "mention", "reblog", "favourite", "update", "poll"]: # statuses in Notification buffers
item = item.status
if item.get('username'): # account dict
holdUser = item
elif isinstance(item.get('mentions'), list):
# mentions in statuses
if item.reblog:
item = item.reblog
mentionedUsers = [(user.acct, user.id) for user in item.mentions]
holdUser = item.account
if not holdUser:
dialogs.no_user()
return
if len(mentionedUsers) == 0:
user = holdUser
else:
mentionedUsers.insert(0, (holdUser.display_name, holdUser.username, holdUser.id))
mentionedUsers = list(set(mentionedUsers))
selectedUser = showUserProfile.selectUserDialog(mentionedUsers)
if not selectedUser:
return # Canceled selection
elif selectedUser[-1] == holdUser.id:
user = holdUser
else: # We don't have this user's dictionary, get it!
user = buffer.session.api.account(selectedUser[-1])
dlg = showUserProfile.ShowUserProfile(user)
dlg.ShowModal()
def community_timeline(self, controller, buffer):
dlg = communityTimeline.CommunityTimeline()
if dlg.ShowModal() != wx.ID_OK:
return
url = dlg.url.GetValue()
bufftype = dlg.get_action()
local_api = mastodon.Mastodon(api_base_url=url)
try:
instance = local_api.instance()
except MastodonError:
commonMessageDialogs.invalid_instance()
return
if bufftype == "local":
title = _(f"Local timeline for {url.replace('https://', '')}")
else:
title = _(f"Federated timeline for {url}")
bufftype = "public"
dlg.Destroy()
tl_info = f"{bufftype}@{url}"
if tl_info in buffer.session.settings["other_buffers"]["communities"]:
return # buffer already exists.
buffer.session.settings["other_buffers"]["communities"].append(tl_info)
buffer.session.settings.write()
communities_position =controller.view.search("communities", buffer.session.get_name())
pub.sendMessage("createBuffer", buffer_type="CommunityBuffer", session_type=buffer.session.type, buffer_title=title, parent_tab=communities_position, start=True, kwargs=dict(parent=controller.view.nb, function="timeline", name=tl_info, sessionObject=buffer.session, account=buffer.session.get_name(), sound="tweet_timeline.ogg", community_url=url, timeline=bufftype))
def create_filter(self, controller, buffer):
filterController = create_filter.CreateFilterController(buffer.session)
try:
filter = filterController.get_response()
except MastodonError as error:
log.exception("Error adding filter.")
commonMessageDialogs.error_adding_filter()
return self.create_filter(controller=controller, buffer=buffer)
def manage_filters(self, controller, buffer):
manageFiltersController = manage_filters.ManageFiltersController(buffer.session)
manageFiltersController.get_response()

View File

@@ -0,0 +1,462 @@
# -*- coding: utf-8 -*-
import os
import re
import wx
import logging
import widgetUtils
import config
import output
import languageHandler
from twitter_text import parse_tweet, config
from mastodon import MastodonError
from controller import messages
from sessions.mastodon import templates
from wxUI.dialogs.mastodon import postDialogs
from extra.autocompletionUsers import completion
from . import userList
log = logging.getLogger("controller.mastodon.messages")
def character_count(post_text, post_cw, character_limit=500):
# We will use text for counting character limit only.
full_text = post_text+post_cw
# find remote users as Mastodon doesn't count the domain in char limit.
users = re.findall("@[\w\.-]+@[\w\.-]+", full_text)
for user in users:
domain = user.split("@")[-1]
full_text = full_text.replace("@"+domain, "")
options = config.config.get("defaults")
options.update(max_weighted_tweet_length=character_limit, default_weight=100)
parsed = parse_tweet(full_text, options=options)
return parsed.weightedLength
class post(messages.basicMessage):
def __init__(self, session, title, caption, text="", *args, **kwargs):
# take max character limit from session as this might be different for some instances.
self.max = session.char_limit
self.title = title
self.session = session
langs = self.session.supported_languages
display_langs = [l.name for l in langs]
self.message = postDialogs.Post(caption=caption, text=text, languages=display_langs, *args, **kwargs)
self.message.SetTitle(title)
self.message.text.SetInsertionPoint(len(self.message.text.GetValue()))
self.set_language(self.session.default_language)
widgetUtils.connect_event(self.message.spellcheck, widgetUtils.BUTTON_PRESSED, self.spellcheck)
widgetUtils.connect_event(self.message.text, widgetUtils.ENTERED_TEXT, self.text_processor)
widgetUtils.connect_event(self.message.spoiler, widgetUtils.ENTERED_TEXT, self.text_processor)
widgetUtils.connect_event(self.message.translate, widgetUtils.BUTTON_PRESSED, self.translate)
widgetUtils.connect_event(self.message.add, widgetUtils.BUTTON_PRESSED, self.on_attach)
widgetUtils.connect_event(self.message.remove_attachment, widgetUtils.BUTTON_PRESSED, self.remove_attachment)
widgetUtils.connect_event(self.message.autocomplete_users, widgetUtils.BUTTON_PRESSED, self.autocomplete_users)
widgetUtils.connect_event(self.message.add_post, widgetUtils.BUTTON_PRESSED, self.add_post)
widgetUtils.connect_event(self.message.remove_post, widgetUtils.BUTTON_PRESSED, self.remove_post)
self.attachments = []
self.thread = []
self.text_processor()
def autocomplete_users(self, *args, **kwargs):
c = completion.autocompletionUsers(self.message, self.session.session_id)
c.show_menu()
def add_post(self, event, update_gui=True, *args, **kwargs):
text = self.message.text.GetValue()
attachments = self.attachments[::]
postdata = dict(text=text, attachments=attachments, sensitive=self.message.sensitive.GetValue(), spoiler_text=None)
if postdata.get("sensitive") == True:
postdata.update(spoiler_text=self.message.spoiler.GetValue())
# Check for scheduled post
if hasattr(self.message, 'get_scheduled_at'):
scheduled_at = self.message.get_scheduled_at()
if scheduled_at:
postdata['scheduled_at'] = scheduled_at
self.thread.append(postdata)
self.attachments = []
if update_gui:
self.message.reset_controls()
self.message.add_item(item=[text, len(attachments)], list_type="post")
self.message.text.SetFocus()
self.text_processor()
def get_post_data(self):
self.add_post(event=None, update_gui=False)
return self.thread
def set_language(self, language=None):
""" Attempt to set the default language for a post. """
# language can be provided in a post (replying or recovering from errors).
# Also it can be provided in user preferences (retrieved in the session).
# If no language is provided, let's fallback to TWBlue's user language.
if language != None:
language_code = language
else:
# Let's cut langcode_VARIANT to ISO-639 two letter code only.
language_code = languageHandler.curLang[:2]
for lang in self.session.supported_languages:
if lang.code == language_code:
self.message.language.SetStringSelection(lang.name)
def set_post_data(self, visibility, data, language):
if len(data) == 0:
return
if len(data) > 1:
self.thread = data[:-1]
for p in self.thread:
self.message.add_item(item=[p.get("text") or "", len(p.get("attachments") or [])], list_type="post")
post = data[-1]
self.attachments = post.get("attachments") or []
self.message.text.SetValue(post.get("text") or "")
self.message.sensitive.SetValue(post.get("sensitive") or False)
self.message.spoiler.SetValue(post.get("spoiler_text") or "")
visibility_settings = dict(public=0, unlisted=1, private=2, direct=3)
self.message.visibility.SetSelection(visibility_settings.get(visibility))
self.message.on_sensitivity_changed()
for attachment in self.attachments:
self.message.add_item(item=[attachment["file"], attachment["type"], attachment["description"]])
self.set_language(language)
self.text_processor()
def text_processor(self, *args, **kwargs):
text = self.message.text.GetValue()
cw = self.message.spoiler.GetValue()
results = character_count(text, cw, character_limit=self.max)
self.message.SetTitle(_("%s - %s of %d characters") % (self.title, results, self.max))
if results > self.max:
self.session.sound.play("max_length.ogg")
if len(self.thread) > 0:
if hasattr(self.message, "posts"):
self.message.posts.Enable(True)
self.message.remove_post.Enable(True)
else:
self.message.posts.Enable(False)
self.message.remove_post.Enable(False)
if len(self.attachments) > 0:
self.message.attachments.Enable(True)
self.message.remove_attachment.Enable(True)
else:
self.message.attachments.Enable(False)
self.message.remove_attachment.Enable(False)
if len(self.message.text.GetValue()) > 0 or len(self.attachments) > 0:
self.message.add_post.Enable(True)
else:
self.message.add_post.Enable(False)
def remove_post(self, *args, **kwargs):
post = self.message.posts.GetFocusedItem()
if post > -1 and len(self.thread) > post:
self.thread.pop(post)
self.message.remove_item(list_type="post")
self.text_processor()
self.message.text.SetFocus()
def can_attach(self):
if len(self.attachments) == 0:
return True
elif len(self.attachments) == 1 and (self.attachments[0]["type"] == "poll" or self.attachments[0]["type"] == "video" or self.attachments[0]["type"] == "audio"):
return False
elif len(self.attachments) < 4:
return True
return False
def on_attach(self, *args, **kwargs):
can_attach = self.can_attach()
menu = self.message.attach_menu(can_attach)
self.message.Bind(wx.EVT_MENU, self.on_attach_image, self.message.add_image)
self.message.Bind(wx.EVT_MENU, self.on_attach_video, self.message.add_video)
self.message.Bind(wx.EVT_MENU, self.on_attach_audio, self.message.add_audio)
self.message.Bind(wx.EVT_MENU, self.on_attach_poll, self.message.add_poll)
self.message.PopupMenu(menu, self.message.add.GetPosition())
def on_attach_image(self, *args, **kwargs):
can_attach = self.can_attach()
big_media_present = False
for a in self.attachments:
if a["type"] == "video" or a["type"] == "audio" or a["type"] == "poll":
big_media_present = True
break
if can_attach == False or big_media_present == True:
return self.message.unable_to_attach_file()
image, description = self.message.get_image()
if image != None:
if image.endswith("gif"):
image_type = "gif"
else:
image_type = "photo"
imageInfo = {"type": image_type, "file": image, "description": description}
if len(self.attachments) > 0 and image_type == "gif":
return self.message.unable_to_attach_file()
self.attachments.append(imageInfo)
self.message.add_item(item=[os.path.basename(imageInfo["file"]), imageInfo["type"], imageInfo["description"]])
self.text_processor()
def on_attach_video(self, *args, **kwargs):
if len(self.attachments) >= 4:
return self.message.unable_to_attach_file()
can_attach = self.can_attach()
big_media_present = False
for a in self.attachments:
if a["type"] == "video" or a["type"] == "audio" or a["type"] == "poll":
big_media_present = True
break
if can_attach == False or big_media_present == True:
return self.message.unable_to_attach_file()
video, description = self.message.get_video()
if video != None:
videoInfo = {"type": "video", "file": video, "description": description}
self.attachments.append(videoInfo)
self.message.add_item(item=[os.path.basename(videoInfo["file"]), videoInfo["type"], videoInfo["description"]])
self.text_processor()
def on_attach_audio(self, *args, **kwargs):
if len(self.attachments) >= 4:
return self.message.unable_to_attach_file()
can_attach = self.can_attach()
big_media_present = False
for a in self.attachments:
if a["type"] == "video" or a["type"] == "audio" or a["type"] == "poll":
big_media_present = True
break
if can_attach == False or big_media_present == True:
return self.message.unable_to_attach_file()
audio, description = self.message.get_audio()
if audio != None:
audioInfo = {"type": "audio", "file": audio, "description": description}
self.attachments.append(audioInfo)
self.message.add_item(item=[os.path.basename(audioInfo["file"]), audioInfo["type"], audioInfo["description"]])
self.text_processor()
def on_attach_poll(self, *args, **kwargs):
if len(self.attachments) > 0:
return self.message.unable_to_attach_poll()
can_attach = self.can_attach()
big_media_present = False
for a in self.attachments:
if a["type"] == "video" or a["type"] == "audio" or a["type"] == "poll":
big_media_present = True
break
if can_attach == False or big_media_present == True:
return self.message.unable_to_attach_file()
dlg = postDialogs.poll()
if dlg.ShowModal() == wx.ID_OK:
day = 86400
periods = [300, 1800, 3600, 21600, day, day*2, day*3, day*4, day*5, day*6, day*7]
period = periods[dlg.period.GetSelection()]
poll_options = dlg.get_options()
multiple = dlg.multiple.GetValue()
hide_totals = dlg.hide_votes.GetValue()
data = dict(type="poll", file="", description=_("Poll with {} options").format(len(poll_options)), options=poll_options, expires_in=period, multiple=multiple, hide_totals=hide_totals)
self.attachments.append(data)
self.message.add_item(item=[data["file"], data["type"], data["description"]])
self.text_processor()
dlg.Destroy()
def get_data(self):
self.add_post(event=None, update_gui=False)
return self.thread
def get_visibility(self):
visibility_settings = ["public", "unlisted", "private", "direct"]
return visibility_settings[self.message.visibility.GetSelection()]
def get_language(self):
langs = self.session.supported_languages
lang = self.message.language.GetSelection()
if lang >= 0:
return langs[lang].code
return None
def set_visibility(self, setting):
visibility_settings = ["public", "unlisted", "private", "direct"]
visibility_setting = visibility_settings.index(setting)
self.message.visibility.SetSelection(setting)
class editPost(post):
def __init__(self, session, item, title, caption, *args, **kwargs):
""" Initialize edit dialog with existing post data.
Note: Per Mastodon API, visibility and language cannot be changed when editing.
These fields will be displayed but disabled in the UI.
"""
# Extract text from post
if item.reblog != None:
item = item.reblog
text = item.content
# Remove HTML tags from content
import re
text = re.sub('<[^<]+?>', '', text)
# Initialize parent class
super(editPost, self).__init__(session, title, caption, text=text, *args, **kwargs)
# Store the post ID for editing
self.post_id = item.id
# Set visibility (read-only, cannot be changed)
visibility_settings = dict(public=0, unlisted=1, private=2, direct=3)
self.message.visibility.SetSelection(visibility_settings.get(item.visibility, 0))
self.message.visibility.Enable(False) # Disable as it cannot be edited
# Set language (read-only, cannot be changed)
if item.language:
self.set_language(item.language)
self.message.language.Enable(False) # Disable as it cannot be edited
# Set sensitive content and spoiler
if item.sensitive:
self.message.sensitive.SetValue(True)
if item.spoiler_text:
self.message.spoiler.ChangeValue(item.spoiler_text)
self.message.on_sensitivity_changed()
# Load existing poll (if any)
# Note: You cannot have both media and a poll, so check poll first
if hasattr(item, 'poll') and item.poll is not None:
log.debug("Loading existing poll for post {}".format(self.post_id))
poll = item.poll
# Extract poll options (just the text, not the votes)
poll_options = [option.title for option in poll.options]
# Calculate expires_in based on current time and expires_at
# For editing, we need to provide a new expiration time
# Since we can't get the original expires_in, use a default or let user configure
# For now, use 1 day (86400 seconds) as default
expires_in = 86400
if hasattr(poll, 'expires_at') and poll.expires_at and not poll.expired:
# Calculate remaining time if poll hasn't expired
from dateutil import parser as date_parser
import datetime
try:
expires_at = poll.expires_at
if isinstance(expires_at, str):
expires_at = date_parser.parse(expires_at)
now = datetime.datetime.now(datetime.timezone.utc)
remaining = (expires_at - now).total_seconds()
if remaining > 0:
expires_in = int(remaining)
except Exception as e:
log.warning("Could not calculate poll expiration: {}".format(e))
poll_info = {
"type": "poll",
"file": "",
"description": _("Poll with {} options").format(len(poll_options)),
"options": poll_options,
"expires_in": expires_in,
"multiple": poll.multiple if hasattr(poll, 'multiple') else False,
"hide_totals": poll.voters_count == 0 if hasattr(poll, 'voters_count') else False
}
self.attachments.append(poll_info)
self.message.add_item(item=[poll_info["file"], poll_info["type"], poll_info["description"]])
log.debug("Loaded poll with {} options. WARNING: Editing will reset all votes!".format(len(poll_options)))
# Load existing media attachments (only if no poll)
elif hasattr(item, 'media_attachments'):
log.debug("Loading existing media attachments for post {}".format(self.post_id))
log.debug("Item has media_attachments attribute, count: {}".format(len(item.media_attachments)))
if len(item.media_attachments) > 0:
for media in item.media_attachments:
log.debug("Processing media: id={}, type={}, url={}".format(media.id, media.type, media.url))
media_info = {
"id": media.id, # Keep the existing media ID
"type": media.type,
"file": media.url, # URL of existing media
"description": media.description or ""
}
# Include focus point if available
if hasattr(media, 'meta') and media.meta and 'focus' in media.meta:
focus = media.meta['focus']
media_info["focus"] = (focus.get('x'), focus.get('y'))
log.debug("Added focus point: {}".format(media_info["focus"]))
self.attachments.append(media_info)
# Display in the attachment list
display_name = media.url.split('/')[-1]
log.debug("Adding item to UI: name={}, type={}, desc={}".format(display_name, media.type, media.description or ""))
self.message.add_item(item=[display_name, media.type, media.description or ""])
log.debug("Total attachments loaded: {}".format(len(self.attachments)))
else:
log.debug("media_attachments list is empty")
else:
log.debug("Item has no poll or media attachments")
# Update text processor to reflect the loaded content
self.text_processor()
class viewPost(post):
def __init__(self, session, post, offset_hours=0, date="", item_url=""):
self.session = session
if post.reblog != None:
post = post.reblog
self.post_id = post.id
author = post.account.display_name if post.account.display_name != "" else post.account.username
title = _(u"Post from {}").format(author)
image_description = templates.process_image_descriptions(post.media_attachments)
text = templates.process_text(post, safe=False)
date = templates.process_date(post.created_at, relative_times=False, offset_hours=offset_hours)
privacy_settings = dict(public=_("Public"), unlisted=_("Not listed"), private=_("followers only"), direct=_("Direct"))
privacy = privacy_settings.get(post.visibility)
boost_count = str(post.reblogs_count)
favs_count = str(post.favourites_count)
# Gets the client from where this post was made.
source_obj = post.get("application")
if source_obj == None:
source = _("Remote instance")
else:
source = source_obj.get("name")
self.message = postDialogs.viewPost(text=text, boosts_count=boost_count, favs_count=favs_count, source=source, date=date, privacy=privacy)
participants = [post.account.id] + [account.id for account in post.mentions]
if self.session.db["user_id"] in participants:
self.message.mute.Enable(True)
if post.muted:
self.message.mute.SetLabel(_("Unmute conversation"))
widgetUtils.connect_event(self.message.mute, widgetUtils.BUTTON_PRESSED, self.mute_unmute)
self.message.SetTitle(title)
if image_description != "":
self.message.image_description.Enable(True)
self.message.image_description.ChangeValue(image_description)
widgetUtils.connect_event(self.message.spellcheck, widgetUtils.BUTTON_PRESSED, self.spellcheck)
if item_url != "":
self.message.enable_button("share")
widgetUtils.connect_event(self.message.share, widgetUtils.BUTTON_PRESSED, self.share)
self.item_url = item_url
widgetUtils.connect_event(self.message.translateButton, widgetUtils.BUTTON_PRESSED, self.translate)
widgetUtils.connect_event(self.message.boosts_button, widgetUtils.BUTTON_PRESSED, self.on_boosts)
widgetUtils.connect_event(self.message.favorites_button, widgetUtils.BUTTON_PRESSED, self.on_favorites)
self.message.ShowModal()
# We won't need text_processor in this dialog, so let's avoid it.
def text_processor(self):
pass
def mute_unmute(self, *args, **kwargs):
post = self.session.api.status(self.post_id)
if post.muted == True:
action = "status_unmute"
new_label = _("Mute conversation")
msg = _("Conversation unmuted.")
else:
action = "status_mute"
new_label = _("Unmute conversation")
msg = _("Conversation muted.")
try:
getattr(self.session.api, action)(self.post_id)
self.message.mute.SetLabel(new_label)
output.speak(msg)
except MastodonError:
return
def on_boosts(self, *args, **kwargs):
users = self.session.api.status_reblogged_by(self.post_id)
title = _("people who boosted this post")
user_list = userList.MastodonUserList(session=self.session, users=users, title=title)
def on_favorites(self, *args, **kwargs):
users = self.session.api.status_favourited_by(self.post_id)
title = _("people who favorited this post")
user_list = userList.MastodonUserList(session=self.session, users=users, title=title)
def share(self, *args, **kwargs):
if hasattr(self, "item_url"):
output.copy(self.item_url)
output.speak(_("Link copied to clipboard."))
class text(messages.basicMessage):
def __init__(self, title, text="", *args, **kwargs):
self.title = title
self.message = postDialogs.viewText(title=title, text=text, *args, **kwargs)
self.message.text.SetInsertionPoint(len(self.message.text.GetValue()))
widgetUtils.connect_event(self.message.spellcheck, widgetUtils.BUTTON_PRESSED, self.spellcheck)
widgetUtils.connect_event(self.message.translateButton, widgetUtils.BUTTON_PRESSED, self.translate)

View File

@@ -0,0 +1,224 @@
# -*- coding: utf-8 -*-
import os
import threading
import logging
import sound_lib
import paths
import widgetUtils
import output
from collections import OrderedDict
from wxUI import commonMessageDialogs
from wxUI.dialogs.mastodon import configuration
from extra.autocompletionUsers import manage
from extra.autocompletionUsers.mastodon import scan
from extra.ocr import OCRSpace
from controller.settings import globalSettingsController
from . templateEditor import EditTemplate
log = logging.getLogger("Settings")
class accountSettingsController(globalSettingsController):
def __init__(self, buffer, window):
self.user = buffer.session.db["user_name"]
self.buffer = buffer
self.window = window
self.config = buffer.session.settings
self.dialog = configuration.configurationDialog()
self.create_config()
self.needs_restart = False
self.is_started = True
def create_config(self):
self.dialog.create_general_account()
widgetUtils.connect_event(self.dialog.general.userAutocompletionScan, widgetUtils.BUTTON_PRESSED, self.on_autocompletion_scan)
widgetUtils.connect_event(self.dialog.general.userAutocompletionManage, widgetUtils.BUTTON_PRESSED, self.on_autocompletion_manage)
self.dialog.set_value("general", "disable_streaming", self.config["general"]["disable_streaming"])
self.dialog.set_value("general", "relative_time", self.config["general"]["relative_times"])
self.dialog.set_value("general", "read_preferences_from_instance", self.config["general"]["read_preferences_from_instance"])
self.dialog.set_value("general", "show_screen_names", self.config["general"]["show_screen_names"])
self.dialog.set_value("general", "hide_emojis", self.config["general"]["hide_emojis"])
self.dialog.set_value("general", "itemsPerApiCall", self.config["general"]["max_posts_per_call"])
self.dialog.set_value("general", "reverse_timelines", self.config["general"]["reverse_timelines"])
boost_mode = self.config["general"]["boost_mode"]
if boost_mode == "ask":
self.dialog.set_value("general", "ask_before_boost", True)
else:
self.dialog.set_value("general", "ask_before_boost", False)
self.dialog.set_value("general", "persist_size", str(self.config["general"]["persist_size"]))
self.dialog.set_value("general", "load_cache_in_memory", self.config["general"]["load_cache_in_memory"])
self.dialog.create_reporting()
self.dialog.set_value("reporting", "speech_reporting", self.config["reporting"]["speech_reporting"])
self.dialog.set_value("reporting", "braille_reporting", self.config["reporting"]["braille_reporting"])
post_template = self.config["templates"]["post"]
conversation_template = self.config["templates"]["conversation"]
person_template = self.config["templates"]["person"]
self.dialog.create_templates(post_template=post_template, conversation_template=conversation_template, person_template=person_template)
widgetUtils.connect_event(self.dialog.templates.post, widgetUtils.BUTTON_PRESSED, self.edit_post_template)
widgetUtils.connect_event(self.dialog.templates.conversation, widgetUtils.BUTTON_PRESSED, self.edit_conversation_template)
widgetUtils.connect_event(self.dialog.templates.person, widgetUtils.BUTTON_PRESSED, self.edit_person_template)
self.dialog.create_other_buffers()
buffer_values = self.get_buffers_list()
self.dialog.buffers.insert_buffers(buffer_values)
self.dialog.buffers.connect_hook_func(self.toggle_buffer_active)
widgetUtils.connect_event(self.dialog.buffers.toggle_state, widgetUtils.BUTTON_PRESSED, self.toggle_state)
widgetUtils.connect_event(self.dialog.buffers.up, widgetUtils.BUTTON_PRESSED, self.dialog.buffers.move_up)
widgetUtils.connect_event(self.dialog.buffers.down, widgetUtils.BUTTON_PRESSED, self.dialog.buffers.move_down)
self.input_devices = sound_lib.input.Input.get_device_names()
self.output_devices = sound_lib.output.Output.get_device_names()
self.soundpacks = []
[self.soundpacks.append(i) for i in os.listdir(paths.sound_path()) if os.path.isdir(os.path.join(paths.sound_path(), i)) == True ]
self.dialog.create_sound(self.input_devices, self.output_devices, self.soundpacks)
self.dialog.set_value("sound", "volumeCtrl", int(self.config["sound"]["volume"]*100))
self.dialog.set_value("sound", "input", self.config["sound"]["input_device"])
self.dialog.set_value("sound", "output", self.config["sound"]["output_device"])
self.dialog.set_value("sound", "session_mute", self.config["sound"]["session_mute"])
self.dialog.set_value("sound", "soundpack", self.config["sound"]["current_soundpack"])
self.dialog.set_value("sound", "indicate_audio", self.config["sound"]["indicate_audio"])
self.dialog.set_value("sound", "indicate_img", self.config["sound"]["indicate_img"])
self.dialog.create_extras(OCRSpace.translatable_langs)
language_index = OCRSpace.OcrLangs.index(self.config["mysc"]["ocr_language"])
self.dialog.extras.ocr_lang.SetSelection(language_index)
self.dialog.realize()
self.dialog.set_title(_("Account settings for %s") % (self.user,))
self.response = self.dialog.get_response()
def edit_post_template(self, *args, **kwargs):
template = self.config["templates"]["post"]
control = EditTemplate(template=template, type="post")
result = control.run_dialog()
if result != "": # Template has been saved.
self.config["templates"]["post"] = result
self.config.write()
self.dialog.templates.post.SetLabel(_("Edit template for posts. Current template: {}").format(result))
def edit_conversation_template(self, *args, **kwargs):
template = self.config["templates"]["conversation"]
control = EditTemplate(template=template, type="conversation")
result = control.run_dialog()
if result != "": # Template has been saved.
self.config["templates"]["conversation"] = result
self.config.write()
self.dialog.templates.conversation.SetLabel(_("Edit template for conversations. Current template: {}").format(result))
def edit_person_template(self, *args, **kwargs):
template = self.config["templates"]["person"]
control = EditTemplate(template=template, type="person")
result = control.run_dialog()
if result != "": # Template has been saved.
self.config["templates"]["person"] = result
self.config.write()
self.dialog.templates.person.SetLabel(_("Edit template for persons. Current template: {}").format(result))
def save_configuration(self):
if self.config["general"]["relative_times"] != self.dialog.get_value("general", "relative_time"):
self.needs_restart = True
log.debug("Triggered app restart due to change in relative times.")
self.config["general"]["relative_times"] = self.dialog.get_value("general", "relative_time")
if self.config["general"]["disable_streaming"] != self.dialog.get_value("general", "disable_streaming"):
self.needs_restart = True
log.debug("Triggered app restart due to change in streaming settings.")
self.config["general"]["disable_streaming"] = self.dialog.get_value("general", "disable_streaming")
self.config["general"]["read_preferences_from_instance"] = self.dialog.get_value("general", "read_preferences_from_instance")
self.config["general"]["show_screen_names"] = self.dialog.get_value("general", "show_screen_names")
self.config["general"]["hide_emojis"] = self.dialog.get_value("general", "hide_emojis")
self.config["general"]["max_posts_per_call"] = self.dialog.get_value("general", "itemsPerApiCall")
if self.config["general"]["load_cache_in_memory"] != self.dialog.get_value("general", "load_cache_in_memory"):
self.config["general"]["load_cache_in_memory"] = self.dialog.get_value("general", "load_cache_in_memory")
self.needs_restart = True
log.debug("Triggered app restart due to change in database strategy management.")
if self.config["general"]["persist_size"] != self.dialog.get_value("general", "persist_size"):
if self.dialog.get_value("general", "persist_size") == '':
self.config["general"]["persist_size"] =-1
else:
try:
self.config["general"]["persist_size"] = int(self.dialog.get_value("general", "persist_size"))
except ValueError:
output.speak("Invalid cache size, setting to default.",True)
self.config["general"]["persist_size"] =1764
if self.config["general"]["reverse_timelines"] != self.dialog.get_value("general", "reverse_timelines"):
self.needs_restart = True
log.debug("Triggered app restart due to change in timeline order.")
self.config["general"]["reverse_timelines"] = self.dialog.get_value("general", "reverse_timelines")
ask_before_boost = self.dialog.get_value("general", "ask_before_boost")
if ask_before_boost == True:
self.config["general"]["boost_mode"] = "ask"
else:
self.config["general"]["boost_mode"] = "direct"
buffers_list = self.dialog.buffers.get_list()
if buffers_list != self.config["general"]["buffer_order"]:
self.needs_restart = True
log.debug("Triggered app restart due to change in buffer ordering.")
self.config["general"]["buffer_order"] = buffers_list
self.config["reporting"]["speech_reporting"] = self.dialog.get_value("reporting", "speech_reporting")
self.config["reporting"]["braille_reporting"] = self.dialog.get_value("reporting", "braille_reporting")
self.config["mysc"]["ocr_language"] = OCRSpace.OcrLangs[self.dialog.extras.ocr_lang.GetSelection()]
if self.config["sound"]["input_device"] != self.dialog.sound.get("input"):
self.config["sound"]["input_device"] = self.dialog.sound.get("input")
try:
self.buffer.session.sound.input.set_device(self.buffer.session.sound.input.find_device_by_name(self.config["sound"]["input_device"]))
except:
self.config["sound"]["input_device"] = "default"
if self.config["sound"]["output_device"] != self.dialog.sound.get("output"):
self.config["sound"]["output_device"] = self.dialog.sound.get("output")
try:
self.buffer.session.sound.output.set_device(self.buffer.session.sound.output.find_device_by_name(self.config["sound"]["output_device"]))
except:
self.config["sound"]["output_device"] = "default"
self.config["sound"]["volume"] = self.dialog.get_value("sound", "volumeCtrl")/100.0
self.config["sound"]["session_mute"] = self.dialog.get_value("sound", "session_mute")
self.config["sound"]["current_soundpack"] = self.dialog.sound.get("soundpack")
self.config["sound"]["indicate_audio"] = self.dialog.get_value("sound", "indicate_audio")
self.config["sound"]["indicate_img"] = self.dialog.get_value("sound", "indicate_img")
self.buffer.session.sound.config = self.config["sound"]
self.buffer.session.sound.check_soundpack()
self.config.write()
def toggle_state(self,*args,**kwargs):
return self.dialog.buffers.change_selected_item()
def on_autocompletion_scan(self, *args, **kwargs):
configuration = scan.autocompletionScan(self.buffer.session.settings, self.buffer, self.window)
to_scan = configuration.show_dialog()
if to_scan == True:
configuration.prepare_progress_dialog()
t = threading.Thread(target=configuration.scan)
t.start()
def on_autocompletion_manage(self, *args, **kwargs):
configuration = manage.autocompletionManage(self.buffer.session)
configuration.show_settings()
def get_buffers_list(self):
all_buffers=OrderedDict()
all_buffers['home']=_("Home")
all_buffers['local'] = _("Local")
all_buffers['federated'] = _("Federated")
all_buffers['mentions']=_("Mentions")
all_buffers['direct_messages']=_("Direct Messages")
all_buffers['sent']=_("Sent")
all_buffers['favorites']=_("Favorites")
all_buffers['bookmarks']=_("Bookmarks")
all_buffers['followers']=_("Followers")
all_buffers['following']=_("Following")
all_buffers['blocked']=_("Blocked users")
all_buffers['muted']=_("Muted users")
all_buffers['notifications']=_("Notifications")
list_buffers = []
hidden_buffers=[]
all_buffers_keys = list(all_buffers.keys())
# Check buffers shown first.
for i in self.config["general"]["buffer_order"]:
if i in all_buffers_keys:
list_buffers.append((i, all_buffers[i], True))
# This second pass will retrieve all hidden buffers.
for i in all_buffers_keys:
if i not in self.config["general"]["buffer_order"]:
hidden_buffers.append((i, all_buffers[i], False))
list_buffers.extend(hidden_buffers)
return list_buffers
def toggle_buffer_active(self, ev):
change = self.dialog.buffers.get_event(ev)
if change == True:
self.dialog.buffers.change_selected_item()

View File

@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
import re
import wx
from typing import List
from sessions.mastodon.templates import post_variables, conversation_variables, person_variables
from wxUI.dialogs import templateDialogs
class EditTemplate(object):
def __init__(self, template: str, type: str) -> None:
super(EditTemplate, self).__init__()
self.default_template = template
if type == "post":
self.variables = post_variables
elif type == "conversation":
self.variables = conversation_variables
else:
self.variables = person_variables
self.template: str = template
def validate_template(self, template: str) -> bool:
used_variables: List[str] = re.findall("\$\w+", template)
validated: bool = True
for var in used_variables:
if var[1:] not in self.variables:
validated = False
return validated
def run_dialog(self) -> str:
dialog = templateDialogs.EditTemplateDialog(template=self.template, variables=self.variables, default_template=self.default_template)
response = dialog.ShowModal()
if response == wx.ID_SAVE:
validated: bool = self.validate_template(dialog.template.GetValue())
if validated == False:
templateDialogs.invalid_template()
self.template = dialog.template.GetValue()
return self.run_dialog()
else:
return dialog.template.GetValue()
else:
return ""

View File

@@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
import logging
import widgetUtils
import output
from wxUI.dialogs.mastodon import userActions as userActionsDialog
from wxUI.dialogs.mastodon import userTimeline as userTimelineDialog
from pubsub import pub
from mastodon import MastodonError, MastodonNotFoundError
#from extra.autocompletionUsers import completion
log = logging.getLogger("controller.mastodon.userActions")
class BasicUserSelector(object):
def __init__(self, session, users=[]):
super(BasicUserSelector, self).__init__()
self.session = session
self.create_dialog(users=users)
def create_dialog(self, users):
pass
def autocomplete_users(self, *args, **kwargs):
c = completion.autocompletionUsers(self.dialog, self.session.session_id)
c.show_menu("dm")
def search_user(self, user):
try:
user = self.session.api.account_lookup(user)
return user
except MastodonError:
log.exception("Error searching for user %s.".format(user))
class userActions(BasicUserSelector):
def __init__(self, *args, **kwargs):
super(userActions, self).__init__(*args, **kwargs)
if self.dialog.get_response() == widgetUtils.OK:
self.process_action()
def create_dialog(self, users):
self.dialog = userActionsDialog.UserActionsDialog(users)
widgetUtils.connect_event(self.dialog.autocompletion, widgetUtils.BUTTON_PRESSED, self.autocomplete_users)
def process_action(self):
action = self.dialog.get_action()
user = self.dialog.get_user()
user = self.search_user(user)
if user == None:
return
getattr(self, action)(user)
def follow(self, user):
try:
self.session.api.account_follow(user.id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
def unfollow(self, user):
try:
result = self.session.api.account_unfollow(user.id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
def mute(self, user):
try:
id = self.session.api.account_mute(user.id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
def unmute(self, user):
try:
id = self.session.api.account_unmute(user.id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
def block(self, user):
try:
id = self.session.api.account_block(user.id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
def unblock(self, user):
try:
id = self.session.api.account_unblock(user.id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
class UserTimeline(BasicUserSelector):
def create_dialog(self, users):
self.dialog = userTimelineDialog.UserTimeline(users)
widgetUtils.connect_event(self.dialog.autocompletion, widgetUtils.BUTTON_PRESSED, self.autocomplete_users)
def process_action(self):
action = self.dialog.get_action()
user = self.dialog.get_user()
user = self.search_user(user)
if user == None:
return
self.user = user
return action

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from mastodon import MastodonError
from wxUI.dialogs.mastodon import showUserProfile
from controller.userList import UserListController
from . import userActions
class MastodonUserList(UserListController):
def process_users(self, users):
return [dict(id=user.id, display_name=f"{user.display_name} (@{user.acct})", acct=user.acct) for user in users]
def on_actions(self, *args, **kwargs):
user = self.dialog.user_list.GetSelection()
user_account = self.users[user]
u = userActions.userActions(self.session, [user_account.get("acct")])
def on_details(self, *args, **kwargs):
user = self.dialog.user_list.GetSelection()
user_id = self.users[user].get("id")
try:
user_object = self.session.api.account(user_id)
except MastodonError:
return
dlg = showUserProfile.ShowUserProfile(user_object)
dlg.ShowModal()