Move almost everything related to Twitter to handler

This commit is contained in:
Manuel Cortez 2022-11-07 15:55:28 -06:00
parent dfcd63b9b6
commit b6b81e2b36
No known key found for this signature in database
GPG Key ID: 9E0735CA15EFE790
3 changed files with 153 additions and 186 deletions

View File

@ -16,6 +16,7 @@ from mysc.thread_utils import call_threaded
from tweepy.errors import TweepyException
from tweepy.cursor import Cursor
from pubsub import pub
from extra import ocr
from sessions.twitter.long_tweets import twishort, tweets
from wxUI import buffers, dialogs, commonMessageDialogs, menus
from controller.twitter import user, messages
@ -606,4 +607,57 @@ class BaseBuffer(base.Buffer):
msg = messages.viewTweet(non_tweet, [], False, date=date)
else:
tweet, tweetsList = self.get_full_tweet()
msg = messages.viewTweet(tweet, tweetsList, utc_offset=self.session.db["utc_offset"], item_url=self.get_item_url())
msg = messages.viewTweet(tweet, tweetsList, utc_offset=self.session.db["utc_offset"], item_url=self.get_item_url())
def reverse_geocode(self, geocoder):
try:
tweet = self.get_tweet()
if tweet.coordinates != None:
x = tweet.coordinates["coordinates"][0]
y = tweet.coordinates["coordinates"][1]
address = geocoder.reverse_geocode(y, x, language = languageHandler.curLang)
return address
else:
output.speak(_("There are no coordinates in this tweet"))
# except GeocoderError:
# output.speak(_(u"There are no results for the coordinates in this tweet"))
except ValueError:
output.speak(_(u"Error decoding coordinates. Try again later."))
except KeyError:
pass
except AttributeError:
pass
def ocr_image(self):
tweet = self.get_tweet()
media_list = []
if hasattr(tweet, "entities") and tweet.entities.get("media") != None:
[media_list.append(i) for i in tweet.entities["media"] if i not in media_list]
elif hasattr(tweet, "retweeted_status") and tweet.retweeted_status.get("media") != None:
[media_list.append(i) for i in tweet.retweeted_status.entities["media"] if i not in media_list]
elif hasattr(tweet, "quoted_status") and tweet.quoted_status.entities.get("media") != None:
[media_list.append(i) for i in tweet.quoted_status.entities["media"] if i not in media_list]
if len(media_list) > 1:
image_list = [_(u"Picture {0}").format(i,) for i in range(0, len(media_list))]
dialog = dialogs.urlList.urlList(title=_(u"Select the picture"))
if dialog.get_response() == widgetUtils.OK:
img = media_list[dialog.get_item()]
else:
return
elif len(media_list) == 1:
img = media_list[0]
else:
output.speak(_(u"Invalid buffer"))
return
if self.session.settings["mysc"]["ocr_language"] != "":
ocr_lang = self.session.settings["mysc"]["ocr_language"]
else:
ocr_lang = ocr.OCRSpace.short_langs.index(tweet.lang)
ocr_lang = ocr.OCRSpace.OcrLangs[ocr_lang]
api = ocr.OCRSpace.OCRSpaceAPI()
try:
text = api.OCR_URL(img["media_url"], lang=ocr_lang)
except ocr.OCRSpace.APIError as er:
output.speak(_(u"Unable to extract text"))
return
msg = messages.viewTweet(text["ParsedText"], [], False)

View File

