Migrated most of twitter util functions from sessions/twitter/utils.py

This commit is contained in:
Manuel Cortez 2021-01-04 11:13:20 -06:00
parent 7ceb806af2
commit bcd51d6259

View File

@ -29,24 +29,24 @@ def find_urls (tweet):
urls = []
# Let's add URLS from tweet entities.
if hasattr(tweet, "message_create"):
entities = tweet.message_create.message_data.entities
entities = tweet.message_create["message_data"]["entities"]
else:
entities = tweet.entities
for i in entities.urls:
if i.expanded_url not in urls:
urls.append(i.expanded_url)
for i in entities["urls"]:
if i["expanded_url"] not in urls:
urls.append(i["expanded_url"])
if hasattr(tweet, "quoted_status"):
for i in tweet.quoted_status.entities.urls:
if i.expanded_url not in urls:
urls.append(i.expanded_url)
for i in tweet.quoted_status.entities["urls"]:
if i["expanded_url"] not in urls:
urls.append(i["expanded_url"])
if hasattr(tweet, "retweeted_status"):
for i in tweet.retweeted_status.entities.urls:
if i.expanded_url not in urls:
urls.append(i.expanded_url)
for i in tweet.retweeted_status.entities["urls"]:
if i["expanded_url"] not in urls:
urls.append(i["expanded_url"])
if hasattr(tweet["retweeted_status"], "quoted_status"):
for i in tweet.retweeted_status.quoted_status.entities.urls:
if i.expanded_url not in urls:
urls.append(i.expanded_url)
for i in tweet.retweeted_status.quoted_status.entities["urls"]:
if i["expanded_url"] not in urls:
urls.append(i["expanded_url"])
if hasattr(tweet, "message"):
i = "message"
elif hasattr(tweet, "full_text"):
@ -54,7 +54,7 @@ def find_urls (tweet):
else:
i = "text"
if hasattr(tweet, "message_create"):
extracted_urls = find_urls_in_text(tweet.message_create.message_data.text)
extracted_urls = find_urls_in_text(tweet.message_create["message_data"]["text"])
else:
extracted_urls = find_urls_in_text(getattr(tweet, i))
# Don't include t.co links (mostly they are photos or shortened versions of already added URLS).
@ -77,15 +77,15 @@ def is_audio(tweet):
if len(find_urls(tweet)) < 1:
return False
if hasattr(tweet, "message_create"):
entities = tweet.message_create.message_data.entities
entities = tweet.message_create["message_data"]["entities"]
else:
entities = tweet.entities
if len(entities.hashtags) > 0:
for i in entities.hashtags:
if i.text == "audio":
if len(entities["hashtags"]) > 0:
for i in entities["hashtags"]:
if i["text"] == "audio":
return True
except IndexError:
print(tweet.entities.hashtags)
print(tweet.entities["hashtags"])
log.exception("Exception while executing is_audio hashtag algorithm")
def is_geocoded(tweet):
@ -94,23 +94,23 @@ def is_geocoded(tweet):
def is_media(tweet):
if hasattr(tweet, "message_create"):
entities = tweet.message_create.message_data.entities
entities = tweet.message_create["message_data"]["entities"]
else:
entities = tweet.entities
if not hasattr(entities, media):
if entities.get("media") == None:
return False
for i in entities.media:
if hasattr(i, "type") and i.type == "photo":
for i in entities["media"]:
if i.get("type") != None and i.get("type") == "photo":
return True
return False
def get_all_mentioned(tweet, conf, field="screen_name"):
""" Gets all users that have been mentioned."""
results = []
for i in tweet.entities.user_mentions:
if i.screen_name != conf["user_name"] and i.screen_name != tweet.user.screen_name:
if getattr(i, field) not in results:
results.append(getattr(i, field))
for i in tweet.entities["user_mentions"]:
if i["screen_name"] != conf["user_name"] and i["screen_name"] != tweet.user.screen_name:
if i.get(field) not in results:
results.append(i.get(field))
return results
def get_all_users(tweet, conf):
@ -123,14 +123,15 @@ def get_all_users(tweet, conf):
else:
if tweet.user.screen_name != conf["user_name"]:
string.append(tweet.user.screen_name)
for i in tweet.entities.user_mentions:
if i.screen_name != conf["user_name"] and i.screen_name != tweet.user.screen_name:
if i.screen_name not in string:
string.append(i.screen_name)
for i in tweet.entities["user_mentions"]:
if i["screen_name"] != conf["user_name"] and i["screen_name"] != tweet.user.screen_name:
if i["screen_name"] not in string:
string.append(i["screen_name"])
if len(string) == 0:
string.append(tweet.user.screen_name)
return string
# ToDo: implement this using tweepy
def if_user_exists(twitter, user):
try:
data = twitter.show_user(screen_name=user)
@ -154,7 +155,7 @@ def is_allowed(tweet, settings, buffer_name):
tweet_data["quote"] = True
if hasattr(tweet, "retweeted_status"):
tweet = tweet.retweeted_status
source = re.sub(r"(?s)<.*?>", "", tweet.source)
source = tweet.source
for i in clients:
if i.lower() == source.lower():
return False
@ -214,7 +215,7 @@ def twitter_error(error):
def expand_urls(text, entities):
""" Expand all URLS present in text with information found in entities"""
urls = find_urls_in_text(text)
for url in entities.urls:
if url.url in text:
text = text.replace(url.url, url.expanded_url)
for url in entities["urls"]:
if url["url"] in text:
text = text.replace(url["url"], url["expanded_url"])
return text