Merge branch 'next-gen' into mastodon

This commit is contained in:
2022-02-24 10:24:12 -06:00
59 changed files with 6001 additions and 5095 deletions

View File

@@ -3,7 +3,6 @@ import platform
system = platform.system()
from . import utils
import re
import html.entities
import time
import output
import languageHandler
@@ -11,21 +10,9 @@ import arrow
import logging
import config
from .long_tweets import twishort, tweets
from .utils import StripChars
log = logging.getLogger("compose")
def StripChars(s):
"""Converts any html entities in s to their unicode-decoded equivalents and returns a string."""
entity_re = re.compile(r"&(#\d+|\w+);")
def matchFunc(match):
"""Nested function to handle a match object.
If we match &blah; and it's not found, &blah; will be returned.
if we match #\d+, unichr(digits) will be returned.
Else, a unicode string will be returned."""
if match.group(1).startswith('#'): return chr(int(match.group(1)[1:]))
replacement = html.entities.entitydefs.get(match.group(1), "&%s;" % match.group(1))
return replacement
return str(entity_re.sub(matchFunc, s))
chars = "abcdefghijklmnopqrstuvwxyz"
def compose_tweet(tweet, db, relative_times, show_screen_names=False, session=None):

View File

@@ -36,9 +36,9 @@ class Session(base.baseSession):
return self.order_direct_messages(data)
num = 0
last_id = None
if (name in self.db) == False:
if self.db.get(name) == None:
self.db[name] = []
if ("users" in self.db) == False:
if self.db.get("users") == None:
self.db["users"] = {}
objects = self.db[name]
if ignore_older and len(self.db[name]) > 0:
@@ -136,7 +136,7 @@ class Session(base.baseSession):
if self.settings["twitter"]["user_key"] != None and self.settings["twitter"]["user_secret"] != None:
try:
log.debug("Logging in to twitter...")
self.auth = tweepy.OAuthHandler(appkeys.twitter_api_key, appkeys.twitter_api_secret)
self.auth = tweepy.OAuth1UserHandler(appkeys.twitter_api_key, appkeys.twitter_api_secret)
self.auth.set_access_token(self.settings["twitter"]["user_key"], self.settings["twitter"]["user_secret"])
self.twitter = tweepy.API(self.auth)
self.twitter_v2 = tweepy.Client(consumer_key=appkeys.twitter_api_key, consumer_secret=appkeys.twitter_api_secret, access_token=self.settings["twitter"]["user_key"], access_token_secret=self.settings["twitter"]["user_secret"])
@@ -158,7 +158,7 @@ class Session(base.baseSession):
if self.logged == True:
raise Exceptions.AlreadyAuthorisedError("The authorisation process is not needed at this time.")
else:
self.auth = tweepy.OAuthHandler(appkeys.twitter_api_key, appkeys.twitter_api_secret)
self.auth = tweepy.OAuth1UserHandler(appkeys.twitter_api_key, appkeys.twitter_api_secret)
redirect_url = self.auth.get_authorization_url()
webbrowser.open_new_tab(redirect_url)
self.authorisation_dialog = authorisationDialog()
@@ -204,7 +204,7 @@ class Session(base.baseSession):
except TweepyException as e:
output.speak(str(e))
val = None
if type(e) != NotFound and type(e) != Forvidden:
if type(e) != NotFound and type(e) != Forbidden:
tries = tries+1
time.sleep(5)
elif report_failure:
@@ -218,6 +218,30 @@ class Session(base.baseSession):
if _sound != None: self.sound.play(_sound)
return val
def api_call_v2(self, call_name, action="", _sound=None, report_success=False, report_failure=True, preexec_message="", *args, **kwargs):
finished = False
tries = 0
if preexec_message:
output.speak(preexec_message, True)
while finished==False and tries < 25:
try:
val = getattr(self.twitter_v2, call_name)(*args, **kwargs)
finished = True
except TweepyException as e:
log.exception("Error sending the tweet.")
output.speak(str(e))
val = None
if type(e) != NotFound and type(e) != Forbidden:
tries = tries+1
time.sleep(5)
elif report_failure:
output.speak(_("%s failed. Reason: %s") % (action, str(e)))
finished = True
if report_success:
output.speak(_("%s succeeded.") % action)
if _sound != None: self.sound.play(_sound)
return val
def search(self, name, *args, **kwargs):
""" Search in twitter, passing args and kwargs as arguments to the Twython function."""
tl = self.twitter.search_tweets(*args, **kwargs)
@@ -232,21 +256,22 @@ class Session(base.baseSession):
tl = self.call_paged("favorites", *args, **kwargs)
return self.order_buffer(name, tl)
def call_paged(self, update_function, *args, **kwargs):
def call_paged(self, update_function, name, *args, **kwargs):
""" Makes a call to the Twitter API methods several times. Useful for get methods.
this function is needed for retrieving more than 200 items.
update_function str: The function to call. This function must be child of self.twitter
args and kwargs are passed to update_function.
returns a list with all items retrieved."""
max = 0
results = []
data = getattr(self.twitter, update_function)(count=self.settings["general"]["max_tweets_per_call"], *args, **kwargs)
if self.db.get(name) == None or self.db.get(name) == []:
since_id = None
else:
if self.settings["general"]["reverse_timelines"] == False:
since_id = self.db[name][-1].id
else:
since_id = self.db[name][0].id
data = getattr(self.twitter, update_function)(count=self.settings["general"]["max_tweets_per_call"], since_id=since_id, *args, **kwargs)
results.extend(data)
for i in range(0, max):
if i == 0: max_id = results[-1].id
else: max_id = results[0].id
data = getattr(self.twitter, update_function)(max_id=max_id, count=self.settings["general"]["max_tweets_per_call"], *args, **kwargs)
results.extend(data)
results.reverse()
return results
@@ -590,4 +615,55 @@ class Session(base.baseSession):
if self.logged == False:
return
if user != self.db["user_name"]:
log.debug("Connected streaming endpoint on account {}".format(user))
log.debug("Connected streaming endpoint on account {}".format(user))
def send_tweet(self, *tweets):
""" Convenience function to send a thread. """
in_reply_to_status_id = None
for obj in tweets:
# When quoting a tweet, the tweet_data dict might contain a parameter called quote_tweet_id. Let's add it, or None, so quotes will be posted successfully.
if len(obj["attachments"]) == 0:
item = self.api_call_v2(call_name="create_tweet", text=obj["text"], _sound="tweet_send.ogg", in_reply_to_tweet_id=in_reply_to_status_id, poll_duration_minutes=obj["poll_period"], poll_options=obj["poll_options"], quote_tweet_id=obj.get("quote_tweet_id"))
in_reply_to_status_id = item.data["id"]
else:
media_ids = []
for i in obj["attachments"]:
img = self.api_call("media_upload", filename=i["file"])
if i["type"] == "photo":
self.api_call(call_name="create_media_metadata", media_id=img.media_id, alt_text=i["description"])
media_ids.append(img.media_id)
item = self.api_call_v2(call_name="create_tweet", text=obj["text"], _sound="tweet_send.ogg", in_reply_to_tweet_id=in_reply_to_status_id, media_ids=media_ids, poll_duration_minutes=obj["poll_period"], poll_options=obj["poll_options"], quote_tweet_id=obj.get("quote_tweet_id"))
in_reply_to_status_id = item.data["id"]
def reply(self, text="", in_reply_to_status_id=None, attachments=[], *args, **kwargs):
if len(attachments) == 0:
item = self.api_call_v2(call_name="create_tweet", text=text, _sound="reply_send.ogg", in_reply_to_tweet_id=in_reply_to_status_id, *args, **kwargs)
else:
media_ids = []
for i in attachments:
img = self.api_call("media_upload", filename=i["file"])
if i["type"] == "photo":
self.api_call(call_name="create_media_metadata", media_id=img.media_id, alt_text=i["description"])
media_ids.append(img.media_id)
item = self.api_call_v2(call_name="create_tweet", text=text, _sound="reply_send.ogg", in_reply_to_tweet_id=in_reply_to_status_id, media_ids=media_ids, *args, **kwargs)
def direct_message(self, text, recipient, attachment=None, *args, **kwargs):
if attachment == None:
item = self.api_call(call_name="send_direct_message", recipient_id=recipient, text=text)
else:
if attachment["type"] == "photo":
media_category = "DmImage"
elif attachment["type"] == "gif":
media_category = "DmGif"
elif attachment["type"] == "video":
media_category = "DmVideo"
media = self.api_call("media_upload", filename=attachment["file"], media_category=media_category)
item = self.api_call(call_name="send_direct_message", recipient_id=recipient, text=text, attachment_type="media", attachment_media_id=media.media_id)
if item != None:
sent_dms = self.db["sent_direct_messages"]
if self.settings["general"]["reverse_timelines"] == False:
sent_dms.append(item)
else:
sent_dms.insert(0, item)
self.db["sent_direct_messages"] = sent_dms
pub.sendMessage("sent-dm", data=item, user=self.db["user_name"])

