twblue/src/sessions/twitter/session.py

453 lines
19 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
""" This is the main session needed to access all Twitter Features."""
import os
import time
import logging
import webbrowser
import wx
import config
import output
import application
2015-02-01 21:13:18 -06:00
from pubsub import pub
import tweepy
from tweepy.error import TweepError
from tweepy.models import User as UserModel
from mysc.thread_utils import call_threaded
from keys import keyring
from sessions import base
from sessions.twitter import utils, compose
from sessions.twitter.long_tweets import tweets, twishort
from .wxUI import authorisationDialog
log = logging.getLogger("sessions.twitterSession")
class Session(base.baseSession):
""" A session object where we will save configuration, the twitter object and a local storage for saving the items retrieved through the Twitter API methods"""
def order_buffer(self, name, data, ignore_older=True):
""" Put new items in the local database.
name str: The name for the buffer stored in the dictionary.
data list: A list with tweets.
ignore_older bool: if set to True, items older than the first element on the list will be ignored.
2015-04-12 19:43:19 -04:00
returns the number of items that have been added in this execution"""
if name == "direct_messages":
return self.order_direct_messages(data)
num = 0
last_id = None
if (name in self.db) == False:
self.db[name] = []
if ("users" in self.db) == False:
self.db["users"] = {}
if ignore_older and len(self.db[name]) > 0:
if self.settings["general"]["reverse_timelines"] == False:
last_id = self.db[name][0].id
else:
last_id = self.db[name][-1].id
for i in data:
if ignore_older and last_id != None:
if i.id < last_id:
log.error("Ignoring an older tweet... Last id: {0}, tweet id: {1}".format(last_id, i.id))
continue
if utils.find_item(i.id, self.db[name]) == None and utils.is_allowed(i, self.settings, name) == True:
i = self.check_quoted_status(i)
2016-04-28 13:54:06 -05:00
i = self.check_long_tweet(i)
if i == False: continue
if self.settings["general"]["reverse_timelines"] == False: self.db[name].append(i)
else: self.db[name].insert(0, i)
num = num+1
if hasattr(i, "user"):
if (i.user.id in self.db["users"]) == False:
self.db["users"][i.user.id] = i.user
return num
def order_people(self, name, data):
""" Put new items on the local database. Useful for cursored buffers (followers, friends, users of a list and searches)
name str: The name for the buffer stored in the dictionary.
data list: A list with items and some information about cursors.
2015-04-12 19:43:19 -04:00
returns the number of items that have been added in this execution"""
num = 0
if (name in self.db) == False:
self.db[name] = []
for i in data:
if utils.find_item(i.id, self.db[name]) == None:
if self.settings["general"]["reverse_timelines"] == False: self.db[name].append(i)
else: self.db[name].insert(0, i)
num = num+1
return num
def order_direct_messages(self, data):
""" Add incoming and sent direct messages to their corresponding database items.
data list: A list of direct messages to add.
returns the number of incoming messages processed in this execution, and sends an event with data regarding amount of sent direct messages added."""
incoming = 0
sent = 0
if ("direct_messages" in self.db) == False:
self.db["direct_messages"] = []
for i in data:
# Twitter returns sender_id as str, which must be converted to int in order to match to our user_id object.
if int(i.message_create["sender_id"]) == self.db["user_id"]:
if "sent_direct_messages" in self.db and utils.find_item(i.id, self.db["sent_direct_messages"]) == None:
if self.settings["general"]["reverse_timelines"] == False: self.db["sent_direct_messages"].append(i)
else: self.db["sent_direct_messages"].insert(0, i)
sent = sent+1
else:
if utils.find_item(i.id, self.db["direct_messages"]) == None:
if self.settings["general"]["reverse_timelines"] == False: self.db["direct_messages"].append(i)
else: self.db["direct_messages"].insert(0, i)
incoming = incoming+1
pub.sendMessage("sent-dms-updated", total=sent, account=self.db["user_name"])
return incoming
def __init__(self, *args, **kwargs):
super(Session, self).__init__(*args, **kwargs)
# Adds here the optional cursors objects.
cursors = dict(direct_messages=-1)
self.db["cursors"] = cursors
2015-01-13 12:31:37 -06:00
self.reconnection_function_active = False
self.counter = 0
self.lists = []
# @_require_configuration
2015-03-08 00:36:41 -06:00
def login(self, verify_credentials=True):
2015-04-12 19:43:19 -04:00
""" Log into twitter using credentials from settings.
if the user account isn't authorised, it needs to call self.authorise() before login."""
if self.settings["twitter"]["user_key"] != None and self.settings["twitter"]["user_secret"] != None:
try:
log.debug("Logging in to twitter...")
self.auth = tweepy.OAuthHandler(keyring.get("api_key"), keyring.get("api_secret"))
self.auth.set_access_token(self.settings["twitter"]["user_key"], self.settings["twitter"]["user_secret"])
self.twitter = tweepy.API(self.auth)
if verify_credentials == True:
self.credentials = self.twitter.verify_credentials()
self.logged = True
log.debug("Logged.")
self.counter = 0
except IOError:
log.error("The login attempt failed.")
self.logged = False
else:
self.logged = False
raise Exceptions.RequireCredentialsSessionError
# @_require_configuration
def authorise(self):
2015-04-12 19:43:19 -04:00
""" Authorises a Twitter account. This function needs to be called for each new session, after self.get_configuration() and before self.login()"""
if self.logged == True:
raise Exceptions.AlreadyAuthorisedError("The authorisation process is not needed at this time.")
else:
self.auth = tweepy.OAuthHandler(keyring.get("api_key"), keyring.get("api_secret"))
redirect_url = self.auth.get_authorization_url()
webbrowser.open_new_tab(redirect_url)
self.authorisation_dialog = authorisationDialog()
self.authorisation_dialog.cancel.Bind(wx.EVT_BUTTON, self.authorisation_cancelled)
self.authorisation_dialog.ok.Bind(wx.EVT_BUTTON, self.authorisation_accepted)
self.authorisation_dialog.ShowModal()
def verify_authorisation(self, pincode):
self.auth.get_access_token(pincode)
self.settings["twitter"]["user_key"] = self.auth.access_token
self.settings["twitter"]["user_secret"] = self.auth.access_token_secret
self.settings.write()
del self.auth
def authorisation_cancelled(self, *args, **kwargs):
""" Destroy the authorization dialog. """
self.authorisation_dialog.Destroy()
del self.authorisation_dialog
def authorisation_accepted(self, *args, **kwargs):
""" Gets the PIN code entered by user and validate it through Twitter."""
pincode = self.authorisation_dialog.text.GetValue()
self.verify_authorisation(pincode)
self.authorisation_dialog.Destroy()
def get_more_items(self, update_function, users=False, dm=False, name=None, *args, **kwargs):
""" Get more items for twitter objects.
update_function str: function to call for getting more items. Must be member of self.twitter.
users, dm bool: If any of these is set to True, the function will treat items as users or dm (they need different handling).
name str: name of the database item to put new element in."""
2015-02-03 09:59:18 -06:00
results = []
if "cursor" in kwargs and kwargs["cursor"] == 0:
output.speak(_(u"There are no more items to retrieve in this buffer."))
return
data = getattr(self.twitter, update_function)(*args, **kwargs)
2015-02-03 09:59:18 -06:00
if users == True:
if type(data) == dict and "next_cursor" in data:
if "next_cursor" in data: # There are more objects to retrieve.
self.db[name]["cursor"] = data["next_cursor"]
else: # Set cursor to 0, wich means no more items available.
self.db[name]["cursor"] = 0
for i in data["users"]: results.append(i)
elif type(data) == list:
results.extend(data[1:])
elif dm == True:
if "next_cursor" in data: # There are more objects to retrieve.
self.db[name]["cursor"] = data["next_cursor"]
else: # Set cursor to 0, wich means no more items available.
self.db[name]["cursor"] = 0
for i in data["events"]: results.append(i)
2015-02-03 09:59:18 -06:00
else:
results.extend(data[1:])
return results
def api_call(self, call_name, action="", _sound=None, report_success=False, report_failure=True, preexec_message="", *args, **kwargs):
2015-04-12 19:43:19 -04:00
""" Make a call to the Twitter API. If there is a connectionError or another exception not related to Twitter, It will call the method again at least 25 times, waiting a while between calls. Useful for post methods.
If twitter returns an error, it will not call the method anymore.
call_name str: The method to call
2015-04-12 19:43:19 -04:00
action str: What you are doing on twitter, it will be reported to the user if report_success is set to True.
for example "following @tw_blue2" will be reported as "following @tw_blue2 succeeded".
_sound str: a sound to play if the call is executed properly.
2015-04-12 19:43:19 -04:00
report_success and report_failure bool: These are self explanatory. True or False.
preexec_message str: A message to speak to the user while the method is running, example: "trying to follow x user"."""
finished = False
tries = 0
if preexec_message:
output.speak(preexec_message, True)
while finished==False and tries < 25:
try:
val = getattr(self.twitter, call_name)(*args, **kwargs)
finished = True
except TweepError as e:
output.speak(e.reason)
val = None
2015-03-01 20:41:02 -06:00
if e.error_code != 403 and e.error_code != 404:
tries = tries+1
time.sleep(5)
elif report_failure and hasattr(e, 'reason'):
output.speak(_("%s failed. Reason: %s") % (action, e.reason))
finished = True
# except:
# tries = tries + 1
# time.sleep(5)
if report_success:
output.speak(_("%s succeeded.") % action)
if _sound != None: self.sound.play(_sound)
return val
def search(self, name, *args, **kwargs):
""" Search in twitter, passing args and kwargs as arguments to the Twython function."""
tl = self.twitter.search(*args, **kwargs)
tl.reverse()
return tl
# @_require_login
def get_favourites_timeline(self, name, *args, **kwargs):
2015-04-12 19:43:19 -04:00
""" Gets favourites for the authenticated user or a friend or follower.
name str: Name for storage in the database.
args and kwargs are passed directly to the Twython function."""
tl = self.call_paged("favorites", *args, **kwargs)
return self.order_buffer(name, tl)
def call_paged(self, update_function, *args, **kwargs):
""" Makes a call to the Twitter API methods several times. Useful for get methods.
this function is needed for retrieving more than 200 items.
update_function str: The function to call. This function must be child of self.twitter
args and kwargs are passed to update_function.
2015-04-12 19:43:19 -04:00
returns a list with all items retrieved."""
max = 0
results = []
data = getattr(self.twitter, update_function)(count=self.settings["general"]["max_tweets_per_call"], *args, **kwargs)
results.extend(data)
for i in range(0, max):
if i == 0: max_id = results[-1].id
else: max_id = results[0].id
data = getattr(self.twitter, update_function)(max_id=max_id, count=self.settings["general"]["max_tweets_per_call"], *args, **kwargs)
results.extend(data)
results.reverse()
return results
# @_require_login
def get_user_info(self):
""" Retrieves some information required by TWBlue for setup."""
f = self.twitter.get_settings()
sn = f["screen_name"]
self.settings["twitter"]["user_name"] = sn
self.db["user_name"] = sn
self.db["user_id"] = self.twitter.get_user(screen_name=sn).id
try:
self.db["utc_offset"] = f["time_zone"]["utc_offset"]
except KeyError:
self.db["utc_offset"] = -time.timezone
# Get twitter's supported languages and save them in a global variable
#so we won't call to this method once per session.
if len(application.supported_languages) == 0:
2021-01-04 12:23:04 -06:00
application.supported_languages = self.twitter.supported_languages()
self.get_lists()
self.get_muted_users()
self.settings.write()
# @_require_login
def get_lists(self):
2015-04-12 19:43:19 -04:00
""" Gets the lists that the user is subscribed to and stores them in the database. Returns None."""
self.db["lists"] = self.twitter.lists_all(reverse=True)
# @_require_login
def get_muted_users(self):
""" Gets muted users (oh really?)."""
self.db["muted_users"] = self.twitter.mutes_ids()
2015-05-26 20:23:02 -05:00
# @_require_login
def get_stream(self, name, function, *args, **kwargs):
""" Retrieves the items for a regular stream.
2015-04-12 19:43:19 -04:00
name str: Name to save items to the database.
function str: A function to get the items."""
last_id = -1
if name in self.db:
try:
if self.db[name][0]["id"] > self.db[name][-1]["id"]:
last_id = self.db[name][0]["id"]
else:
last_id = self.db[name][-1]["id"]
except IndexError:
pass
tl = self.call_paged(function, sinze_id=last_id, *args, **kwargs)
self.order_buffer(name, tl)
2015-12-26 23:28:46 -06:00
def get_cursored_stream(self, name, function, items="users", get_previous=False, *args, **kwargs):
2015-04-12 19:43:19 -04:00
""" Gets items for API calls that require using cursors to paginate the results.
name str: Name to save it in the database.
function str: Function that provides the items.
items: When the function returns the list with results, items will tell how the order function should be look. for example get_followers_list returns a list and users are under list["users"], here the items should point to "users".
get_previous bool: wether this function will be used to get previous items in a buffer or load the buffer from scratch.
returns number of items retrieved."""
items_ = []
try:
if "cursor" in self.db[name] and get_previous:
2015-02-03 09:59:18 -06:00
cursor = self.db[name]["cursor"]
else:
2015-02-03 09:59:18 -06:00
cursor = -1
except KeyError:
2015-02-03 09:59:18 -06:00
cursor = -1
if cursor != -1:
tl = getattr(self.twitter, function)(cursor=cursor, count=self.settings["general"]["max_tweets_per_call"], *args, **kwargs)
else:
tl = getattr(self.twitter, function)(count=self.settings["general"]["max_tweets_per_call"], *args, **kwargs)
tl[items].reverse()
num = self.order_cursored_buffer(name, tl[items])
# Recently, Twitter's new endpoints have cursor if there are more results.
if "next_cursor" in tl:
self.db[name]["cursor"] = tl["next_cursor"]
else:
self.db[name]["cursor"] = 0
return num
2015-01-13 12:31:37 -06:00
def check_connection(self):
""" Restart the Twitter object every 5 executions. It is useful for dealing with requests timeout and other oddities."""
log.debug("Executing check connection...")
self.counter += 1
if self.counter >= 4:
log.debug("Restarting connection after 5 minutes.")
del self.twitter
self.logged = False
2015-03-08 00:36:41 -06:00
self.login(False)
self.counter = 0
2015-09-29 08:38:05 -05:00
def check_quoted_status(self, tweet):
""" Helper for get_quoted_tweet. Get a quoted status inside a tweet and create a special tweet with all info available.
tweet dict: A tweet dictionary.
Returns a quoted tweet or the original tweet if is not a quote"""
2015-09-29 08:38:05 -05:00
status = tweets.is_long(tweet)
if status != False and config.app["app-settings"]["handle_longtweets"]:
quoted_tweet = self.get_quoted_tweet(tweet)
return quoted_tweet
2015-09-29 08:38:05 -05:00
return tweet
def get_quoted_tweet(self, tweet):
""" Process a tweet and extract all information related to the quote. """
2016-04-28 13:54:06 -05:00
quoted_tweet = tweet
if hasattr(tweet, "full_text"):
value = "full_text"
else:
value = "text"
setattr(quoted_tweet, value, utils.expand_urls(getattr(quoted_tweet, value), quoted_tweet.entities))
if quoted_tweet.is_quote_status == True and hasattr(quoted_tweet, "quoted_status"):
original_tweet = quoted_tweet.quoted_status
elif hasattr(quoted_tweet, "retweeted_status") and quoted_tweet.retweeted_status.is_quote_status == True and hasattr(quoted_tweet.retweeted_status, "quoted_status"):
original_tweet = quoted_tweet.retweeted_status.quoted_status
else:
return quoted_tweet
original_tweet = self.check_long_tweet(original_tweet)
if hasattr(original_tweet, "full_text"):
value = "full_text"
elif hasattr(original_tweet, "message"):
value = "message"
else:
value = "text"
setattr(original_tweet, value, utils.expand_urls(getattr(original_tweet, value), original_tweet.entities))
2015-09-29 08:38:05 -05:00
return compose.compose_quoted_tweet(quoted_tweet, original_tweet)
2016-04-28 13:54:06 -05:00
def check_long_tweet(self, tweet):
""" Process a tweet and add extra info if it's a long tweet made with Twyshort.
tweet dict: a tweet object.
returns a tweet with a new argument message, or original tweet if it's not a long tweet."""
2016-04-28 13:54:06 -05:00
long = twishort.is_long(tweet)
if long != False and config.app["app-settings"]["handle_longtweets"]:
message = twishort.get_full_text(long)
if hasattr(tweet, "quoted_status"):
tweet.quoted_status.message = message
if tweet.quoted_status.message == False: return False
tweet.quoted_status.twishort = True
2021-01-04 12:23:04 -06:00
for i in tweet.quoted_status.entities["user_mentions"]:
if "@%s" % (i["screen_name"]) not in tweet.quoted_status.message and i["screen_name"] != tweet.user.screen_name:
2021-03-09 11:35:52 -06:00
if hasattr(tweet.quoted_status, "retweeted_status") and tweet.retweeted_status.user.screen_name == i["screen_name"]:
continue
2021-01-04 12:23:04 -06:00
tweet.quoted_status.message = u"@%s %s" % (i["screen_name"], tweet.message)
else:
tweet.message = message
if tweet.message == False: return False
tweet.twishort = True
2021-01-04 12:23:04 -06:00
for i in tweet.entities["user_mentions"]:
if "@%s" % (i["screen_name"]) not in tweet.message and i["screen_name"] != tweet.user.screen_name:
if hasattr(tweet, "retweeted_status") and tweet.retweeted_status.user.screen_name == i["screen_name"]:
continue
return tweet
def get_user(self, id):
""" Returns an user object associated with an ID.
id str: User identifier, provided by Twitter.
returns a tweepy user object."""
if ("users" in self.db) == False or (id in self.db["users"]) == False:
try:
user = self.twitter.get_user(id=id)
except TweepError as err:
user = UserModel(None)
user.screen_name = "deleted_user"
user.id = id
user.name = _("Deleted account")
user.id_str = id
self.db["users"][user.id_str] = user
return user
else:
return self.db["users"][id]
def get_user_by_screen_name(self, screen_name):
""" Returns an user identifier associated with a screen_name.
screen_name str: User name, such as tw_blue2, provided by Twitter.
returns an user ID."""
if ("users" in self.db) == False:
user = utils.if_user_exists(self.twitter, screen_name)
self.db["users"][user["id_str"]] = user
return user["id_str"]
else:
2019-06-06 11:52:23 -05:00
for i in list(self.db["users"].keys()):
if self.db["users"][i].screen_name == screen_name:
return self.db["users"][i].id_str
user = utils.if_user_exists(self.twitter, screen_name)
self.db["users"][user.id_str] = user
return user.id_str
def save_users(self, user_ids):
""" Adds all new users to the users database. """
if len(user_ids) == 0:
return
log.debug("Received %d user IDS to be added in the database." % (len(user_ids)))
users_to_retrieve = [user_id for user_id in user_ids if user_id not in self.db["users"]]
# Remove duplicates
users_to_retrieve = list(dict.fromkeys(users_to_retrieve))
if len(users_to_retrieve) == 0:
return
log.debug("TWBlue will get %d new users from Twitter." % (len(users_to_retrieve)))
users = self.twitter.lookup_users(user_ids=users_to_retrieve, tweet_mode="extended")
for user in users:
self.db["users"][user.id_str] = user
log.debug("Added %d new users" % (len(users)))