Implemented user timelines (statuses, followers & following). Limited support due to API

This commit is contained in:
Manuel Cortez 2022-11-20 02:23:12 -06:00
parent 73de8d4f49
commit d3914a4e34
No known key found for this signature in database
GPG Key ID: 9E0735CA15EFE790
7 changed files with 216 additions and 64 deletions

View File

@ -37,6 +37,8 @@ class BaseBuffer(base.Buffer):
self.buffer.account = account
self.bind_events()
self.sound = sound
if "-timeline" in self.name or "-followers" in self.name or "-following" in self.name:
self.finished_timeline = False
def create_buffer(self, parent, name):
self.buffer = buffers.mastodon.basePanel(parent, name)
@ -51,12 +53,10 @@ class BaseBuffer(base.Buffer):
elif hasattr(self, "username"):
if "-timeline" in self.name:
return _(u"{username}'s timeline").format(username=self.username,)
elif "-favorite" in self.name:
return _(u"{username}'s likes").format(username=self.username,)
elif "-followers" in self.name:
return _(u"{username}'s followers").format(username=self.username,)
elif "-friends" in self.name:
return _(u"{username}'s friends").format(username=self.username,)
elif "-following" in self.name:
return _(u"{username}'s following").format(username=self.username,)
log.error("Error getting name for buffer %s" % (self.name,))
return _(u"Unknown buffer")
@ -102,6 +102,10 @@ class BaseBuffer(base.Buffer):
return
number_of_items = self.session.order_buffer(self.name, results)
log.debug("Number of items retrieved: %d" % (number_of_items,))
if hasattr(self, "finished_timeline") and self.finished_timeline == False:
if "-timeline" in self.name:
self.username = self.session.db[self.name][0]["account"].username
self.finished_timeline = True
self.put_items_on_list(number_of_items)
if number_of_items > 0 and self.name != "sent_posts" and self.name != "sent_direct_messages" and self.sound != None and self.session.settings["sound"]["session_mute"] == False and self.name not in self.session.settings["other_buffers"]["muted_buffers"] and play_sound == True:
self.session.sound.play(self.sound)
@ -161,28 +165,14 @@ class BaseBuffer(base.Buffer):
else:
dlg = widgetUtils.YES
if dlg == widgetUtils.YES:
if self.name[:-9] in self.session.settings["other_buffers"]["timelines"]:
self.session.settings["other_buffers"]["timelines"].remove(self.name[:-9])
if self.kwargs.get("id") in self.session.settings["other_buffers"]["timelines"]:
self.session.settings["other_buffers"]["timelines"].remove(self.kwargs.get("id"))
self.session.settings.write()
if self.name in self.session.db:
self.session.db.pop(self.name)
return True
elif dlg == widgetUtils.NO:
return False
elif "favorite" in self.name:
if force == False:
dlg = commonMessageDialogs.remove_buffer()
else:
dlg = widgetUtils.YES
if dlg == widgetUtils.YES:
if self.name[:-9] in self.session.settings["other_buffers"]["favourites_timelines"]:
self.session.settings["other_buffers"]["favourites_timelines"].remove(self.name[:-9])
if self.name in self.session.db:
self.session.db.pop(self.name)
self.session.settings.write()
return True
elif dlg == widgetUtils.NO:
return False
else:
output.speak(_(u"This buffer is not a timeline; it can't be deleted."), True)
return False

View File

