Install Twython from a git repo instead of shipping it in the source code. #273

This commit is contained in:
Manuel Cortez 2018-11-22 12:18:38 -06:00
parent c5e9e97c84
commit 4391e3d3de
11 changed files with 1 additions and 2400 deletions

View File

@ -24,3 +24,4 @@ python-vlc
pywin32
certifi
backports.functools_lru_cache
git+https://github.com/manuelcortez/twython

View File

@ -1,29 +0,0 @@
# ______ __ __
# /_ __/_ __ __ __ / /_ / /_ ____ ____
# / / | | /| / // / / // __// __ \ / __ \ / __ \
# / / | |/ |/ // /_/ // /_ / / / // /_/ // / / /
# /_/ |__/|__/ \__, / \__//_/ /_/ \____//_/ /_/
# /____/
"""
Twython
-------
Twython is a library for Python that wraps the Twitter API.
It aims to abstract away all the API endpoints, so that
additions to the library and/or the Twitter API won't
cause any overall problems.
Questions, comments? ryan@venodesigns.net
"""
__author__ = 'Ryan McGrath <ryan@venodesigns.net>'
__version__ = '3.7.0'
from .api import Twython
from .streaming import TwythonStreamer
from .exceptions import (
TwythonError, TwythonRateLimitError, TwythonAuthError,
TwythonStreamError
)

View File

@ -1,22 +0,0 @@
# -*- coding: utf-8 -*-
"""
twython.advisory
~~~~~~~~~~~~~~~~
This module contains Warning classes for Twython to specifically
alert the user about.
This mainly is because Python 2.7 > mutes DeprecationWarning and when
we deprecate a method or variable in Twython, we want to use to see
the Warning but don't want ALL DeprecationWarnings to appear,
only TwythonDeprecationWarnings.
"""
class TwythonDeprecationWarning(DeprecationWarning):
"""Custom DeprecationWarning to be raised when methods/variables
are being deprecated in Twython. Python 2.7 > ignores DeprecationWarning
so we want to specifcally bubble up ONLY Twython Deprecation Warnings
"""
pass

View File

