mirror of
https://github.com/MCV-Software/TWBlue.git
synced 2025-01-19 00:40:42 -06:00
Removed hold modules from code. Now everything is in sessions
This commit is contained in:
parent
0966739296
commit
76db14360a
@ -1 +0,0 @@
|
||||
import utils, compose, twitter
|
@ -1,32 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import BaseHTTPServer
|
||||
import application
|
||||
from urlparse import urlparse, parse_qs
|
||||
from pubsub import pub
|
||||
|
||||
logged = False
|
||||
verifier = None
|
||||
|
||||
class handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
|
||||
|
||||
def do_GET(self):
|
||||
global logged
|
||||
self.send_response(200)
|
||||
self.send_header("Content-type", "text/html")
|
||||
self.end_headers()
|
||||
logged = True
|
||||
params = parse_qs(urlparse(self.path).query)
|
||||
global verifier
|
||||
verifier = params.get('oauth_verifier', [None])[0]
|
||||
self.wfile.write(u"You have successfully logged into Twitter with {0}. You can close this window now.".format(application.name))
|
||||
pub.sendMessage("authorisation-accepted")
|
||||
pub.unsubscribe(self.cancelled, "authorisation-cancelled")
|
||||
self.finish()
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
pub.subscribe(self.cancelled, "authorisation-cancelled")
|
||||
super(handler, self).__init__(*args, **kwargs)
|
||||
|
||||
def cancelled(self):
|
||||
pub.unsubscribe(self.cancelled, "authorisation-cancelled")
|
||||
self.finish()
|
@ -1,175 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import platform
|
||||
system = platform.system()
|
||||
import utils
|
||||
import re
|
||||
import htmlentitydefs
|
||||
import time
|
||||
import output
|
||||
import languageHandler
|
||||
import arrow
|
||||
import logging
|
||||
import config
|
||||
from long_tweets import twishort, tweets
|
||||
log = logging.getLogger("compose")
|
||||
|
||||
def StripChars(s):
|
||||
"""Converts any html entities in s to their unicode-decoded equivalents and returns a string."""
|
||||
entity_re = re.compile(r"&(#\d+|\w+);")
|
||||
def matchFunc(match):
|
||||
"""Nested function to handle a match object.
|
||||
If we match &blah; and it's not found, &blah; will be returned.
|
||||
if we match #\d+, unichr(digits) will be returned.
|
||||
Else, a unicode string will be returned."""
|
||||
if match.group(1).startswith('#'): return unichr(int(match.group(1)[1:]))
|
||||
replacement = htmlentitydefs.entitydefs.get(match.group(1), "&%s;" % match.group(1))
|
||||
return replacement.decode('iso-8859-1')
|
||||
return unicode(entity_re.sub(matchFunc, s))
|
||||
|
||||
chars = "abcdefghijklmnopqrstuvwxyz"
|
||||
|
||||
def compose_tweet(tweet, db, relative_times, show_screen_names=False, session=None):
|
||||
""" It receives a tweet and returns a list with the user, text for the tweet or message, date and the client where user is."""
|
||||
if system == "Windows":
|
||||
original_date = arrow.get(tweet["created_at"], "ddd MMM DD H:m:s Z YYYY", locale="en")
|
||||
if relative_times == True:
|
||||
ts = original_date.humanize(locale=languageHandler.getLanguage())
|
||||
else:
|
||||
ts = original_date.replace(seconds=db["utc_offset"]).format(_(u"dddd, MMMM D, YYYY H:m:s"), locale=languageHandler.getLanguage())
|
||||
else:
|
||||
ts = tweet["created_at"]
|
||||
if tweet.has_key("message"):
|
||||
value = "message"
|
||||
elif tweet.has_key("full_text"):
|
||||
value = "full_text"
|
||||
else:
|
||||
value = "text"
|
||||
if tweet.has_key("retweeted_status") and value != "message":
|
||||
text = StripChars(tweet["retweeted_status"][value])
|
||||
else:
|
||||
text = StripChars(tweet[value])
|
||||
if show_screen_names:
|
||||
user = tweet["user"]["screen_name"]
|
||||
else:
|
||||
user = tweet["user"]["name"]
|
||||
source = re.sub(r"(?s)<.*?>", "", tweet["source"])
|
||||
if tweet.has_key("retweeted_status"):
|
||||
if tweet.has_key("message") == False and tweet["retweeted_status"]["is_quote_status"] == False:
|
||||
text = "RT @%s: %s" % (tweet["retweeted_status"]["user"]["screen_name"], text)
|
||||
elif tweet["retweeted_status"]["is_quote_status"]:
|
||||
text = "%s" % (text)
|
||||
else:
|
||||
text = "RT @%s: %s" % (tweet["retweeted_status"]["user"]["screen_name"], text)
|
||||
if tweet.has_key("message") == False:
|
||||
urls = utils.find_urls_in_text(text)
|
||||
if tweet.has_key("retweeted_status"):
|
||||
for url in range(0, len(urls)):
|
||||
try:
|
||||
text = text.replace(urls[url], tweet["retweeted_status"]["entities"]["urls"][url]["expanded_url"])
|
||||
except: pass
|
||||
else:
|
||||
for url in range(0, len(urls)):
|
||||
try:
|
||||
text = text.replace(urls[url], tweet["entities"]["urls"][url]["expanded_url"])
|
||||
except: pass
|
||||
if config.app['app-settings']['handle_longtweets']: pass
|
||||
return [user+", ", text, ts+", ", source]
|
||||
|
||||
def compose_direct_message(item, db, relative_times, show_screen_names=False, session=None):
|
||||
# for a while this function will be together with compose_dm.
|
||||
# this one composes direct messages based on events (new API Endpoints).
|
||||
if system == "Windows":
|
||||
# Let's remove the last 3 digits in the timestamp string.
|
||||
# Twitter sends their "epoch" timestamp with 3 digits for milliseconds and arrow doesn't like it.
|
||||
original_date = arrow.get(item["created_timestamp"][:-3])
|
||||
if relative_times == True:
|
||||
ts = original_date.humanize(locale=languageHandler.getLanguage())
|
||||
else:
|
||||
ts = original_date.replace(seconds=db["utc_offset"]).format(_(u"dddd, MMMM D, YYYY H:m:s"), locale=languageHandler.getLanguage())
|
||||
else:
|
||||
ts = item["created_timestamp"]
|
||||
text = StripChars(item["message_create"]["message_data"]["text"])
|
||||
source = "DM"
|
||||
sender = session.get_user(item["message_create"]["sender_id"])
|
||||
if db["user_name"] == sender["screen_name"]:
|
||||
if show_screen_names:
|
||||
user = _(u"Dm to %s ") % (session.get_user(item["message_create"]["target"]["recipient_id"])["screen_name"])
|
||||
else:
|
||||
user = _(u"Dm to %s ") % (session.get_user(item["message_create"]["target"]["recipient_id"])["name"])
|
||||
else:
|
||||
if show_screen_names:
|
||||
user = sender["screen_name"]
|
||||
else:
|
||||
user = sender["name"]
|
||||
if text[-1] in chars: text=text+"."
|
||||
urls = utils.find_urls_in_text(text)
|
||||
for url in range(0, len(urls)):
|
||||
try: text = text.replace(urls[url], item["message_create"]["message_data"]["entities"]["urls"][url]["expanded_url"])
|
||||
except IndexError: pass
|
||||
return [user+", ", text, ts+", ", source]
|
||||
|
||||
def compose_quoted_tweet(quoted_tweet, original_tweet, show_screen_names=False, session=None):
|
||||
""" It receives a tweet and returns a list with the user, text for the tweet or message, date and the client where user is."""
|
||||
if quoted_tweet.has_key("retweeted_status"):
|
||||
if quoted_tweet["retweeted_status"].has_key("full_text"):
|
||||
value = "full_text"
|
||||
else:
|
||||
value = "text"
|
||||
text = StripChars(quoted_tweet["retweeted_status"][value])
|
||||
else:
|
||||
if quoted_tweet.has_key("full_text"):
|
||||
value = "full_text"
|
||||
else:
|
||||
value = "text"
|
||||
text = StripChars(quoted_tweet[value])
|
||||
if show_screen_names:
|
||||
quoting_user = quoted_tweet["user"]["screen_name"]
|
||||
else:
|
||||
quoting_user = quoted_tweet["user"]["name"]
|
||||
source = re.sub(r"(?s)<.*?>", "", quoted_tweet["source"])
|
||||
if quoted_tweet.has_key("retweeted_status"):
|
||||
text = "rt @%s: %s" % (quoted_tweet["retweeted_status"]["user"]["screen_name"], text)
|
||||
if text[-1] in chars: text=text+"."
|
||||
original_user = original_tweet["user"]["screen_name"]
|
||||
if original_tweet.has_key("message"):
|
||||
original_text = original_tweet["message"]
|
||||
elif original_tweet.has_key("full_text"):
|
||||
original_text = StripChars(original_tweet["full_text"])
|
||||
else:
|
||||
original_text = StripChars(original_tweet["text"])
|
||||
quoted_tweet["message"] = _(u"{0}. Quoted tweet from @{1}: {2}").format( text, original_user, original_text)
|
||||
quoted_tweet = tweets.clear_url(quoted_tweet)
|
||||
quoted_tweet["entities"]["urls"].extend(original_tweet["entities"]["urls"])
|
||||
return quoted_tweet
|
||||
|
||||
def compose_followers_list(tweet, db, relative_times=True, show_screen_names=False, session=None):
|
||||
if system == "Windows":
|
||||
original_date = arrow.get(tweet["created_at"], "ddd MMM D H:m:s Z YYYY", locale="en")
|
||||
if relative_times == True:
|
||||
ts = original_date.humanize(locale=languageHandler.getLanguage())
|
||||
else:
|
||||
ts = original_date.replace(seconds=db["utc_offset"]).format(_(u"dddd, MMMM D, YYYY H:m:s"), locale=languageHandler.getLanguage())
|
||||
else:
|
||||
ts = tweet["created_at"]
|
||||
if tweet.has_key("status"):
|
||||
if len(tweet["status"]) > 4 and system == "Windows":
|
||||
original_date2 = arrow.get(tweet["status"]["created_at"], "ddd MMM D H:m:s Z YYYY", locale="en")
|
||||
if relative_times:
|
||||
ts2 = original_date2.humanize(locale=languageHandler.getLanguage())
|
||||
else:
|
||||
ts2 = original_date2.replace(seconds=db["utc_offset"]).format(_(u"dddd, MMMM D, YYYY H:m:s"), locale=languageHandler.getLanguage())
|
||||
else:
|
||||
ts2 = _("Unavailable")
|
||||
else:
|
||||
ts2 = _("Unavailable")
|
||||
return [_(u"%s (@%s). %s followers, %s friends, %s tweets. Last tweeted %s. Joined Twitter %s") % (tweet["name"], tweet["screen_name"], tweet["followers_count"], tweet["friends_count"], tweet["statuses_count"], ts2, ts)]
|
||||
|
||||
def compose_list(list):
|
||||
name = list["name"]
|
||||
if list["description"] == None: description = _(u"No description available")
|
||||
else: description = list["description"]
|
||||
user = list["user"]["name"]
|
||||
members = str(list["member_count"])
|
||||
if list["mode"] == "private": status = _(u"private")
|
||||
else: status = _(u"public")
|
||||
return [name, description, user, members, status]
|
@ -1,31 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import config
|
||||
import random
|
||||
import webbrowser
|
||||
from twython import Twython, TwythonError
|
||||
from keys import keyring
|
||||
from requests import certs
|
||||
import logging
|
||||
log = logging.getLogger("sessionTwitter")
|
||||
|
||||
class twitter(object):
|
||||
|
||||
def login(self, user_key, user_secret, verify_credentials):
|
||||
self.twitter = Twython(keyring.get("api_key"), keyring.get("api_secret"), user_key, user_secret)
|
||||
if verify_credentials == True:
|
||||
self.credentials = self.twitter.verify_credentials()
|
||||
|
||||
def authorise(self):
|
||||
twitter = Twython(keyring.get("api_key"), keyring.get("api_secret"))
|
||||
self.auth = twitter.get_authentication_tokens(callback_url="oob")
|
||||
webbrowser.open_new_tab(self.auth['auth_url'])
|
||||
|
||||
def verify_authorisation(self, settings, pincode):
|
||||
self.twitter = Twython(keyring.get("api_key"), keyring.get("api_secret"), self.auth['oauth_token'], self.auth['oauth_token_secret'])
|
||||
final = self.twitter.get_authorized_tokens(pincode)
|
||||
self.save_configuration(settings, final["oauth_token"], final["oauth_token_secret"])
|
||||
|
||||
def save_configuration(self, settings, user_key, user_secret):
|
||||
settings["twitter"]["user_key"] = user_key
|
||||
settings["twitter"]["user_secret"] = user_secret
|
||||
settings.write()
|
@ -1,229 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import url_shortener, re
|
||||
import output
|
||||
from twython import TwythonError
|
||||
import config
|
||||
import logging
|
||||
import requests
|
||||
import time
|
||||
import sound
|
||||
log = logging.getLogger("twitter.utils")
|
||||
""" Some utilities for the twitter interface."""
|
||||
|
||||
__version__ = 0.1
|
||||
__doc__ = "Find urls in tweets and #audio hashtag."
|
||||
|
||||
url_re = re.compile(r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))")
|
||||
|
||||
url_re2 = re.compile("(?:\w+://|www\.)[^ ,.?!#%=+][^ ]*")
|
||||
bad_chars = '\'\\.,[](){}:;"'
|
||||
|
||||
def find_urls_in_text(text):
|
||||
return [s.strip(bad_chars) for s in url_re2.findall(text)]
|
||||
|
||||
def find_urls (tweet):
|
||||
urls = []
|
||||
# Let's add URLS from tweet entities.
|
||||
if tweet.has_key("message_create"):
|
||||
entities = tweet["message_create"]["message_data"]["entities"]
|
||||
else:
|
||||
entities = tweet["entities"]
|
||||
for i in entities["urls"]:
|
||||
if i["expanded_url"] not in urls:
|
||||
urls.append(i["expanded_url"])
|
||||
if tweet.has_key("quoted_status"):
|
||||
for i in tweet["quoted_status"]["entities"]["urls"]:
|
||||
if i["expanded_url"] not in urls:
|
||||
urls.append(i["expanded_url"])
|
||||
if tweet.has_key("retweeted_status"):
|
||||
for i in tweet["retweeted_status"]["entities"]["urls"]:
|
||||
if i["expanded_url"] not in urls:
|
||||
urls.append(i["expanded_url"])
|
||||
if tweet["retweeted_status"].has_key("quoted_status"):
|
||||
for i in tweet["retweeted_status"]["quoted_status"]["entities"]["urls"]:
|
||||
if i["expanded_url"] not in urls:
|
||||
urls.append(i["expanded_url"])
|
||||
if tweet.has_key("message"):
|
||||
i = "message"
|
||||
elif tweet.has_key("full_text"):
|
||||
i = "full_text"
|
||||
else:
|
||||
i = "text"
|
||||
if tweet.has_key("message_create"):
|
||||
extracted_urls = find_urls_in_text(tweet["message_create"]["message_data"]["text"])
|
||||
else:
|
||||
extracted_urls = find_urls_in_text(tweet[i])
|
||||
# Don't include t.co links (mostly they are photos or shortened versions of already added URLS).
|
||||
for i in extracted_urls:
|
||||
if i not in urls and "https://t.co" not in i:
|
||||
urls.append(i)
|
||||
return urls
|
||||
|
||||
def find_item(id, listItem):
|
||||
for i in range(0, len(listItem)):
|
||||
if listItem[i]["id"] == id: return i
|
||||
return None
|
||||
|
||||
def find_list(name, lists):
|
||||
for i in range(0, len(lists)):
|
||||
if lists[i]["name"] == name: return lists[i]["id"]
|
||||
|
||||
def find_previous_reply(id, listItem):
|
||||
for i in range(0, len(listItem)):
|
||||
if listItem[i]["id_str"] == str(id): return i
|
||||
return None
|
||||
|
||||
def find_next_reply(id, listItem):
|
||||
for i in range(0, len(listItem)):
|
||||
if listItem[i]["in_reply_to_status_id_str"] == str(id): return i
|
||||
return None
|
||||
|
||||
def is_audio(tweet):
|
||||
try:
|
||||
if len(find_urls(tweet)) < 1:
|
||||
return False
|
||||
if tweet.has_key("message_create"):
|
||||
entities = tweet["message_create"]["message_data"]["entities"]
|
||||
else:
|
||||
entities = tweet["entities"]
|
||||
if len(entities["hashtags"]) > 0:
|
||||
for i in entities["hashtags"]:
|
||||
if i["text"] == "audio":
|
||||
return True
|
||||
except IndexError:
|
||||
print tweet["entities"]["hashtags"]
|
||||
log.exception("Exception while executing is_audio hashtag algorithm")
|
||||
|
||||
def is_geocoded(tweet):
|
||||
if tweet.has_key("coordinates") and tweet["coordinates"] != None:
|
||||
return True
|
||||
|
||||
def is_media(tweet):
|
||||
if tweet.has_key("message_create"):
|
||||
entities = tweet["message_create"]["message_data"]["entities"]
|
||||
else:
|
||||
entities = tweet["entities"]
|
||||
if entities.has_key("media") == False:
|
||||
return False
|
||||
for i in entities["media"]:
|
||||
if i.has_key("type") and i["type"] == "photo":
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_all_mentioned(tweet, conf, field="screen_name"):
|
||||
""" Gets all users that has been mentioned."""
|
||||
results = []
|
||||
for i in tweet["entities"]["user_mentions"]:
|
||||
if i["screen_name"] != conf["user_name"] and i["screen_name"] != tweet["user"]["screen_name"]:
|
||||
if i[field] not in results:
|
||||
results.append(i[field])
|
||||
return results
|
||||
|
||||
def get_all_users(tweet, conf):
|
||||
string = []
|
||||
if tweet.has_key("retweeted_status"):
|
||||
string.append(tweet["user"]["screen_name"])
|
||||
tweet = tweet["retweeted_status"]
|
||||
if tweet.has_key("sender"):
|
||||
string.append(tweet["sender"]["screen_name"])
|
||||
else:
|
||||
if tweet["user"]["screen_name"] != conf["user_name"]:
|
||||
string.append(tweet["user"]["screen_name"])
|
||||
for i in tweet["entities"]["user_mentions"]:
|
||||
if i["screen_name"] != conf["user_name"] and i["screen_name"] != tweet["user"]["screen_name"]:
|
||||
if i["screen_name"] not in string:
|
||||
string.append(i["screen_name"])
|
||||
if len(string) == 0:
|
||||
string.append(tweet["user"]["screen_name"])
|
||||
return string
|
||||
|
||||
def if_user_exists(twitter, user):
|
||||
try:
|
||||
data = twitter.show_user(screen_name=user)
|
||||
return data
|
||||
except TwythonError as err:
|
||||
if err.error_code == 404:
|
||||
return None
|
||||
else:
|
||||
return user
|
||||
|
||||
def api_call(parent=None, call_name=None, preexec_message="", success="", success_snd="", *args, **kwargs):
|
||||
if preexec_message:
|
||||
output.speak(preexec_message, True)
|
||||
try:
|
||||
val = getattr(parent.twitter.twitter, call_name)(*args, **kwargs)
|
||||
output.speak(success)
|
||||
parent.parent.sound.play(success_snd)
|
||||
except TwythonError as e:
|
||||
output.speak("Error %s: %s" % (e.error_code, e.msg), True)
|
||||
parent.parent.sound.play("error.ogg")
|
||||
return val
|
||||
|
||||
def is_allowed(tweet, settings, buffer_name):
|
||||
clients = settings["twitter"]["ignored_clients"]
|
||||
if tweet.has_key("sender"): return True
|
||||
allowed = True
|
||||
tweet_data = {}
|
||||
if tweet.has_key("retweeted_status"):
|
||||
tweet_data["retweet"] = True
|
||||
if tweet["in_reply_to_status_id_str"] != None:
|
||||
tweet_data["reply"] = True
|
||||
if tweet.has_key("quoted_status"):
|
||||
tweet_data["quote"] = True
|
||||
if tweet.has_key("retweeted_status"): tweet = tweet["retweeted_status"]
|
||||
source = re.sub(r"(?s)<.*?>", "", tweet["source"])
|
||||
for i in clients:
|
||||
if i.lower() == source.lower():
|
||||
return False
|
||||
return filter_tweet(tweet, tweet_data, settings, buffer_name)
|
||||
|
||||
def filter_tweet(tweet, tweet_data, settings, buffer_name):
|
||||
if tweet.has_key("full_text"):
|
||||
value = "full_text"
|
||||
else:
|
||||
value = "text"
|
||||
for i in settings["filters"]:
|
||||
if settings["filters"][i]["in_buffer"] == buffer_name:
|
||||
regexp = settings["filters"][i]["regexp"]
|
||||
word = settings["filters"][i]["word"]
|
||||
# Added if/else for compatibility reasons.
|
||||
if settings["filters"][i].has_key("allow_rts"):
|
||||
allow_rts = settings["filters"][i]["allow_rts"]
|
||||
else:
|
||||
allow_rts = "True"
|
||||
if settings["filters"][i].has_key("allow_quotes"):
|
||||
allow_quotes = settings["filters"][i]["allow_quotes"]
|
||||
else:
|
||||
allow_quotes = "True"
|
||||
if settings["filters"][i].has_key("allow_replies"):
|
||||
allow_replies = settings["filters"][i]["allow_replies"]
|
||||
else:
|
||||
allow_replies = "True"
|
||||
if allow_rts == "False" and tweet_data.has_key("retweet"):
|
||||
return False
|
||||
if allow_quotes == "False" and tweet_data.has_key("quote"):
|
||||
return False
|
||||
if allow_replies == "False" and tweet_data.has_key("reply"):
|
||||
return False
|
||||
if word != "" and settings["filters"][i]["if_word_exists"]:
|
||||
if word in tweet[value]:
|
||||
return False
|
||||
elif word != "" and settings["filters"][i]["if_word_exists"] == False:
|
||||
if word not in tweet[value]:
|
||||
return False
|
||||
if settings["filters"][i]["in_lang"] == "True":
|
||||
if tweet["lang"] not in settings["filters"][i]["languages"]:
|
||||
return False
|
||||
elif settings["filters"][i]["in_lang"] == "False":
|
||||
if tweet["lang"] in settings["filters"][i]["languages"]:
|
||||
return False
|
||||
return True
|
||||
|
||||
def twitter_error(error):
|
||||
if error.error_code == 403:
|
||||
msg = _(u"Sorry, you are not authorised to see this status.")
|
||||
elif error.error_code == 404:
|
||||
msg = _(u"No status found with that ID")
|
||||
else:
|
||||
msg = _(u"Error code {0}").format(error.error_code,)
|
||||
output.speak(msg)
|
Loading…
x
Reference in New Issue
Block a user