@ -86,6 +86,10 @@ class UserBuffer(BaseBuffer):
return
number_of_items = self.session.order_buffer(self.name, results)
log.debug("Number of items retrieved: %d" % (number_of_items,))
if hasattr(self, "finished_timeline") and self.finished_timeline == False:
if "-followers" in self.name or "-following" in self.name:
self.username = self.session.api.account(id=self.kwargs.get("id")).username
self.finished_timeline = True
self.put_items_on_list(number_of_items)
if number_of_items > 0 and self.name != "sent_posts" and self.name != "sent_direct_messages" and self.sound != None and self.session.settings["sound"]["session_mute"] == False and self.name not in self.session.settings["other_buffers"]["muted_buffers"] and play_sound == True:
self.session.sound.play(self.sound)
@ -154,12 +158,29 @@ class UserBuffer(BaseBuffer):
pass
def remove_buffer(self, force=False):
if "-searchUser" in self.name:
if "-followers" in self.name:
if force == False:
dlg = commonMessageDialogs.remove_buffer()
else:
dlg = widgetUtils.YES
if dlg == widgetUtils.YES:
if self.kwargs.get("id") in self.session.settings["other_buffers"]["followers_timelines"]:
self.session.settings["other_buffers"]["followers_timelines"].remove(self.kwargs.get("id"))
self.session.settings.write()
if self.name in self.session.db:
self.session.db.pop(self.name)
return True
elif dlg == widgetUtils.NO:
return False
if "-following" in self.name:
if force == False:
dlg = commonMessageDialogs.remove_buffer()
else:
dlg = widgetUtils.YES
if dlg == widgetUtils.YES:
if self.kwargs.get("id") in self.session.settings["other_buffers"]["following_timelines"]:
self.session.settings["other_buffers"]["following_timelines"].remove(self.kwargs.get("id"))
self.session.settings.write()
if self.name in self.session.db:
self.session.db.pop(self.name)
return True

View File

@ -351,6 +351,7 @@ class Controller(object):
for i in delete_buffers:
self.destroy_buffer(i, name)
session.db = None
session.logged = False
def destroy_buffer(self, buffer_name, session_name):
buffer = self.search_buffer(buffer_name, session_name)
@ -1164,6 +1165,8 @@ class Controller(object):
title = _(u"Followers for {}").format(buffer.username,)
elif buffer.name.endswith("-friends"):
title = _(u"Friends for {}").format(buffer.username,)
elif buffer.name.endswith("-following"):
title = _(u"Following for {}").format(buffer.username,)
elif buffer.name.endswith("_tt"):
title = _("Trending topics for %s") % (buffer.name_)
buffer_index = self.view.search(buffer.name, buffer.account)

View File

