Remove most of Twitter code as Twitter's API access has been removed

This commit is contained in:
2023-04-03 13:35:05 -06:00
parent 74fe437684
commit 8acebc290b
95 changed files with 74 additions and 7643 deletions

View File

@@ -195,8 +195,6 @@ class Session(base.baseSession):
time.sleep(5)
if tries == 4 and finished == False:
raise e
else:
raise e
if report_success:
output.speak(_("%s succeeded.") % action)
if _sound != None:

View File

@@ -1 +0,0 @@
# -*- coding: utf-8 -*-

View File

@@ -1,143 +0,0 @@
# -*- coding: utf-8 -*-
from . import utils
import re
import time
import output
import languageHandler
import arrow
import logging
import config
from .long_tweets import twishort, tweets
from .utils import StripChars
log = logging.getLogger("compose")
chars = "abcdefghijklmnopqrstuvwxyz"
def compose_tweet(tweet, db, relative_times, show_screen_names=False, session=None):
""" It receives a tweet and returns a list with the user, text for the tweet or message, date and the client where user is."""
original_date = arrow.get(tweet.created_at, locale="en")
if relative_times == True:
ts = original_date.humanize(locale=languageHandler.curLang[:2])
else:
ts = original_date.shift(seconds=db["utc_offset"]).format(_(u"dddd, MMMM D, YYYY H:m:s"), locale=languageHandler.curLang[:2])
if hasattr(tweet, "message"):
value = "message"
elif hasattr(tweet, "full_text"):
value = "full_text"
else:
value = "text"
if hasattr(tweet, "retweeted_status") and value != "message":
text = utils.clean_mentions(StripChars(getattr(tweet.retweeted_status, value)))
else:
text = utils.clean_mentions(StripChars(getattr(tweet, value)))
if show_screen_names:
user = session.get_user(tweet.user).screen_name
else:
user = session.get_user(tweet.user).name
source = re.sub(r"(?s)<.*?>", "", tweet.source)
if hasattr(tweet, "retweeted_status"):
if hasattr(tweet, "message") == False and hasattr(tweet.retweeted_status, "is_quote_status") == False:
text = "RT @%s: %s" % (session.get_user(tweet.retweeted_status.user).screen_name, text)
elif hasattr(tweet.retweeted_status, "is_quote_status"):
text = "%s" % (text)
else:
text = "RT @%s: %s" % (session.get_user(tweet.retweeted_status.user).screen_name, text)
if not hasattr(tweet, "message"):
if hasattr(tweet, "retweeted_status"):
if hasattr(tweet.retweeted_status, "entities"):
text = utils.expand_urls(text, tweet.retweeted_status.entities)
else:
if hasattr(tweet, "entities"):
text = utils.expand_urls(text, tweet.entities)
if config.app['app-settings']['handle_longtweets']: pass
return [user+", ", text, ts+", ", source]
def compose_direct_message(item, db, relative_times, show_screen_names=False, session=None):
# Let's remove the last 3 digits in the timestamp string.
# Twitter sends their "epoch" timestamp with 3 digits for milliseconds and arrow doesn't like it.
original_date = arrow.get(int(item.created_timestamp))
if relative_times == True:
ts = original_date.humanize(locale=languageHandler.curLang[:2])
else:
ts = original_date.shift(seconds=db["utc_offset"]).format(_(u"dddd, MMMM D, YYYY H:m:s"), locale=languageHandler.curLang[:2])
text = StripChars(item.message_create["message_data"]["text"])
source = "DM"
sender = session.get_user(item.message_create["sender_id"])
if db["user_name"] == sender.screen_name:
if show_screen_names:
user = _(u"Dm to %s ") % (session.get_user(item.message_create["target"]["recipient_id"]).screen_name)
else:
user = _(u"Dm to %s ") % (session.get_user(item.message_create["target"]["recipient_id"]).name)
else:
if show_screen_names:
user = sender.screen_name
else:
user = sender.name
if text[-1] in chars: text=text+"."
text = utils.expand_urls(text, item.message_create["message_data"]["entities"])
return [user+", ", text, ts+", ", source]
def compose_quoted_tweet(quoted_tweet, original_tweet, show_screen_names=False, session=None):
""" It receives a tweet and returns a list with the user, text for the tweet or message, date and the client where user is."""
if hasattr(quoted_tweet, "retweeted_status"):
if hasattr(quoted_tweet.retweeted_status, "full_text"):
value = "full_text"
else:
value = "text"
text = StripChars(getattr(quoted_tweet.retweeted_status, value))
else:
if hasattr(quoted_tweet, "full_text"):
value = "full_text"
else:
value = "text"
text = utils.clean_mentions(StripChars(getattr(quoted_tweet, value)))
if show_screen_names:
quoting_user = session.get_user(quoted_tweet.user).screen_name
else:
quoting_user = session.get_user(quoted_tweet.user).name
source = quoted_tweet.source
if hasattr(quoted_tweet, "retweeted_status"):
text = "rt @%s: %s" % (session.get_user(quoted_tweet.retweeted_status.user).screen_name, text)
if text[-1] in chars: text=text+"."
original_user = session.get_user(original_tweet.user).screen_name
if hasattr(original_tweet, "message"):
original_text = original_tweet.message
elif hasattr(original_tweet, "full_text"):
original_text = utils.clean_mentions(StripChars(original_tweet.full_text))
else:
original_text = utils.clean_mentions(StripChars(original_tweet.text))
quoted_tweet.message = _(u"{0}. Quoted tweet from @{1}: {2}").format( text, original_user, original_text)
quoted_tweet = tweets.clear_url(quoted_tweet)
if hasattr(original_tweet, "entities") and original_tweet.entities.get("urls"):
if hasattr(quoted_tweet, "entities") == False:
quoted_tweet.entities = {}
if quoted_tweet.entities.get("urls") == None:
quoted_tweet.entities["urls"] = []
quoted_tweet.entities["urls"].extend(original_tweet.entities["urls"])
return quoted_tweet
def compose_followers_list(tweet, db, relative_times=True, show_screen_names=False, session=None):
original_date = arrow.get(tweet.created_at, locale="en")
if relative_times == True:
ts = original_date.humanize(locale=languageHandler.curLang[:2])
else:
ts = original_date.shift(seconds=db["utc_offset"]).format(_(u"dddd, MMMM D, YYYY H:m:s"), locale=languageHandler.curLang[:2])
if hasattr(tweet, "status"):
original_date2 = arrow.get(tweet.status.created_at, locale="en")
if relative_times:
ts2 = original_date2.humanize(locale=languageHandler.curLang[:2])
else:
ts2 = original_date2.shift(seconds=db["utc_offset"]).format(_(u"dddd, MMMM D, YYYY H:m:s"), locale=languageHandler.curLang[:2])
else:
ts2 = _("Unavailable")
return [_(u"%s (@%s). %s followers, %s friends, %s tweets. Last tweeted %s. Joined Twitter %s") % (tweet.name, tweet.screen_name, tweet.followers_count, tweet.friends_count, tweet.statuses_count, ts2, ts)]
def compose_list(list):
name = list.name
if list.description == None: description = _(u"No description available")
else: description = list.description
user = list.user.name
members = str(list.member_count)
if list.mode == "private": status = _(u"private")
else: status = _(u"public")
return [name, description, user, members, status]

View File

@@ -1,2 +0,0 @@
# -*- coding: utf-8 -*-
""" this package holds different modules to extract information regarding long tweets. A long tweet contains more than one tweet (such a quoted tweet), or is made via services like twishort."""

View File