@ -1,708 +0,0 @@
# -*- coding: utf-8 -*-
"""
twython.api
~~~~~~~~~~~
This module contains functionality for access to core Twitter API calls,
Twitter Authentication, and miscellaneous methods that are useful when
dealing with the Twitter API
"""
import warnings
import re
import requests
from requests.auth import HTTPBasicAuth
from requests_oauthlib import OAuth1, OAuth2
from . import __version__
from .advisory import TwythonDeprecationWarning
from .compat import json, urlencode, parse_qsl, quote_plus, str, is_py2
from .compat import urlsplit
from .endpoints import EndpointsMixin
from .exceptions import TwythonError, TwythonAuthError, TwythonRateLimitError
from .helpers import _transparent_params
warnings.simplefilter('always', TwythonDeprecationWarning) # For Python 2.7 >
class Twython(EndpointsMixin, object):
def __init__(self, app_key=None, app_secret=None, oauth_token=None,
oauth_token_secret=None, access_token=None,
token_type='bearer', oauth_version=1, api_version='1.1',
client_args=None, auth_endpoint='authenticate'):
"""Instantiates an instance of Twython. Takes optional parameters for
authentication and such (see below).
:param app_key: (optional) Your applications key
:param app_secret: (optional) Your applications secret key
:param oauth_token: (optional) When using **OAuth 1**, combined with
oauth_token_secret to make authenticated calls
:param oauth_token_secret: (optional) When using **OAuth 1** combined
with oauth_token to make authenticated calls
:param access_token: (optional) When using **OAuth 2**, provide a
valid access token if you have one
:param token_type: (optional) When using **OAuth 2**, provide your
token type. Default: bearer
:param oauth_version: (optional) Choose which OAuth version to use.
Default: 1
:param api_version: (optional) Choose which Twitter API version to
use. Default: 1.1
:param client_args: (optional) Accepts some requests Session parameters
and some requests Request parameters.
See http://docs.python-requests.org/en/latest/api/#sessionapi
and requests section below it for details.
[ex. headers, proxies, verify(SSL verification)]
:param auth_endpoint: (optional) Lets you select which authentication
endpoint will use your application.
This will allow the application to have DM access
if the endpoint is 'authorize'.
Default: authenticate.
"""
# API urls, OAuth urls and API version; needed for hitting that there
# API.
self.api_version = api_version
self.api_url = 'https://api.twitter.com/%s'
self.app_key = app_key
self.app_secret = app_secret
self.oauth_token = oauth_token
self.oauth_token_secret = oauth_token_secret
self.access_token = access_token
# OAuth 1
self.request_token_url = self.api_url % 'oauth/request_token'
self.access_token_url = self.api_url % 'oauth/access_token'
self.authenticate_url = self.api_url % ('oauth/%s' % auth_endpoint)
if self.access_token: # If they pass an access token, force OAuth 2
oauth_version = 2
self.oauth_version = oauth_version
# OAuth 2
if oauth_version == 2:
self.request_token_url = self.api_url % 'oauth2/token'
self.client_args = client_args or {}
default_headers = {'User-Agent': 'Twython v' + __version__}
if 'headers' not in self.client_args:
# If they didn't set any headers, set our defaults for them
self.client_args['headers'] = default_headers
elif 'User-Agent' not in self.client_args['headers']:
# If they set headers, but didn't include User-Agent.. set
# it for them
self.client_args['headers'].update(default_headers)
# Generate OAuth authentication object for the request
# If no keys/tokens are passed to __init__, auth=None allows for
# unauthenticated requests, although I think all v1.1 requests
# need auth
auth = None
if oauth_version == 1:
# User Authentication is through OAuth 1
if self.app_key is not None and self.app_secret is not None:
auth = OAuth1(self.app_key, self.app_secret,
self.oauth_token, self.oauth_token_secret)
elif oauth_version == 2 and self.access_token:
# Application Authentication is through OAuth 2
token = {'token_type': token_type,
'access_token': self.access_token}
auth = OAuth2(self.app_key, token=token)
self.client = requests.Session()
self.client.auth = auth
# Make a copy of the client args and iterate over them
# Pop out all the acceptable args at this point because they will
# Never be used again.
client_args_copy = self.client_args.copy()
for k, v in client_args_copy.items():
if k in ('cert', 'hooks', 'max_redirects', 'proxies'):
setattr(self.client, k, v)
self.client_args.pop(k) # Pop, pop!
# Headers are always present, so we unconditionally pop them and merge
# them into the session headers.
self.client.headers.update(self.client_args.pop('headers'))
self._last_call = None
def __repr__(self):
return '<Twython: %s>' % (self.app_key)
def _request(self, url, method='GET', params=None, api_call=None, json_encoded=False):
"""Internal request method"""
method = method.lower()
params = params or {}
func = getattr(self.client, method)
if isinstance(params, dict) and json_encoded == False:
params, files = _transparent_params(params)
else:
params = params
files = list()
requests_args = {}
for k, v in self.client_args.items():
# Maybe this should be set as a class variable and only done once?
if k in ('timeout', 'allow_redirects', 'stream', 'verify'):
requests_args[k] = v
if method == 'get' or method == 'delete':
requests_args['params'] = params
else:
# Check for json_encoded so we will sent params as "data" or "json"
if json_encoded:
data_key = "json"
else:
data_key = "data"
requests_args.update({
data_key: params,
'files': files,
})
try:
response = func(url, **requests_args)
except requests.RequestException as e:
raise TwythonError(str(e))
# create stash for last function intel
self._last_call = {
'api_call': api_call,
'api_error': None,
'cookies': response.cookies,
'headers': response.headers,
'status_code': response.status_code,
'url': response.url,
'content': response.text,
}
# greater than 304 (not modified) is an error
if response.status_code > 304:
error_message = self._get_error_message(response)
self._last_call['api_error'] = error_message
ExceptionType = TwythonError
if response.status_code == 429:
# Twitter API 1.1, always return 429 when
# rate limit is exceeded
ExceptionType = TwythonRateLimitError
elif response.status_code == 401 or 'Bad Authentication data' \
in error_message:
# Twitter API 1.1, returns a 401 Unauthorized or
# a 400 "Bad Authentication data" for invalid/expired
# app keys/user tokens
ExceptionType = TwythonAuthError
raise ExceptionType(
error_message,
error_code=response.status_code,
retry_after=response.headers.get('X-Rate-Limit-Reset'))
content = ''
try:
if response.status_code == 204:
content = response.content
else:
content = response.json()
except ValueError:
if response.content != '':
raise TwythonError('Response was not valid JSON. \
Unable to decode.')
return content
def _get_error_message(self, response):
"""Parse and return the first error message"""
error_message = 'An error occurred processing your request.'
try:
content = response.json()
# {"errors":[{"code":34,"message":"Sorry,
# that page does not exist"}]}
error_message = content['errors'][0]['message']
except TypeError:
error_message = content['errors']
except ValueError:
# bad json data from Twitter for an error
pass
except (KeyError, IndexError):
# missing data so fallback to default message
pass
return error_message
def request(self, endpoint, method='GET', params=None, version='1.1', json_encoded=False):
"""Return dict of response received from Twitter's API
:param endpoint: (required) Full url or Twitter API endpoint
(e.g. search/tweets)
:type endpoint: string
:param method: (optional) Method of accessing data, either
GET, POST or DELETE. (default GET)
:type method: string
:param params: (optional) Dict of parameters (if any) accepted
the by Twitter API endpoint you are trying to
access (default None)
:type params: dict or None
:param version: (optional) Twitter API version to access
(default 1.1)
:type version: string
:param json_encoded: (optional) Flag to indicate if this method should send data encoded as json
(default False)
:type json_encoded: bool
:rtype: dict
"""
if endpoint.startswith('http://'):
raise TwythonError('api.twitter.com is restricted to SSL/TLS traffic.')
# In case they want to pass a full Twitter URL
# i.e. https://api.twitter.com/1.1/search/tweets.json
if endpoint.startswith('https://'):
url = endpoint
else:
url = '%s/%s.json' % (self.api_url % version, endpoint)
content = self._request(url, method=method, params=params,
api_call=url, json_encoded=json_encoded)
return content
def get(self, endpoint, params=None, version='1.1'):
"""Shortcut for GET requests via :class:`request`"""
return self.request(endpoint, params=params, version=version)
def post(self, endpoint, params=None, version='1.1', json_encoded=False):
"""Shortcut for POST requests via :class:`request`"""
return self.request(endpoint, 'POST', params=params, version=version, json_encoded=json_encoded)
def delete(self, endpoint, params=None, version='1.1', json_encoded=False):
"""Shortcut for delete requests via :class:`request`"""
return self.request(endpoint, 'DELETE', params=params, version=version, json_encoded=json_encoded)
def get_lastfunction_header(self, header, default_return_value=None):
"""Returns a specific header from the last API call
This will return None if the header is not present
:param header: (required) The name of the header you want to get
the value of
Most useful for the following header information:
x-rate-limit-limit,
x-rate-limit-remaining,
x-rate-limit-class,
x-rate-limit-reset
"""
if self._last_call is None:
raise TwythonError('This function must be called after an API call. \
It delivers header information.')
return self._last_call['headers'].get(header, default_return_value)
def get_authentication_tokens(self, callback_url=None, force_login=False,
screen_name=''):
"""Returns a dict including an authorization URL, ``auth_url``, to
direct a user to
:param callback_url: (optional) Url the user is returned to after
they authorize your app (web clients only)
:param force_login: (optional) Forces the user to enter their
credentials to ensure the correct users
account is authorized.
:param screen_name: (optional) If forced_login is set OR user is
not currently logged in, Prefills the username
input box of the OAuth login screen with the
given value
:rtype: dict
"""
if self.oauth_version != 1:
raise TwythonError('This method can only be called when your \
OAuth version is 1.0.')
request_args = {}
if callback_url:
request_args['oauth_callback'] = callback_url
response = self.client.get(self.request_token_url, params=request_args)
if response.status_code == 401:
raise TwythonAuthError(response.content,
error_code=response.status_code)
elif response.status_code != 200:
raise TwythonError(response.content,
error_code=response.status_code)
request_tokens = dict(parse_qsl(response.content.decode('utf-8')))
if not request_tokens:
raise TwythonError('Unable to decode request tokens.')
oauth_callback_confirmed = request_tokens.get('oauth_callback_confirmed') \
== 'true'
auth_url_params = {
'oauth_token': request_tokens['oauth_token'],
}
if force_login:
auth_url_params.update({
'force_login': force_login,
'screen_name': screen_name
})
# Use old-style callback argument if server didn't accept new-style
if callback_url and not oauth_callback_confirmed:
auth_url_params['oauth_callback'] = self.callback_url
request_tokens['auth_url'] = self.authenticate_url + \
'?' + urlencode(auth_url_params)
return request_tokens
def get_authorized_tokens(self, oauth_verifier):
"""Returns a dict of authorized tokens after they go through the
:class:`get_authentication_tokens` phase.
:param oauth_verifier: (required) The oauth_verifier (or a.k.a PIN
for non web apps) retrieved from the callback url querystring
:rtype: dict
"""
if self.oauth_version != 1:
raise TwythonError('This method can only be called when your \
OAuth version is 1.0.')
response = self.client.get(self.access_token_url,
params={'oauth_verifier': oauth_verifier},
headers={'Content-Type': 'application/\
json'})
if response.status_code == 401:
try:
try:
# try to get json
content = response.json()
except AttributeError: # pragma: no cover
# if unicode detected
content = json.loads(response.content)
except ValueError:
content = {}
raise TwythonError(content.get('error', 'Invalid / expired To \
ken'), error_code=response.status_code)
authorized_tokens = dict(parse_qsl(response.content.decode('utf-8')))
if not authorized_tokens:
raise TwythonError('Unable to decode authorized tokens.')
return authorized_tokens # pragma: no cover
def obtain_access_token(self):
"""Returns an OAuth 2 access token to make OAuth 2 authenticated
read-only calls.
:rtype: string
"""
if self.oauth_version != 2:
raise TwythonError('This method can only be called when your \
OAuth version is 2.0.')
data = {'grant_type': 'client_credentials'}
basic_auth = HTTPBasicAuth(self.app_key, self.app_secret)
try:
response = self.client.post(self.request_token_url,
data=data, auth=basic_auth)
content = response.content.decode('utf-8')
try:
content = content.json()
except AttributeError:
content = json.loads(content)
access_token = content['access_token']
except (KeyError, ValueError, requests.exceptions.RequestException):
raise TwythonAuthError('Unable to obtain OAuth 2 access token.')
else:
return access_token
@staticmethod
def construct_api_url(api_url, **params):
"""Construct a Twitter API url, encoded, with parameters
:param api_url: URL of the Twitter API endpoint you are attempting
to construct
:param \*\*params: Parameters that are accepted by Twitter for the
endpoint you're requesting
:rtype: string
Usage::
>>> from twython import Twython
>>> twitter = Twython()
>>> api_url = 'https://api.twitter.com/1.1/search/tweets.json'
>>> constructed_url = twitter.construct_api_url(api_url, q='python',
result_type='popular')
>>> print constructed_url
https://api.twitter.com/1.1/search/tweets.json?q=python&result_type=popular
"""
querystring = []
params, _ = _transparent_params(params or {})
params = requests.utils.to_key_val_list(params)
for (k, v) in params:
querystring.append(
'%s=%s' % (Twython.encode(k), quote_plus(Twython.encode(v)))
)
return '%s?%s' % (api_url, '&'.join(querystring))
def search_gen(self, search_query, **params): # pragma: no cover
warnings.warn(
'This method is deprecated. You should use Twython.cursor instead. \
[eg. Twython.cursor(Twython.search, q=\'your_query\')]',
TwythonDeprecationWarning,
stacklevel=2
)
return self.cursor(self.search, q=search_query, **params)
def cursor(self, function, return_pages=False, **params):
"""Returns a generator for results that match a specified query.
:param function: Instance of a Twython function
(Twython.get_home_timeline, Twython.search)
:param \*\*params: Extra parameters to send with your request
(usually parameters accepted by the Twitter API endpoint)
:rtype: generator
Usage::
>>> from twython import Twython
>>> twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN,
OAUTH_TOKEN_SECRET)
>>> results = twitter.cursor(twitter.search, q='python')
>>> for result in results:
>>> print result
"""
if not callable(function):
raise TypeError('.cursor() takes a Twython function as its first \
argument. Did you provide the result of a \
function call?')
if not hasattr(function, 'iter_mode'):
raise TwythonError('Unable to create generator for Twython \
method "%s"' % function.__name__)
while True:
content = function(**params)
if not content:
raise StopIteration
if hasattr(function, 'iter_key'):
results = content.get(function.iter_key)
else:
results = content
if return_pages:
yield results
else:
for result in results:
yield result
if function.iter_mode == 'cursor' and \
content['next_cursor_str'] == '0':
raise StopIteration
try:
if function.iter_mode == 'id':
# Set max_id in params to one less than lowest tweet id
if hasattr(function, 'iter_metadata'):
# Get supplied next max_id
metadata = content.get(function.iter_metadata)
if 'next_results' in metadata:
next_results = urlsplit(metadata['next_results'])
params = dict(parse_qsl(next_results.query))
else:
# No more results
raise StopIteration
else:
# Twitter gives tweets in reverse chronological order:
params['max_id'] = str(int(content[-1]['id_str']) - 1)
elif function.iter_mode == 'cursor':
params['cursor'] = content['next_cursor_str']
except (TypeError, ValueError): # pragma: no cover
raise TwythonError('Unable to generate next page of search \
results, `page` is not a number.')
except (KeyError, AttributeError): #pragma no cover
raise TwythonError('Unable to generate next page of search \
results, content has unexpected structure.')
@staticmethod
def unicode2utf8(text):
try:
if is_py2 and isinstance(text, str):
text = text.encode('utf-8')
except:
pass
return text
@staticmethod
def encode(text):
if is_py2 and isinstance(text, (str)):
return Twython.unicode2utf8(text)
return str(text)
@staticmethod
def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False, expand_quoted_status=False):
"""Return HTML for a tweet (urls, mentions, hashtags, symbols replaced with links)
:param tweet: Tweet object from received from Twitter API
:param use_display_url: Use display URL to represent link
(ex. google.com, github.com). Default: True
:param use_expanded_url: Use expanded URL to represent link
(e.g. http://google.com). Default False
If use_expanded_url is True, it overrides use_display_url.
If use_display_url and use_expanded_url is False, short url will
be used (t.co/xxxxx)
"""
if 'retweeted_status' in tweet:
tweet = tweet['retweeted_status']
if 'extended_tweet' in tweet:
tweet = tweet['extended_tweet']
orig_tweet_text = tweet.get('full_text') or tweet['text']
display_text_range = tweet.get('display_text_range') or [0, len(orig_tweet_text)]
display_text_start, display_text_end = display_text_range[0], display_text_range[1]
display_text = orig_tweet_text[display_text_start:display_text_end]
prefix_text = orig_tweet_text[0:display_text_start]
suffix_text = orig_tweet_text[display_text_end:len(orig_tweet_text)]
if 'entities' in tweet:
# We'll put all the bits of replacement HTML and their starts/ends
# in this list:
entities = []
# Mentions
if 'user_mentions' in tweet['entities']:
for entity in tweet['entities']['user_mentions']:
temp = {}
temp['start'] = entity['indices'][0]
temp['end'] = entity['indices'][1]
mention_html = '<a href="https://twitter.com/%(screen_name)s" class="twython-mention">@%(screen_name)s</a>' % {'screen_name': entity['screen_name']}
if display_text_start <= temp['start'] <= display_text_end:
temp['replacement'] = mention_html
temp['start'] -= display_text_start
temp['end'] -= display_text_start
entities.append(temp)
else:
# Make the '@username' at the start, before
# display_text, into a link:
sub_expr = r'(?<!>)' + orig_tweet_text[temp['start']:temp['end']] + '(?!</a>)'
prefix_text = re.sub(sub_expr, mention_html, prefix_text)
# Hashtags
if 'hashtags' in tweet['entities']:
for entity in tweet['entities']['hashtags']:
temp = {}
temp['start'] = entity['indices'][0] - display_text_start
temp['end'] = entity['indices'][1] - display_text_start
url_html = '<a href="https://twitter.com/search?q=%%23%(hashtag)s" class="twython-hashtag">#%(hashtag)s</a>' % {'hashtag': entity['text']}
temp['replacement'] = url_html
entities.append(temp)
# Symbols
if 'symbols' in tweet['entities']:
for entity in tweet['entities']['symbols']:
temp = {}
temp['start'] = entity['indices'][0] - display_text_start
temp['end'] = entity['indices'][1] - display_text_start
url_html = '<a href="https://twitter.com/search?q=%%24%(symbol)s" class="twython-symbol">$%(symbol)s</a>' % {'symbol': entity['text']}
temp['replacement'] = url_html
entities.append(temp)
# URLs
if 'urls' in tweet['entities']:
for entity in tweet['entities']['urls']:
temp = {}
temp['start'] = entity['indices'][0] - display_text_start
temp['end'] = entity['indices'][1] - display_text_start
if use_display_url and entity.get('display_url') and not use_expanded_url:
shown_url = entity['display_url']
elif use_expanded_url and entity.get('expanded_url'):
shown_url = entity['expanded_url']
else:
shown_url = entity['url']
url_html = '<a href="%s" class="twython-url">%s</a>' % (entity['url'], shown_url)
if display_text_start <= temp['start'] <= display_text_end:
temp['replacement'] = url_html
entities.append(temp)
else:
suffix_text = suffix_text.replace(orig_tweet_text[temp['start']:temp['end']], url_html)
if 'media' in tweet['entities'] and len(tweet['entities']['media']) > 0:
# We just link to the overall URL for the tweet's media,
# rather than to each individual item.
# So, we get the URL from the first media item:
entity = tweet['entities']['media'][0]
temp = {}
temp['start'] = entity['indices'][0]
temp['end'] = entity['indices'][1]
if use_display_url and entity.get('display_url') and not use_expanded_url:
shown_url = entity['display_url']
elif use_expanded_url and entity.get('expanded_url'):
shown_url = entity['expanded_url']
else:
shown_url = entity['url']
url_html = '<a href="%s" class="twython-media">%s</a>' % (entity['url'], shown_url)
if display_text_start <= temp['start'] <= display_text_end:
temp['replacement'] = url_html
entities.append(temp)
else:
suffix_text = suffix_text.replace(orig_tweet_text[temp['start']:temp['end']], url_html)
# Now do all the replacements, starting from the end, so that the
# start/end indices still work:
for entity in sorted(entities, key=lambda e: e['start'], reverse=True):
display_text = display_text[0:entity['start']] + entity['replacement'] + display_text[entity['end']:]
quote_text = ''
if expand_quoted_status and tweet.get('is_quote_status') and tweet.get('quoted_status'):
quoted_status = tweet['quoted_status']
quote_text += '<blockquote class="twython-quote">%(quote)s<cite><a href="%(quote_tweet_link)s">' \
'<span class="twython-quote-user-name">%(quote_user_name)s</span>' \
'<span class="twython-quote-user-screenname">@%(quote_user_screen_name)s</span></a>' \
'</cite></blockquote>' % \
{'quote': Twython.html_for_tweet(quoted_status, use_display_url, use_expanded_url, False),
'quote_tweet_link': 'https://twitter.com/%s/status/%s' %
(quoted_status['user']['screen_name'], quoted_status['id_str']),
'quote_user_name': quoted_status['user']['name'],
'quote_user_screen_name': quoted_status['user']['screen_name']}
return '%(prefix)s%(display)s%(suffix)s%(quote)s' % {
'prefix': '<span class="twython-tweet-prefix">%s</span>' % prefix_text if prefix_text else '',
'display': display_text,
'suffix': '<span class="twython-tweet-suffix">%s</span>' % suffix_text if suffix_text else '',
'quote': quote_text
}