@ -2,7 +2,10 @@
import wx
import logging
from pubsub import pub
from wxUI.dialogs.mastodon import dialogs
from wxUI.dialogs.mastodon import search as search_dialogs
from wxUI.dialogs.mastodon import dialogs
from wxUI import commonMessageDialogs
from sessions.twitter import utils
from . import userActions
@ -44,22 +47,14 @@ class Handler(object):
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Muted users"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="mutes", name="muted", sessionObject=session, account=name))
elif i == 'blocked':
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Blocked users"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="blocks", name="blocked", sessionObject=session, account=name))
# pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="timelines", name))
# timelines_position =controller.view.search("timelines", session.db["user_name"])
# for i in session.settings["other_buffers"]["timelines"]:
# pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_(u"Timeline for {}").format(i,), parent_tab=timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="user_timeline", name="%s-timeline" % (i,), sessionObject=session, name, sound="tweet_timeline.ogg", bufferType=None, user_id=i, include_ext_alt_text=True, tweet_mode="extended"))
# pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Likes timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="favs_timelines", name))
# favs_timelines_position =controller.view.search("favs_timelines", session.db["user_name"])
# for i in session.settings["other_buffers"]["favourites_timelines"]:
# pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=_("Likes for {}").format(i,), parent_tab=favs_timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_favorites", name="%s-favorite" % (i,), sessionObject=session, name, bufferType=None, sound="favourites_timeline_updated.ogg", user_id=i, include_ext_alt_text=True, tweet_mode="extended"))
# pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Followers timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="followers_timelines", name))
# followers_timelines_position =controller.view.search("followers_timelines", session.db["user_name"])
# for i in session.settings["other_buffers"]["followers_timelines"]:
# pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_("Followers for {}").format(i,), parent_tab=followers_timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_followers", name="%s-followers" % (i,), sessionObject=session, name, sound="new_event.ogg", user_id=i))
# pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Following timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="friends_timelines", name))
# friends_timelines_position =controller.view.search("friends_timelines", session.db["user_name"])
# for i in session.settings["other_buffers"]["friends_timelines"]:
# pub.sendMessage("createBuffer", buffer_type="PeopleBuffer", session_type=session.type, buffer_title=_(u"Friends for {}").format(i,), parent_tab=friends_timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="get_friends", name="%s-friends" % (i,), sessionObject=session, name, sound="new_event.ogg", user_id=i))
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Timelines"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="timelines", account=name))
timelines_position =controller.view.search("timelines", name)
for i in session.settings["other_buffers"]["timelines"]:
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=session.type, buffer_title=i, parent_tab=timelines_position, start=False, kwargs=dict(parent=controller.view.nb, function="account_statuses", name="%s-timeline".format(i), sessionObject=session, account=name, sound="tweet_timeline.ogg", id=i))
for i in session.settings["other_buffers"]["followers_timelines"]:
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Followers for {}").format(i), parent_tab=timelines_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_followers", name="%s-followers" % (i,), sessionObject=session, account=name, sound="new_event.ogg", id=i))
for i in session.settings["other_buffers"]["following_timelines"]:
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Following for {}").format(i), parent_tab=timelines_position, start=False, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_following", name="%s-following" % (i,), sessionObject=session, account=name, sound="new_event.ogg", id=i))
# pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Lists"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="lists", name))
# lists_position =controller.view.search("lists", session.db["user_name"])
# for i in session.settings["other_buffers"]["lists"]:
@ -72,10 +67,16 @@ class Handler(object):
# pub.sendMessage("createBuffer", buffer_type="TrendsBuffer", session_type=session.type, buffer_title=_("Trending topics for %s") % (i), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="%s_tt" % (i,), sessionObject=session, name, trendsFor=i, sound="trends_updated.ogg"))
def start_buffer(self, controller, buffer):
if hasattr(buffer, "finished_timeline") and buffer.finished_timeline == False:
change_title = True
else:
change_title = False
try:
buffer.start_stream(play_sound=False)
except Exception as err:
log.exception("Error %s starting buffer %s on account %s, with args %r and kwargs %r." % (str(err), buffer.name, buffer.account, buffer.args, buffer.kwargs))
if change_title:
pub.sendMessage("buffer-title-changed", buffer=buffer)
def open_conversation(self, controller, buffer):
post = buffer.get_item()
@ -99,7 +100,7 @@ class Handler(object):
users = [user.acct for user in item.mentions if user.id != buffer.session.db["user_id"]]
if item.account.acct not in users and item.account.id != buffer.session.db["user_id"]:
users.insert(0, item.account.acct)
u = userActions.userActionsController(buffer.session, users)
u = userActions.userActions(buffer.session, users)
def search(self, controller, session, value):
log.debug("Creating a new search...")
@ -118,3 +119,61 @@ class Handler(object):
elif dlg.users.GetValue() == True:
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=session.type, buffer_title=_("Search for {}").format(term), parent_tab=searches_position, start=True, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_search", name="%s-searchUser" % (term,), sessionObject=session, account=session.get_name(), sound="search_updated.ogg", q=term))
dlg.Destroy()
# ToDo: explore how to play sound & save config differently.
# currently, TWBlue will play the sound and save the config for the timeline even if the buffer did not load or something else.
def open_timeline(self, controller, buffer):
if not hasattr(buffer, "get_item"):
return
item = buffer.get_item()
if buffer.type == "user":
users = [item.acct]
elif buffer.type == "baseBuffer":
if item.reblog != None:
users = [user.acct for user in item.reblog.mentions if user.id != buffer.session.db["user_id"]]
if item.reblog.account.acct not in users and item.account.id != buffer.session.db["user_id"]:
users.insert(0, item.reblog.account.acct)
else:
users = [user.acct for user in item.mentions if user.id != buffer.session.db["user_id"]]
if item.account.acct not in users and item.account.id != buffer.session.db["user_id"]:
users.insert(0, item.account.acct)
u = userActions.UserTimeline(buffer.session, users)
if u.dialog.ShowModal() == wx.ID_OK:
action = u.process_action()
if action == None:
return
user = u.user
if action == "posts":
if user.statuses_count == 0:
dialogs.no_posts()
return
if user.id in buffer.session.settings["other_buffers"]["timelines"]:
commonMessageDialogs.timeline_exist()
return
timelines_position =controller.view.search("timelines", buffer.session.get_name())
pub.sendMessage("createBuffer", buffer_type="BaseBuffer", session_type=buffer.session.type, buffer_title=_("Timeline for {}").format(user.username,), parent_tab=timelines_position, start=True, kwargs=dict(parent=controller.view.nb, function="account_statuses", name="%s-timeline" % (user.id,), sessionObject=buffer.session, account=buffer.session.get_name(), sound="tweet_timeline.ogg", id=user.id))
buffer.session.settings["other_buffers"]["timelines"].append(user.id)
buffer.session.sound.play("create_timeline.ogg")
elif action == "followers":
if user.followers_count == 0:
dialogs.no_followers()
return
if user.id in buffer.session.settings["other_buffers"]["followers_timelines"]:
commonMessageDialogs.timeline_exist()
return
timelines_position =controller.view.search("timelines", buffer.session.get_name())
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=buffer.session.type, buffer_title=_("Followers for {}").format(user.username,), parent_tab=timelines_position, start=True, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_followers", name="%s-followers" % (user.id,), sessionObject=buffer.session, account=buffer.session.get_name(), sound="new_event.ogg", id=user.id))
buffer.session.settings["other_buffers"]["followers_timelines"].append(user.id)
buffer.session.sound.play("create_timeline.ogg")
elif action == "following":
if user.following_count == 0:
dialogs.no_following()
return
if user.id in buffer.session.settings["other_buffers"]["following_timelines"]:
commonMessageDialogs.timeline_exist()
return
timelines_position =controller.view.search("timelines", buffer.session.get_name())
pub.sendMessage("createBuffer", buffer_type="UserBuffer", session_type=buffer.session.type, buffer_title=_("Following for {}").format(user.username,), parent_tab=timelines_position, start=True, kwargs=dict(parent=controller.view.nb, compose_func="compose_user", function="account_following", name="%s-followers" % (user.id,), sessionObject=buffer.session, account=buffer.session.get_name(), sound="new_event.ogg", id=user.id))
buffer.session.settings["other_buffers"]["following_timelines"].append(user.id)
buffer.session.sound.play("create_timeline.ogg")
buffer.session.settings.write()