@ -18,7 +18,7 @@ from tweepy.errors import TweepyException, Forbidden
from geopy.geocoders import Nominatim
from update import updater
from audio_services import youtube_utils
from extra import SoundsTutorial, ocr
from extra import SoundsTutorial
from wxUI import view, dialogs, commonMessageDialogs, sysTrayIcon
from . import settings
from keyboard_handler.wx_handler import WXKeyboardHandler
@ -113,7 +113,6 @@ class Controller(object):
widgetUtils.connect_event(self.view, widgetUtils.CLOSE_EVENT, self.exit_)
pub.subscribe(self.logout_account, "logout")
pub.subscribe(self.login_account, "login")
pub.subscribe(self.create_new_buffer, "create-new-buffer")
pub.subscribe(self.execute_action, "execute-action")
pub.subscribe(self.search_topic, "search")
pub.subscribe(self.update_sent_dms, "sent-dms-updated")
@ -154,7 +153,7 @@ class Controller(object):
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.add_to_favourites, self.view.fav)
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.remove_from_favourites, self.view.unfav)
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.view_item, self.view.view)
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.reverse_geocode, menuitem=self.view.view_coordinates)
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.view_reverse_geocode, menuitem=self.view.view_coordinates)
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.delete, self.view.delete)
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.follow, menuitem=self.view.follow)
widgetUtils.connect_event(self.view, widgetUtils.MENU, self.send_dm, self.view.dm)
@ -650,74 +649,37 @@ class Controller(object):
self.showing = True
def get_trending_topics(self, *args, **kwargs):
buff = self.get_best_buffer()
trends = trendingTopics.trendingTopicsController(buff.session)
if trends.dialog.get_response() == widgetUtils.OK:
woeid = trends.get_woeid()
if woeid in buff.session.settings["other_buffers"]["trending_topic_buffers"]: return
buffer = buffers.twitter.TrendsBuffer(self.view.nb, "%s_tt" % (woeid,), buff.session, buff.account, woeid, sound="trends_updated.ogg")
buffer.searchfunction = self.search
pos=self.view.search(buff.session.db["user_name"], buff.session.db["user_name"])
self.view.insert_buffer(buffer.buffer, name=_(u"Trending topics for %s") % (trends.get_string()), pos=pos)
self.buffers.append(buffer)
buffer.start_stream()
buffer.session.settings["other_buffers"]["trending_topic_buffers"].append(woeid)
buffer.session.settings.write()
def reverse_geocode(self, event=None):
try:
tweet = self.get_current_buffer().get_tweet()
if tweet.coordinates != None:
x = tweet.coordinates["coordinates"][0]
y = tweet.coordinates["coordinates"][1]
address = geocoder.reverse("{}, {}".format(y, x), language = languageHandler.curLang)
if event == None: output.speak(address.address)
else: self.view.show_address(address.address)
else:
output.speak(_(u"There are no coordinates in this tweet"))
except ValueError:
output.speak(_(u"Error decoding coordinates. Try again later."))
# except KeyError:
# pass
except AttributeError:
output.speak(_("Unable to find address in OpenStreetMap."))
buffer = self.get_best_buffer()
handler = self.get_handler(type=buffer.session.type)
if handler != None and hasattr(handler, "get_trending_topics"):
return handler.get_trending_topics(controller=self, session=buffer.session)
def view_reverse_geocode(self, event=None):
try:
tweet = self.get_current_buffer().get_right_tweet()
if tweet.coordinates != None:
x = tweet.coordinates["coordinates"][0]
y = tweet.coordinates["coordinates"][1]
address = geocoder.reverse_geocode(y, x, language = languageHandler.curLang)
dlg = commonMessageDialogs.view_geodata(address[0].__str__())
else:
output.speak(_(u"There are no coordinates in this tweet"))
except GeocoderError:
output.speak(_(u"There are no results for the coordinates in this tweet"))
except ValueError:
output.speak(_(u"Error decoding coordinates. Try again later."))
except KeyError:
pass
except AttributeError:
pass
buffer = self.get_current_buffer()
if hasattr(buffer, "reverse_geocode"):
address = buffer.reverse_geocode()
if address != None:
dlg = commonMessageDialogs.view_geodata(address[0].__str__())
def get_more_items(self, *args, **kwargs):
self.get_current_buffer().get_more_items()
buffer = self.get_current_buffer()
if hasattr(buffer, "get_more_items"):
return buffer.get_more_items()
def clear_buffer(self, *args, **kwargs):
self.get_current_buffer().clear_list()
buffer = self.get_current_buffer()
if hasattr(buffer, "clear"):
return buffer.clear_list()
def remove_buffer(self, *args, **kwargs):
buffer = self.get_current_buffer()
if not hasattr(buffer, "account"): return
if not hasattr(buffer, "account"):
return
buff = self.view.search(buffer.name, buffer.account)
answer = buffer.remove_buffer()
if answer == False: return
if answer == False:
return
log.debug("destroying buffer...")
if hasattr(buffer, "timer"):
log.debug("Stopping timer...")
buffer.timer.cancel()
log.debug("Timer cancelled.")
self.right()
self.view.delete_buffer(buff)
buffer.session.sound.play("delete_timeline.ogg")
@ -733,7 +695,8 @@ class Controller(object):
buffer = self.get_current_buffer()
if buffer.account != self.current_account:
self.current_account = buffer.account
if not hasattr(buffer, "session") or buffer.session == None: return
if not hasattr(buffer, "session") or buffer.session == None:
return
muted = autoread = False
if buffer.name in buffer.session.settings["other_buffers"]["muted_buffers"]:
muted = True
@ -747,7 +710,8 @@ class Controller(object):
if buf == None:
for i in self.accounts:
buffer = self.view.search("home_timeline", i)
if buffer != None: break
if buffer != None:
break
else:
buffer = self.view.search("home_timeline", buf.session.db["user_name"])
if buffer!=None:
@ -920,16 +884,23 @@ class Controller(object):
def url(self, *args, **kwargs):
buffer = self.get_current_buffer()
buffer.url()
if hasattr(buffer, "url"):
buffer.url()
def audio(self, *args, **kwargs):
self.get_current_buffer().audio()
buffer = self.get_current_buffer()
if hasattr(buffer, "audio"):
return buffer.audio()
def volume_down(self, *args, **kwargs):
self.get_current_buffer().volume_down()
buffer = self.get_current_buffer()
if hasattr(buffer, "volume_down"):
return buffer.volume_down()
def volume_up(self, *args, **kwargs):
self.get_current_buffer().volume_up()
buffer = self.get_current_buffer()
if hasattr(buffer, "volume_up"):
return buffer.volume_up()
def create_invisible_keyboard_shorcuts(self):
keymap = {}
@ -955,7 +926,8 @@ class Controller(object):
pass
def notify(self, session, play_sound=None, message=None, notification=False):
if session.settings["sound"]["session_mute"] == True: return
if session.settings["sound"]["session_mute"] == True:
return
if play_sound != None:
session.sound.play(play_sound)
if message != None:
@ -1019,32 +991,10 @@ class Controller(object):
def start_buffers(self, session):
log.debug("starting buffers... Session %s" % (session.session_id,))
handler = self.get_handler(type=session.type)
for i in self.buffers:
if i.session == session and i.needs_init == True:
if hasattr(i, "finished_timeline") and i.finished_timeline == False:
change_title = True
else:
change_title = False
try:
if "mentions" in i.name or "direct_messages" in i.name:
i.start_stream()
else:
i.start_stream(play_sound=False)
except TweepyException as err:
log.exception("Error %s starting buffer %s on account %s, with args %r and kwargs %r." % (str(err), i.name, i.account, i.args, i.kwargs))
# Determine if this error was caused by a block applied to the current user (IE permission errors).
if type(err) == Forbidden:
buff = self.view.search(i.name, i.account)
i.remove_buffer(force=True)
commonMessageDialogs.blocked_timeline()
if self.get_current_buffer() == i:
self.right()
self.view.delete_buffer(buff)
self.buffers.remove(i)
del i
continue
if change_title:
pub.sendMessage("buffer-title-changed", buffer=i)
handler.start_buffer(controller=self, buffer=i)
def set_positions(self):
for i in sessions.sessions:
@ -1055,54 +1005,12 @@ class Controller(object):
return
for i in sessions.sessions:
try:
if sessions.sessions[i].is_logged == False: continue
if sessions.sessions[i].is_logged == False:
continue
sessions.sessions[i].check_connection()
except TweepyException: # We shouldn't allow this function to die.
except: # We shouldn't allow this function to die.
pass
def create_new_buffer(self, buffer, account, create):
buff = self.search_buffer("home_timeline", account)
if create == True:
if buffer == "favourites":
favourites = buffers.twitter.BaseBuffer(self.view.nb, "get_favorites", "favourites", buff.session, buff.session.db["user_name"], include_ext_alt_text=True, tweet_mode="extended")
self.buffers.append(favourites)
self.view.insert_buffer(favourites.buffer, name=_(u"Likes"), pos=self.view.search(buff.session.db["user_name"], buff.session.db["user_name"]))
favourites.start_stream(play_sound=False)
if buffer == "followers":
followers = buffers.twitter.PeopleBuffer(self.view.nb, "get_followers", "followers", buff.session, buff.session.db["user_name"], screen_name=buff.session.db["user_name"])
self.buffers.append(followers)
self.view.insert_buffer(followers.buffer, name=_(u"Followers"), pos=self.view.search(buff.session.db["user_name"], buff.session.db["user_name"]))
followers.start_stream(play_sound=False)
elif buffer == "friends":
friends = buffers.twitter.PeopleBuffer(self.view.nb, "get_friends", "friends", buff.session, buff.session.db["user_name"], screen_name=buff.session.db["user_name"])
self.buffers.append(friends)
self.view.insert_buffer(friends.buffer, name=_(u"Friends"), pos=self.view.search(buff.session.db["user_name"], buff.session.db["user_name"]))
friends.start_stream(play_sound=False)
elif buffer == "blocked":
blocks = buffers.twitter.PeopleBuffer(self.view.nb, "get_blocks", "blocked", buff.session, buff.session.db["user_name"])
self.buffers.append(blocks)
self.view.insert_buffer(blocks.buffer, name=_(u"Blocked users"), pos=self.view.search(buff.session.db["user_name"], buff.session.db["user_name"]))
blocks.start_stream(play_sound=False)
elif buffer == "muted":
muted = buffers.twitter.PeopleBuffer(self.view.nb, "get_mutes", "muted", buff.session, buff.session.db["user_name"])
self.buffers.append(muted)
self.view.insert_buffer(muted.buffer, name=_(u"Muted users"), pos=self.view.search(buff.session.db["user_name"], buff.session.db["user_name"]))
muted.start_stream(play_sound=False)
elif create == False:
self.destroy_buffer(buffer, buff.session.db["user_name"])
elif buffer == "list":
if create in buff.session.settings["other_buffers"]["lists"]:
output.speak(_(u"This list is already opened"), True)
return
tl = buffers.twitter.ListBuffer(self.view.nb, "list_timeline", create+"-list", buff.session, buff.session.db["user_name"], bufferType=None, list_id=utils.find_list(create, buff.session.db["lists"]), include_ext_alt_text=True, tweet_mode="extended")
buff.session.lists.append(tl)
pos=self.view.search("lists", buff.session.db["user_name"])
self.insert_buffer(tl, pos)
self.view.insert_buffer(tl.buffer, name=_(u"List for {}").format(create), pos=self.view.search("lists", buff.session.db["user_name"]))
tl.start_stream(play_sound=False)
buff.session.settings["other_buffers"]["lists"].append(create)
buff.session.settings.write()
def invisible_shorcuts_changed(self, registered):
if registered == True:
km = self.create_invisible_keyboard_shorcuts()
@ -1161,17 +1069,20 @@ class Controller(object):
sessions.sessions.pop(i)
def update_profile(self, *args, **kwargs):
r = user.profileController(self.get_best_buffer().session)
buffer = self.get_best_buffer()
handler = self.get_handler(type=buffer.session.type)
if hasattr(handler, "update_profile"):
return handler.update_profile(session=buffer.session)
def user_details(self, *args, **kwargs):
buffer = self.get_current_buffer()
if not hasattr(buffer, "session") or buffer.session == None: return
if hasattr(buffer, "user_details"):
buffer.user_details()
def toggle_autoread(self, *args, **kwargs):
buffer = self.get_current_buffer()
if hasattr(buffer, "session") and buffer.session == None: return
if hasattr(buffer, "session") and buffer.session == None:
return
if buffer.name not in buffer.session.settings["other_buffers"]["autoread_buffers"]:
buffer.session.settings["other_buffers"]["autoread_buffers"].append(buffer.name)
output.speak(_(u"The auto-reading of new tweets is enabled for this buffer"), True)
@ -1192,7 +1103,8 @@ class Controller(object):
def toggle_buffer_mute(self, *args, **kwargs):
buffer = self.get_current_buffer()
if hasattr(buffer, "session") and buffer.session == None: return
if hasattr(buffer, "session") and buffer.session == None:
return
if buffer.name not in buffer.session.settings["other_buffers"]["muted_buffers"]:
buffer.session.settings["other_buffers"]["muted_buffers"].append(buffer.name)
output.speak(_(u"Buffer mute on"), True)
@ -1213,9 +1125,6 @@ class Controller(object):
webbrowser.open("changelog.html")
os.chdir("../../")
def insert_buffer(self, buffer, position):
self.buffers.insert(position, buffer)
def copy_to_clipboard(self, *args, **kwargs):
output.copy(self.get_current_buffer().get_message())
output.speak(_(u"Copied"))
@ -1232,7 +1141,7 @@ class Controller(object):
if i.session != None and i.session.is_logged == True:
try:
i.start_stream(mandatory=True)
except TweepyException as err:
except Exception as err:
log.exception("Error %s starting buffer %s on account %s, with args %r and kwargs %r." % (str(err), i.name, i.account, i.args, i.kwargs))
# Determine if this error was caused by a block applied to the current user (IE permission errors).
if type(err) == Forbidden:
@ -1250,11 +1159,10 @@ class Controller(object):
if not hasattr(bf, "start_stream"):
output.speak(_(u"Unable to update this buffer."))
return
else:
output.speak(_(u"Updating buffer..."))
n = bf.start_stream(mandatory=True, avoid_autoreading=True)
if n != None:
output.speak(_(u"{0} items retrieved").format(n,))
output.speak(_(u"Updating buffer..."))
n = bf.start_stream(mandatory=True, avoid_autoreading=True)
if n != None:
output.speak(_(u"{0} items retrieved").format(n,))
def buffer_title_changed(self, buffer):
if buffer.name.endswith("-timeline"):
@ -1272,41 +1180,8 @@ class Controller(object):
def ocr_image(self, *args, **kwargs):
buffer = self.get_current_buffer()
if hasattr(buffer, "get_right_tweet") == False:
output.speak(_(u"Invalid buffer"))
return
tweet = buffer.get_tweet()
media_list = []
if hasattr(tweet, "entities") and tweet.entities.get("media") != None:
[media_list.append(i) for i in tweet.entities["media"] if i not in media_list]
elif hasattr(tweet, "retweeted_status") and tweet.retweeted_status.get("media") != None:
[media_list.append(i) for i in tweet.retweeted_status.entities["media"] if i not in media_list]
elif hasattr(tweet, "quoted_status") and tweet.quoted_status.entities.get("media") != None:
[media_list.append(i) for i in tweet.quoted_status.entities["media"] if i not in media_list]
if len(media_list) > 1:
image_list = [_(u"Picture {0}").format(i,) for i in range(0, len(media_list))]
dialog = dialogs.urlList.urlList(title=_(u"Select the picture"))
if dialog.get_response() == widgetUtils.OK:
img = media_list[dialog.get_item()]
else:
return
elif len(media_list) == 1:
img = media_list[0]
else:
output.speak(_(u"Invalid buffer"))
return
if buffer.session.settings["mysc"]["ocr_language"] != "":
ocr_lang = buffer.session.settings["mysc"]["ocr_language"]
else:
ocr_lang = ocr.OCRSpace.short_langs.index(tweet.lang)
ocr_lang = ocr.OCRSpace.OcrLangs[ocr_lang]
api = ocr.OCRSpace.OCRSpaceAPI()
try:
text = api.OCR_URL(img["media_url"], lang=ocr_lang)
except ocr.OCRSpace.APIError as er:
output.speak(_(u"Unable to extract text"))
return
msg = messages.viewTweet(text["ParsedText"], [], False)
if hasattr(buffer, "ocr_image"):
return buffer.ocr_image()
def update_sent_dms(self, total, account):
sent_dms = self.search_buffer("sent_direct_messages", account)