View File

@ -1,40 +0,0 @@
# -*- coding: utf-8 -*-
"""
twython.compat
~~~~~~~~~~~~~~
This module contains imports and declarations for seamless Python 2 and
Python 3 compatibility.
"""
import sys
_ver = sys.version_info
#: Python 2.x?
is_py2 = (_ver[0] == 2)
#: Python 3.x?
is_py3 = (_ver[0] == 3)
try:
import simplejson as json
except ImportError:
import json
if is_py2:
from urllib import urlencode, quote_plus
from urlparse import parse_qsl, urlsplit
str = unicode
basestring = basestring
numeric_types = (int, long, float)
elif is_py3:
from urllib.parse import urlencode, quote_plus, parse_qsl, urlsplit
str = str
basestring = (str, bytes)
numeric_types = (int, float)

File diff suppressed because it is too large Load Diff

View File

@ -1,61 +0,0 @@
# -*- coding: utf-8 -*-
"""
twython.exceptions
~~~~~~~~~~~~~~~~~~
This module contains Twython specific Exception classes.
"""
from .endpoints import TWITTER_HTTP_STATUS_CODE
class TwythonError(Exception):
"""Generic error class, catch-all for most Twython issues.
Special cases are handled by TwythonAuthError & TwythonRateLimitError.
from twython import TwythonError, TwythonRateLimitError, TwythonAuthError
"""
def __init__(self, msg, error_code=None, retry_after=None):
self.error_code = error_code
if error_code is not None and error_code in TWITTER_HTTP_STATUS_CODE:
msg = 'Twitter API returned a %s (%s), %s' % \
(error_code,
TWITTER_HTTP_STATUS_CODE[error_code][0],
msg)
super(TwythonError, self).__init__(msg)
@property
def msg(self): # pragma: no cover
return self.args[0]
class TwythonAuthError(TwythonError):
"""Raised when you try to access a protected resource and it fails due to
some issue with your authentication.
"""
pass
class TwythonRateLimitError(TwythonError): # pragma: no cover
"""Raised when you've hit a rate limit.
The amount of seconds to retry your request in will be appended
to the message.
"""
def __init__(self, msg, error_code, retry_after=None):
if isinstance(retry_after, int):
msg = '%s (Retry after %d seconds)' % (msg, retry_after)
TwythonError.__init__(self, msg, error_code=error_code)
self.retry_after = retry_after
class TwythonStreamError(TwythonError):
"""Raised when an invalid response from the Stream API is received"""
pass