@@ -1,52 +0,0 @@
# -*- coding: utf-8 -*-
############################################################
# Copyright (c) 2015 Manuel Eduardo Cortéz Vallejo <manuel@manuelcortez.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
############################################################
from __future__ import unicode_literals
from sessions.twitter import utils
def is_long(tweet):
""" Check if the passed tweet contains a quote in its metadata.
tweet dict: a tweet dictionary.
returns True if a quote is detected, False otherwise."""
if hasattr(tweet, "quoted_status_id") and hasattr(tweet, "quoted_status"):
return tweet.quoted_status_id
elif hasattr(tweet, "retweeted_status") and hasattr(tweet.retweeted_status, "quoted_status_id") and hasattr(tweet.retweeted_status, "quoted_status"):
return tweet.retweeted_status.quoted_status_id
return False
def clear_url(tweet):
""" Reads data from a quoted tweet and removes the link to the Status from the tweet's text.
tweet dict: a tweet dictionary.
returns a tweet dictionary without the URL to the status ID in its text to display."""
if hasattr(tweet, "retweeted_status"):
if hasattr(tweet.retweeted_status, "full_text"):
value = "full_text"
else:
value = "text"
urls = utils.find_urls_in_text(getattr(tweet.retweeted_status, value))
try: tweet.message = tweet.message.replace(urls[-1], "")
except IndexError: pass
else:
if hasattr(tweet, "full_text"):
value = "full_text"
else:
value = "text"
urls = utils.find_urls_in_text(getattr(tweet, value))
try: tweet.message = tweet.message.replace(urls[-1], "")
except IndexError: pass
return tweet

View File

@@ -1,98 +0,0 @@
# -*- coding: utf-8 -*-
############################################################
# Copyright (c) 2013, 2014 Manuel Eduardo Cortéz Vallejo <manuel@manuelcortez.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
############################################################
from __future__ import print_function
from __future__ import unicode_literals
from builtins import range
import logging
import requests
import keys
from requests_oauthlib import OAuth1Session
from sessions.twitter import utils
log = logging.getLogger("long_tweets.twishort")
def get_twishort_uri(url):
""" Takes A twishort URl and returns the twishort ID.
url str: an url like http://twishort.com/id.
returns a twishort ID if the URL is valid, False otherwise."""
try:
return url.split("twishort.com/")[1]
except ValueError:
return False
def is_long(tweet):
""" Check if the passed tweet is made with Twishort.
returns True if is a long tweet, False otherwise."""
long = False
if hasattr(tweet, "entities") and tweet.entities.get("urls"):
for url in range(0, len(tweet.entities["urls"])):
try:
if tweet.entities["urls"][url] != None and "twishort.com" in tweet.entities["urls"][url]["expanded_url"]:
long = get_twishort_uri(tweet.entities["urls"][url]["expanded_url"])
except IndexError:
pass
# sometimes Twitter returns URL's with None objects, so let's take it.
# see https://github.com/manuelcortez/TWBlue/issues/103
except TypeError:
pass
if long == False and hasattr(tweet, "retweeted_status"):
return is_long(tweet.retweeted_status)
return long
def get_full_text(uri):
""" Get Twishort's full text.
uri str: Twishort's identifier.
returns the contents of the tweet."""
try:
r = requests.get("http://api.twishort.com/1.1/get.json", params={"uri": uri, "api_key": keys.keyring.get("twishort_api_key")})
msg = r.json()["text"]
# Try to parse possible HTML entities.
from sessions.twitter.compose import StripChars
msg = StripChars(msg)
return msg
except:
return False
def create_tweet(user_token, user_secret, text, media=0):
""" Send a tweet to be extended by using Twishort.
user_token, user_secret str: Twitter user access key and secret, used by TWBlue to authorise against Twitter.
text str: Tweet text, max 10000 characters.
media int: Not used currently.
Returns text to be placed in the Tweet if the post has been succeeded, 0 otherwise."""
twitter = OAuth1Session(keys.keyring.get("api_key"), client_secret=keys.keyring.get("api_secret"), resource_owner_key=user_token, resource_owner_secret=user_secret)
twishort_key=keys.keyring.get("twishort_api_key")
x_auth_service_provider = "https://api.twitter.com/1.1/account/verify_credentials.json"
twishort_post_url = "http://api.twishort.com/1.1/post.json"
twishort_update_ids_url = "http://api.twishort.com/1.1/update_ids.json"
r=requests.Request('GET', x_auth_service_provider)
prep=twitter.prepare_request(r)
resp=twitter.send(prep)
twitter.headers={
'X-Auth-Service-Provider':x_auth_service_provider,
'X-Verify-Credentials-Authorization':prep.headers['Authorization'],
}
data = {'api_key':twishort_key,
"text": text.encode("utf-8"),
"media": media}
response = twitter.post(twishort_post_url, data=data)
try:
return response.json()["text_to_tweet"]
except:
print("There was a problem creating a long tweet")
return 0

View File

@@ -1,37 +0,0 @@
# -*- coding: utf-8 -*-
""" Strips unneeded tweet information in order to store tweet objects by using less memory. This is especially useful when buffers start to contain more than a certain amount of items. """
from tweepy.models import Status
def reduce_tweet(tweet):
""" generates a new Tweet model with the fields we currently need, excluding everything else including null values and empty collections. """
allowed_values = ["created_at", "id", "full_text", "text", "message", "in_reply_to_status_id", "in_reply_to_user_id", "is_quote_status", "lang", "source", "coordinates", "quoted_status_id", "extended_entities"]
allowed_entities = ["hashtags", "media", "urls", "user_mentions", "polls"]
status_dict = {}
for key in allowed_values:
if tweet._json.get(key):
status_dict[key] = tweet._json[key]
entities = dict()
for key in allowed_entities:
if tweet._json["entities"].get(key) and tweet._json["entities"].get(key) != None:
entities[key] = tweet._json["entities"][key]
status_dict["entities"] = entities
# If tweet comes from the cached database, it does not include an API, so we can pass None here as we do not use that reference to tweepy's API.
if hasattr(tweet, "_api"):
api = tweet._api
else:
api = None
status = Status().parse(api=api, json=status_dict)
# Quotes and retweets are different objects. So we parse a new tweet when we have a quoted or retweeted status here.
if tweet._json.get("quoted_status"):
quoted_tweet = reduce_tweet(tweet.quoted_status)
status.quoted_status = quoted_tweet
if tweet._json.get("retweeted_status"):
retweeted_tweet = reduce_tweet(tweet.retweeted_status)
status.retweeted_status = retweeted_tweet
# Adds user ID to here so we can reference it later.
# Sometimes, the conversations buffer would send an already reduced tweet here so we will need to return it as is.
if isinstance(tweet.user, str) == False:
status.user = tweet.user.id_str
else:
return tweet
return status

View File