View File

@ -8,7 +8,7 @@ from mysc import restart
from sessions.twitter import utils, compose
from controller import userSelector
from wxUI import dialogs, commonMessageDialogs
from . import filters, lists, settings, userActions
from . import filters, lists, settings, userActions, trendingTopics, user
log = logging.getLogger("controller.twitter.handler")
@ -279,4 +279,42 @@ class Handler(object):
user = buffer.session.get_user(tweet.user).screen_name
searches_position =controller.view.search("searches", buffer.session.db["user_name"])
pub.sendMessage("createBuffer", buffer_type="ConversationBuffer", session_type=buffer.session.type, buffer_title=_(u"Conversation with {0}").format(user), parent_tab=searches_position, start=True, kwargs=dict(tweet=tweet, parent=controller.view.nb, function="search_tweets", name="%s-searchterm" % (tweet.id,), sessionObject=buffer.session, account=buffer.session.db["user_name"], bufferType="searchPanel", sound="search_updated.ogg", since_id=tweet.id, q="@{0}".format(user)))
def get_trending_topics(self, controller, session):
trends = trendingTopics.trendingTopicsController(session)
if trends.dialog.get_response() == widgetUtils.OK:
woeid = trends.get_woeid()
if woeid in session.settings["other_buffers"]["trending_topic_buffers"]:
return
root_position =controller.view.search(session.db["user_name"], session.db["user_name"])
pub.sendMessage("createBuffer", buffer_type="TrendsBuffer", session_type=session.type, buffer_title=_("Trending topics for %s") % (trends.get_string()), parent_tab=root_position, start=True, kwargs=dict(parent=controller.view.nb, name="%s_tt" % (woeid,), sessionObject=session, account=session.db["user_name"], trendsFor=woeid, sound="trends_updated.ogg"))
session.settings["other_buffers"]["trending_topic_buffers"].append(woeid)
session.settings.write()
def start_buffer(self, controller, buffer):
if hasattr(buffer, "finished_timeline") and buffer.finished_timeline == False:
change_title = True
else:
change_title = False
try:
if "mentions" in buffer.name or "direct_messages" in buffer.name:
buffer.start_stream()
else:
buffer.start_stream(play_sound=False)
except TweepyException as err:
log.exception("Error %s starting buffer %s on account %s, with args %r and kwargs %r." % (str(err), buffer.name, buffer.account, buffer.args, buffer.kwargs))
# Determine if this error was caused by a block applied to the current user (IE permission errors).
if type(err) == Forbidden:
buff = controller.view.search(buffer.name, buffer.account)
buffer.remove_buffer(force=True)
commonMessageDialogs.blocked_timeline()
if controller.get_current_buffer() == buffer:
controller.right()
controller.view.delete_buffer(buff)
controller.buffers.remove(buffer)
del buffer
if change_title:
pub.sendMessage("buffer-title-changed", buffer=buffer)
def update_profile(self, session):
r = user.profileController(session)