View File

@ -1,34 +0,0 @@
# -*- coding: utf-8 -*-
"""
twython.helpers
~~~~~~~~~~~~~~~
This module contains functions that are repeatedly used throughout
the Twython library.
"""
from .compat import basestring, numeric_types
def _transparent_params(_params):
params = {}
files = {}
for k, v in _params.items():
if hasattr(v, 'read') and callable(v.read):
files[k] = v # pragma: no cover
elif isinstance(v, bool):
if v:
params[k] = 'true'
else:
params[k] = 'false'
elif isinstance(v, basestring) or isinstance(v, numeric_types):
params[k] = v
elif isinstance(v, list):
try:
params[k] = ','.join(v)
except TypeError:
params[k] = ','.join(map(str, v))
else:
continue # pragma: no cover
return params, files

View File

@ -1 +0,0 @@
from .api import TwythonStreamer

View File

@ -1,201 +0,0 @@
# -*- coding: utf-8 -*-
"""
twython.streaming.api
~~~~~~~~~~~~~~~~~~~~~
This module contains functionality for interfacing with streaming
Twitter API calls.
"""
from .. import __version__
from ..compat import json, is_py3
from ..helpers import _transparent_params
from .types import TwythonStreamerTypes
import requests
from requests_oauthlib import OAuth1
import time
class TwythonStreamer(object):
def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret,
timeout=300, retry_count=None, retry_in=10, client_args=None,
handlers=None, chunk_size=1):
"""Streaming class for a friendly streaming user experience
Authentication IS required to use the Twitter Streaming API
:param app_key: (required) Your applications key
:param app_secret: (required) Your applications secret key
:param oauth_token: (required) Used with oauth_token_secret to make
authenticated calls
:param oauth_token_secret: (required) Used with oauth_token to make
authenticated calls
:param timeout: (optional) How long (in secs) the streamer should wait
for a response from Twitter Streaming API
:param retry_count: (optional) Number of times the API call should be
retired
:param retry_in: (optional) Amount of time (in secs) the previous
API call should be tried again
:param client_args: (optional) Accepts some requests Session
parameters and some requests Request parameters.
See
http://docs.python-requests.org/en/latest/api/#sessionapi
and requests section below it for details.
[ex. headers, proxies, verify(SSL verification)]
:param handlers: (optional) Array of message types for which
corresponding handlers will be called
:param chunk_size: (optional) Define the buffer size before data is
actually returned from the Streaming API. Default: 1
"""
self.auth = OAuth1(app_key, app_secret,
oauth_token, oauth_token_secret)
self.client_args = client_args or {}
default_headers = {'User-Agent': 'Twython Streaming v' + __version__}
if 'headers' not in self.client_args:
# If they didn't set any headers, set our defaults for them
self.client_args['headers'] = default_headers
elif 'User-Agent' not in self.client_args['headers']:
# If they set headers, but didn't include User-Agent..
# set it for them
self.client_args['headers'].update(default_headers)
self.client_args['timeout'] = timeout
self.client = requests.Session()
self.client.auth = self.auth
self.client.stream = True
# Make a copy of the client args and iterate over them
# Pop out all the acceptable args at this point because they will
# Never be used again.
client_args_copy = self.client_args.copy()
for k, v in client_args_copy.items():
if k in ('cert', 'headers', 'hooks', 'max_redirects', 'proxies'):
setattr(self.client, k, v)
self.client_args.pop(k) # Pop, pop!
self.api_version = '1.1'
self.retry_in = retry_in
self.retry_count = retry_count
# Set up type methods
StreamTypes = TwythonStreamerTypes(self)
self.statuses = StreamTypes.statuses
self.user = StreamTypes.user
self.site = StreamTypes.site
self.connected = False
self.handlers = handlers if handlers else \
['delete', 'limit', 'disconnect']
self.chunk_size = chunk_size
def _request(self, url, method='GET', params=None):
"""Internal stream request handling"""
self.connected = True
retry_counter = 0
method = method.lower()
func = getattr(self.client, method)
params, _ = _transparent_params(params)
def _send(retry_counter):
requests_args = {}
for k, v in self.client_args.items():
# Maybe this should be set as a class
# variable and only done once?
if k in ('timeout', 'allow_redirects', 'verify'):
requests_args[k] = v
while self.connected:
try:
if method == 'get':
requests_args['params'] = params
else:
requests_args['data'] = params
response = func(url, **requests_args)
except requests.exceptions.Timeout:
self.on_timeout()
else:
if response.status_code != 200:
self.on_error(response.status_code, response.content)
if self.retry_count and \
(self.retry_count - retry_counter) > 0:
time.sleep(self.retry_in)
retry_counter += 1
_send(retry_counter)
return response
while self.connected:
response = _send(retry_counter)
for line in response.iter_lines(self.chunk_size):
if not self.connected:
break
if line:
try:
if is_py3:
line = line.decode('utf-8')
data = json.loads(line)
except ValueError: # pragma: no cover
self.on_error(response.status_code,
'Unable to decode response, \
not valid JSON.')
else:
if self.on_success(data): # pragma: no cover
for message_type in self.handlers:
if message_type in data:
handler = getattr(self,
'on_' + message_type,
None)
if handler \
and callable(handler) \
and not handler(data.get(message_type)):
break
response.close()
def on_success(self, data): # pragma: no cover
"""Called when data has been successfully received from the stream.
Returns True if other handlers for this message should be invoked.
Feel free to override this to handle your streaming data how you
want it handled. See
https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
for messages sent along in stream responses.
:param data: data recieved from the stream
:type data: dict
"""
return True
def on_error(self, status_code, data): # pragma: no cover
"""Called when stream returns non-200 status code
Feel free to override this to handle your streaming data how you
want it handled.
:param status_code: Non-200 status code sent from stream
:type status_code: int
:param data: Error message sent from stream
:type data: dict
"""
return
def on_timeout(self): # pragma: no cover
""" Called when the request has timed out """
return
def disconnect(self):
"""Used to disconnect the streaming client manually"""
self.connected = False

