Merge pull request #385 from manuelcortez/streaming

Streaming API support
This commit is contained in:
Manuel Cortez 2021-07-02 17:29:10 -05:00 committed by GitHub
commit 062289a977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 204 additions and 3 deletions

View File

@ -125,6 +125,7 @@ class Controller(object):
pub.subscribe(self.update_sent_dms, "sent-dms-updated") pub.subscribe(self.update_sent_dms, "sent-dms-updated")
pub.subscribe(self.more_dms, "more-sent-dms") pub.subscribe(self.more_dms, "more-sent-dms")
pub.subscribe(self.manage_sent_tweets, "sent-tweet") pub.subscribe(self.manage_sent_tweets, "sent-tweet")
pub.subscribe(self.manage_new_tweet, "newTweet")
pub.subscribe(self.manage_friend, "friend") pub.subscribe(self.manage_friend, "friend")
pub.subscribe(self.manage_unfollowing, "unfollowing") pub.subscribe(self.manage_unfollowing, "unfollowing")
pub.subscribe(self.manage_favourite, "favourite") pub.subscribe(self.manage_favourite, "favourite")
@ -265,11 +266,15 @@ class Controller(object):
if sessions.sessions[i].is_logged == False: continue if sessions.sessions[i].is_logged == False: continue
self.start_buffers(sessions.sessions[i]) self.start_buffers(sessions.sessions[i])
self.set_buffer_positions(sessions.sessions[i]) self.set_buffer_positions(sessions.sessions[i])
sessions.sessions[i].start_streaming()
if config.app["app-settings"]["play_ready_sound"] == True: if config.app["app-settings"]["play_ready_sound"] == True:
sessions.sessions[list(sessions.sessions.keys())[0]].sound.play("ready.ogg") sessions.sessions[list(sessions.sessions.keys())[0]].sound.play("ready.ogg")
if config.app["app-settings"]["speak_ready_msg"] == True: if config.app["app-settings"]["speak_ready_msg"] == True:
output.speak(_(u"Ready")) output.speak(_(u"Ready"))
self.started = True self.started = True
self.streams_checker_function = RepeatingTimer(60, self.check_streams)
self.streams_checker_function.start()
def create_ignored_session_buffer(self, session): def create_ignored_session_buffer(self, session):
self.accounts.append(session.settings["twitter"]["user_name"]) self.accounts.append(session.settings["twitter"]["user_name"])
@ -649,6 +654,8 @@ class Controller(object):
log.debug("Saving global configuration...") log.debug("Saving global configuration...")
for item in sessions.sessions: for item in sessions.sessions:
if sessions.sessions[item].logged == False: continue if sessions.sessions[item].logged == False: continue
log.debug("Disconnecting streaming endpoint for session" + sessions.sessions[item].session_id)
sessions.sessions[item].stop_streaming()
log.debug("Disconnecting streams for %s session" % (sessions.sessions[item].session_id,)) log.debug("Disconnecting streams for %s session" % (sessions.sessions[item].session_id,))
sessions.sessions[item].sound.cleaner.cancel() sessions.sessions[item].sound.cleaner.cancel()
log.debug("Saving database for " + sessions.sessions[item].session_id) log.debug("Saving database for " + sessions.sessions[item].session_id)
@ -658,6 +665,9 @@ class Controller(object):
pidpath = os.path.join(os.getenv("temp"), "{}.pid".format(application.name)) pidpath = os.path.join(os.getenv("temp"), "{}.pid".format(application.name))
if os.path.exists(pidpath): if os.path.exists(pidpath):
os.remove(pidpath) os.remove(pidpath)
if hasattr(self, "streams_checker_function"):
log.debug("Stopping stream checker...")
self.streams_checker_function.cancel()
widgetUtils.exit_application() widgetUtils.exit_application()
def follow(self, *args, **kwargs): def follow(self, *args, **kwargs):
@ -1268,8 +1278,6 @@ class Controller(object):
def manage_sent_tweets(self, data, user): def manage_sent_tweets(self, data, user):
buffer = self.search_buffer("sent_tweets", user) buffer = self.search_buffer("sent_tweets", user)
if buffer == None: return if buffer == None: return
# if "sent_tweets" not in buffer.session.settings["other_buffers"]["muted_buffers"]:
# self.notify(buffer.session, play_sound=play_sound)
data = buffer.session.check_quoted_status(data) data = buffer.session.check_quoted_status(data)
data = buffer.session.check_long_tweet(data) data = buffer.session.check_long_tweet(data)
if data == False: # Long tweet deleted from twishort. if data == False: # Long tweet deleted from twishort.
@ -1622,3 +1630,27 @@ class Controller(object):
def save_data_in_db(self): def save_data_in_db(self):
for i in sessions.sessions: for i in sessions.sessions:
sessions.sessions[i].save_persistent_data() sessions.sessions[i].save_persistent_data()
def manage_new_tweet(self, data, user, _buffers):
sound_to_play = None
for buff in _buffers:
buffer = self.search_buffer(buff, user)
if buffer == None or buffer.session.db["user_name"] != user: return
buffer.add_new_item(data)
if buff == "home_timeline": sound_to_play = "tweet_received.ogg"
elif buff == "mentions": sound_to_play = "mention_received.ogg"
elif buff == "sent_tweets": sound_to_play = "tweet_send.ogg"
elif "timeline" in buff: sound_to_play = "tweet_timeline.ogg"
else: sound_to_play = None
if sound_to_play != None and buff not in buffer.session.settings["other_buffers"]["muted_buffers"]:
self.notify(buffer.session, sound_to_play)
def check_streams(self):
if self.started == False:
return
for i in sessions.sessions:
try:
if sessions.sessions[i].is_logged == False: continue
sessions.sessions[i].check_streams()
except TweepError: # We shouldn't allow this function to die.
pass

