Updated Twython library to version 3.6.0, keeping our customizations

This commit is contained in:
Jose Manuel Delicado 2017-10-07 18:40:27 +02:00
parent 4d75f2db34
commit b3c8de9a1f
3 changed files with 158 additions and 54 deletions

View File

@ -19,7 +19,7 @@ Questions, comments? ryan@venodesigns.net
"""
__author__ = 'Ryan McGrath <ryan@venodesigns.net>'
__version__ = '3.3.0'
__version__ = '3.6.0'
from .api import Twython
from .streaming import TwythonStreamer

View File

@ -145,6 +145,7 @@ class Twython(EndpointsMixin, object):
else:
params = params
files = list()
requests_args = {}
for k, v in self.client_args.items():
# Maybe this should be set as a class variable and only done once?
@ -195,17 +196,16 @@ class Twython(EndpointsMixin, object):
error_message,
error_code=response.status_code,
retry_after=response.headers.get('X-Rate-Limit-Reset'))
content=""
try:
if response.status_code == 204:
content = response.content
else:
content = response.json()
except ValueError:
# Send the response as is for working with /media/metadata/create.json.
content = response.content
# raise TwythonError('Response was not valid JSON. \
# Unable to decode.')
if response.content!="":
raise TwythonError('Response was not valid JSON. \
Unable to decode.')
return content
@ -258,8 +258,10 @@ class Twython(EndpointsMixin, object):
url = endpoint
else:
url = '%s/%s.json' % (self.api_url % version, endpoint)
content = self._request(url, method=method, params=params,
api_call=url)
return content
def get(self, endpoint, params=None, version='1.1'):
@ -473,6 +475,11 @@ class Twython(EndpointsMixin, object):
>>> print result
"""
if not callable(function):
raise TypeError('.cursor() takes a Twython function as its first \
argument. Did you provide the result of a \
function call?')
if not hasattr(function, 'iter_mode'):
raise TwythonError('Unable to create generator for Twython \
method "%s"' % function.__name__)
@ -531,7 +538,7 @@ class Twython(EndpointsMixin, object):
@staticmethod
def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False, expand_quoted_status=False):
"""Return HTML for a tweet (urls, mentions, hashtags replaced with links)
"""Return HTML for a tweet (urls, mentions, hashtags, symbols replaced with links)
:param tweet: Tweet object from received from Twitter API
:param use_display_url: Use display URL to represent link
@ -547,62 +554,116 @@ class Twython(EndpointsMixin, object):
if 'retweeted_status' in tweet:
tweet = tweet['retweeted_status']
if 'extended_tweet' in tweet:
tweet = tweet['extended_tweet']
orig_tweet_text = tweet.get('full_text') or tweet['text']
display_text_range = tweet.get('display_text_range') or [0, len(orig_tweet_text)]
display_text_start, display_text_end = display_text_range[0], display_text_range[1]
display_text = orig_tweet_text[display_text_start:display_text_end]
prefix_text = orig_tweet_text[0:display_text_start]
suffix_text = orig_tweet_text[display_text_end:len(orig_tweet_text)]
if 'entities' in tweet:
text = tweet['text']
entities = tweet['entities']
# We'll put all the bits of replacement HTML and their starts/ends
# in this list:
entities = []
# Mentions
for entity in sorted(entities['user_mentions'],
key=lambda mention: len(mention['screen_name']), reverse=True):
start, end = entity['indices'][0], entity['indices'][1]
if 'user_mentions' in tweet['entities']:
for entity in tweet['entities']['user_mentions']:
temp = {}
temp['start'] = entity['indices'][0]
temp['end'] = entity['indices'][1]
mention_html = '<a href="https://twitter.com/%(screen_name)s" class="twython-mention">@%(screen_name)s</a>'
text = re.sub(r'(?<!>)' + tweet['text'][start:end] + '(?!</a>)',
mention_html % {'screen_name': entity['screen_name']}, text)
mention_html = '<a href="https://twitter.com/%(screen_name)s" class="twython-mention">@%(screen_name)s</a>' % {'screen_name': entity['screen_name']}
if display_text_start <= temp['start'] <= display_text_end:
temp['replacement'] = mention_html
entities.append(temp)
else:
# Make the '@username' at the start, before
# display_text, into a link:
sub_expr = r'(?<!>)' + orig_tweet_text[temp['start']:temp['end']] + '(?!</a>)'
prefix_text = re.sub(sub_expr, mention_html, prefix_text)
# Hashtags
for entity in sorted(entities['hashtags'],
key=lambda hashtag: len(hashtag['text']), reverse=True):
start, end = entity['indices'][0], entity['indices'][1]
if 'hashtags' in tweet['entities']:
for entity in tweet['entities']['hashtags']:
temp = {}
temp['start'] = entity['indices'][0]
temp['end'] = entity['indices'][1]
hashtag_html = '<a href="https://twitter.com/search?q=%%23%(hashtag)s" class="twython-hashtag">#%(hashtag)s</a>'
text = re.sub(r'(?<!>)' + tweet['text'][start:end] + '(?!</a>)',
hashtag_html % {'hashtag': entity['text']}, text)
url_html = '<a href="https://twitter.com/search?q=%%23%(hashtag)s" class="twython-hashtag">#%(hashtag)s</a>' % {'hashtag': entity['text']}
# Urls
for entity in entities['urls']:
start, end = entity['indices'][0], entity['indices'][1]
if use_display_url and entity.get('display_url') \
and not use_expanded_url:
shown_url = entity['display_url']
elif use_expanded_url and entity.get('expanded_url'):
shown_url = entity['expanded_url']
else:
shown_url = entity['url']
temp['replacement'] = url_html
entities.append(temp)
url_html = '<a href="%s" class="twython-url">%s</a>'
text = text.replace(tweet['text'][start:end],
url_html % (entity['url'], shown_url))
# Symbols
if 'symbols' in tweet['entities']:
for entity in tweet['entities']['symbols']:
temp = {}
temp['start'] = entity['indices'][0]
temp['end'] = entity['indices'][1]
# Media
if 'media' in entities:
for entity in entities['media']:
start, end = entity['indices'][0], entity['indices'][1]
if use_display_url and entity.get('display_url') \
and not use_expanded_url:
url_html = '<a href="https://twitter.com/search?q=%%24%(symbol)s" class="twython-symbol">$%(symbol)s</a>' % {'symbol': entity['text']}
temp['replacement'] = url_html
entities.append(temp)
# URLs
if 'urls' in tweet['entities']:
for entity in tweet['entities']['urls']:
temp = {}
temp['start'] = entity['indices'][0]
temp['end'] = entity['indices'][1]
if use_display_url and entity.get('display_url') and not use_expanded_url:
shown_url = entity['display_url']
elif use_expanded_url and entity.get('expanded_url'):
shown_url = entity['expanded_url']
else:
shown_url = entity['url']
url_html = '<a href="%s" class="twython-media">%s</a>'
text = text.replace(tweet['text'][start:end],
url_html % (entity['url'], shown_url))
url_html = '<a href="%s" class="twython-url">%s</a>' % (entity['url'], shown_url)
if expand_quoted_status and tweet.get('is_quote_status'):
if display_text_start <= temp['start'] <= display_text_end:
temp['replacement'] = url_html
entities.append(temp)
else:
suffix_text = suffix_text.replace(orig_tweet_text[temp['start']:temp['end']], url_html)
if 'media' in tweet['entities']:
for entity in tweet['entities']['media']:
temp = {}
temp['start'] = entity['indices'][0]
temp['end'] = entity['indices'][1]
if use_display_url and entity.get('display_url') and not use_expanded_url:
shown_url = entity['display_url']
elif use_expanded_url and entity.get('expanded_url'):
shown_url = entity['expanded_url']
else:
shown_url = entity['url']
url_html = '<a href="%s" class="twython-media">%s</a>' % (entity['url'], shown_url)
if display_text_start <= temp['start'] <= display_text_end:
temp['replacement'] = url_html
entities.append(temp)
else:
suffix_text = suffix_text.replace(orig_tweet_text[temp['start']:temp['end']], url_html)
# Now do all the replacements, starting from the end, so that the
# start/end indices still work:
for entity in sorted(entities, key=lambda e: e['start'], reverse=True):
display_text = display_text[0:entity['start']] + entity['replacement'] + display_text[entity['end']:]
quote_text = ''
if expand_quoted_status and tweet.get('is_quote_status') and tweet.get('quoted_status'):
quoted_status = tweet['quoted_status']
text += '<blockquote class="twython-quote">%(quote)s<cite><a href="%(quote_tweet_link)s">' \
quote_text += '<blockquote class="twython-quote">%(quote)s<cite><a href="%(quote_tweet_link)s">' \
'<span class="twython-quote-user-name">%(quote_user_name)s</span>' \
'<span class="twython-quote-user-screenname">@%(quote_user_screen_name)s</span></a>' \
'</cite></blockquote>' % \
@ -612,4 +673,9 @@ class Twython(EndpointsMixin, object):
'quote_user_name': quoted_status['user']['name'],
'quote_user_screen_name': quoted_status['user']['screen_name']}
return text
return '%(prefix)s%(display)s%(suffix)s%(quote)s' % {
'prefix': '<span class="twython-tweet-prefix">%s</span>' % prefix_text if prefix_text else '',
'display': display_text,
'suffix': '<span class="twython-tweet-suffix">%s</span>' % suffix_text if suffix_text else '',
'quote': quote_text
}