View File

@ -1,108 +0,0 @@
# -*- coding: utf-8 -*-
"""
twython.streaming.types
~~~~~~~~~~~~~~~~~~~~~~~
This module contains classes and methods for :class:`TwythonStreamer` to use.
"""
class TwythonStreamerTypes(object):
"""Class for different stream endpoints
Not all streaming endpoints have nested endpoints.
User Streams and Site Streams are single streams with no nested endpoints
Status Streams include filter, sample and firehose endpoints
"""
def __init__(self, streamer):
self.streamer = streamer
self.statuses = TwythonStreamerTypesStatuses(streamer)
def user(self, **params):
"""Stream user
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/user
"""
url = 'https://userstream.twitter.com/%s/user.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
def site(self, **params):
"""Stream site
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/site
"""
url = 'https://sitestream.twitter.com/%s/site.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
class TwythonStreamerTypesStatuses(object):
"""Class for different statuses endpoints
Available so :meth:`TwythonStreamer.statuses.filter()` is available.
Just a bit cleaner than :meth:`TwythonStreamer.statuses_filter()`,
:meth:`statuses_sample()`, etc. all being single methods in
:class:`TwythonStreamer`.
"""
def __init__(self, streamer):
self.streamer = streamer
self.params = None
def filter(self, **params):
"""Stream statuses/filter
:param \*\*params: Parameters to send with your stream request
Accepted params found at:
https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter
"""
url = 'https://stream.twitter.com/%s/statuses/filter.json' \
% self.streamer.api_version
self.streamer._request(url, 'POST', params=params)
def sample(self, **params):
"""Stream statuses/sample
:param \*\*params: Parameters to send with your stream request
Accepted params found at:
https://developer.twitter.com/en/docs/tweets/sample-realtime/api-reference/get-statuses-sample
"""
url = 'https://stream.twitter.com/%s/statuses/sample.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
def firehose(self, **params):
"""Stream statuses/firehose
:param \*\*params: Parameters to send with your stream request
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/statuses/firehose
"""
url = 'https://stream.twitter.com/%s/statuses/firehose.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
def set_dynamic_filter(self, **params):
"""Set/update statuses/filter
:param \*\*params: Parameters to send with your stream request
Accepted params found at:
https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter
"""
self.params = params
def dynamic_filter(self):
"""Stream statuses/filter with dynamic parameters"""
url = 'https://stream.twitter.com/%s/statuses/filter.json' \
% self.streamer.api_version
self.streamer._request(url, 'POST', params=self.params)