View File

@ -17,7 +17,7 @@ from keys import keyring
from sessions import base from sessions import base
from sessions.twitter import utils, compose from sessions.twitter import utils, compose
from sessions.twitter.long_tweets import tweets, twishort from sessions.twitter.long_tweets import tweets, twishort
from . import reduce from . import reduce, streaming
from .wxUI import authorisationDialog from .wxUI import authorisationDialog
log = logging.getLogger("sessions.twitterSession") log = logging.getLogger("sessions.twitterSession")
@ -125,6 +125,8 @@ class Session(base.baseSession):
# This will be especially useful because if the user reactivates their account later, TWblue will try to retrieve such user again at startup. # This will be especially useful because if the user reactivates their account later, TWblue will try to retrieve such user again at startup.
# If we wouldn't implement this approach, TWBlue would save permanently the "deleted user" object. # If we wouldn't implement this approach, TWBlue would save permanently the "deleted user" object.
self.deleted_users = {} self.deleted_users = {}
pub.subscribe(self.handle_new_status, "newStatus")
pub.subscribe(self.handle_connected, "streamConnected")
# @_require_configuration # @_require_configuration
def login(self, verify_credentials=True): def login(self, verify_credentials=True):
@ -501,3 +503,61 @@ class Session(base.baseSession):
if hasattr(i, "retweeted_status") and (i.retweeted_status.user.id_str in self.db["users"]) == False: if hasattr(i, "retweeted_status") and (i.retweeted_status.user.id_str in self.db["users"]) == False:
users[i.retweeted_status.user.id_str] = i.retweeted_status.user users[i.retweeted_status.user.id_str] = i.retweeted_status.user
self.db["users"] = users self.db["users"] = users
def start_streaming(self):
self.stream_listener = streaming.StreamListener(twitter_api=self.twitter, user=self.db["user_name"], user_id=self.db["user_id"])
self.stream = streaming.Stream(auth = self.auth, listener=self.stream_listener, chunk_size=1025)
self.stream_thread = call_threaded(self.stream.filter, follow=self.stream_listener.users, stall_warnings=True)
def stop_streaming(self):
self.stream.running = False
log.debug("Stream stopped for accounr {}".format(self.db["user_name"]))
def handle_new_status(self, status, user):
""" Handles a new status present in the Streaming API. """
# Discard processing the status if the streaming sends a tweet for another account.
if self.db["user_name"] != user:
return
# the Streaming API sends non-extended tweets with an optional parameter "extended_tweets" which contains full_text and other data.
# so we have to make sure we check it before processing the normal status.
# As usual, we handle also quotes and retweets at first.
if hasattr(status, "retweeted_status") and hasattr(status.retweeted_status, "extended_tweet"):
status.retweeted_status._json = {**status.retweeted_status._json, **status.retweeted_status._json["extended_tweet"]}
# compose.compose_tweet requires the parent tweet to have a full_text field, so we have to add it to retweets here.
status._json["full_text"] = status._json["text"]
if hasattr(status, "quoted_status") and hasattr(status.quoted_status, "extended_tweet"):
status.quoted_status._json = {**status.quoted_status._json, **status.quoted_status._json["extended_tweet"]}
if status.truncated:
status._json = {**status._json, **status._json["extended_tweet"]}
# Sends status to database, where it will be reduced and changed according to our needs.
buffers_to_send = []
if status.user.id_str in self.stream_listener.users:
buffers_to_send.append("home_timeline")
if status.user.id == self.db["user_id"]:
buffers_to_send.append("sent_tweets")
for user in status.entities["user_mentions"]:
if user["id"] == self.db["user_id"]:
buffers_to_send.append("mentions")
users_with_timeline = [user.split("-")[0] for user in self.db.keys() if user.endswith("-timeline")]
for user in users_with_timeline:
if status.user.id_str == user:
buffers_to_send.append("{}-timeline".format(user))
for buffer in buffers_to_send[::]:
num = self.order_buffer(buffer, [status])
if num == 0:
buffers_to_send.remove(buffer)
# However, we have to do the "reduce and change" process here because the status we sent to the db is going to be a different object that the one sent to database.
reduced_status = reduce.reduce_tweet(status)
status = self.check_quoted_status(status)
status = self.check_long_tweet(status)
# Send it to the main controller object.
pub.sendMessage("newTweet", data=status, user=self.db["user_name"], _buffers=buffers_to_send)
def check_streams(self):
log.debug("Status of running stream for user {}: {}".format(self.db["user_name"], self.stream.running))
if self.stream.running == False:
self.start_streaming()
def handle_connected(self, user):
if user != self.db["user_name"]:
log.debug("Connected streaming endpoint on account {}".format(user))