View File

@ -2,34 +2,27 @@
import logging
import widgetUtils
import output
from wxUI.dialogs.mastodon import userActions
from wxUI.dialogs.mastodon import userActions as userActionsDialog
from wxUI.dialogs.mastodon import userTimeline as userTimelineDialog
from pubsub import pub
from mastodon import MastodonError, MastodonNotFoundError
from extra.autocompletionUsers import completion
log = logging.getLogger("controller.mastodon.userActions")
class userActionsController(object):
def __init__(self, session, users=[], default="follow"):
super(userActionsController, self).__init__()
class BasicUserSelector(object):
def __init__(self, session, users=[]):
super(BasicUserSelector, self).__init__()
self.session = session
self.dialog = userActions.UserActionsDialog(users, default)
widgetUtils.connect_event(self.dialog.autocompletion, widgetUtils.BUTTON_PRESSED, self.autocomplete_users)
if self.dialog.get_response() == widgetUtils.OK:
self.process_action()
self.create_dialog(users=users)
def create_dialog(self, users):
pass
def autocomplete_users(self, *args, **kwargs):
c = completion.autocompletionUsers(self.dialog, self.session.session_id)
c.show_menu("dm")
def process_action(self):
action = self.dialog.get_action()
user = self.dialog.get_user()
user = self.search_user(user)
if user == None:
return
getattr(self, action)(user)
def search_user(self, user):
try:
users = self.session.api.account_search(user)
@ -38,38 +31,52 @@ class userActionsController(object):
except MastodonError:
log.exception("Error searching for user %s.".format(user))
class userActions(BasicUserSelector):
def __init__(self, *args, **kwargs):
super(userActions, self).__init__(*args, **kwargs)
if self.dialog.get_response() == widgetUtils.OK:
self.process_action()
def create_dialog(self, users):
self.dialog = userActionsDialog.UserActionsDialog(users)
widgetUtils.connect_event(self.dialog.autocompletion, widgetUtils.BUTTON_PRESSED, self.autocomplete_users)
def process_action(self):
action = self.dialog.get_action()
user = self.dialog.get_user()
user = self.search_user(user)
if user == None:
return
getattr(self, action)(user)
def follow(self, user):
try:
self.session.api.account_follow(user.id)
pub.sendMessage("restartStreaming", session=self.session.session_id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
def unfollow(self, user):
try:
result = self.session.api.account_unfollow(user.id)
pub.sendMessage("restartStreaming", session=self.session.session_id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
def mute(self, user):
try:
id = self.session.api.account_mute(user.id)
pub.sendMessage("restartStreaming", session=self.session.session_id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
def unmute(self, user):
try:
id = self.session.api.account_unmute(user.id)
pub.sendMessage("restartStreaming", session=self.session.session_id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
def block(self, user):
try:
id = self.session.api.account_block(user.id)
pub.sendMessage("restartStreaming", session=self.session.session_id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
@ -78,3 +85,18 @@ class userActionsController(object):
id = self.session.api.account_unblock(user.id)
except MastodonError as err:
output.speak("Error %s" % (str(err)), True)
class UserTimeline(BasicUserSelector):
def create_dialog(self, users):
self.dialog = userTimelineDialog.UserTimeline(users)
widgetUtils.connect_event(self.dialog.autocompletion, widgetUtils.BUTTON_PRESSED, self.autocomplete_users)
def process_action(self):
action = self.dialog.get_action()
user = self.dialog.get_user()
user = self.search_user(user)
if user == None:
return
self.user = user
return action

View File

@ -28,7 +28,7 @@ timelines = list(default=list())
searches = list(default=list())
lists = list(default=list())
followers_timelines = list(default=list())
friends_timelines = list(default=list())
following_timelines = list(default=list())
trending_topic_buffers = list(default=list())
muted_buffers = list(default=list())
autoread_buffers = list(default=list(mentions, direct_messages, events))

View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
import wx
class UserTimeline(wx.Dialog):
def __init__(self, users=[], default="posts", *args, **kwargs):
super(UserTimeline, self).__init__(parent=None, *args, **kwargs)
panel = wx.Panel(self)
userSizer = wx.BoxSizer()
self.SetTitle(_("Timeline for %s") % (users[0]))
userLabel = wx.StaticText(panel, -1, _(u"User"))
self.cb = wx.ComboBox(panel, -1, choices=users, value=users[0])
self.cb.SetFocus()
self.autocompletion = wx.Button(panel, -1, _(u"&Autocomplete users"))
userSizer.Add(userLabel, 0, wx.ALL, 5)
userSizer.Add(self.cb, 0, wx.ALL, 5)
userSizer.Add(self.autocompletion, 0, wx.ALL, 5)
actionSizer = wx.BoxSizer(wx.VERTICAL)
label2 = wx.StaticText(panel, -1, _(u"Buffer type"))
self.posts = wx.RadioButton(panel, -1, _(u"&Posts"), style=wx.RB_GROUP)
self.followers = wx.RadioButton(panel, -1, _(u"&Followers"))
self.following = wx.RadioButton(panel, -1, _(u"F&ollowing"))
self.setup_default(default)
hSizer = wx.BoxSizer(wx.HORIZONTAL)
hSizer.Add(label2, 0, wx.ALL, 5)
actionSizer.Add(self.posts, 0, wx.ALL, 5)
actionSizer.Add(self.followers, 0, wx.ALL, 5)
actionSizer.Add(self.following, 0, wx.ALL, 5)
hSizer.Add(actionSizer, 0, wx.ALL, 5)
sizer = wx.BoxSizer(wx.VERTICAL)
ok = wx.Button(panel, wx.ID_OK, _(u"&OK"))
ok.SetDefault()
cancel = wx.Button(panel, wx.ID_CANCEL, _(u"&Close"))
btnsizer = wx.BoxSizer()
btnsizer.Add(ok)
btnsizer.Add(cancel)
sizer.Add(userSizer)
sizer.Add(hSizer, 0, wx.ALL, 5)
sizer.Add(btnsizer)
panel.SetSizer(sizer)
def get_action(self):
if self.posts.GetValue() == True: return "posts"
elif self.followers.GetValue() == True: return "followers"
elif self.following.GetValue() == True: return "following"
def setup_default(self, default):
if default == "posts":
self.posts.SetValue(True)
def get_user(self):
return self.cb.GetValue()
def get_position(self):
return self.cb.GetPosition()
def popup_menu(self, menu):
self.PopupMenu(menu, self.cb.GetPosition())