@@ -1,671 +0,0 @@
# -*- coding: utf-8 -*-
""" This is the main session needed to access all Twitter Features."""
import os
import time
import logging
import webbrowser
import wx
import tweepy
import demoji
import config
import output
import application
import appkeys
from pubsub import pub
from tweepy.errors import TweepyException, Forbidden, NotFound
from tweepy.models import User as UserModel
from mysc.thread_utils import call_threaded
from keys import keyring
from sessions import base
from sessions.twitter import utils, compose
from sessions.twitter.long_tweets import tweets, twishort
from . import reduce, streaming
from .wxUI import authorisationDialog
log = logging.getLogger("sessions.twitterSession")
class Session(base.baseSession):
""" A session object where we will save configuration, the twitter object and a local storage for saving the items retrieved through the Twitter API methods"""
def order_buffer(self, name, data, ignore_older=True):
""" Put new items in the local database.
name str: The name for the buffer stored in the dictionary.
data list: A list with tweets.
ignore_older bool: if set to True, items older than the first element on the list will be ignored.
returns the number of items that have been added in this execution"""
if name == "direct_messages":
return self.order_direct_messages(data)
num = 0
last_id = None
if self.db.get(name) == None:
self.db[name] = []
if self.db.get("users") == None:
self.db["users"] = {}
objects = self.db[name]
if ignore_older and len(self.db[name]) > 0:
if self.settings["general"]["reverse_timelines"] == False:
last_id = self.db[name][0].id
else:
last_id = self.db[name][-1].id
self.add_users_from_results(data)
for i in data:
if ignore_older and last_id != None:
if i.id < last_id:
log.error("Ignoring an older tweet... Last id: {0}, tweet id: {1}".format(last_id, i.id))
continue
if utils.find_item(i, self.db[name]) == None and utils.is_allowed(i, self.settings, name) == True:
if i == False: continue
reduced_object = reduce.reduce_tweet(i)
reduced_object = self.check_quoted_status(reduced_object)
reduced_object = self.check_long_tweet(reduced_object)
if self.settings["general"]["reverse_timelines"] == False: objects.append(reduced_object)
else: objects.insert(0, reduced_object)
num = num+1
self.db[name] = objects
return num
def order_people(self, name, data):
""" Put new items on the local database. Useful for cursored buffers (followers, friends, users of a list and searches)
name str: The name for the buffer stored in the dictionary.
data list: A list with items and some information about cursors.
returns the number of items that have been added in this execution"""
num = 0
if (name in self.db) == False:
self.db[name] = []
objects = self.db[name]
for i in data:
if utils.find_item(i, self.db[name]) == None:
if self.settings["general"]["reverse_timelines"] == False: objects.append(i)
else: objects.insert(0, i)
num = num+1
self.db[name] = objects
return num
def order_direct_messages(self, data):
""" Add incoming and sent direct messages to their corresponding database items.
data list: A list of direct messages to add.
returns the number of incoming messages processed in this execution, and sends an event with data regarding amount of sent direct messages added."""
incoming = 0
sent = 0
if ("direct_messages" in self.db) == False:
self.db["direct_messages"] = []
if ("sent_direct_messages" in self.db) == False:
self.db["sent_direct_messages"] = []
objects = self.db["direct_messages"]
sent_objects = self.db["sent_direct_messages"]
for i in data:
if i.message_create["sender_id"] == self.db["user_id"]:
if "sent_direct_messages" in self.db and utils.find_item(i, self.db["sent_direct_messages"]) == None:
if self.settings["general"]["reverse_timelines"] == False: sent_objects.append(i)
else: sent_objects.insert(0, i)
sent = sent+1
else:
if utils.find_item(i, self.db["direct_messages"]) == None:
if self.settings["general"]["reverse_timelines"] == False: objects.append(i)
else: objects.insert(0, i)
incoming = incoming+1
self.db["direct_messages"] = objects
self.db["sent_direct_messages"] = sent_objects
pub.sendMessage("twitter.sent_dms_updated", total=sent, session_name=self.get_name())
return incoming
def __init__(self, *args, **kwargs):
super(Session, self).__init__(*args, **kwargs)
# Adds here the optional cursors objects.
cursors = dict(direct_messages=-1)
self.db["cursors"] = cursors
self.reconnection_function_active = False
self.counter = 0
self.lists = []
# As users are cached for accessing them with not too many twitter calls,
# there could be a weird situation where a deleted user who sent direct messages to the current account will not be able to be retrieved at twitter.
# So we need to store an "user deleted" object in the cache, but have the ID of the deleted user in a local reference.
# This will be especially useful because if the user reactivates their account later, TWblue will try to retrieve such user again at startup.
# If we wouldn't implement this approach, TWBlue would save permanently the "deleted user" object.
self.deleted_users = {}
self.type = "twitter"
pub.subscribe(self.handle_new_status, "twitter.new_status")
pub.subscribe(self.handle_connected, "twitter.stream_connected")
# @_require_configuration
def login(self, verify_credentials=True):
""" Log into twitter using credentials from settings.
if the user account isn't authorised, it needs to call self.authorise() before login."""
if self.settings["twitter"]["user_key"] != None and self.settings["twitter"]["user_secret"] != None:
try:
log.debug("Logging in to twitter...")
self.auth = tweepy.OAuth1UserHandler(consumer_key=appkeys.twitter_api_key, consumer_secret=appkeys.twitter_api_secret, access_token=self.settings["twitter"]["user_key"], access_token_secret=self.settings["twitter"]["user_secret"])
self.twitter = tweepy.API(self.auth)
self.twitter_v2 = tweepy.Client(consumer_key=appkeys.twitter_api_key, consumer_secret=appkeys.twitter_api_secret, access_token=self.settings["twitter"]["user_key"], access_token_secret=self.settings["twitter"]["user_secret"])
if verify_credentials == True:
self.credentials = self.twitter.verify_credentials()
self.settings["twitter"]["user_name"] = self.credentials.screen_name
self.db["user_name"] = self.credentials.screen_name
self.db["user_id"] = self.credentials.id_str
self.logged = True
log.debug("Logged.")
self.counter = 0
except Exception as e:
log.exception("The login attempt failed.")
self.logged = False
else:
self.logged = False
raise Exceptions.RequireCredentialsSessionError
def authorise(self):
""" Authorises a Twitter account. This function needs to be called for each new session, after self.get_configuration() and before self.login()"""
if self.logged == True:
raise Exceptions.AlreadyAuthorisedError("The authorisation process is not needed at this time.")
auth = tweepy.OAuth1UserHandler(appkeys.twitter_api_key, appkeys.twitter_api_secret)
redirect_url = auth.get_authorization_url()
webbrowser.open_new_tab(redirect_url)
verification_dialog = wx.TextEntryDialog(None, _("Enter your PIN code here"), _("Authorising account..."))
answer = verification_dialog.ShowModal()
code = verification_dialog.GetValue()
verification_dialog.Destroy()
if answer != wx.ID_OK:
return
try:
auth.get_access_token(code)
except TweepyException:
dlg = wx.MessageDialog(None, _("We could not authorice your Twitter account to be used in TWBlue. This might be caused due to an incorrect verification code. Please try to add the session again."), _("Authorization error"), wx.ICON_ERROR)
dlg.ShowModal()
dlg.Destroy()
return False
self.create_session_folder()
self.get_configuration()
self.settings["twitter"]["user_key"] = auth.access_token
self.settings["twitter"]["user_secret"] = auth.access_token_secret
self.settings.write()
return True
def api_call(self, call_name, action="", _sound=None, report_success=False, report_failure=True, preexec_message="", *args, **kwargs):
""" Make a call to the Twitter API. If there is a connectionError or another exception not related to Twitter, It will call the method again at least 25 times, waiting a while between calls. Useful for post methods.
If twitter returns an error, it will not call the method anymore.
call_name str: The method to call
action str: What you are doing on twitter, it will be reported to the user if report_success is set to True.
for example "following @tw_blue2" will be reported as "following @tw_blue2 succeeded".
_sound str: a sound to play if the call is executed properly.
report_success and report_failure bool: These are self explanatory. True or False.
preexec_message str: A message to speak to the user while the method is running, example: "trying to follow x user"."""
finished = False
tries = 0
if preexec_message:
output.speak(preexec_message, True)
while finished==False and tries < 25:
try:
val = getattr(self.twitter, call_name)(*args, **kwargs)
finished = True
except TweepyException as e:
output.speak(str(e))
val = None
if type(e) != NotFound and type(e) != Forbidden:
tries = tries+1
time.sleep(5)
elif report_failure:
output.speak(_("%s failed. Reason: %s") % (action, str(e)))
finished = True
# except:
# tries = tries + 1
# time.sleep(5)
if report_success:
output.speak(_("%s succeeded.") % action)
if _sound != None: self.sound.play(_sound)
return val
def api_call_v2(self, call_name, action="", _sound=None, report_success=False, report_failure=True, preexec_message="", *args, **kwargs):
finished = False
tries = 0
if preexec_message:
output.speak(preexec_message, True)
while finished==False and tries < 25:
try:
val = getattr(self.twitter_v2, call_name)(*args, **kwargs)
finished = True
except TweepyException as e:
log.exception("Error sending the tweet.")
output.speak(str(e))
val = None
if type(e) != NotFound and type(e) != Forbidden:
tries = tries+1
time.sleep(5)
elif report_failure:
output.speak(_("%s failed. Reason: %s") % (action, str(e)))
finished = True
if report_success:
output.speak(_("%s succeeded.") % action)
if _sound != None: self.sound.play(_sound)
return val
def search(self, name, *args, **kwargs):
""" Search in twitter, passing args and kwargs as arguments to the Twython function."""
tl = self.twitter.search_tweets(*args, **kwargs)
tl.reverse()
return tl
# @_require_login
def get_favourites_timeline(self, name, *args, **kwargs):
""" Gets favourites for the authenticated user or a friend or follower.
name str: Name for storage in the database.
args and kwargs are passed directly to the Twython function."""
tl = self.call_paged("favorites", *args, **kwargs)
return self.order_buffer(name, tl)
def call_paged(self, update_function, name, *args, **kwargs):
""" Makes a call to the Twitter API methods several times. Useful for get methods.
this function is needed for retrieving more than 200 items.
update_function str: The function to call. This function must be child of self.twitter
args and kwargs are passed to update_function.
returns a list with all items retrieved."""
results = []
if self.db.get(name) == None or self.db.get(name) == []:
since_id = None
else:
if self.settings["general"]["reverse_timelines"] == False:
since_id = self.db[name][-1].id
else:
since_id = self.db[name][0].id
data = getattr(self.twitter, update_function)(count=self.settings["general"]["max_tweets_per_call"], since_id=since_id, *args, **kwargs)
results.extend(data)
results.reverse()
return results
# @_require_login
def get_user_info(self):
""" Retrieves some information required by TWBlue for setup."""
offset = time.timezone if (time.localtime().tm_isdst == 0) else time.altzone
offset = offset*-1
self.db["utc_offset"] = offset
# Get twitter's supported languages and save them in a global variable
#so we won't call to this method once per session.
if len(application.supported_languages) == 0:
application.supported_languages = self.twitter.supported_languages()
self.get_lists()
self.get_muted_users()
self.settings.write()
# @_require_login
def get_lists(self):
""" Gets the lists that the user is subscribed to and stores them in the database. Returns None."""
self.db["lists"] = self.twitter.get_lists(reverse=True)
# @_require_login
def get_muted_users(self):
""" Gets muted users (oh really?)."""
self.db["muted_users"] = self.twitter.get_muted_ids()
# @_require_login
def get_stream(self, name, function, *args, **kwargs):
""" Retrieves the items for a regular stream.
name str: Name to save items to the database.
function str: A function to get the items."""
last_id = -1
if name in self.db:
try:
if self.db[name][0]["id"] > self.db[name][-1]["id"]:
last_id = self.db[name][0]["id"]
else:
last_id = self.db[name][-1]["id"]
except IndexError:
pass
tl = self.call_paged(function, sinze_id=last_id, *args, **kwargs)
self.order_buffer(name, tl)
def get_cursored_stream(self, name, function, items="users", get_previous=False, *args, **kwargs):
""" Gets items for API calls that require using cursors to paginate the results.
name str: Name to save it in the database.
function str: Function that provides the items.
items: When the function returns the list with results, items will tell how the order function should be look. for example get_followers_list returns a list and users are under list["users"], here the items should point to "users".
get_previous bool: wether this function will be used to get previous items in a buffer or load the buffer from scratch.
returns number of items retrieved."""
items_ = []
try:
if "cursor" in self.db[name] and get_previous:
cursor = self.db[name]["cursor"]
else:
cursor = -1
except KeyError:
cursor = -1
if cursor != -1:
tl = getattr(self.twitter, function)(cursor=cursor, count=self.settings["general"]["max_tweets_per_call"], *args, **kwargs)
else:
tl = getattr(self.twitter, function)(count=self.settings["general"]["max_tweets_per_call"], *args, **kwargs)
tl[items].reverse()
num = self.order_cursored_buffer(name, tl[items])
# Recently, Twitter's new endpoints have cursor if there are more results.
if "next_cursor" in tl:
self.db[name]["cursor"] = tl["next_cursor"]
else:
self.db[name]["cursor"] = 0
return num
def check_connection(self):
""" Restart the Twitter object every 5 executions. It is useful for dealing with requests timeout and other oddities."""
log.debug("Executing check connection...")
self.counter += 1
if self.counter >= 4:
log.debug("Restarting connection after 5 minutes.")
del self.twitter
self.logged = False
self.login(False)
self.counter = 0
def check_quoted_status(self, tweet):
""" Helper for get_quoted_tweet. Get a quoted status inside a tweet and create a special tweet with all info available.
tweet dict: A tweet dictionary.
Returns a quoted tweet or the original tweet if is not a quote"""
status = tweets.is_long(tweet)
if status != False and config.app["app-settings"]["handle_longtweets"]:
quoted_tweet = self.get_quoted_tweet(tweet)
return quoted_tweet
return tweet
def get_quoted_tweet(self, tweet):
""" Process a tweet and extract all information related to the quote. """
quoted_tweet = tweet
if hasattr(tweet, "full_text"):
value = "full_text"
else:
value = "text"
if hasattr(quoted_tweet, "entities"):
setattr(quoted_tweet, value, utils.expand_urls(getattr(quoted_tweet, value), quoted_tweet.entities))
if hasattr(quoted_tweet, "is_quote_status") == True and hasattr(quoted_tweet, "quoted_status"):
original_tweet = quoted_tweet.quoted_status
elif hasattr(quoted_tweet, "retweeted_status") and hasattr(quoted_tweet.retweeted_status, "is_quote_status") == True and hasattr(quoted_tweet.retweeted_status, "quoted_status"):
original_tweet = quoted_tweet.retweeted_status.quoted_status
else:
return quoted_tweet
original_tweet = self.check_long_tweet(original_tweet)
if hasattr(original_tweet, "full_text"):
value = "full_text"
elif hasattr(original_tweet, "message"):
value = "message"
else:
value = "text"
if hasattr(original_tweet, "entities"):
setattr(original_tweet, value, utils.expand_urls(getattr(original_tweet, value), original_tweet.entities))
# ToDo: Shall we check whether we should add show_screen_names here?
return compose.compose_quoted_tweet(quoted_tweet, original_tweet, session=self)
def check_long_tweet(self, tweet):
""" Process a tweet and add extra info if it's a long tweet made with Twyshort.
tweet dict: a tweet object.
returns a tweet with a new argument message, or original tweet if it's not a long tweet."""
long = False
if hasattr(tweet, "entities") and tweet.entities.get("urls"):
long = twishort.is_long(tweet)
if long != False and config.app["app-settings"]["handle_longtweets"]:
message = twishort.get_full_text(long)
if hasattr(tweet, "quoted_status"):
tweet.quoted_status.message = message
if tweet.quoted_status.message == False: return False
tweet.quoted_status.twishort = True
if hasattr(tweet.quoted_status, "entities") and tweet.quoted_status.entities.get("user_mentions"):
for i in tweet.quoted_status.entities["user_mentions"]:
if "@%s" % (i["screen_name"]) not in tweet.quoted_status.message and i["screen_name"] != self.get_user(tweet.user).screen_name:
if hasattr(tweet.quoted_status, "retweeted_status") and self.get_user(tweet.retweeted_status.user).screen_name == i["screen_name"]:
continue
tweet.quoted_status.message = u"@%s %s" % (i["screen_name"], tweet.message)
else:
tweet.message = message
if tweet.message == False: return False
tweet.twishort = True
if hasattr(tweet, "entities") and tweet.entities.get("user_mentions"):
for i in tweet.entities["user_mentions"]:
if "@%s" % (i["screen_name"]) not in tweet.message and i["screen_name"] != self.get_user(tweet.user).screen_name:
if hasattr(tweet, "retweeted_status") and self.get_user(tweet.retweeted_status.user).screen_name == i["screen_name"]:
continue
tweet.message = u"@%s %s" % (i["screen_name"], tweet.message)
return tweet
def get_user(self, id):
""" Returns an user object associated with an ID.
id str: User identifier, provided by Twitter.
returns a tweepy user object."""
if hasattr(id, "id_str"):
log.error("Called get_user function by passing a full user id as a parameter.")
id = id.id_str
# Check if the user has been added to the list of deleted users previously.
if id in self.deleted_users:
log.debug("Returning user {} from the list of deleted users.".format(id))
return self.deleted_users[id]
if ("users" in self.db) == False or (str(id) in self.db["users"]) == False:
log.debug("Requesting user id {} as it is not present in the users database.".format(id))
try:
user = self.twitter.get_user(id=id)
except TweepyException as err:
user = UserModel(None)
user.screen_name = "deleted_user"
user.id = id
user.name = _("Deleted account")
if type(err) == NotFound:
self.deleted_users[id] = user
return user
else:
log.exception("Error when attempting to retrieve an user from Twitter.")
return user
users = self.db["users"]
users[user.id_str] = user
self.db["users"] = users
user.name = self.get_user_alias(user)
return user
else:
user = self.db["users"][str(id)]
user.name = self.get_user_alias(user)
return user
def get_user_alias(self, user):
""" Retrieves an alias for the passed user model, if exists.
@ user Tweepy.models.user: An user object.
"""
aliases = self.settings.get("user-aliases")
if aliases == None:
log.error("Aliases are not defined for this config spec.")
return self.demoji_user(user.name)
user_alias = aliases.get(user.id_str)
if user_alias != None:
return user_alias
return self.demoji_user(user.name)
def demoji_user(self, name):
if self.settings["general"]["hide_emojis"] == True:
return demoji.replace(name, "")
return name
def get_user_by_screen_name(self, screen_name):
""" Returns an user identifier associated with a screen_name.
screen_name str: User name, such as tw_blue2, provided by Twitter.
returns an user ID."""
if ("users" in self.db) == False:
user = utils.if_user_exists(self.twitter, screen_name)
users = self.db["users"]
users[user["id"]] = user
self.db["users"] = users
return user["id"]
else:
for i in list(self.db["users"].keys()):
if self.db["users"][i].screen_name == screen_name:
return self.db["users"][i].id
user = utils.if_user_exists(self.twitter, screen_name)
users = self.db["users"]
users[user.id] = user
self.db["users"] = users
return user.id
def save_users(self, user_ids):
""" Adds all new users to the users database. """
if len(user_ids) == 0:
return
log.debug("Received %d user IDS to be added in the database." % (len(user_ids)))
users_to_retrieve = [user_id for user_id in user_ids if (user_id not in self.db["users"] and user_id not in self.deleted_users)]
# Remove duplicates
users_to_retrieve = list(dict.fromkeys(users_to_retrieve))
if len(users_to_retrieve) == 0:
return
log.debug("TWBlue will get %d new users from Twitter." % (len(users_to_retrieve)))
try:
users = self.twitter.lookup_users(user_id=users_to_retrieve, tweet_mode="extended")
users_db = self.db["users"]
for user in users:
users_db[user.id_str] = user
log.debug("Added %d new users" % (len(users)))
self.db["users"] = users_db
except TweepyException as err:
if type(err) == NotFound: # User not found.
log.error("The specified users {} were not found in twitter.".format(user_ids))
# Creates a deleted user object for every user_id not found here.
# This will make TWBlue to not waste Twitter API calls when attempting to retrieve those users again.
# As deleted_users is not saved across restarts, when restarting TWBlue, it will retrieve the correct users if they enabled their accounts.
for id in users_to_retrieve:
user = UserModel(None)
user.screen_name = "deleted_user"
user.id = id
user.name = _("Deleted account")
self.deleted_users[id] = user
else:
log.exception("An exception happened while attempting to retrieve a list of users from direct messages in Twitter.")
def add_users_from_results(self, data):
users = self.db["users"]
for i in data:
if hasattr(i, "user"):
if isinstance(i.user, str):
log.warning("A String was passed to be added as an user. This is normal only if TWBlue tried to load a conversation.")
continue
if (i.user.id_str in self.db["users"]) == False:
users[i.user.id_str] = i.user
if hasattr(i, "quoted_status") and (i.quoted_status.user.id_str in self.db["users"]) == False:
users[i.quoted_status.user.id_str] = i.quoted_status.user
if hasattr(i, "retweeted_status") and (i.retweeted_status.user.id_str in self.db["users"]) == False:
users[i.retweeted_status.user.id_str] = i.retweeted_status.user
self.db["users"] = users
def start_streaming(self):
if config.app["app-settings"]["no_streaming"]:
return
self.stream = streaming.Stream(twitter_api=self.twitter, session_name=self.get_name(), user_id=self.db["user_id"], muted_users=self.db["muted_users"], consumer_key=appkeys.twitter_api_key, consumer_secret=appkeys.twitter_api_secret, access_token=self.settings["twitter"]["user_key"], access_token_secret=self.settings["twitter"]["user_secret"], chunk_size=1025)
self.stream_thread = call_threaded(self.stream.filter, follow=self.stream.users, stall_warnings=True)
def stop_streaming(self):
if config.app["app-settings"]["no_streaming"]:
return
if hasattr(self, "stream"):
self.stream.running = False
log.debug("Stream stopped for accounr {}".format(self.db["user_name"]))
def handle_new_status(self, status, session_name):
""" Handles a new status present in the Streaming API. """
if self.logged == False:
return
# Discard processing the status if the streaming sends a tweet for another account.
if self.get_name() != session_name:
return
# the Streaming API sends non-extended tweets with an optional parameter "extended_tweets" which contains full_text and other data.
# so we have to make sure we check it before processing the normal status.
# As usual, we handle also quotes and retweets at first.
if hasattr(status, "retweeted_status") and hasattr(status.retweeted_status, "quoted_status") and status.retweeted_status.quoted_status.truncated:
status.retweeted_status.quoted_status._json = {**status.retweeted_status.quoted_status._json, **status.retweeted_status.quoted_status._json["extended_tweet"]}
if hasattr(status, "retweeted_status") and hasattr(status.retweeted_status, "extended_tweet"):
status.retweeted_status._json = {**status.retweeted_status._json, **status.retweeted_status._json["extended_tweet"]}
# compose.compose_tweet requires the parent tweet to have a full_text field, so we have to add it to retweets here.
status._json["full_text"] = status._json["text"]
elif hasattr(status, "quoted_status") and hasattr(status.quoted_status, "extended_tweet"):
status.quoted_status._json = {**status.quoted_status._json, **status.quoted_status._json["extended_tweet"]}
if status.truncated:
status._json = {**status._json, **status._json["extended_tweet"]}
# Sends status to database, where it will be reduced and changed according to our needs.
buffers_to_send = []
if status.user.id_str in self.stream.users:
buffers_to_send.append("home_timeline")
if status.user.id == self.db["user_id"]:
buffers_to_send.append("sent_tweets")
for user in status.entities["user_mentions"]:
if user["id"] == self.db["user_id"]:
buffers_to_send.append("mentions")
users_with_timeline = [user.split("-")[0] for user in self.db.keys() if user.endswith("-timeline")]
for user in users_with_timeline:
if status.user.id_str == user:
buffers_to_send.append("{}-timeline".format(user))
for buffer in buffers_to_send[::]:
num = self.order_buffer(buffer, [status])
if num == 0:
buffers_to_send.remove(buffer)
# However, we have to do the "reduce and change" process here because the status we sent to the db is going to be a different object that the one sent to controller.
status = reduce.reduce_tweet(status)
status = self.check_quoted_status(status)
status = self.check_long_tweet(status)
# Send it to the main controller object.
pub.sendMessage("twitter.new_tweet", data=status, session_name=self.get_name(), _buffers=buffers_to_send)
def check_streams(self):
if config.app["app-settings"]["no_streaming"]:
return
if not hasattr(self, "stream"):
return
log.debug("Status of running stream for user {}: {}".format(self.db["user_name"], self.stream.running))
if self.stream.running == False:
self.start_streaming()
def handle_connected(self, session_name):
if self.logged == False:
return
if session_name != self.get_name():
log.debug("Connected streaming endpoint on session {}".format(session_name))
def send_tweet(self, *tweets):
""" Convenience function to send a thread. """
in_reply_to_status_id = None
for obj in tweets:
# When quoting a tweet, the tweet_data dict might contain a parameter called quote_tweet_id. Let's add it, or None, so quotes will be posted successfully.
if len(obj["attachments"]) == 0:
item = self.api_call_v2(call_name="create_tweet", text=obj["text"], _sound="tweet_send.ogg", in_reply_to_tweet_id=in_reply_to_status_id, poll_duration_minutes=obj["poll_period"], poll_options=obj["poll_options"], quote_tweet_id=obj.get("quote_tweet_id"))
in_reply_to_status_id = item.data["id"]
else:
media_ids = []
for i in obj["attachments"]:
img = self.api_call("media_upload", filename=i["file"])
if i["type"] == "photo":
self.api_call(call_name="create_media_metadata", media_id=img.media_id, alt_text=i["description"])
media_ids.append(img.media_id)
item = self.api_call_v2(call_name="create_tweet", text=obj["text"], _sound="tweet_send.ogg", in_reply_to_tweet_id=in_reply_to_status_id, media_ids=media_ids, poll_duration_minutes=obj["poll_period"], poll_options=obj["poll_options"], quote_tweet_id=obj.get("quote_tweet_id"))
in_reply_to_status_id = item.data["id"]
def reply(self, text="", in_reply_to_status_id=None, attachments=[], *args, **kwargs):
if len(attachments) == 0:
item = self.api_call_v2(call_name="create_tweet", text=text, _sound="reply_send.ogg", in_reply_to_tweet_id=in_reply_to_status_id, *args, **kwargs)
else:
media_ids = []
for i in attachments:
img = self.api_call("media_upload", filename=i["file"])
if i["type"] == "photo":
self.api_call(call_name="create_media_metadata", media_id=img.media_id, alt_text=i["description"])
media_ids.append(img.media_id)
item = self.api_call_v2(call_name="create_tweet", text=text, _sound="reply_send.ogg", in_reply_to_tweet_id=in_reply_to_status_id, media_ids=media_ids, *args, **kwargs)
def direct_message(self, text, recipient, attachment=None, *args, **kwargs):
if attachment == None:
item = self.api_call(call_name="send_direct_message", recipient_id=recipient, text=text)
else:
if attachment["type"] == "photo":
media_category = "DmImage"
elif attachment["type"] == "gif":
media_category = "DmGif"
elif attachment["type"] == "video":
media_category = "DmVideo"
media = self.api_call("media_upload", filename=attachment["file"], media_category=media_category)
item = self.api_call(call_name="send_direct_message", recipient_id=recipient, text=text, attachment_type="media", attachment_media_id=media.media_id)
if item != None:
sent_dms = self.db["sent_direct_messages"]
if self.settings["general"]["reverse_timelines"] == False:
sent_dms.append(item)
else:
sent_dms.insert(0, item)
self.db["sent_direct_messages"] = sent_dms
pub.sendMessage("twitter.sent_dm", data=item, session_name=self.get_name())
def get_name(self):
if self.logged:
return "Twitter: {}".format(self.db["user_name"])
else:
return "Twitter: {}".format(self.settings["twitter"]["user_name"])