View File

@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
""" Streaming support for TWBlue. """
import time
import six
import requests
import urllib3
import ssl
import tweepy
import logging
from pubsub import pub
log = logging.getLogger("sessions.twitter.streaming")
class StreamListener(tweepy.StreamListener):
def __init__(self, twitter_api, user, user_id, *args, **kwargs):
super(StreamListener, self).__init__(*args, **kwargs)
self.api = twitter_api
self.user = user
self.user_id = user_id
self.users = [str(id) for id in self.api.friends_ids()]
self.users.append(str(self.user_id))
log.debug("Started streaming object for user {}".format(self.user))
def on_connect(self):
pub.sendMessage("streamConnected", user=self.user)
def on_exception(self, ex):
log.exception("Exception received on streaming endpoint for user {}".format(self.user))
def on_status(self, status):
""" Checks data arriving as a tweet. """
if status.user.id_str in self.users:
pub.sendMessage("newStatus", status=status, user=self.user)
# print(status.text)
class Stream(tweepy.Stream):
def _run(self):
# Authenticate
url = "https://%s%s" % (self.host, self.url)
# Connect and process the stream
error_counter = 0
resp = None
exc_info = None
while self.running:
if self.retry_count is not None:
if error_counter > self.retry_count:
# quit if error count greater than retry count
break
try:
auth = self.auth.apply_auth()
resp = self.session.request('POST',
url,
data=self.body,
timeout=self.timeout,
stream=True,
auth=auth,
verify=self.verify,
proxies = self.proxies)
if resp.status_code != 200:
if self.listener.on_error(resp.status_code) is False:
break
error_counter += 1
if resp.status_code == 420:
self.retry_time = max(self.retry_420_start,
self.retry_time)
time.sleep(self.retry_time)
self.retry_time = min(self.retry_time * 2,
self.retry_time_cap)
else:
error_counter = 0
self.retry_time = self.retry_time_start
self.snooze_time = self.snooze_time_step
self.listener.on_connect()
self._read_loop(resp)
except (requests.ConnectionError, requests.Timeout, ssl.SSLError, urllib3.exceptions.ReadTimeoutError, urllib3.exceptions.ProtocolError) as exc:
# This is still necessary, as a SSLError can actually be
# thrown when using Requests
# If it's not time out treat it like any other exception
if isinstance(exc, ssl.SSLError):
if not (exc.args and 'timed out' in str(exc.args[0])):
exc_info = sys.exc_info()
break
if self.listener.on_timeout() is False:
break
if self.running is False:
break
time.sleep(self.snooze_time)
self.snooze_time = min(self.snooze_time + self.snooze_time_step,
self.snooze_time_cap)
except Exception as exc:
exc_info = sys.exc_info()
# any other exception is fatal, so kill loop
break
# cleanup
self.running = False
if resp:
resp.close()
self.new_session()
if exc_info:
# call a handler first so that the exception can be logged.
self.listener.on_exception(exc_info[1])
six.reraise(*exc_info)