mirror of
https://github.com/MCV-Software/TWBlue.git
synced 2025-07-17 13:46:07 -04:00
Feat: Added support to display local and public timelines for remote instances
This commit is contained in:
@@ -5,3 +5,4 @@ from .conversations import ConversationBuffer, ConversationListBuffer
|
||||
from .users import UserBuffer
|
||||
from .notifications import NotificationsBuffer
|
||||
from .search import SearchBuffer
|
||||
from .community import CommunityBuffer
|
||||
|
@@ -591,8 +591,11 @@ class BaseBuffer(base.Buffer):
|
||||
response = viewer.message.ShowModal()
|
||||
viewer.message.Destroy()
|
||||
|
||||
def vote(self):
|
||||
post = self.get_item()
|
||||
def vote(self, item=None):
|
||||
if item == None:
|
||||
post = self.get_item()
|
||||
else:
|
||||
post = item
|
||||
if not hasattr(post, "poll") or post.poll == None:
|
||||
return
|
||||
poll = post.poll
|
||||
|
160
src/controller/buffers/mastodon/community.py
Normal file
160
src/controller/buffers/mastodon/community.py
Normal file
@@ -0,0 +1,160 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import time
|
||||
import logging
|
||||
import mastodon
|
||||
import widgetUtils
|
||||
import output
|
||||
from wxUI import commonMessageDialogs
|
||||
from sessions.mastodon import utils
|
||||
from . import base
|
||||
|
||||
log = logging.getLogger("controller.buffers.mastodon.community")
|
||||
|
||||
class CommunityBuffer(base.BaseBuffer):
|
||||
def __init__(self, community_url, *args, **kwargs):
|
||||
super(CommunityBuffer, self).__init__(*args, **kwargs)
|
||||
self.community_url = community_url
|
||||
self.community_api = mastodon.Mastodon(api_base_url=self.community_url)
|
||||
self.timeline = kwargs.get("timeline", "local")
|
||||
self.kwargs.pop("timeline")
|
||||
|
||||
def get_buffer_name(self):
|
||||
type = _("Local") if self.timeline == "local" else _("Federated")
|
||||
instance = self.community_url.replace("https://", "")
|
||||
return _(f"{type} timeline for {instance}")
|
||||
|
||||
def start_stream(self, mandatory=False, play_sound=True, avoid_autoreading=False):
|
||||
current_time = time.time()
|
||||
if self.execution_time == 0 or current_time-self.execution_time >= 180 or mandatory==True:
|
||||
self.execution_time = current_time
|
||||
log.debug("Starting stream for buffer %s, account %s and type %s" % (self.name, self.account, self.type))
|
||||
log.debug("args: %s, kwargs: %s" % (self.args, self.kwargs))
|
||||
count = self.session.settings["general"]["max_posts_per_call"]
|
||||
min_id = None
|
||||
# toDo: Implement reverse timelines properly here.
|
||||
if self.name in self.session.db and len(self.session.db[self.name]) > 0:
|
||||
if self.session.settings["general"]["reverse_timelines"]:
|
||||
min_id = self.session.db[self.name][0].id
|
||||
else:
|
||||
min_id = self.session.db[self.name][-1].id
|
||||
try:
|
||||
results = self.community_api.timeline(timeline=self.timeline, min_id=min_id, limit=count, *self.args, **self.kwargs)
|
||||
results.reverse()
|
||||
except Exception as e:
|
||||
log.exception("Error %s" % (str(e)))
|
||||
return
|
||||
number_of_items = self.session.order_buffer(self.name, results)
|
||||
log.debug("Number of items retrieved: %d" % (number_of_items,))
|
||||
self.put_items_on_list(number_of_items)
|
||||
if number_of_items > 0 and self.sound != None and self.session.settings["sound"]["session_mute"] == False and self.name not in self.session.settings["other_buffers"]["muted_buffers"] and play_sound == True:
|
||||
self.session.sound.play(self.sound)
|
||||
# Autoread settings
|
||||
if avoid_autoreading == False and mandatory == True and number_of_items > 0 and self.name in self.session.settings["other_buffers"]["autoread_buffers"]:
|
||||
self.auto_read(number_of_items)
|
||||
return number_of_items
|
||||
|
||||
def get_more_items(self):
|
||||
elements = []
|
||||
if self.session.settings["general"]["reverse_timelines"] == False:
|
||||
max_id = self.session.db[self.name][0].id
|
||||
else:
|
||||
max_id = self.session.db[self.name][-1].id
|
||||
try:
|
||||
items = self.community_api.timeline(timeline=self.timeline, max_id=max_id, limit=self.session.settings["general"]["max_posts_per_call"], *self.args, **self.kwargs)
|
||||
except Exception as e:
|
||||
log.exception("Error %s" % (str(e)))
|
||||
return
|
||||
items_db = self.session.db[self.name]
|
||||
for i in items:
|
||||
if utils.find_item(i, self.session.db[self.name]) == None:
|
||||
elements.append(i)
|
||||
if self.session.settings["general"]["reverse_timelines"] == False:
|
||||
items_db.insert(0, i)
|
||||
else:
|
||||
items_db.append(i)
|
||||
self.session.db[self.name] = items_db
|
||||
selection = self.buffer.list.get_selected()
|
||||
log.debug("Retrieved %d items from cursored search in function %s." % (len(elements), self.function))
|
||||
safe = True
|
||||
if self.session.settings["general"]["read_preferences_from_instance"]:
|
||||
safe = self.session.expand_spoilers == False
|
||||
if self.session.settings["general"]["reverse_timelines"] == False:
|
||||
for i in elements:
|
||||
post = self.compose_function(i, self.session.db, self.session.settings, self.session.settings["general"]["relative_times"], self.session.settings["general"]["show_screen_names"], safe=safe)
|
||||
self.buffer.list.insert_item(True, *post)
|
||||
else:
|
||||
for i in elements:
|
||||
post = self.compose_function(i, self.session.db, self.session.settings, self.session.settings["general"]["relative_times"], self.session.settings["general"]["show_screen_names"], safe=safe)
|
||||
self.buffer.list.insert_item(False, *post)
|
||||
self.buffer.list.select_item(selection)
|
||||
output.speak(_(u"%s items retrieved") % (str(len(elements))), True)
|
||||
|
||||
def remove_buffer(self, force=False):
|
||||
if force == False:
|
||||
dlg = commonMessageDialogs.remove_buffer()
|
||||
else:
|
||||
dlg = widgetUtils.YES
|
||||
if dlg == widgetUtils.YES:
|
||||
tl_info = f"{self.timeline}@{self.community_url}"
|
||||
self.session.settings["other_buffers"]["communities"].remove(tl_info)
|
||||
self.session.settings.write()
|
||||
if self.name in self.session.db:
|
||||
self.session.db.pop(self.name)
|
||||
return True
|
||||
elif dlg == widgetUtils.NO:
|
||||
return False
|
||||
|
||||
def get_item_from_instance(self, *args, **kwargs):
|
||||
item = self.get_item()
|
||||
try:
|
||||
results = self.session.api.search(q=item.url, resolve=True, result_type="statuses")
|
||||
except Exception as e:
|
||||
log.exception("Error when searching for remote post.")
|
||||
return None
|
||||
item = results["statuses"][0]
|
||||
return item
|
||||
|
||||
def reply(self, *args, **kwargs):
|
||||
item = self.get_item_from_instance()
|
||||
if item != None:
|
||||
super(CommunityBuffer, self).reply(item=item)
|
||||
|
||||
def send_message(self, *args, **kwargs):
|
||||
item = self.get_item_from_instance()
|
||||
if item != None:
|
||||
super(CommunityBuffer, self).send_message(item=item)
|
||||
|
||||
def share_item(self, *args, **kwargs):
|
||||
item = self.get_item_from_instance()
|
||||
if item != None:
|
||||
super(CommunityBuffer, self).share_item(item=item)
|
||||
|
||||
def add_to_favorites(self, *args, **kwargs):
|
||||
item = self.get_item_from_instance()
|
||||
if item != None:
|
||||
super(CommunityBuffer, self).add_to_favorite(item=item)
|
||||
|
||||
def remove_from_favorites(self, *args, **kwargs):
|
||||
item = self.get_item_from_instance()
|
||||
if item != None:
|
||||
super(CommunityBuffer, self).remove_from_favorites(item=item)
|
||||
|
||||
def toggle_favorite(self, *args, **kwargs):
|
||||
item = self.get_item_from_instance()
|
||||
if item != None:
|
||||
super(CommunityBuffer, self).toggle_favorite(item=item)
|
||||
|
||||
def toggle_bookmark(self, *args, **kwargs):
|
||||
item = self.get_item_from_instance()
|
||||
if item != None:
|
||||
super(CommunityBuffer, self).toggle_bookmark(item=item)
|
||||
|
||||
def vote(self, *args, **kwargs):
|
||||
item = self.get_item_from_instance()
|
||||
if item != None:
|
||||
super(CommunityBuffer, self).vote(item=item)
|
||||
|
||||
def view_item(self, *args, **kwargs):
|
||||
item = self.get_item_from_instance()
|
||||
if item != None:
|
||||
super(CommunityBuffer, self).view_item(item=item)
|
@@ -107,6 +107,9 @@ class Handler(object):
|
||||
pub.sendMessage("createBuffer", buffer_type="SearchBuffer", session_type=session.type, buffer_title=_("Search for {}").format(term), parent_tab=searches_position, start=True, kwargs=dict(parent=controller.view.nb, compose_func="compose_post", function="search", name="%s-searchterm" % (term,), sessionObject=session, account=session.get_name(), sound="search_updated.ogg", q=term, result_type="statuses"))
|
||||
pub.sendMessage("createBuffer", buffer_type="EmptyBuffer", session_type="base", buffer_title=_("Communities"), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="communities", account=name))
|
||||
communities_position =controller.view.search("communities", name)
|
||||
for community in session.settings["other_buffers"]["communities"]:
|
||||
pub.sendMessage("createBuffer", buffer_type="CommunityBuffer", session_type=session.type, buffer_title=_("Community for {}").format(community.split("@")[1].replace("https://", "")), parent_tab=communities_position, start=True, kwargs=dict(parent=controller.view.nb, function="timeline", compose_func="compose_post", name=community, sessionObject=session, community_url=community.split("@")[1], account=session.get_name(), sound="search_updated.ogg", timeline=community.split("@")[0]))
|
||||
|
||||
# for i in session.settings["other_buffers"]["trending_topic_buffers"]:
|
||||
# pub.sendMessage("createBuffer", buffer_type="TrendsBuffer", session_type=session.type, buffer_title=_("Trending topics for %s") % (i), parent_tab=root_position, start=False, kwargs=dict(parent=controller.view.nb, name="%s_tt" % (i,), sessionObject=session, name, trendsFor=i, sound="trends_updated.ogg"))
|
||||
|
||||
|
Reference in New Issue
Block a user