diff --git a/src/controller/mainController.py b/src/controller/mainController.py index 6f6e21ea..4f415f8c 100644 --- a/src/controller/mainController.py +++ b/src/controller/mainController.py @@ -125,6 +125,7 @@ class Controller(object): pub.subscribe(self.update_sent_dms, "sent-dms-updated") pub.subscribe(self.more_dms, "more-sent-dms") pub.subscribe(self.manage_sent_tweets, "sent-tweet") + pub.subscribe(self.manage_new_tweet, "newTweet") pub.subscribe(self.manage_friend, "friend") pub.subscribe(self.manage_unfollowing, "unfollowing") pub.subscribe(self.manage_favourite, "favourite") @@ -265,11 +266,15 @@ class Controller(object): if sessions.sessions[i].is_logged == False: continue self.start_buffers(sessions.sessions[i]) self.set_buffer_positions(sessions.sessions[i]) + sessions.sessions[i].start_streaming() if config.app["app-settings"]["play_ready_sound"] == True: sessions.sessions[list(sessions.sessions.keys())[0]].sound.play("ready.ogg") if config.app["app-settings"]["speak_ready_msg"] == True: output.speak(_(u"Ready")) self.started = True + self.streams_checker_function = RepeatingTimer(60, self.check_streams) + self.streams_checker_function.start() + def create_ignored_session_buffer(self, session): self.accounts.append(session.settings["twitter"]["user_name"]) @@ -649,6 +654,8 @@ class Controller(object): log.debug("Saving global configuration...") for item in sessions.sessions: if sessions.sessions[item].logged == False: continue + log.debug("Disconnecting streaming endpoint for session" + sessions.sessions[item].session_id) + sessions.sessions[item].stop_streaming() log.debug("Disconnecting streams for %s session" % (sessions.sessions[item].session_id,)) sessions.sessions[item].sound.cleaner.cancel() log.debug("Saving database for " + sessions.sessions[item].session_id) @@ -658,6 +665,9 @@ class Controller(object): pidpath = os.path.join(os.getenv("temp"), "{}.pid".format(application.name)) if os.path.exists(pidpath): os.remove(pidpath) + if hasattr(self, "streams_checker_function"): + log.debug("Stopping stream checker...") + self.streams_checker_function.cancel() widgetUtils.exit_application() def follow(self, *args, **kwargs): @@ -1268,8 +1278,6 @@ class Controller(object): def manage_sent_tweets(self, data, user): buffer = self.search_buffer("sent_tweets", user) if buffer == None: return -# if "sent_tweets" not in buffer.session.settings["other_buffers"]["muted_buffers"]: -# self.notify(buffer.session, play_sound=play_sound) data = buffer.session.check_quoted_status(data) data = buffer.session.check_long_tweet(data) if data == False: # Long tweet deleted from twishort. @@ -1622,3 +1630,27 @@ class Controller(object): def save_data_in_db(self): for i in sessions.sessions: sessions.sessions[i].save_persistent_data() + + def manage_new_tweet(self, data, user, _buffers): + sound_to_play = None + for buff in _buffers: + buffer = self.search_buffer(buff, user) + if buffer == None or buffer.session.db["user_name"] != user: return + buffer.add_new_item(data) + if buff == "home_timeline": sound_to_play = "tweet_received.ogg" + elif buff == "mentions": sound_to_play = "mention_received.ogg" + elif buff == "sent_tweets": sound_to_play = "tweet_send.ogg" + elif "timeline" in buff: sound_to_play = "tweet_timeline.ogg" + else: sound_to_play = None + if sound_to_play != None and buff not in buffer.session.settings["other_buffers"]["muted_buffers"]: + self.notify(buffer.session, sound_to_play) + + def check_streams(self): + if self.started == False: + return + for i in sessions.sessions: + try: + if sessions.sessions[i].is_logged == False: continue + sessions.sessions[i].check_streams() + except TweepError: # We shouldn't allow this function to die. + pass diff --git a/src/sessions/twitter/session.py b/src/sessions/twitter/session.py index e33d1019..cdfd9684 100644 --- a/src/sessions/twitter/session.py +++ b/src/sessions/twitter/session.py @@ -17,7 +17,7 @@ from keys import keyring from sessions import base from sessions.twitter import utils, compose from sessions.twitter.long_tweets import tweets, twishort -from . import reduce +from . import reduce, streaming from .wxUI import authorisationDialog log = logging.getLogger("sessions.twitterSession") @@ -125,6 +125,8 @@ class Session(base.baseSession): # This will be especially useful because if the user reactivates their account later, TWblue will try to retrieve such user again at startup. # If we wouldn't implement this approach, TWBlue would save permanently the "deleted user" object. self.deleted_users = {} + pub.subscribe(self.handle_new_status, "newStatus") + pub.subscribe(self.handle_connected, "streamConnected") # @_require_configuration def login(self, verify_credentials=True): @@ -501,3 +503,61 @@ class Session(base.baseSession): if hasattr(i, "retweeted_status") and (i.retweeted_status.user.id_str in self.db["users"]) == False: users[i.retweeted_status.user.id_str] = i.retweeted_status.user self.db["users"] = users + + def start_streaming(self): + self.stream_listener = streaming.StreamListener(twitter_api=self.twitter, user=self.db["user_name"], user_id=self.db["user_id"]) + self.stream = streaming.Stream(auth = self.auth, listener=self.stream_listener, chunk_size=1025) + self.stream_thread = call_threaded(self.stream.filter, follow=self.stream_listener.users, stall_warnings=True) + + def stop_streaming(self): + self.stream.running = False + log.debug("Stream stopped for accounr {}".format(self.db["user_name"])) + + def handle_new_status(self, status, user): + """ Handles a new status present in the Streaming API. """ + # Discard processing the status if the streaming sends a tweet for another account. + if self.db["user_name"] != user: + return + # the Streaming API sends non-extended tweets with an optional parameter "extended_tweets" which contains full_text and other data. + # so we have to make sure we check it before processing the normal status. + # As usual, we handle also quotes and retweets at first. + if hasattr(status, "retweeted_status") and hasattr(status.retweeted_status, "extended_tweet"): + status.retweeted_status._json = {**status.retweeted_status._json, **status.retweeted_status._json["extended_tweet"]} + # compose.compose_tweet requires the parent tweet to have a full_text field, so we have to add it to retweets here. + status._json["full_text"] = status._json["text"] + if hasattr(status, "quoted_status") and hasattr(status.quoted_status, "extended_tweet"): + status.quoted_status._json = {**status.quoted_status._json, **status.quoted_status._json["extended_tweet"]} + if status.truncated: + status._json = {**status._json, **status._json["extended_tweet"]} + # Sends status to database, where it will be reduced and changed according to our needs. + buffers_to_send = [] + if status.user.id_str in self.stream_listener.users: + buffers_to_send.append("home_timeline") + if status.user.id == self.db["user_id"]: + buffers_to_send.append("sent_tweets") + for user in status.entities["user_mentions"]: + if user["id"] == self.db["user_id"]: + buffers_to_send.append("mentions") + users_with_timeline = [user.split("-")[0] for user in self.db.keys() if user.endswith("-timeline")] + for user in users_with_timeline: + if status.user.id_str == user: + buffers_to_send.append("{}-timeline".format(user)) + for buffer in buffers_to_send[::]: + num = self.order_buffer(buffer, [status]) + if num == 0: + buffers_to_send.remove(buffer) + # However, we have to do the "reduce and change" process here because the status we sent to the db is going to be a different object that the one sent to database. + reduced_status = reduce.reduce_tweet(status) + status = self.check_quoted_status(status) + status = self.check_long_tweet(status) + # Send it to the main controller object. + pub.sendMessage("newTweet", data=status, user=self.db["user_name"], _buffers=buffers_to_send) + + def check_streams(self): + log.debug("Status of running stream for user {}: {}".format(self.db["user_name"], self.stream.running)) + if self.stream.running == False: + self.start_streaming() + + def handle_connected(self, user): + if user != self.db["user_name"]: + log.debug("Connected streaming endpoint on account {}".format(user)) \ No newline at end of file diff --git a/src/sessions/twitter/streaming.py b/src/sessions/twitter/streaming.py new file mode 100644 index 00000000..613edc01 --- /dev/null +++ b/src/sessions/twitter/streaming.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +""" Streaming support for TWBlue. """ +import time +import six +import requests +import urllib3 +import ssl +import tweepy +import logging +from pubsub import pub + +log = logging.getLogger("sessions.twitter.streaming") + +class StreamListener(tweepy.StreamListener): + + def __init__(self, twitter_api, user, user_id, *args, **kwargs): + super(StreamListener, self).__init__(*args, **kwargs) + self.api = twitter_api + self.user = user + self.user_id = user_id + self.users = [str(id) for id in self.api.friends_ids()] + self.users.append(str(self.user_id)) + log.debug("Started streaming object for user {}".format(self.user)) + + def on_connect(self): + pub.sendMessage("streamConnected", user=self.user) + + def on_exception(self, ex): + log.exception("Exception received on streaming endpoint for user {}".format(self.user)) + + def on_status(self, status): + """ Checks data arriving as a tweet. """ + if status.user.id_str in self.users: + pub.sendMessage("newStatus", status=status, user=self.user) +# print(status.text) + + +class Stream(tweepy.Stream): + + def _run(self): + # Authenticate + url = "https://%s%s" % (self.host, self.url) + + # Connect and process the stream + error_counter = 0 + resp = None + exc_info = None + while self.running: + if self.retry_count is not None: + if error_counter > self.retry_count: + # quit if error count greater than retry count + break + try: + auth = self.auth.apply_auth() + resp = self.session.request('POST', + url, + data=self.body, + timeout=self.timeout, + stream=True, + auth=auth, + verify=self.verify, + proxies = self.proxies) + if resp.status_code != 200: + if self.listener.on_error(resp.status_code) is False: + break + error_counter += 1 + if resp.status_code == 420: + self.retry_time = max(self.retry_420_start, + self.retry_time) + time.sleep(self.retry_time) + self.retry_time = min(self.retry_time * 2, + self.retry_time_cap) + else: + error_counter = 0 + self.retry_time = self.retry_time_start + self.snooze_time = self.snooze_time_step + self.listener.on_connect() + self._read_loop(resp) + except (requests.ConnectionError, requests.Timeout, ssl.SSLError, urllib3.exceptions.ReadTimeoutError, urllib3.exceptions.ProtocolError) as exc: + # This is still necessary, as a SSLError can actually be + # thrown when using Requests + # If it's not time out treat it like any other exception + if isinstance(exc, ssl.SSLError): + if not (exc.args and 'timed out' in str(exc.args[0])): + exc_info = sys.exc_info() + break + if self.listener.on_timeout() is False: + break + if self.running is False: + break + time.sleep(self.snooze_time) + self.snooze_time = min(self.snooze_time + self.snooze_time_step, + self.snooze_time_cap) + except Exception as exc: + exc_info = sys.exc_info() + # any other exception is fatal, so kill loop + break + + # cleanup + self.running = False + if resp: + resp.close() + + self.new_session() + + if exc_info: + # call a handler first so that the exception can be logged. + self.listener.on_exception(exc_info[1]) + six.reraise(*exc_info)