diff --git a/src/controller/mainController.py b/src/controller/mainController.py index 2c3d5836..c56a9f7c 100644 --- a/src/controller/mainController.py +++ b/src/controller/mainController.py @@ -273,6 +273,9 @@ class Controller(object): if config.app["app-settings"]["speak_ready_msg"] == True: output.speak(_(u"Ready")) self.started = True + self.streams_checker_function = RepeatingTimer(60, self.check_streams) + self.streams_checker_function.start() + def create_ignored_session_buffer(self, session): self.accounts.append(session.settings["twitter"]["user_name"]) @@ -1633,4 +1636,14 @@ class Controller(object): if buffer == None or buffer.session.db["user_name"] != user: return buffer.add_new_item(data) if "home_timeline" not in buffer.session.settings["other_buffers"]["muted_buffers"]: - self.notify(buffer.session, "tweet_received.ogg") \ No newline at end of file + self.notify(buffer.session, "tweet_received.ogg") + + def check_streams(self): + if self.started == False: + return + for i in sessions.sessions: + try: + if sessions.sessions[i].is_logged == False: continue + sessions.sessions[i].check_streams() + except TweepError: # We shouldn't allow this function to die. + pass diff --git a/src/sessions/twitter/session.py b/src/sessions/twitter/session.py index f74b8837..8dbb6333 100644 --- a/src/sessions/twitter/session.py +++ b/src/sessions/twitter/session.py @@ -126,6 +126,7 @@ class Session(base.baseSession): # If we wouldn't implement this approach, TWBlue would save permanently the "deleted user" object. self.deleted_users = {} pub.subscribe(self.handle_new_status, "newStatus") + pub.subscribe(self.handle_connected, "streamConnected") # @_require_configuration def login(self, verify_credentials=True): @@ -504,9 +505,9 @@ class Session(base.baseSession): self.db["users"] = users def start_streaming(self): - self.stream_listener = streaming.StreamListener(twitter_api=self.twitter, user=self.db["user_name"]) + self.stream_listener = streaming.StreamListener(twitter_api=self.twitter, user=self.db["user_name"], user_id=self.db["user_id"]) self.stream = tweepy.Stream(auth = self.auth, listener=self.stream_listener) - call_threaded(self.stream.filter, follow=self.stream_listener.users) + self.stream_thread = self.stream.filter(follow=self.stream_listener.users, is_async=True) def handle_new_status(self, status, user): """ Handles a new status present in the Streaming API. """ @@ -532,4 +533,13 @@ class Session(base.baseSession): status = self.check_quoted_status(status) status = self.check_long_tweet(status) # Send it to the main controller object. - pub.sendMessage("tweet-in-home", data=status, user=self.db["user_name"]) \ No newline at end of file + pub.sendMessage("tweet-in-home", data=status, user=self.db["user_name"]) + + def check_streams(self): + log.debug("Status of running stream for user {}: {}".format(self.db["user_name"], self.stream.running)) + if self.stream.running == False: + self.start_streaming() + + def handle_connected(self, user): + if user != self.db["user_name"]: + log.debug("Connected streaming endpoint on account {}".format(user)) \ No newline at end of file diff --git a/src/sessions/twitter/streaming.py b/src/sessions/twitter/streaming.py index b40ccc61..1dabf7f6 100644 --- a/src/sessions/twitter/streaming.py +++ b/src/sessions/twitter/streaming.py @@ -1,14 +1,26 @@ # -*- coding: utf-8 -*- import tweepy +import logging from pubsub import pub +log = logging.getLogger("sessions.twitter.streaming") + class StreamListener(tweepy.StreamListener): - def __init__(self, twitter_api, user, *args, **kwargs): + def __init__(self, twitter_api, user, user_id, *args, **kwargs): super(StreamListener, self).__init__(*args, **kwargs) self.api = twitter_api self.user = user + self.user_id = user_id self.users = [str(id) for id in self.api.friends_ids()] + self.users.append(str(self.user_id)) + log.debug("Started streaming object for user {}".format(self.user)) + + def on_connect(self): + pub.sendMessage("streamConnected", user=self.user) + + def on_exception(self, ex): + log.exception("Exception received on streaming endpoint for user {}".format(self.user)) def on_status(self, status): """ Checks data arriving as a tweet. """