View File

@@ -0,0 +1,159 @@
# -*- coding: utf-8 -*-
import re
import arrow
import languageHandler
from string import Template
from . import utils
# Define variables that would be available for all template objects.
# This will be used for the edit template dialog.
# Available variables for tweet objects.
tweet_variables = ["date", "display_name", "screen_name", "source", "lang", "text", "image_descriptions"]
dm_variables = ["date", "sender_display_name", "sender_screen_name", "recipient_display_name", "recipient_display_name", "text"]
person_variables = ["display_name", "screen_name", "location", "description", "followers", "following", "listed", "likes", "tweets", "created_at"]
# Default, translatable templates.
tweet_default_template = _("$display_name, $text $image_descriptions $date. $source")
dm_default_template = _("$sender_display_name, $text $date")
dm_sent_default_template = _("Dm to $recipient_display_name, $text $date")
person_default_template = _("$display_name (@$screen_name). $followers followers, $following following, $tweets tweets. Joined Twitter $created_at.")
def process_date(field, relative_times=True, offset_seconds=0):
original_date = arrow.get(field, locale="en")
if relative_times == True:
ts = original_date.humanize(locale=languageHandler.curLang[:2])
else:
ts = original_date.shift(seconds=offset_seconds).format(_("dddd, MMMM D, YYYY H:m:s"), locale=languageHandler.curLang[:2])
return ts
def process_text(tweet):
if hasattr(tweet, "full_text"):
text = tweet.full_text
elif hasattr(tweet, "text"):
text = tweet.text
# Cleanup mentions, so we'll remove more than 2 mentions to make the tweet easier to read.
text = utils.clean_mentions(utils.StripChars(text))
# Replace URLS for extended version of those.
if hasattr(tweet, "entities"):
text = utils.expand_urls(text, tweet.entities)
text = re.sub(r"https://twitter.com/\w+/status/\d+", "", text)
return text
def process_image_descriptions(entities):
""" Attempt to extract information for image descriptions. """
image_descriptions = []
for media in entities["media"]:
if media.get("ext_alt_text") != None:
image_descriptions.append(media.get("ext_alt_text"))
# Tweets retrieved via the Streaming API have a description field in media photos with image description available.
elif media.get("description") != None:
image_descriptions.append(media.get("description"))
idescriptions = ""
for image in image_descriptions:
idescriptions += _("Image description: {}.").format(image)
return idescriptions
def remove_unneeded_variables(template, variables):
for variable in variables:
template = re.sub("\$"+variable, "", template)
return template
def render_tweet(tweet, template, session, relative_times=False, offset_seconds=0):
""" Renders any given Tweet according to the passed template.
Available data for tweets will be stored in the following variables:
$date: Creation date.
$display_name: User profile name.
$screen_name: User screen name, this is the same name used to reference the user in Twitter.
$ source: Source client from where the current tweet was sent.
$lang: Two letter code for the automatically detected language for the tweet. This detection is performed by Twitter.
$text: Tweet text.
$image_descriptions: Information regarding image descriptions added by twitter users.
"""
global tweet_variables
available_data = dict()
created_at = process_date(tweet.created_at, relative_times, offset_seconds)
available_data.update(date=created_at)
# user.
available_data.update(display_name=session.get_user(tweet.user).name, screen_name=session.get_user(tweet.user).screen_name)
# Source client from where tweet was originated.
available_data.update(source=tweet.source)
if hasattr(tweet, "retweeted_status"):
if hasattr(tweet.retweeted_status, "quoted_status"):
text = "RT @{}: {} Quote from @{}: {}".format(session.get_user(tweet.retweeted_status.user).screen_name, process_text(tweet.retweeted_status), session.get_user(tweet.retweeted_status.quoted_status.user).screen_name, process_text(tweet.retweeted_status.quoted_status))
else:
text = "RT @{}: {}".format(session.get_user(tweet.retweeted_status.user).screen_name, process_text(tweet.retweeted_status))
elif hasattr(tweet, "quoted_status"):
text = "{} Quote from @{}: {}".format(process_text(tweet), session.get_user(tweet.quoted_status.user).screen_name, process_text(tweet.quoted_status))
else:
text = process_text(tweet)
available_data.update(lang=tweet.lang, text=text)
# process image descriptions
image_descriptions = ""
if hasattr(tweet, "quoted_status") and hasattr(tweet.quoted_status, "extended_entities"):
image_descriptions = process_image_descriptions(tweet.quoted_status.extended_entities)
elif hasattr(tweet, "retweeted_status") and hasattr(tweet.retweeted_status, "quoted_status") and hasattr(tweet.retweeted_status.quoted_status, "extended_entities"):
image_descriptions = process_image_descriptions(tweet.retweeted_status.quoted_status.extended_entities)
elif hasattr(tweet, "extended_entities"):
image_descriptions = process_image_descriptions(tweet.extended_entities)
if image_descriptions != "":
available_data.update(image_descriptions=image_descriptions)
result = Template(_(template)).safe_substitute(**available_data)
result = remove_unneeded_variables(result, tweet_variables)
return result
def render_dm(dm, template, session, relative_times=False, offset_seconds=0):
""" Renders direct messages by using the provided template.
Available data will be stored in the following variables:
$date: Creation date.
$sender_display_name: User profile name for user who sent the dm.
$sender_screen_name: User screen name for user sending the dm, this is the same name used to reference the user in Twitter.
$recipient_display_name: User profile name for user who received the dm.
$recipient_screen_name: User screen name for user receiving the dm, this is the same name used to reference the user in Twitter.
$text: Text of the direct message.
"""
global dm_variables
available_data = dict()
available_data.update(text=utils.expand_urls(dm.message_create["message_data"]["text"], dm.message_create["message_data"]["entities"]))
# Let's remove the last 3 digits in the timestamp string.
# Twitter sends their "epoch" timestamp with 3 digits for milliseconds and arrow doesn't like it.
original_date = arrow.get(int(dm.created_timestamp))
if relative_times == True:
ts = original_date.humanize(locale=languageHandler.curLang[:2])
else:
ts = original_date.shift(seconds=offset_seconds)
available_data.update(date=ts)
sender = session.get_user(dm.message_create["sender_id"])
recipient = session.get_user(dm.message_create["target"]["recipient_id"])
available_data.update(sender_display_name=sender.name, sender_screen_name=sender.screen_name, recipient_display_name=recipient.name, recipient_screen_name=recipient.screen_name)
result = Template(_(template)).safe_substitute(**available_data)
result = remove_unneeded_variables(result, dm_variables)
return result
# Sesion object is not used in this function but we keep compatibility across all rendering functions.
def render_person(user, template, session=None, relative_times=True, offset_seconds=0):
""" Renders persons (any Twitter user) by using the provided template.
Available data will be stored in the following variables:
$display_name: The name of the user, as theyve defined it. Not necessarily a persons name. Typically capped at 50 characters, but subject to change.
$screen_name: The screen name, handle, or alias that this user identifies themselves with.
$location: The user-defined location for this accounts profile. Not necessarily a location, nor machine-parseable.
$description: The user-defined UTF-8 string describing their account.
$followers: The number of followers this account currently has. This value might be inaccurate.
$following: The number of users this account is following (AKA their “followings”). This value might be inaccurate.
$listed: The number of public lists that this user is a member of. This value might be inaccurate.
$likes: The number of Tweets this user has liked in the accounts lifetime. This value might be inaccurate.
$tweets: The number of Tweets (including retweets) issued by the user. This value might be inaccurate.
$created_at: The date and time that the user account was created on Twitter.
"""
global person_variables
available_data = dict(display_name=user.name, screen_name=user.screen_name, followers=user.followers_count, following=user.friends_count, likes=user.favourites_count, listed=user.listed_count, tweets=user.statuses_count)
# Nullable values.
nullables = ["location", "description"]
for nullable in nullables:
if hasattr(user, nullable) and getattr(user, nullable) != None:
available_data[nullable] = getattr(user, nullable)
created_at = process_date(user.created_at, relative_times=relative_times, offset_seconds=offset_seconds)
available_data.update(created_at=created_at)
result = Template(_(template)).safe_substitute(**available_data)
result = remove_unneeded_variables(result, person_variables)
return result

View File

@@ -1,11 +1,10 @@
# -*- coding: utf-8 -*-
import url_shortener, re
import re
import html.entities
import output
import config
import logging
import requests
import time
import sound
from tweepy.errors import TweepyException, NotFound, Forbidden
log = logging.getLogger("twitter.utils")
""" Some utilities for the twitter interface."""
@@ -18,6 +17,19 @@ url_re = re.compile(r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4
url_re2 = re.compile("(?:\w+://|www\.)[^ ,.?!#%=+][^ \\n\\t]*")
bad_chars = '\'\\\n.,[](){}:;"'
def StripChars(s):
"""Converts any html entities in s to their unicode-decoded equivalents and returns a string."""
entity_re = re.compile(r"&(#\d+|\w+);")
def matchFunc(match):
"""Nested function to handle a match object.
If we match &blah; and it's not found, &blah; will be returned.
if we match #\d+, unichr(digits) will be returned.
Else, a unicode string will be returned."""
if match.group(1).startswith('#'): return chr(int(match.group(1)[1:]))
replacement = html.entities.entitydefs.get(match.group(1), "&%s;" % match.group(1))
return replacement
return str(entity_re.sub(matchFunc, s))
def find_urls_in_text(text):
return url_re2.findall(text)