From 168c7e7a5d6ee02506da1564c76b62cb7cb3cf11 Mon Sep 17 00:00:00 2001 From: Manuel Cortez Date: Mon, 28 Jun 2021 17:03:26 -0500 Subject: [PATCH 1/8] Initial test for supporting a subset of the Streaming API --- src/controller/mainController.py | 11 +++++++++-- src/sessions/twitter/session.py | 22 +++++++++++++++++++++- src/sessions/twitter/streaming.py | 18 ++++++++++++++++++ 3 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 src/sessions/twitter/streaming.py diff --git a/src/controller/mainController.py b/src/controller/mainController.py index b405b3b2..2c3d5836 100644 --- a/src/controller/mainController.py +++ b/src/controller/mainController.py @@ -126,6 +126,7 @@ class Controller(object): pub.subscribe(self.update_sent_dms, "sent-dms-updated") pub.subscribe(self.more_dms, "more-sent-dms") pub.subscribe(self.manage_sent_tweets, "sent-tweet") + pub.subscribe(self.manage_tweet_in_home, "tweet-in-home") pub.subscribe(self.manage_friend, "friend") pub.subscribe(self.manage_unfollowing, "unfollowing") pub.subscribe(self.manage_favourite, "favourite") @@ -266,6 +267,7 @@ class Controller(object): if sessions.sessions[i].is_logged == False: continue self.start_buffers(sessions.sessions[i]) self.set_buffer_positions(sessions.sessions[i]) + sessions.sessions[i].start_streaming() if config.app["app-settings"]["play_ready_sound"] == True: sessions.sessions[list(sessions.sessions.keys())[0]].sound.play("ready.ogg") if config.app["app-settings"]["speak_ready_msg"] == True: @@ -1269,8 +1271,6 @@ class Controller(object): def manage_sent_tweets(self, data, user): buffer = self.search_buffer("sent_tweets", user) if buffer == None: return -# if "sent_tweets" not in buffer.session.settings["other_buffers"]["muted_buffers"]: -# self.notify(buffer.session, play_sound=play_sound) data = buffer.session.check_quoted_status(data) data = buffer.session.check_long_tweet(data) if data == False: # Long tweet deleted from twishort. @@ -1627,3 +1627,10 @@ class Controller(object): def save_data_in_db(self): for i in sessions.sessions: sessions.sessions[i].save_persistent_data() + + def manage_tweet_in_home(self, data, user): + buffer = self.search_buffer("home_timeline", user) + if buffer == None or buffer.session.db["user_name"] != user: return + buffer.add_new_item(data) + if "home_timeline" not in buffer.session.settings["other_buffers"]["muted_buffers"]: + self.notify(buffer.session, "tweet_received.ogg") \ No newline at end of file diff --git a/src/sessions/twitter/session.py b/src/sessions/twitter/session.py index e33d1019..ce02c888 100644 --- a/src/sessions/twitter/session.py +++ b/src/sessions/twitter/session.py @@ -17,7 +17,7 @@ from keys import keyring from sessions import base from sessions.twitter import utils, compose from sessions.twitter.long_tweets import tweets, twishort -from . import reduce +from . import reduce, streaming from .wxUI import authorisationDialog log = logging.getLogger("sessions.twitterSession") @@ -125,6 +125,7 @@ class Session(base.baseSession): # This will be especially useful because if the user reactivates their account later, TWblue will try to retrieve such user again at startup. # If we wouldn't implement this approach, TWBlue would save permanently the "deleted user" object. self.deleted_users = {} + pub.subscribe(self.handle_new_status, "newStatus") # @_require_configuration def login(self, verify_credentials=True): @@ -501,3 +502,22 @@ class Session(base.baseSession): if hasattr(i, "retweeted_status") and (i.retweeted_status.user.id_str in self.db["users"]) == False: users[i.retweeted_status.user.id_str] = i.retweeted_status.user self.db["users"] = users + + def start_streaming(self): + self.stream_listener = streaming.StreamListener(twitter_api=self.twitter, user=self.db["user_name"]) + self.stream = tweepy.Stream(auth = self.auth, listener=self.stream_listener) + call_threaded(self.stream.filter, follow=self.stream_listener.users) + + def handle_new_status(self, status, user): + if self.db["user_name"] != user: + return + if hasattr(status, "retweeted_status") and status.retweeted_status.truncated: + status.retweeted_status._json["full_text"] = status.retweeted_status.extended_tweet["full_text"] + if hasattr(status, "quoted_status") and status.quoted_status.truncated: + status.quoted_status._json["full_text"] = status.quoted_status.extended_tweet["full_text"] + if status.truncated: + status._json["full_text"] = status.extended_tweet["full_text"] + num = self.order_buffer("home_timeline", [status]) + if num == 1: + status = reduce.reduce_tweet(status) + pub.sendMessage("tweet-in-home", data=status, user=self.db["user_name"]) \ No newline at end of file diff --git a/src/sessions/twitter/streaming.py b/src/sessions/twitter/streaming.py new file mode 100644 index 00000000..b40ccc61 --- /dev/null +++ b/src/sessions/twitter/streaming.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +import tweepy +from pubsub import pub + +class StreamListener(tweepy.StreamListener): + + def __init__(self, twitter_api, user, *args, **kwargs): + super(StreamListener, self).__init__(*args, **kwargs) + self.api = twitter_api + self.user = user + self.users = [str(id) for id in self.api.friends_ids()] + + def on_status(self, status): + """ Checks data arriving as a tweet. """ + if status.user.id_str in self.users: + pub.sendMessage("newStatus", status=status, user=self.user) +# print(status.text) + From bb5ead80dea5ab3af0871955a073a401a0aaf35d Mon Sep 17 00:00:00 2001 From: Manuel Cortez Date: Tue, 29 Jun 2021 05:05:20 -0500 Subject: [PATCH 2/8] Parse correctly incoming tweets from Streaming API --- src/sessions/twitter/session.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/sessions/twitter/session.py b/src/sessions/twitter/session.py index ce02c888..f74b8837 100644 --- a/src/sessions/twitter/session.py +++ b/src/sessions/twitter/session.py @@ -509,15 +509,27 @@ class Session(base.baseSession): call_threaded(self.stream.filter, follow=self.stream_listener.users) def handle_new_status(self, status, user): + """ Handles a new status present in the Streaming API. """ + # Discard processing the status if the streaming sends a tweet for another account. if self.db["user_name"] != user: return - if hasattr(status, "retweeted_status") and status.retweeted_status.truncated: - status.retweeted_status._json["full_text"] = status.retweeted_status.extended_tweet["full_text"] - if hasattr(status, "quoted_status") and status.quoted_status.truncated: - status.quoted_status._json["full_text"] = status.quoted_status.extended_tweet["full_text"] + # the Streaming API sends non-extended tweets with an optional parameter "extended_tweets" which contains full_text and other data. + # so we have to make sure we check it before processing the normal status. + # As usual, we handle also quotes and retweets at first. + if hasattr(status, "retweeted_status") and hasattr(status.retweeted_status, "extended_tweet"): + status.retweeted_status._json = {**status.retweeted_status._json, **status.retweeted_status._json["extended_tweet"]} + # compose.compose_tweet requires the parent tweet to have a full_text field, so we have to add it to retweets here. + status._json["full_text"] = status._json["text"] + if hasattr(status, "quoted_status") and hasattr(status.quoted_status, "extended_tweet"): + status.quoted_status._json = {**status.quoted_status._json, **status.quoted_status._json["extended_tweet"]} if status.truncated: - status._json["full_text"] = status.extended_tweet["full_text"] + status._json = {**status._json, **status._json["extended_tweet"]} + # Sends status to database, where it will be reduced and changed according to our needs. num = self.order_buffer("home_timeline", [status]) if num == 1: + # However, we have to do the "reduce and change" process here because the status we sent to the db is going to be a different object that the one sent to database. status = reduce.reduce_tweet(status) + status = self.check_quoted_status(status) + status = self.check_long_tweet(status) + # Send it to the main controller object. pub.sendMessage("tweet-in-home", data=status, user=self.db["user_name"]) \ No newline at end of file From 8fd3041efd2baa5ccac582c85832c7244b3befc3 Mon Sep 17 00:00:00 2001 From: Manuel Cortez Date: Tue, 29 Jun 2021 17:16:53 -0500 Subject: [PATCH 3/8] Added some reconnection code and logging --- src/controller/mainController.py | 15 ++++++++++++++- src/sessions/twitter/session.py | 16 +++++++++++++--- src/sessions/twitter/streaming.py | 14 +++++++++++++- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/src/controller/mainController.py b/src/controller/mainController.py index 2c3d5836..c56a9f7c 100644 --- a/src/controller/mainController.py +++ b/src/controller/mainController.py @@ -273,6 +273,9 @@ class Controller(object): if config.app["app-settings"]["speak_ready_msg"] == True: output.speak(_(u"Ready")) self.started = True + self.streams_checker_function = RepeatingTimer(60, self.check_streams) + self.streams_checker_function.start() + def create_ignored_session_buffer(self, session): self.accounts.append(session.settings["twitter"]["user_name"]) @@ -1633,4 +1636,14 @@ class Controller(object): if buffer == None or buffer.session.db["user_name"] != user: return buffer.add_new_item(data) if "home_timeline" not in buffer.session.settings["other_buffers"]["muted_buffers"]: - self.notify(buffer.session, "tweet_received.ogg") \ No newline at end of file + self.notify(buffer.session, "tweet_received.ogg") + + def check_streams(self): + if self.started == False: + return + for i in sessions.sessions: + try: + if sessions.sessions[i].is_logged == False: continue + sessions.sessions[i].check_streams() + except TweepError: # We shouldn't allow this function to die. + pass diff --git a/src/sessions/twitter/session.py b/src/sessions/twitter/session.py index f74b8837..8dbb6333 100644 --- a/src/sessions/twitter/session.py +++ b/src/sessions/twitter/session.py @@ -126,6 +126,7 @@ class Session(base.baseSession): # If we wouldn't implement this approach, TWBlue would save permanently the "deleted user" object. self.deleted_users = {} pub.subscribe(self.handle_new_status, "newStatus") + pub.subscribe(self.handle_connected, "streamConnected") # @_require_configuration def login(self, verify_credentials=True): @@ -504,9 +505,9 @@ class Session(base.baseSession): self.db["users"] = users def start_streaming(self): - self.stream_listener = streaming.StreamListener(twitter_api=self.twitter, user=self.db["user_name"]) + self.stream_listener = streaming.StreamListener(twitter_api=self.twitter, user=self.db["user_name"], user_id=self.db["user_id"]) self.stream = tweepy.Stream(auth = self.auth, listener=self.stream_listener) - call_threaded(self.stream.filter, follow=self.stream_listener.users) + self.stream_thread = self.stream.filter(follow=self.stream_listener.users, is_async=True) def handle_new_status(self, status, user): """ Handles a new status present in the Streaming API. """ @@ -532,4 +533,13 @@ class Session(base.baseSession): status = self.check_quoted_status(status) status = self.check_long_tweet(status) # Send it to the main controller object. - pub.sendMessage("tweet-in-home", data=status, user=self.db["user_name"]) \ No newline at end of file + pub.sendMessage("tweet-in-home", data=status, user=self.db["user_name"]) + + def check_streams(self): + log.debug("Status of running stream for user {}: {}".format(self.db["user_name"], self.stream.running)) + if self.stream.running == False: + self.start_streaming() + + def handle_connected(self, user): + if user != self.db["user_name"]: + log.debug("Connected streaming endpoint on account {}".format(user)) \ No newline at end of file diff --git a/src/sessions/twitter/streaming.py b/src/sessions/twitter/streaming.py index b40ccc61..1dabf7f6 100644 --- a/src/sessions/twitter/streaming.py +++ b/src/sessions/twitter/streaming.py @@ -1,14 +1,26 @@ # -*- coding: utf-8 -*- import tweepy +import logging from pubsub import pub +log = logging.getLogger("sessions.twitter.streaming") + class StreamListener(tweepy.StreamListener): - def __init__(self, twitter_api, user, *args, **kwargs): + def __init__(self, twitter_api, user, user_id, *args, **kwargs): super(StreamListener, self).__init__(*args, **kwargs) self.api = twitter_api self.user = user + self.user_id = user_id self.users = [str(id) for id in self.api.friends_ids()] + self.users.append(str(self.user_id)) + log.debug("Started streaming object for user {}".format(self.user)) + + def on_connect(self): + pub.sendMessage("streamConnected", user=self.user) + + def on_exception(self, ex): + log.exception("Exception received on streaming endpoint for user {}".format(self.user)) def on_status(self, status): """ Checks data arriving as a tweet. """ From ba908421859707034884900045d444a93b43a75f Mon Sep 17 00:00:00 2001 From: Manuel Cortez Date: Tue, 29 Jun 2021 17:55:36 -0500 Subject: [PATCH 4/8] Initial work to put tweets in mentions, sent and timelines --- src/controller/mainController.py | 21 ++++++++++++++------- src/sessions/twitter/session.py | 30 ++++++++++++++++++++++-------- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/src/controller/mainController.py b/src/controller/mainController.py index c56a9f7c..9e75b459 100644 --- a/src/controller/mainController.py +++ b/src/controller/mainController.py @@ -126,7 +126,7 @@ class Controller(object): pub.subscribe(self.update_sent_dms, "sent-dms-updated") pub.subscribe(self.more_dms, "more-sent-dms") pub.subscribe(self.manage_sent_tweets, "sent-tweet") - pub.subscribe(self.manage_tweet_in_home, "tweet-in-home") + pub.subscribe(self.manage_new_tweet, "newTweet") pub.subscribe(self.manage_friend, "friend") pub.subscribe(self.manage_unfollowing, "unfollowing") pub.subscribe(self.manage_favourite, "favourite") @@ -1631,12 +1631,19 @@ class Controller(object): for i in sessions.sessions: sessions.sessions[i].save_persistent_data() - def manage_tweet_in_home(self, data, user): - buffer = self.search_buffer("home_timeline", user) - if buffer == None or buffer.session.db["user_name"] != user: return - buffer.add_new_item(data) - if "home_timeline" not in buffer.session.settings["other_buffers"]["muted_buffers"]: - self.notify(buffer.session, "tweet_received.ogg") + def manage_new_tweet(self, data, user, _buffers): + sound_to_play = None + for buff in _buffers: + buffer = self.search_buffer(buff, user) + if buffer == None or buffer.session.db["user_name"] != user: return + buffer.add_new_item(data) + if buff == "home_timeline": sound_to_play = "tweet_received.ogg" + elif buff == "mentions_timeline": sound_to_play = "mention_received.ogg" + elif buff == "sent_tweets": sound_to_play = "tweet_send.ogg" + elif "timeline" in buff: sound_to_play = "tweet_timeline.ogg" + else: sound_to_play = None + if sound_to_play != None and buff not in buffer.session.settings["other_buffers"]["muted_buffers"]: + self.notify(buffer.session, sound_to_play) def check_streams(self): if self.started == False: diff --git a/src/sessions/twitter/session.py b/src/sessions/twitter/session.py index 8dbb6333..0bee2fd7 100644 --- a/src/sessions/twitter/session.py +++ b/src/sessions/twitter/session.py @@ -526,14 +526,28 @@ class Session(base.baseSession): if status.truncated: status._json = {**status._json, **status._json["extended_tweet"]} # Sends status to database, where it will be reduced and changed according to our needs. - num = self.order_buffer("home_timeline", [status]) - if num == 1: - # However, we have to do the "reduce and change" process here because the status we sent to the db is going to be a different object that the one sent to database. - status = reduce.reduce_tweet(status) - status = self.check_quoted_status(status) - status = self.check_long_tweet(status) - # Send it to the main controller object. - pub.sendMessage("tweet-in-home", data=status, user=self.db["user_name"]) + buffers_to_send = [] + if status.user.id_str in self.stream_listener.users: + buffers_to_send.append("home_timeline") + if status.user.id == self.db["user_id"]: + buffers_to_send.append("sent_tweets") + for user in status.entities["user_mentions"]: + if user["id"] == self.db["user_id"]: + buffers_to_send.append("mentions_timeline") + users_with_timeline = [user.split("-")[0] for user in self.db.keys() if user.endswith("-timeline")] + for user in users_with_timeline: + if status.user.id_str == user: + buffers_to_send.append("{}-timeline".format(user)) + for buffer in buffers_to_send[::]: + num = self.order_buffer(buffer, [status]) + if num == 0: + buffers_to_send.remove(buffer) + # However, we have to do the "reduce and change" process here because the status we sent to the db is going to be a different object that the one sent to database. + reduced_status = reduce.reduce_tweet(status) + status = self.check_quoted_status(status) + status = self.check_long_tweet(status) + # Send it to the main controller object. + pub.sendMessage("newTweet", data=status, user=self.db["user_name"], _buffers=buffers_to_send) def check_streams(self): log.debug("Status of running stream for user {}: {}".format(self.db["user_name"], self.stream.running)) From 55b1c7bdaeb20a177cbb3fbd218c7263c7fe3dd7 Mon Sep 17 00:00:00 2001 From: Manuel Cortez Date: Fri, 2 Jul 2021 09:50:22 -0500 Subject: [PATCH 5/8] Integrates Tweepy's 68e19cc for preventing Urllib3 ProtocolError --- src/sessions/twitter/streaming.py | 79 +++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/src/sessions/twitter/streaming.py b/src/sessions/twitter/streaming.py index 1dabf7f6..613edc01 100644 --- a/src/sessions/twitter/streaming.py +++ b/src/sessions/twitter/streaming.py @@ -1,4 +1,10 @@ # -*- coding: utf-8 -*- +""" Streaming support for TWBlue. """ +import time +import six +import requests +import urllib3 +import ssl import tweepy import logging from pubsub import pub @@ -28,3 +34,76 @@ class StreamListener(tweepy.StreamListener): pub.sendMessage("newStatus", status=status, user=self.user) # print(status.text) + +class Stream(tweepy.Stream): + + def _run(self): + # Authenticate + url = "https://%s%s" % (self.host, self.url) + + # Connect and process the stream + error_counter = 0 + resp = None + exc_info = None + while self.running: + if self.retry_count is not None: + if error_counter > self.retry_count: + # quit if error count greater than retry count + break + try: + auth = self.auth.apply_auth() + resp = self.session.request('POST', + url, + data=self.body, + timeout=self.timeout, + stream=True, + auth=auth, + verify=self.verify, + proxies = self.proxies) + if resp.status_code != 200: + if self.listener.on_error(resp.status_code) is False: + break + error_counter += 1 + if resp.status_code == 420: + self.retry_time = max(self.retry_420_start, + self.retry_time) + time.sleep(self.retry_time) + self.retry_time = min(self.retry_time * 2, + self.retry_time_cap) + else: + error_counter = 0 + self.retry_time = self.retry_time_start + self.snooze_time = self.snooze_time_step + self.listener.on_connect() + self._read_loop(resp) + except (requests.ConnectionError, requests.Timeout, ssl.SSLError, urllib3.exceptions.ReadTimeoutError, urllib3.exceptions.ProtocolError) as exc: + # This is still necessary, as a SSLError can actually be + # thrown when using Requests + # If it's not time out treat it like any other exception + if isinstance(exc, ssl.SSLError): + if not (exc.args and 'timed out' in str(exc.args[0])): + exc_info = sys.exc_info() + break + if self.listener.on_timeout() is False: + break + if self.running is False: + break + time.sleep(self.snooze_time) + self.snooze_time = min(self.snooze_time + self.snooze_time_step, + self.snooze_time_cap) + except Exception as exc: + exc_info = sys.exc_info() + # any other exception is fatal, so kill loop + break + + # cleanup + self.running = False + if resp: + resp.close() + + self.new_session() + + if exc_info: + # call a handler first so that the exception can be logged. + self.listener.on_exception(exc_info[1]) + six.reraise(*exc_info) From 5f11467f2749fead5de6700401674812a62b05f3 Mon Sep 17 00:00:00 2001 From: Manuel Cortez Date: Fri, 2 Jul 2021 09:52:21 -0500 Subject: [PATCH 6/8] Switched threads to our own facilities and attempts to improve thread management for streaming endpoints --- src/controller/mainController.py | 2 ++ src/sessions/twitter/session.py | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/controller/mainController.py b/src/controller/mainController.py index 9e75b459..d6021a78 100644 --- a/src/controller/mainController.py +++ b/src/controller/mainController.py @@ -659,6 +659,8 @@ class Controller(object): sessions.sessions[item].sound.cleaner.cancel() log.debug("Saving database for " + sessions.sessions[item].session_id) sessions.sessions[item].save_persistent_data() + log.debug("Disconnecting streaming endpoint for session" + sessions.sessions[item].session_id) + sessions.sessions[item].stop_streaming() if system == "Windows": self.systrayIcon.RemoveIcon() pidpath = os.path.join(os.getenv("temp"), "{}.pid".format(application.name)) diff --git a/src/sessions/twitter/session.py b/src/sessions/twitter/session.py index 0bee2fd7..e14916f1 100644 --- a/src/sessions/twitter/session.py +++ b/src/sessions/twitter/session.py @@ -506,8 +506,13 @@ class Session(base.baseSession): def start_streaming(self): self.stream_listener = streaming.StreamListener(twitter_api=self.twitter, user=self.db["user_name"], user_id=self.db["user_id"]) - self.stream = tweepy.Stream(auth = self.auth, listener=self.stream_listener) - self.stream_thread = self.stream.filter(follow=self.stream_listener.users, is_async=True) + self.stream = streaming.Stream(auth = self.auth, listener=self.stream_listener, chunk_size=1025) + self.stream_thread = call_threaded(self.stream.filter, follow=self.stream_listener.users, stall_warnings=True) + + def stop_streaming(self): + if hasattr(self, "stream_thread"): + self.stream_thread.join() + log.debug("Stopping Streaming Endpoint...") def handle_new_status(self, status, user): """ Handles a new status present in the Streaming API. """ From 9053fcd5dec14e808acdef1ca133d48abf918408 Mon Sep 17 00:00:00 2001 From: Manuel Cortez Date: Fri, 2 Jul 2021 10:11:50 -0500 Subject: [PATCH 7/8] Send Tweets to mentions properly --- src/controller/mainController.py | 2 +- src/sessions/twitter/session.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/controller/mainController.py b/src/controller/mainController.py index d6021a78..0dc5e29d 100644 --- a/src/controller/mainController.py +++ b/src/controller/mainController.py @@ -1640,7 +1640,7 @@ class Controller(object): if buffer == None or buffer.session.db["user_name"] != user: return buffer.add_new_item(data) if buff == "home_timeline": sound_to_play = "tweet_received.ogg" - elif buff == "mentions_timeline": sound_to_play = "mention_received.ogg" + elif buff == "mentions": sound_to_play = "mention_received.ogg" elif buff == "sent_tweets": sound_to_play = "tweet_send.ogg" elif "timeline" in buff: sound_to_play = "tweet_timeline.ogg" else: sound_to_play = None diff --git a/src/sessions/twitter/session.py b/src/sessions/twitter/session.py index e14916f1..6e01a9e8 100644 --- a/src/sessions/twitter/session.py +++ b/src/sessions/twitter/session.py @@ -538,7 +538,7 @@ class Session(base.baseSession): buffers_to_send.append("sent_tweets") for user in status.entities["user_mentions"]: if user["id"] == self.db["user_id"]: - buffers_to_send.append("mentions_timeline") + buffers_to_send.append("mentions") users_with_timeline = [user.split("-")[0] for user in self.db.keys() if user.endswith("-timeline")] for user in users_with_timeline: if status.user.id_str == user: From 77eadb42bbf1d2f99b3c8f825d564b2f363351e3 Mon Sep 17 00:00:00 2001 From: Manuel Cortez Date: Fri, 2 Jul 2021 10:35:20 -0500 Subject: [PATCH 8/8] Make sure to disconnect the streams as tweepy implements it --- src/controller/mainController.py | 7 +++++-- src/sessions/twitter/session.py | 5 ++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/controller/mainController.py b/src/controller/mainController.py index 0dc5e29d..340f6071 100644 --- a/src/controller/mainController.py +++ b/src/controller/mainController.py @@ -655,17 +655,20 @@ class Controller(object): log.debug("Saving global configuration...") for item in sessions.sessions: if sessions.sessions[item].logged == False: continue + log.debug("Disconnecting streaming endpoint for session" + sessions.sessions[item].session_id) + sessions.sessions[item].stop_streaming() log.debug("Disconnecting streams for %s session" % (sessions.sessions[item].session_id,)) sessions.sessions[item].sound.cleaner.cancel() log.debug("Saving database for " + sessions.sessions[item].session_id) sessions.sessions[item].save_persistent_data() - log.debug("Disconnecting streaming endpoint for session" + sessions.sessions[item].session_id) - sessions.sessions[item].stop_streaming() if system == "Windows": self.systrayIcon.RemoveIcon() pidpath = os.path.join(os.getenv("temp"), "{}.pid".format(application.name)) if os.path.exists(pidpath): os.remove(pidpath) + if hasattr(self, "streams_checker_function"): + log.debug("Stopping stream checker...") + self.streams_checker_function.cancel() widgetUtils.exit_application() def follow(self, *args, **kwargs): diff --git a/src/sessions/twitter/session.py b/src/sessions/twitter/session.py index 6e01a9e8..cdfd9684 100644 --- a/src/sessions/twitter/session.py +++ b/src/sessions/twitter/session.py @@ -510,9 +510,8 @@ class Session(base.baseSession): self.stream_thread = call_threaded(self.stream.filter, follow=self.stream_listener.users, stall_warnings=True) def stop_streaming(self): - if hasattr(self, "stream_thread"): - self.stream_thread.join() - log.debug("Stopping Streaming Endpoint...") + self.stream.running = False + log.debug("Stream stopped for accounr {}".format(self.db["user_name"])) def handle_new_status(self, status, user): """ Handles a new status present in the Streaming API. """