View File

@ -17,10 +17,12 @@ https://dev.twitter.com/docs/api/1.1
import json
import os
import warnings
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
from io import BytesIO
from time import sleep
#try:
#from StringIO import StringIO
#except ImportError:
#from io import StringIO
from .advisory import TwythonDeprecationWarning
@ -143,6 +145,10 @@ class EndpointsMixin(object):
Docs:
https://dev.twitter.com/rest/reference/post/media/upload
"""
# https://dev.twitter.com/rest/reference/get/media/upload-status
if params and params.get('command', '') == 'STATUS':
return self.get('https://upload.twitter.com/1.1/media/upload.json', params=params)
return self.post('https://upload.twitter.com/1.1/media/upload.json', params=params)
def create_metadata(self, **params):
@ -152,7 +158,7 @@ class EndpointsMixin(object):
params = json.dumps(params)
return self.post("https://upload.twitter.com/1.1/media/metadata/create.json", params=params)
def upload_video(self, media, media_type, size=None):
def upload_video(self, media, media_type, media_category=None, size=None, check_progress=False):
"""Uploads video file to Twitter servers in chunks. The file will be available to be attached
to a status for 60 minutes. To attach to a update, pass a list of returned media ids
to the 'update_status' method using the 'media_ids' param.
@ -177,7 +183,8 @@ class EndpointsMixin(object):
params = {
'command': 'INIT',
'media_type': media_type,
'total_bytes': size
'total_bytes': size,
'media_category': media_category
}
response_init = self.post(upload_url, params=params)
media_id = response_init['media_id']
@ -188,7 +195,7 @@ class EndpointsMixin(object):
data = media.read(1*1024*1024)
if not data:
break
media_chunk = StringIO()
media_chunk = BytesIO()
media_chunk.write(data)
media_chunk.seek(0)
@ -206,7 +213,38 @@ class EndpointsMixin(object):
'command': 'FINALIZE',
'media_id': media_id
}
return self.post(upload_url, params=params)
response = self.post(upload_url, params=params)
# Only get the status if explicity asked to
# Default to False
if check_progress:
# Stage 4: STATUS call if still processing
params = {
'command': 'STATUS',
'media_id': media_id
}
# added code to handle if media_category is NOT set and check_progress=True
# the API will return a NoneType object in this case
try:
processing_state = response.get('processing_info').get('state')
except AttributeError:
return response
if processing_state:
while (processing_state == 'pending' or processing_state == 'in_progress') :
# get the secs to wait
check_after_secs = response.get('processing_info').get('check_after_secs')
if check_after_secs:
sleep(check_after_secs)
response = self.get(upload_url, params=params)
# get new state after waiting
processing_state = response.get('processing_info').get('state')
return response
def get_oembed_tweet(self, **params):
"""Returns information allowing the creation of an embedded