View File

@@ -1,47 +0,0 @@
# -*- coding: utf-8 -*-
""" Streaming support for TWBlue. """
import time
import sys
import six
import requests
import urllib3
import ssl
import tweepy
import logging
from pubsub import pub
log = logging.getLogger("sessions.twitter.streaming")
class Stream(tweepy.Stream):
def __init__(self, twitter_api, session_name, user_id, muted_users=[], *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
log.debug("Starting streaming listener for session {}".format(session_name))
self.started = False
self.users = []
self.api = twitter_api
self.session_name = session_name
self.user_id = user_id
friends = self.api.get_friend_ids()
log.debug("Retrieved {} friends to add to the streaming listener.".format(len(friends)))
self.users.append(str(self.user_id))
log.debug("Got {} muted users.".format(len(muted_users)))
for user in friends:
if user not in muted_users:
self.users.append(str(user))
self.started = True
log.debug("Streaming listener started with {} users to follow.".format(len(self.users)))
def on_connect(self):
pub.sendMessage("twitter.stream_connected", session_name=self.session_name)
def on_exception(self, ex):
log.exception("Exception received on streaming endpoint for session {}".format(self.session_name))
def on_status(self, status):
""" Checks data arriving as a tweet. """
# Hide replies to users not followed by current account.
if status.in_reply_to_user_id_str != None and status.in_reply_to_user_id_str not in self.users and status.user.id != self.user_id:
return
if status.user.id_str in self.users:
pub.sendMessage("twitter.new_status", status=status, session_name=self.session_name)

View File

@@ -1,152 +0,0 @@
# -*- coding: utf-8 -*-
import re
import arrow
import languageHandler
from string import Template
from . import utils
# Define variables that would be available for all template objects.
# This will be used for the edit template dialog.
# Available variables for tweet objects.
tweet_variables = ["date", "display_name", "screen_name", "source", "lang", "text", "image_descriptions"]
dm_variables = ["date", "sender_display_name", "sender_screen_name", "recipient_display_name", "recipient_display_name", "text"]
person_variables = ["display_name", "screen_name", "location", "description", "followers", "following", "listed", "likes", "tweets", "created_at"]
# Default, translatable templates.
tweet_default_template = _("$display_name, $text $image_descriptions $date. $source")
dm_default_template = _("$sender_display_name, $text $date")
dm_sent_default_template = _("Dm to $recipient_display_name, $text $date")
person_default_template = _("$display_name (@$screen_name). $followers followers, $following following, $tweets tweets. Joined Twitter $created_at.")
def process_date(field, relative_times=True, offset_seconds=0):
original_date = arrow.get(field, locale="en")
if relative_times == True:
ts = original_date.humanize(locale=languageHandler.curLang[:2])
else:
ts = original_date.shift(seconds=offset_seconds).format(_("dddd, MMMM D, YYYY H:m:s"), locale=languageHandler.curLang[:2])
return ts
def process_text(tweet):
if hasattr(tweet, "full_text"):
text = tweet.full_text
elif hasattr(tweet, "text"):
text = tweet.text
# Cleanup mentions, so we'll remove more than 2 mentions to make the tweet easier to read.
text = utils.clean_mentions(utils.StripChars(text))
# Replace URLS for extended version of those.
if hasattr(tweet, "entities"):
text = utils.expand_urls(text, tweet.entities)
text = re.sub(r"https://twitter.com/\w+/status/\S+", "", text)
return text
def process_image_descriptions(entities):
""" Attempt to extract information for image descriptions. """
image_descriptions = []
for media in entities["media"]:
if media.get("ext_alt_text") != None:
image_descriptions.append(media.get("ext_alt_text"))
# Tweets retrieved via the Streaming API have a description field in media photos with image description available.
elif media.get("description") != None:
image_descriptions.append(media.get("description"))
idescriptions = ""
for image in image_descriptions:
idescriptions += _("Image description: {}.").format(image)
return idescriptions
def render_tweet(tweet, template, session, relative_times=False, offset_seconds=0):
""" Renders any given Tweet according to the passed template.
Available data for tweets will be stored in the following variables:
$date: Creation date.
$display_name: User profile name.
$screen_name: User screen name, this is the same name used to reference the user in Twitter.
$ source: Source client from where the current tweet was sent.
$lang: Two letter code for the automatically detected language for the tweet. This detection is performed by Twitter.
$text: Tweet text.
$image_descriptions: Information regarding image descriptions added by twitter users.
"""
global tweet_variables
available_data = dict()
created_at = process_date(tweet.created_at, relative_times, offset_seconds)
available_data.update(date=created_at)
# user.
available_data.update(display_name=session.get_user(tweet.user).name, screen_name=session.get_user(tweet.user).screen_name)
# Source client from where tweet was originated.
available_data.update(source=tweet.source)
if hasattr(tweet, "retweeted_status"):
if hasattr(tweet.retweeted_status, "quoted_status"):
text = _("RT @{}: {} Quote from @{}: {}").format(session.get_user(tweet.retweeted_status.user).screen_name, process_text(tweet.retweeted_status), session.get_user(tweet.retweeted_status.quoted_status.user).screen_name, process_text(tweet.retweeted_status.quoted_status))
else:
text = _("RT @{}: {}").format(session.get_user(tweet.retweeted_status.user).screen_name, process_text(tweet.retweeted_status))
elif hasattr(tweet, "quoted_status"):
text = _("{} Quote from @{}: {}").format(process_text(tweet), session.get_user(tweet.quoted_status.user).screen_name, process_text(tweet.quoted_status))
else:
text = process_text(tweet)
available_data.update(lang=tweet.lang, text=text)
# process image descriptions
image_descriptions = ""
if hasattr(tweet, "quoted_status") and hasattr(tweet.quoted_status, "extended_entities"):
image_descriptions = process_image_descriptions(tweet.quoted_status.extended_entities)
elif hasattr(tweet, "retweeted_status") and hasattr(tweet.retweeted_status, "quoted_status") and hasattr(tweet.retweeted_status.quoted_status, "extended_entities"):
image_descriptions = process_image_descriptions(tweet.retweeted_status.quoted_status.extended_entities)
elif hasattr(tweet, "extended_entities"):
image_descriptions = process_image_descriptions(tweet.extended_entities)
available_data.update(image_descriptions=image_descriptions)
result = Template(_(template)).safe_substitute(**available_data)
return result
def render_dm(dm, template, session, relative_times=False, offset_seconds=0):
""" Renders direct messages by using the provided template.
Available data will be stored in the following variables:
$date: Creation date.
$sender_display_name: User profile name for user who sent the dm.
$sender_screen_name: User screen name for user sending the dm, this is the same name used to reference the user in Twitter.
$recipient_display_name: User profile name for user who received the dm.
$recipient_screen_name: User screen name for user receiving the dm, this is the same name used to reference the user in Twitter.
$text: Text of the direct message.
"""
global dm_variables
available_data = dict()
available_data.update(text=utils.expand_urls(utils.StripChars(dm.message_create["message_data"]["text"]), dm.message_create["message_data"]["entities"]))
# Let's remove the last 3 digits in the timestamp string.
# Twitter sends their "epoch" timestamp with 3 digits for milliseconds and arrow doesn't like it.
original_date = arrow.get(int(dm.created_timestamp))
if relative_times == True:
ts = original_date.humanize(locale=languageHandler.curLang[:2])
else:
ts = original_date.shift(seconds=offset_seconds)
available_data.update(date=ts)
sender = session.get_user(dm.message_create["sender_id"])
recipient = session.get_user(dm.message_create["target"]["recipient_id"])
available_data.update(sender_display_name=sender.name, sender_screen_name=sender.screen_name, recipient_display_name=recipient.name, recipient_screen_name=recipient.screen_name)
result = Template(_(template)).safe_substitute(**available_data)
return result
# Sesion object is not used in this function but we keep compatibility across all rendering functions.
def render_person(user, template, session=None, relative_times=True, offset_seconds=0):
""" Renders persons (any Twitter user) by using the provided template.
Available data will be stored in the following variables:
$display_name: The name of the user, as theyve defined it. Not necessarily a persons name. Typically capped at 50 characters, but subject to change.
$screen_name: The screen name, handle, or alias that this user identifies themselves with.
$location: The user-defined location for this accounts profile. Not necessarily a location, nor machine-parseable.
$description: The user-defined UTF-8 string describing their account.
$followers: The number of followers this account currently has. This value might be inaccurate.
$following: The number of users this account is following (AKA their “followings”). This value might be inaccurate.
$listed: The number of public lists that this user is a member of. This value might be inaccurate.
$likes: The number of Tweets this user has liked in the accounts lifetime. This value might be inaccurate.
$tweets: The number of Tweets (including retweets) issued by the user. This value might be inaccurate.
$created_at: The date and time that the user account was created on Twitter.
"""
global person_variables
available_data = dict(display_name=user.name, screen_name=user.screen_name, followers=user.followers_count, following=user.friends_count, likes=user.favourites_count, listed=user.listed_count, tweets=user.statuses_count)
# Nullable values.
nullables = ["location", "description"]
for nullable in nullables:
if hasattr(user, nullable) and getattr(user, nullable) != None:
available_data[nullable] = getattr(user, nullable)
else:
available_data[nullable] = ""
created_at = process_date(user.created_at, relative_times=relative_times, offset_seconds=offset_seconds)
available_data.update(created_at=created_at)
result = Template(_(template)).safe_substitute(**available_data)
return result

View File

@@ -1,275 +0,0 @@
# -*- coding: utf-8 -*-
import re
import html.entities
import output
import logging
import requests
import time
from tweepy.errors import TweepyException, NotFound, Forbidden
log = logging.getLogger("twitter.utils")
""" Some utilities for the twitter interface."""
__version__ = 0.1
__doc__ = "Find urls in tweets and #audio hashtag."
url_re = re.compile(r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))")
url_re2 = re.compile("(?:\w+://|www\.)[^ ,.?!#%=+][^ \\n\\t]*")
bad_chars = '\'\\\n.,[](){}:;"'
def StripChars(s):
"""Converts any html entities in s to their unicode-decoded equivalents and returns a string."""
entity_re = re.compile(r"&(#\d+|\w+);")
def matchFunc(match):
"""Nested function to handle a match object.
If we match &blah; and it's not found, &blah; will be returned.
if we match #\d+, unichr(digits) will be returned.
Else, a unicode string will be returned."""
if match.group(1).startswith('#'): return chr(int(match.group(1)[1:]))
replacement = html.entities.entitydefs.get(match.group(1), "&%s;" % match.group(1))
return replacement
return str(entity_re.sub(matchFunc, s))
def find_urls_in_text(text):
return url_re2.findall(text)
def find_urls (tweet, twitter_media=False):
urls = []
if twitter_media and hasattr(tweet, "extended_entities"):
for mediaItem in tweet.extended_entities["media"]:
if mediaItem["type"] == "video":
for variant in mediaItem["video_info"]["variants"]:
if variant["content_type"] == "video/mp4":
urls.append(variant["url"])
break
# Let's add URLS from tweet entities.
if hasattr(tweet, "message_create"):
entities = tweet.message_create["message_data"]["entities"]
else:
if hasattr(tweet, "entities") == True:
entities = tweet.entities
if entities.get("urls") != None:
for i in entities["urls"]:
if i["expanded_url"] not in urls:
urls.append(i["expanded_url"])
if hasattr(tweet, "quoted_status"):
urls.extend(find_urls(tweet.quoted_status, twitter_media))
if hasattr(tweet, "retweeted_status"):
urls.extend(find_urls(tweet.retweeted_status, twitter_media))
if hasattr(tweet, "message"):
i = "message"
elif hasattr(tweet, "full_text"):
i = "full_text"
else:
i = "text"
if hasattr(tweet, "message_create"):
extracted_urls = find_urls_in_text(tweet.message_create["message_data"]["text"])
else:
extracted_urls = find_urls_in_text(getattr(tweet, i))
# Don't include t.co links (mostly they are photos or shortened versions of already added URLS).
for i in extracted_urls:
if i not in urls and "https://t.co" not in i:
urls.append(i)
return urls
def find_item(item, listItems):
for i in range(0, len(listItems)):
if listItems[i].id == item.id:
return i
# Check also retweets.
if hasattr(item, "retweeted_status") and item.retweeted_status.id == listItems[i].id:
return i
return None
def find_list(name, lists):
for i in range(0, len(lists)):
if lists[i].name == name: return lists[i].id
def is_audio(tweet):
if hasattr(tweet, "quoted_status") and hasattr(tweet.quoted_status, "extended_entities"):
result = is_audio(tweet.quoted_status)
if result != None:
return result
if hasattr(tweet, "retweeted_status") and hasattr(tweet.retweeted_status, "extended_entities"):
result = is_audio(tweet.retweeted_status)
if result == True:
return result
# Checks firstly for Twitter videos and audios.
if hasattr(tweet, "extended_entities"):
for mediaItem in tweet.extended_entities["media"]:
if mediaItem["type"] == "video":
return True
try:
if len(find_urls(tweet)) < 1:
return False
if hasattr(tweet, "message_create"):
entities = tweet.message_create["message_data"]["entities"]
else:
if hasattr(tweet, "entities") == False or tweet.entities.get("hashtags") == None:
return False
entities = tweet.entities
if len(entities["hashtags"]) > 0:
for i in entities["hashtags"]:
if i["text"] == "audio":
return True
except IndexError:
log.exception("Exception while executing is_audio hashtag algorithm")
def is_geocoded(tweet):
if hasattr(tweet, "coordinates") and tweet.coordinates != None:
return True
def is_media(tweet):
if hasattr(tweet, "message_create"):
entities = tweet.message_create["message_data"]["entities"]
else:
if hasattr(tweet, "entities") == False or tweet.entities.get("hashtags") == None:
return False
entities = tweet.entities
if entities.get("media") == None:
return False
for i in entities["media"]:
if i.get("type") != None and i.get("type") == "photo":
return True
return False
def get_all_mentioned(tweet, conf, field="screen_name"):
""" Gets all users that have been mentioned."""
results = []
if hasattr(tweet, "retweeted_status"):
results.extend(get_all_mentioned(tweet.retweeted_status, conf, field))
if hasattr(tweet, "quoted_status"):
results.extend(get_all_mentioned(tweet.quoted_status, conf, field))
if hasattr(tweet, "entities") and tweet.entities.get("user_mentions"):
for i in tweet.entities["user_mentions"]:
if i["screen_name"] != conf["user_name"] and i["id_str"] != tweet.user:
if i.get(field) not in results:
results.append(i.get(field))
return results
def get_all_users(tweet, session):
string = []
user = session.get_user(tweet.user)
if user.screen_name != session.db["user_name"]:
string.append(user.screen_name)
if hasattr(tweet, "retweeted_status"):
string.extend(get_all_users(tweet.retweeted_status, session))
if hasattr(tweet, "quoted_status"):
string.extend(get_all_users(tweet.quoted_status, session))
if hasattr(tweet, "entities") and tweet.entities.get("user_mentions"):
for i in tweet.entities["user_mentions"]:
if i["screen_name"] != session.db["user_name"] and i["screen_name"] != user.screen_name:
if i["screen_name"] not in string:
string.append(i["screen_name"])
# Attempt to remove duplicates, tipically caused by nested tweets.
string = list(dict.fromkeys(string))
if len(string) == 0:
string.append(user.screen_name)
return string
def if_user_exists(twitter, user):
try:
data = twitter.get_user(screen_name=user)
return data
except TweepyException as err:
if type(err) == NotFound:
return None
else:
return user
def is_allowed(tweet, settings, buffer_name):
clients = settings["twitter"]["ignored_clients"]
if hasattr(tweet, "sender"): return True
allowed = True
tweet_data = {}
if hasattr(tweet, "retweeted_status"):
tweet_data["retweet"] = True
if hasattr(tweet, "in_reply_to_status_id"):
tweet_data["reply"] = True
if hasattr(tweet, "quoted_status"):
tweet_data["quote"] = True
if hasattr(tweet, "retweeted_status"):
tweet = tweet.retweeted_status
source = tweet.source
for i in clients:
if i.lower() == source.lower():
return False
return filter_tweet(tweet, tweet_data, settings, buffer_name)
def filter_tweet(tweet, tweet_data, settings, buffer_name):
if hasattr(tweet, "full_text"):
value = "full_text"
else:
value = "text"
for i in settings["filters"]:
if settings["filters"][i]["in_buffer"] == buffer_name:
regexp = settings["filters"][i]["regexp"]
word = settings["filters"][i]["word"]
# Added if/else for compatibility reasons.
if "allow_rts" in settings["filters"][i]:
allow_rts = settings["filters"][i]["allow_rts"]
else:
allow_rts = "True"
if "allow_quotes" in settings["filters"][i]:
allow_quotes = settings["filters"][i]["allow_quotes"]
else:
allow_quotes = "True"
if "allow_replies" in settings["filters"][i]:
allow_replies = settings["filters"][i]["allow_replies"]
else:
allow_replies = "True"
if allow_rts == "False" and "retweet" in tweet_data:
return False
if allow_quotes == "False" and "quote" in tweet_data:
return False
if allow_replies == "False" and "reply" in tweet_data:
return False
if word != "" and settings["filters"][i]["if_word_exists"]:
if word in getattr(tweet, value):
return False
elif word != "" and settings["filters"][i]["if_word_exists"] == False:
if word not in getattr(tweet, value):
return False
if settings["filters"][i]["in_lang"] == "True":
if getattr(tweet, "lang") not in settings["filters"][i]["languages"]:
return False
elif settings["filters"][i]["in_lang"] == "False":
if tweet.lang in settings["filters"][i]["languages"]:
return False
return True
def twitter_error(error):
if type(error) == Forbidden:
msg = _(u"Sorry, you are not authorised to see this status.")
elif type(error) == NotFound:
msg = _(u"No status found with that ID")
else:
msg = _(u"Error {0}").format(str(error),)
output.speak(msg)
def expand_urls(text, entities):
""" Expand all URLS present in text with information found in entities"""
if entities.get("urls") == None:
return text
urls = find_urls_in_text(text)
for url in entities["urls"]:
if url["url"] in text:
text = text.replace(url["url"], url["expanded_url"])
return text
def clean_mentions(text):
new_text = text
mentionned_people = [u for u in re.finditer("(?<=^|(?<=[^a-zA-Z0-9-\.]))@([A-Za-z0-9_]+)", text)]
if len(mentionned_people) <= 2:
return text
end = -2
total_users = 0
for user in mentionned_people:
if abs(user.start()-end) < 3:
new_text = new_text.replace(user.group(0), "", 1)
total_users = total_users+1
end = user.end()
if total_users-2 < 1:
return text
new_text = _("{user_1}, {user_2} and {all_users} more: {text}").format(user_1=mentionned_people[0].group(0), user_2=mentionned_people[1].group(0), all_users=total_users-2, text=new_text)
return new_text

View File

@@ -1,17 +0,0 @@
# -*- coding: utf-8 -*-
import wx
class authorisationDialog(wx.Dialog):
def __init__(self):
super(authorisationDialog, self).__init__(parent=None, title=_(u"Authorising account..."))
panel = wx.Panel(self)
sizer = wx.BoxSizer(wx.VERTICAL)
static = wx.StaticText(panel, wx.NewId(), _(u"Enter your PIN code here"))
self.text = wx.TextCtrl(panel, -1)
self.ok = wx.Button(panel, wx.ID_OK)
self.cancel = wx.Button(panel, wx.ID_CANCEL)
sizer.Add(self.text, 0, wx.ALL, 5)
sizer.Add(self.cancel, 0, wx.ALL, 5)
panel.SetSizer(sizer)
min = sizer.CalcMin()
self.SetClientSize(min)