Putting all the code from the current master branch of TWBlue

This commit is contained in:
2014-10-27 16:29:04 -06:00
parent 58c82e5486
commit 1af4a8b291
284 changed files with 58760 additions and 0 deletions

29
src/twython/__init__.py Normal file
View File

@@ -0,0 +1,29 @@
# ______ __ __
# /_ __/_ __ __ __ / /_ / /_ ____ ____
# / / | | /| / // / / // __// __ \ / __ \ / __ \
# / / | |/ |/ // /_/ // /_ / / / // /_/ // / / /
# /_/ |__/|__/ \__, / \__//_/ /_/ \____//_/ /_/
# /____/
"""
Twython
-------
Twython is a library for Python that wraps the Twitter API.
It aims to abstract away all the API endpoints, so that
additions to the library and/or the Twitter API won't
cause any overall problems.
Questions, comments? ryan@venodesigns.net
"""
__author__ = 'Ryan McGrath <ryan@venodesigns.net>'
__version__ = '3.1.2'
from .api import Twython
from .streaming import TwythonStreamer
from .exceptions import (
TwythonError, TwythonRateLimitError, TwythonAuthError,
TwythonStreamError
)

22
src/twython/advisory.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
twython.advisory
~~~~~~~~~~~~~~~~
This module contains Warning classes for Twython to specifically
alert the user about.
This mainly is because Python 2.7 > mutes DeprecationWarning and when
we deprecate a method or variable in Twython, we want to use to see
the Warning but don't want ALL DeprecationWarnings to appear,
only TwythonDeprecationWarnings.
"""
class TwythonDeprecationWarning(DeprecationWarning):
"""Custom DeprecationWarning to be raised when methods/variables
are being deprecated in Twython. Python 2.7 > ignores DeprecationWarning
so we want to specifcally bubble up ONLY Twython Deprecation Warnings
"""
pass

579
src/twython/api.py Normal file
View File

@@ -0,0 +1,579 @@
# -*- coding: utf-8 -*-
"""
twython.api
~~~~~~~~~~~
This module contains functionality for access to core Twitter API calls,
Twitter Authentication, and miscellaneous methods that are useful when
dealing with the Twitter API
"""
import requests
from requests.auth import HTTPBasicAuth
from requests_oauthlib import OAuth1, OAuth2
from . import __version__
from .advisory import TwythonDeprecationWarning
from .compat import json, urlencode, parse_qsl, quote_plus, str, is_py2
from .endpoints import EndpointsMixin
from .exceptions import TwythonError, TwythonAuthError, TwythonRateLimitError
from .helpers import _transparent_params
import warnings
warnings.simplefilter('always', TwythonDeprecationWarning) # For Python 2.7 >
class Twython(EndpointsMixin, object):
def __init__(self, app_key=None, app_secret=None, oauth_token=None,
oauth_token_secret=None, access_token=None,
token_type='bearer', oauth_version=1, api_version='1.1',
client_args=None, auth_endpoint='authenticate'):
"""Instantiates an instance of Twython. Takes optional parameters for
authentication and such (see below).
:param app_key: (optional) Your applications key
:param app_secret: (optional) Your applications secret key
:param oauth_token: (optional) When using **OAuth 1**, combined with
oauth_token_secret to make authenticated calls
:param oauth_token_secret: (optional) When using **OAuth 1** combined
with oauth_token to make authenticated calls
:param access_token: (optional) When using **OAuth 2**, provide a
valid access token if you have one
:param token_type: (optional) When using **OAuth 2**, provide your
token type. Default: bearer
:param oauth_version: (optional) Choose which OAuth version to use.
Default: 1
:param api_version: (optional) Choose which Twitter API version to
use. Default: 1.1
:param client_args: (optional) Accepts some requests Session parameters
and some requests Request parameters.
See http://docs.python-requests.org/en/latest/api/#sessionapi
and requests section below it for details.
[ex. headers, proxies, verify(SSL verification)]
:param auth_endpoint: (optional) Lets you select which authentication
endpoint will use your application.
This will allow the application to have DM access
if the endpoint is 'authorize'.
Default: authenticate.
"""
# API urls, OAuth urls and API version; needed for hitting that there
# API.
self.api_version = api_version
self.api_url = 'https://api.twitter.com/%s'
self.app_key = app_key
self.app_secret = app_secret
self.oauth_token = oauth_token
self.oauth_token_secret = oauth_token_secret
self.access_token = access_token
# OAuth 1
self.request_token_url = self.api_url % 'oauth/request_token'
self.access_token_url = self.api_url % 'oauth/access_token'
self.authenticate_url = self.api_url % ('oauth/%s' % auth_endpoint)
if self.access_token: # If they pass an access token, force OAuth 2
oauth_version = 2
self.oauth_version = oauth_version
# OAuth 2
if oauth_version == 2:
self.request_token_url = self.api_url % 'oauth2/token'
self.client_args = client_args or {}
default_headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0'}
# 'Twython v' + __version__}
if 'headers' not in self.client_args:
# If they didn't set any headers, set our defaults for them
self.client_args['headers'] = default_headers
elif 'User-Agent' not in self.client_args['headers']:
# If they set headers, but didn't include User-Agent.. set
# it for them
self.client_args['headers'].update(default_headers)
# Generate OAuth authentication object for the request
# If no keys/tokens are passed to __init__, auth=None allows for
# unauthenticated requests, although I think all v1.1 requests
# need auth
auth = None
if oauth_version == 1:
# User Authentication is through OAuth 1
if self.app_key is not None and self.app_secret is not None and \
self.oauth_token is None and self.oauth_token_secret is None:
auth = OAuth1(self.app_key, self.app_secret)
if self.app_key is not None and self.app_secret is not None and \
self.oauth_token is not None and self.oauth_token_secret is \
not None:
auth = OAuth1(self.app_key, self.app_secret,
self.oauth_token, self.oauth_token_secret)
elif oauth_version == 2 and self.access_token:
# Application Authentication is through OAuth 2
token = {'token_type': token_type,
'access_token': self.access_token}
auth = OAuth2(self.app_key, token=token)
self.client = requests.Session()
self.client.auth = auth
# Make a copy of the client args and iterate over them
# Pop out all the acceptable args at this point because they will
# Never be used again.
client_args_copy = self.client_args.copy()
for k, v in client_args_copy.items():
if k in ('cert', 'hooks', 'max_redirects', 'proxies'):
setattr(self.client, k, v)
self.client_args.pop(k) # Pop, pop!
# Headers are always present, so we unconditionally pop them and merge
# them into the session headers.
self.client.headers.update(self.client_args.pop('headers'))
self._last_call = None
def __repr__(self):
return '<Twython: %s>' % (self.app_key)
def _request(self, url, method='GET', params=None, api_call=None):
"""Internal request method"""
method = method.lower()
params = params or {}
func = getattr(self.client, method)
params, files = _transparent_params(params)
requests_args = {}
for k, v in self.client_args.items():
# Maybe this should be set as a class variable and only done once?
if k in ('timeout', 'allow_redirects', 'stream', 'verify'):
requests_args[k] = v
if method == 'get':
requests_args['params'] = params
else:
requests_args.update({
'data': params,
'files': files,
})
try:
response = func(url, **requests_args)
except requests.RequestException as e:
raise TwythonError(str(e))
# create stash for last function intel
self._last_call = {
'api_call': api_call,
'api_error': None,
'cookies': response.cookies,
'headers': response.headers,
'status_code': response.status_code,
'url': response.url,
'content': response.text,
}
# greater than 304 (not modified) is an error
if response.status_code > 304:
error_message = self._get_error_message(response)
self._last_call['api_error'] = error_message
ExceptionType = TwythonError
if response.status_code == 429:
# Twitter API 1.1, always return 429 when
# rate limit is exceeded
ExceptionType = TwythonRateLimitError
elif response.status_code == 401 or 'Bad Authentication data' \
in error_message:
# Twitter API 1.1, returns a 401 Unauthorized or
# a 400 "Bad Authentication data" for invalid/expired
# app keys/user tokens
ExceptionType = TwythonAuthError
raise ExceptionType(error_message,
error_code=response.status_code,
retry_after=response.headers.get('retry-\
after'))
try:
content = response.json()
except ValueError:
raise TwythonError('Response was not valid JSON. \
Unable to decode.')
return content
def _get_error_message(self, response):
"""Parse and return the first error message"""
error_message = 'An error occurred processing your request.'
try:
content = response.json()
# {"errors":[{"code":34,"message":"Sorry,
# that page does not exist"}]}
error_message = content['errors'][0]['message']
except ValueError:
# bad json data from Twitter for an error
pass
except (KeyError, IndexError):
# missing data so fallback to default message
pass
return error_message
def request(self, endpoint, method='GET', params=None, version='1.1'):
"""Return dict of response received from Twitter's API
:param endpoint: (required) Full url or Twitter API endpoint
(e.g. search/tweets)
:type endpoint: string
:param method: (optional) Method of accessing data, either
GET or POST. (default GET)
:type method: string
:param params: (optional) Dict of parameters (if any) accepted
the by Twitter API endpoint you are trying to
access (default None)
:type params: dict or None
:param version: (optional) Twitter API version to access
(default 1.1)
:type version: string
:rtype: dict
"""
# In case they want to pass a full Twitter URL
# i.e. https://api.twitter.com/1.1/search/tweets.json
if endpoint.startswith('http://') or endpoint.startswith('https://'):
url = endpoint
else:
url = '%s/%s.json' % (self.api_url % version, endpoint)
content = self._request(url, method=method, params=params,
api_call=url)
return content
def get(self, endpoint, params=None, version='1.1'):
"""Shortcut for GET requests via :class:`request`"""
return self.request(endpoint, params=params, version=version)
def post(self, endpoint, params=None, version='1.1'):
"""Shortcut for POST requests via :class:`request`"""
return self.request(endpoint, 'POST', params=params, version=version)
def get_lastfunction_header(self, header, default_return_value=None):
"""Returns a specific header from the last API call
This will return None if the header is not present
:param header: (required) The name of the header you want to get
the value of
Most useful for the following header information:
x-rate-limit-limit,
x-rate-limit-remaining,
x-rate-limit-class,
x-rate-limit-reset
"""
if self._last_call is None:
raise TwythonError('This function must be called after an API call. \
It delivers header information.')
return self._last_call['headers'].get(header, default_return_value)
def get_authentication_tokens(self, callback_url=None, force_login=False,
screen_name=''):
"""Returns a dict including an authorization URL, ``auth_url``, to
direct a user to
:param callback_url: (optional) Url the user is returned to after
they authorize your app (web clients only)
:param force_login: (optional) Forces the user to enter their
credentials to ensure the correct users
account is authorized.
:param screen_name: (optional) If forced_login is set OR user is
not currently logged in, Prefills the username
input box of the OAuth login screen with the
given value
:rtype: dict
"""
if self.oauth_version != 1:
raise TwythonError('This method can only be called when your \
OAuth version is 1.0.')
request_args = {}
if callback_url:
request_args['oauth_callback'] = callback_url
response = self.client.get(self.request_token_url, params=request_args)
if response.status_code == 401:
raise TwythonAuthError(response.content,
error_code=response.status_code)
elif response.status_code != 200:
raise TwythonError(response.content,
error_code=response.status_code)
request_tokens = dict(parse_qsl(response.content.decode('utf-8')))
if not request_tokens:
raise TwythonError('Unable to decode request tokens.')
oauth_callback_confirmed = request_tokens.get('oauth_callback_confirmed') \
== 'true'
auth_url_params = {
'oauth_token': request_tokens['oauth_token'],
}
if force_login:
auth_url_params.update({
'force_login': force_login,
'screen_name': screen_name
})
# Use old-style callback argument if server didn't accept new-style
if callback_url and not oauth_callback_confirmed:
auth_url_params['oauth_callback'] = self.callback_url
request_tokens['auth_url'] = self.authenticate_url + \
'?' + urlencode(auth_url_params)
return request_tokens
def get_authorized_tokens(self, oauth_verifier):
"""Returns a dict of authorized tokens after they go through the
:class:`get_authentication_tokens` phase.
:param oauth_verifier: (required) The oauth_verifier (or a.k.a PIN
for non web apps) retrieved from the callback url querystring
:rtype: dict
"""
if self.oauth_version != 1:
raise TwythonError('This method can only be called when your \
OAuth version is 1.0.')
response = self.client.get(self.access_token_url,
params={'oauth_verifier': oauth_verifier},
headers={'Content-Type': 'application/\
json'})
if response.status_code == 401:
try:
try:
# try to get json
content = response.json()
except AttributeError: # pragma: no cover
# if unicode detected
content = json.loads(response.content)
except ValueError:
content = {}
raise TwythonError(content.get('error', 'Invalid / expired To \
ken'), error_code=response.status_code)
authorized_tokens = dict(parse_qsl(response.content.decode('utf-8')))
if not authorized_tokens:
raise TwythonError('Unable to decode authorized tokens.')
return authorized_tokens # pragma: no cover
def obtain_access_token(self):
"""Returns an OAuth 2 access token to make OAuth 2 authenticated
read-only calls.
:rtype: string
"""
if self.oauth_version != 2:
raise TwythonError('This method can only be called when your \
OAuth version is 2.0.')
data = {'grant_type': 'client_credentials'}
basic_auth = HTTPBasicAuth(self.app_key, self.app_secret)
try:
response = self.client.post(self.request_token_url,
data=data, auth=basic_auth)
content = response.content.decode('utf-8')
try:
content = content.json()
except AttributeError:
content = json.loads(content)
access_token = content['access_token']
except (KeyError, ValueError, requests.exceptions.RequestException):
raise TwythonAuthError('Unable to obtain OAuth 2 access token.')
else:
return access_token
@staticmethod
def construct_api_url(api_url, **params):
"""Construct a Twitter API url, encoded, with parameters
:param api_url: URL of the Twitter API endpoint you are attempting
to construct
:param \*\*params: Parameters that are accepted by Twitter for the
endpoint you're requesting
:rtype: string
Usage::
>>> from twython import Twython
>>> twitter = Twython()
>>> api_url = 'https://api.twitter.com/1.1/search/tweets.json'
>>> constructed_url = twitter.construct_api_url(api_url, q='python',
result_type='popular')
>>> print constructed_url
https://api.twitter.com/1.1/search/tweets.json?q=python&result_type=popular
"""
querystring = []
params, _ = _transparent_params(params or {})
params = requests.utils.to_key_val_list(params)
for (k, v) in params:
querystring.append(
'%s=%s' % (Twython.encode(k), quote_plus(Twython.encode(v)))
)
return '%s?%s' % (api_url, '&'.join(querystring))
def search_gen(self, search_query, **params): # pragma: no cover
warnings.warn(
'This method is deprecated. You should use Twython.cursor instead. \
[eg. Twython.cursor(Twython.search, q=\'your_query\')]',
TwythonDeprecationWarning,
stacklevel=2
)
return self.cursor(self.search, q=search_query, **params)
def cursor(self, function, return_pages=False, **params):
"""Returns a generator for results that match a specified query.
:param function: Instance of a Twython function
(Twython.get_home_timeline, Twython.search)
:param \*\*params: Extra parameters to send with your request
(usually parameters accepted by the Twitter API endpoint)
:rtype: generator
Usage::
>>> from twython import Twython
>>> twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN,
OAUTH_TOKEN_SECRET)
>>> results = twitter.cursor(twitter.search, q='python')
>>> for result in results:
>>> print result
"""
if not hasattr(function, 'iter_mode'):
raise TwythonError('Unable to create generator for Twython \
method "%s"' % function.__name__)
while True:
content = function(**params)
if not content:
raise StopIteration
if hasattr(function, 'iter_key'):
results = content.get(function.iter_key)
else:
results = content
if return_pages:
yield results
else:
for result in results:
yield result
if function.iter_mode == 'cursor' and \
content['next_cursor_str'] == '0':
raise StopIteration
try:
if function.iter_mode == 'id':
if 'max_id' not in params:
# Add 1 to the id because since_id and
# max_id are inclusive
if hasattr(function, 'iter_metadata'):
since_id = content[function.iter_metadata]\
.get('since_id_str')
else:
since_id = content[0]['id_str']
params['since_id'] = (int(since_id) - 1)
elif function.iter_mode == 'cursor':
params['cursor'] = content['next_cursor_str']
except (TypeError, ValueError): # pragma: no cover
raise TwythonError('Unable to generate next page of search \
results, `page` is not a number.')
@staticmethod
def unicode2utf8(text):
try:
if is_py2 and isinstance(text, str):
text = text.encode('utf-8')
except:
pass
return text
@staticmethod
def encode(text):
if is_py2 and isinstance(text, (str)):
return Twython.unicode2utf8(text)
return str(text)
@staticmethod
def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False):
"""Return HTML for a tweet (urls, mentions, hashtags replaced with links)
:param tweet: Tweet object from received from Twitter API
:param use_display_url: Use display URL to represent link
(ex. google.com, github.com). Default: True
:param use_expanded_url: Use expanded URL to represent link
(e.g. http://google.com). Default False
If use_expanded_url is True, it overrides use_display_url.
If use_display_url and use_expanded_url is False, short url will
be used (t.co/xxxxx)
"""
if 'retweeted_status' in tweet:
tweet = tweet['retweeted_status']
if 'entities' in tweet:
text = tweet['text']
entities = tweet['entities']
# Mentions
for entity in entities['user_mentions']:
start, end = entity['indices'][0], entity['indices'][1]
mention_html = '<a href="https://twitter.com/%(screen_name)s" class="twython-mention">@%(screen_name)s</a>'
text = text.replace(tweet['text'][start:end],
mention_html % {'screen_name': entity['screen_name']})
# Hashtags
for entity in entities['hashtags']:
start, end = entity['indices'][0], entity['indices'][1]
hashtag_html = '<a href="https://twitter.com/search?q=%%23%(hashtag)s" class="twython-hashtag">#%(hashtag)s</a>'
text = text.replace(tweet['text'][start:end], hashtag_html % {'hashtag': entity['text']})
# Urls
for entity in entities['urls']:
start, end = entity['indices'][0], entity['indices'][1]
if use_display_url and entity.get('display_url') \
and not use_expanded_url:
shown_url = entity['display_url']
elif use_expanded_url and entity.get('expanded_url'):
shown_url = entity['expanded_url']
else:
shown_url = entity['url']
url_html = '<a href="%s" class="twython-url">%s</a>'
text = text.replace(tweet['text'][start:end],
url_html % (entity['url'], shown_url))
return text

40
src/twython/compat.py Normal file
View File

@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
"""
twython.compat
~~~~~~~~~~~~~~
This module contains imports and declarations for seamless Python 2 and
Python 3 compatibility.
"""
import sys
_ver = sys.version_info
#: Python 2.x?
is_py2 = (_ver[0] == 2)
#: Python 3.x?
is_py3 = (_ver[0] == 3)
try:
import simplejson as json
except ImportError:
import json
if is_py2:
from urllib import urlencode, quote_plus
from urlparse import parse_qsl
str = unicode
basestring = basestring
numeric_types = (int, long, float)
elif is_py3:
from urllib.parse import urlencode, quote_plus, parse_qsl
str = str
basestring = (str, bytes)
numeric_types = (int, float)

893
src/twython/endpoints.py Normal file
View File

@@ -0,0 +1,893 @@
# -*- coding: utf-8 -*-
"""
twython.endpoints
~~~~~~~~~~~~~~~~~
This module provides a mixin for a :class:`Twython <Twython>` instance.
Parameters that need to be embedded in the API url just need to be passed as a keyword argument.
e.g. Twython.retweet(id=12345)
This map is organized the order functions are documented at:
https://dev.twitter.com/docs/api/1.1
"""
class EndpointsMixin(object):
# Timelines
def get_mentions_timeline(self, **params):
"""Returns the 20 most recent mentions (tweets containing a users's
@screen_name) for the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/get/statuses/mentions_timeline
"""
return self.get('statuses/mentions_timeline', params=params)
get_mentions_timeline.iter_mode = 'id'
def get_user_timeline(self, **params):
"""Returns a collection of the most recent Tweets posted by the user
indicated by the screen_name or user_id parameters.
Docs: https://dev.twitter.com/docs/api/1.1/get/statuses/user_timeline
"""
return self.get('statuses/user_timeline', params=params)
get_user_timeline.iter_mode = 'id'
def get_home_timeline(self, **params):
"""Returns a collection of the most recent Tweets and retweets
posted by the authenticating user and the users they follow.
Docs: https://dev.twitter.com/docs/api/1.1/get/statuses/home_timeline
"""
return self.get('statuses/home_timeline', params=params)
get_home_timeline.iter_mode = 'id'
def retweeted_of_me(self, **params):
"""Returns the most recent tweets authored by the authenticating user
that have been retweeted by others.
Docs: https://dev.twitter.com/docs/api/1.1/get/statuses/retweets_of_me
"""
return self.get('statuses/retweets_of_me', params=params)
retweeted_of_me.iter_mode = 'id'
# Tweets
def get_retweets(self, **params):
"""Returns up to 100 of the first retweets of a given tweet.
Docs: https://dev.twitter.com/docs/api/1.1/get/statuses/retweets/%3Aid
"""
return self.get('statuses/retweets/%s' % params.get('id'), params=params)
def show_status(self, **params):
"""Returns a single Tweet, specified by the id parameter
Docs: https://dev.twitter.com/docs/api/1.1/get/statuses/show/%3Aid
"""
return self.get('statuses/show/%s' % params.get('id'), params=params)
def destroy_status(self, **params):
"""Destroys the status specified by the required ID parameter
Docs: https://dev.twitter.com/docs/api/1.1/post/statuses/destroy/%3Aid
"""
return self.post('statuses/destroy/%s' % params.get('id'))
def update_status(self, **params):
"""Updates the authenticating user's current status, also known as tweeting
Docs: https://dev.twitter.com/docs/api/1.1/post/statuses/update
"""
return self.post('statuses/update', params=params)
def retweet(self, **params):
"""Retweets a tweet specified by the id parameter
Docs: https://dev.twitter.com/docs/api/1.1/post/statuses/retweet/%3Aid
"""
return self.post('statuses/retweet/%s' % params.get('id'))
def update_status_with_media(self, **params): # pragma: no cover
"""Updates the authenticating user's current status and attaches media
for upload. In other words, it creates a Tweet with a picture attached.
Docs: https://dev.twitter.com/docs/api/1.1/post/statuses/update_with_media
"""
return self.post('statuses/update_with_media', params=params)
def get_oembed_tweet(self, **params):
"""Returns information allowing the creation of an embedded
representation of a Tweet on third party sites.
Docs: https://dev.twitter.com/docs/api/1.1/get/statuses/oembed
"""
return self.get('statuses/oembed', params=params)
def get_retweeters_ids(self, **params):
"""Returns a collection of up to 100 user IDs belonging to users who
have retweeted the tweet specified by the id parameter.
Docs: https://dev.twitter.com/docs/api/1.1/get/statuses/retweeters/ids
"""
return self.get('statuses/retweeters/ids', params=params)
get_retweeters_ids.iter_mode = 'cursor'
get_retweeters_ids.iter_key = 'ids'
# Search
def search(self, **params):
"""Returns a collection of relevant Tweets matching a specified query.
Docs: https://dev.twitter.com/docs/api/1.1/get/search/tweets
"""
return self.get('search/tweets', params=params)
search.iter_mode = 'id'
search.iter_key = 'statuses'
search.iter_metadata = 'search_metadata'
# Direct Messages
def get_direct_messages(self, **params):
"""Returns the 20 most recent direct messages sent to the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/get/direct_messages
"""
return self.get('direct_messages', params=params)
get_direct_messages.iter_mode = 'id'
def get_sent_messages(self, **params):
"""Returns the 20 most recent direct messages sent by the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/get/direct_messages/sent
"""
return self.get('direct_messages/sent', params=params)
get_sent_messages.iter_mode = 'id'
def get_direct_message(self, **params):
"""Returns a single direct message, specified by an id parameter.
Docs: https://dev.twitter.com/docs/api/1.1/get/direct_messages/show
"""
return self.get('direct_messages/show', params=params)
def destroy_direct_message(self, **params):
"""Destroys the direct message specified in the required id parameter
Docs: https://dev.twitter.com/docs/api/1.1/post/direct_messages/destroy
"""
return self.post('direct_messages/destroy', params=params)
def send_direct_message(self, **params):
"""Sends a new direct message to the specified user from the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/post/direct_messages/new
"""
return self.post('direct_messages/new', params=params)
# Friends & Followers
def get_user_ids_of_blocked_retweets(self, **params):
"""Returns a collection of user_ids that the currently authenticated
user does not want to receive retweets from.
Docs: https://dev.twitter.com/docs/api/1.1/get/friendships/no_retweets/ids
"""
return self.get('friendships/no_retweets/ids', params=params)
def get_friends_ids(self, **params):
"""Returns a cursored collection of user IDs for every user the
specified user is following (otherwise known as their "friends").
Docs: https://dev.twitter.com/docs/api/1.1/get/friends/ids
"""
return self.get('friends/ids', params=params)
get_friends_ids.iter_mode = 'cursor'
get_friends_ids.iter_key = 'ids'
def get_followers_ids(self, **params):
"""Returns a cursored collection of user IDs for every user
following the specified user.
Docs: https://dev.twitter.com/docs/api/1.1/get/followers/ids
"""
return self.get('followers/ids', params=params)
get_followers_ids.iter_mode = 'cursor'
get_followers_ids.iter_key = 'ids'
def lookup_friendships(self, **params):
"""Returns the relationships of the authenticating user to the
comma-separated list of up to 100 screen_names or user_ids provided.
Docs: https://dev.twitter.com/docs/api/1.1/get/friendships/lookup
"""
return self.get('friendships/lookup', params=params)
def get_incoming_friendship_ids(self, **params):
"""Returns a collection of numeric IDs for every user who has a
pending request to follow the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/get/friendships/incoming
"""
return self.get('friendships/incoming', params=params)
get_incoming_friendship_ids.iter_mode = 'cursor'
get_incoming_friendship_ids.iter_key = 'ids'
def get_outgoing_friendship_ids(self, **params):
"""Returns a collection of numeric IDs for every protected user for
whom the authenticating user has a pending follow request.
Docs: https://dev.twitter.com/docs/api/1.1/get/friendships/outgoing
"""
return self.get('friendships/outgoing', params=params)
get_outgoing_friendship_ids.iter_mode = 'cursor'
get_outgoing_friendship_ids.iter_key = 'ids'
def create_friendship(self, **params):
"""Allows the authenticating users to follow the user specified
in the ID parameter.
Docs: https://dev.twitter.com/docs/api/1.1/post/friendships/create
"""
return self.post('friendships/create', params=params)
def destroy_friendship(self, **params):
"""Allows the authenticating user to unfollow the user specified
in the ID parameter.
Docs: https://dev.twitter.com/docs/api/1.1/post/friendships/destroy
"""
return self.post('friendships/destroy', params=params)
def update_friendship(self, **params):
"""Allows one to enable or disable retweets and device notifications
from the specified user.
Docs: https://dev.twitter.com/docs/api/1.1/post/friendships/update
"""
return self.post('friendships/update', params=params)
def show_friendship(self, **params):
"""Returns detailed information about the relationship between two
arbitrary users.
Docs: https://dev.twitter.com/docs/api/1.1/get/friendships/show
"""
return self.get('friendships/show', params=params)
def get_friends_list(self, **params):
"""Returns a cursored collection of user objects for every user the
specified user is following (otherwise known as their "friends").
Docs: https://dev.twitter.com/docs/api/1.1/get/friends/list
"""
return self.get('friends/list', params=params)
get_friends_list.iter_mode = 'cursor'
get_friends_list.iter_key = 'users'
def get_followers_list(self, **params):
"""Returns a cursored collection of user objects for users
following the specified user.
Docs: https://dev.twitter.com/docs/api/1.1/get/followers/list
"""
return self.get('followers/list', params=params)
get_followers_list.iter_mode = 'cursor'
get_followers_list.iter_key = 'users'
# Users
def get_account_settings(self, **params):
"""Returns settings (including current trend, geo and sleep time
information) for the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/get/account/settings
"""
return self.get('account/settings', params=params)
def verify_credentials(self, **params):
"""Returns an HTTP 200 OK response code and a representation of the
requesting user if authentication was successful; returns a 401 status
code and an error message if not.
Docs: https://dev.twitter.com/docs/api/1.1/get/account/verify_credentials
"""
return self.get('account/verify_credentials', params=params)
def update_account_settings(self, **params):
"""Updates the authenticating user's settings.
Docs: https://dev.twitter.com/docs/api/1.1/post/account/settings
"""
return self.post('account/settings', params=params)
def update_delivery_service(self, **params):
"""Sets which device Twitter delivers updates to for the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/post/account/update_delivery_device
"""
return self.post('account/update_delivery_device', params=params)
def update_profile(self, **params):
"""Sets values that users are able to set under the "Account" tab of their settings page.
Docs: https://dev.twitter.com/docs/api/1.1/post/account/update_profile
"""
return self.post('account/update_profile', params=params)
def update_profile_banner_image(self, **params): # pragma: no cover
"""Updates the authenticating user's profile background image.
Docs: https://dev.twitter.com/docs/api/1.1/post/account/update_profile_background_image
"""
return self.post('account/update_profile_banner', params=params)
def update_profile_colors(self, **params):
"""Sets one or more hex values that control the color scheme of the
authenticating user's profile page on twitter.com.
Docs: https://dev.twitter.com/docs/api/1.1/post/account/update_profile_colors
"""
return self.post('account/update_profile_colors', params=params)
def update_profile_image(self, **params): # pragma: no cover
"""Updates the authenticating user's profile image.
Docs: https://dev.twitter.com/docs/api/1.1/post/account/update_profile_image
"""
return self.post('account/update_profile_image', params=params)
def list_blocks(self, **params):
"""Returns a collection of user objects that the authenticating user is blocking.
Docs: https://dev.twitter.com/docs/api/1.1/get/blocks/list
"""
return self.get('blocks/list', params=params)
list_blocks.iter_mode = 'cursor'
list_blocks.iter_key = 'users'
def list_block_ids(self, **params):
"""Returns an array of numeric user ids the authenticating user is blocking.
Docs: https://dev.twitter.com/docs/api/1.1/get/blocks/ids
"""
return self.get('blocks/ids', params=params)
list_block_ids.iter_mode = 'cursor'
list_block_ids.iter_key = 'ids'
def create_block(self, **params):
"""Blocks the specified user from following the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/post/blocks/create
"""
return self.post('blocks/create', params=params)
def destroy_block(self, **params):
"""Un-blocks the user specified in the ID parameter for the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/post/blocks/destroy
"""
return self.post('blocks/destroy', params=params)
def lookup_user(self, **params):
"""Returns fully-hydrated user objects for up to 100 users per request,
as specified by comma-separated values passed to the user_id and/or screen_name parameters.
Docs: https://dev.twitter.com/docs/api/1.1/get/users/lookup
"""
return self.post('users/lookup', params=params)
def show_user(self, **params):
"""Returns a variety of information about the user specified by the
required user_id or screen_name parameter.
Docs: https://dev.twitter.com/docs/api/1.1/get/users/show
"""
return self.get('users/show', params=params)
def search_users(self, **params):
"""Provides a simple, relevance-based search interface to public user accounts on Twitter.
Docs: https://dev.twitter.com/docs/api/1.1/get/users/search
"""
return self.get('users/search', params=params)
def get_contributees(self, **params):
"""Returns a collection of users that the specified user can "contribute" to.
Docs: https://dev.twitter.com/docs/api/1.1/get/users/contributees
"""
return self.get('users/contributees', params=params)
def get_contributors(self, **params):
"""Returns a collection of users who can contribute to the specified account.
Docs: https://dev.twitter.com/docs/api/1.1/get/users/contributors
"""
return self.get('users/contributors', params=params)
def remove_profile_banner(self, **params):
"""Removes the uploaded profile banner for the authenticating user.
Returns HTTP 200 upon success.
Docs: https://dev.twitter.com/docs/api/1.1/post/account/remove_profile_banner
"""
return self.post('account/remove_profile_banner', params=params)
def update_profile_background_image(self, **params):
"""Uploads a profile banner on behalf of the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/post/account/update_profile_banner
"""
return self.post('account/update_profile_background_image', params=params)
def get_profile_banner_sizes(self, **params):
"""Returns a map of the available size variations of the specified user's profile banner.
Docs: https://dev.twitter.com/docs/api/1.1/get/users/profile_banner
"""
return self.get('users/profile_banner', params=params)
# Suggested Users
def get_user_suggestions_by_slug(self, **params):
"""Access the users in a given category of the Twitter suggested user list.
Docs: https://dev.twitter.com/docs/api/1.1/get/users/suggestions/%3Aslug
"""
return self.get('users/suggestions/%s' % params.get('slug'), params=params)
def get_user_suggestions(self, **params):
"""Access to Twitter's suggested user list.
Docs: https://dev.twitter.com/docs/api/1.1/get/users/suggestions
"""
return self.get('users/suggestions', params=params)
def get_user_suggestions_statuses_by_slug(self, **params):
"""Access the users in a given category of the Twitter suggested user
list and return their most recent status if they are not a protected user.
Docs: https://dev.twitter.com/docs/api/1.1/get/users/suggestions/%3Aslug/members
"""
return self.get('users/suggestions/%s/members' % params.get('slug'), params=params)
# Favorites
def get_favorites(self, **params):
"""Returns the 20 most recent Tweets favorited by the authenticating or specified user.
Docs: https://dev.twitter.com/docs/api/1.1/get/favorites/list
"""
return self.get('favorites/list', params=params)
get_favorites.iter_mode = 'id'
def destroy_favorite(self, **params):
"""Un-favorites the status specified in the ID parameter as the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/post/favorites/destroy
"""
return self.post('favorites/destroy', params=params)
def create_favorite(self, **params):
"""Favorites the status specified in the ID parameter as the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/post/favorites/create
"""
return self.post('favorites/create', params=params)
# Mute
def create_mute(self, **params):
return self.post('mutes/users/create', params=params)
def destroy_mute(self, **params):
return self.post('mutes/users/destroy', params=params)
def get_muted_users_ids(self, **params):
return self.get('mutes/users/ids', params=params)
get_muted_users_ids.iter_mode = 'cursor'
get_muted_users_ids.iter_key = 'ids'
def get_muted_users_list(self, **params):
return self.get('mutes/users/list', params=params)
get_muted_users_list.iter_mode = 'cursor'
get_muted_users_list.iter_key = 'users'
# Lists
def show_lists(self, **params):
"""Returns all lists the authenticating or specified user subscribes to, including their own.
Docs: https://dev.twitter.com/docs/api/1.1/get/lists/list
"""
return self.get('lists/list', params=params)
def get_list_statuses(self, **params):
"""Returns a timeline of tweets authored by members of the specified list.
Docs: https://dev.twitter.com/docs/api/1.1/get/lists/statuses
"""
return self.get('lists/statuses', params=params)
get_list_statuses.iter_mode = 'id'
def delete_list_member(self, **params):
"""Removes the specified member from the list.
Docs: https://dev.twitter.com/docs/api/1.1/post/lists/members/destroy
"""
return self.post('lists/members/destroy', params=params)
def get_list_memberships(self, **params):
"""Returns the lists the specified user has been added to.
Docs: https://dev.twitter.com/docs/api/1.1/get/lists/memberships
"""
return self.get('lists/memberships', params=params)
get_list_memberships.iter_mode = 'cursor'
get_list_memberships.iter_key = 'lists'
def get_list_subscribers(self, **params):
"""Returns the subscribers of the specified list.
Docs: https://dev.twitter.com/docs/api/1.1/get/lists/subscribers
"""
return self.get('lists/subscribers', params=params)
get_list_subscribers.iter_mode = 'cursor'
get_list_subscribers.iter_key = 'users'
def subscribe_to_list(self, **params):
"""Subscribes the authenticated user to the specified list.
Docs: https://dev.twitter.com/docs/api/1.1/post/lists/subscribers/create
"""
return self.post('lists/subscribers/create', params=params)
def is_list_subscriber(self, **params):
"""Check if the specified user is a subscriber of the specified list.
Docs: https://dev.twitter.com/docs/api/1.1/get/lists/subscribers/show
"""
return self.get('lists/subscribers/show', params=params)
def unsubscribe_from_list(self, **params):
"""Unsubscribes the authenticated user from the specified list.
Docs: https://dev.twitter.com/docs/api/1.1/post/lists/subscribers/destroy
"""
return self.post('lists/subscribers/destroy', params=params)
def create_list_members(self, **params):
"""Adds multiple members to a list, by specifying a comma-separated
list of member ids or screen names.
Docs: https://dev.twitter.com/docs/api/1.1/post/lists/members/create_all
"""
return self.post('lists/members/create_all', params=params)
def is_list_member(self, **params):
"""Check if the specified user is a member of the specified list.
Docs: https://dev.twitter.com/docs/api/1.1/get/lists/members/show
"""
return self.get('lists/members/show', params=params)
def get_list_members(self, **params):
"""Returns the members of the specified list.
Docs: https://dev.twitter.com/docs/api/1.1/get/lists/members
"""
return self.get('lists/members', params=params)
get_list_members.iter_mode = 'cursor'
get_list_members.iter_key = 'users'
def add_list_member(self, **params):
"""Add a member to a list.
Docs: https://dev.twitter.com/docs/api/1.1/post/lists/members/create
"""
return self.post('lists/members/create', params=params)
def delete_list(self, **params):
"""Deletes the specified list.
Docs: https://dev.twitter.com/docs/api/1.1/post/lists/destroy
"""
return self.post('lists/destroy', params=params)
def update_list(self, **params):
"""Updates the specified list.
Docs: https://dev.twitter.com/docs/api/1.1/post/lists/update
"""
return self.post('lists/update', params=params)
def create_list(self, **params):
"""Creates a new list for the authenticated user.
Docs: https://dev.twitter.com/docs/api/1.1/post/lists/create
"""
return self.post('lists/create', params=params)
def get_specific_list(self, **params):
"""Returns the specified list.
Docs: https://dev.twitter.com/docs/api/1.1/get/lists/show
"""
return self.get('lists/show', params=params)
def get_list_subscriptions(self, **params):
"""Obtain a collection of the lists the specified user is subscribed to.
Docs: https://dev.twitter.com/docs/api/1.1/get/lists/subscriptions
"""
return self.get('lists/subscriptions', params=params)
get_list_subscriptions.iter_mode = 'cursor'
get_list_subscriptions.iter_key = 'lists'
def delete_list_members(self, **params):
"""Removes multiple members from a list, by specifying a
comma-separated list of member ids or screen names.
Docs: https://dev.twitter.com/docs/api/1.1/post/lists/members/destroy_all
"""
return self.post('lists/members/destroy_all', params=params)
def show_owned_lists(self, **params):
"""Returns the lists owned by the specified Twitter user.
Docs: https://dev.twitter.com/docs/api/1.1/get/lists/ownerships
"""
return self.get('lists/ownerships', params=params)
show_owned_lists.iter_mode = 'cursor'
show_owned_lists.iter_key = 'lists'
# Saved Searches
def get_saved_searches(self, **params):
"""Returns the authenticated user's saved search queries.
Docs: https://dev.twitter.com/docs/api/1.1/get/saved_searches/list
"""
return self.get('saved_searches/list', params=params)
def show_saved_search(self, **params):
"""Retrieve the information for the saved search represented by the given id.
Docs: https://dev.twitter.com/docs/api/1.1/get/saved_searches/show/%3Aid
"""
return self.get('saved_searches/show/%s' % params.get('id'), params=params)
def create_saved_search(self, **params):
"""Create a new saved search for the authenticated user.
Docs: https://dev.twitter.com/docs/api/1.1/post/saved_searches/create
"""
return self.post('saved_searches/create', params=params)
def destroy_saved_search(self, **params):
"""Destroys a saved search for the authenticating user.
Docs: https://dev.twitter.com/docs/api/1.1/post/saved_searches/destroy/%3Aid
"""
return self.post('saved_searches/destroy/%s' % params.get('id'), params=params)
# Places & Geo
def get_geo_info(self, **params):
"""Returns all the information about a known place.
Docs: https://dev.twitter.com/docs/api/1.1/get/geo/id/%3Aplace_id
"""
return self.get('geo/id/%s' % params.get('place_id'), params=params)
def reverse_geocode(self, **params):
"""Given a latitude and a longitude, searches for up to 20 places
that can be used as a place_id when updating a status.
Docs: https://dev.twitter.com/docs/api/1.1/get/geo/reverse_geocode
"""
return self.get('geo/reverse_geocode', params=params)
def search_geo(self, **params):
"""Search for places that can be attached to a statuses/update.
Docs: https://dev.twitter.com/docs/api/1.1/get/geo/search
"""
return self.get('geo/search', params=params)
def get_similar_places(self, **params):
"""Locates places near the given coordinates which are similar in name.
Docs: https://dev.twitter.com/docs/api/1.1/get/geo/similar_places
"""
return self.get('geo/similar_places', params=params)
def create_place(self, **params): # pragma: no cover
"""Creates a new place object at the given latitude and longitude.
Docs: https://dev.twitter.com/docs/api/1.1/post/geo/place
"""
return self.post('geo/place', params=params)
# Trends
def get_place_trends(self, **params):
"""Returns the top 10 trending topics for a specific WOEID, if
trending information is available for it.
Docs: https://dev.twitter.com/docs/api/1.1/get/trends/place
"""
return self.get('trends/place', params=params)
def get_available_trends(self, **params):
"""Returns the locations that Twitter has trending topic information for.
Docs: https://dev.twitter.com/docs/api/1.1/get/trends/available
"""
return self.get('trends/available', params=params)
def get_closest_trends(self, **params):
"""Returns the locations that Twitter has trending topic information
for, closest to a specified location.
Docs: https://dev.twitter.com/docs/api/1.1/get/trends/closest
"""
return self.get('trends/closest', params=params)
# Spam Reporting
def report_spam(self, **params): # pragma: no cover
"""Report the specified user as a spam account to Twitter.
Docs: https://dev.twitter.com/docs/api/1.1/post/users/report_spam
"""
return self.post('users/report_spam', params=params)
# OAuth
def invalidate_token(self, **params): # pragma: no cover
"""Allows a registered application to revoke an issued OAuth 2 Bearer
Token by presenting its client credentials.
Docs: https://dev.twitter.com/docs/api/1.1/post/oauth2/invalidate_token
"""
return self.post('oauth2/invalidate_token', params=params)
# Help
def get_twitter_configuration(self, **params):
"""Returns the current configuration used by Twitter
Docs: https://dev.twitter.com/docs/api/1.1/get/help/configuration
"""
return self.get('help/configuration', params=params)
def get_supported_languages(self, **params):
"""Returns the list of languages supported by Twitter along with
their ISO 639-1 code.
Docs: https://dev.twitter.com/docs/api/1.1/get/help/languages
"""
return self.get('help/languages', params=params)
def get_privacy_policy(self, **params):
"""Returns Twitter's Privacy Policy
Docs: https://dev.twitter.com/docs/api/1.1/get/help/privacy
"""
return self.get('help/privacy', params=params)
def get_tos(self, **params):
"""Return the Twitter Terms of Service
Docs: https://dev.twitter.com/docs/api/1.1/get/help/tos
"""
return self.get('help/tos', params=params)
def get_application_rate_limit_status(self, **params):
"""Returns the current rate limits for methods belonging to the
specified resource families.
Docs: https://dev.twitter.com/docs/api/1.1/get/application/rate_limit_status
"""
return self.get('application/rate_limit_status', params=params)
# from https://dev.twitter.com/docs/error-codes-responses
TWITTER_HTTP_STATUS_CODE = {
200: ('OK', 'Success!'),
304: ('Not Modified', 'There was no new data to return.'),
400: ('Bad Request', 'The request was invalid. An accompanying error message will explain why. This is the status code will be returned during rate limiting.'),
401: ('Unauthorized', 'Authentication credentials were missing or incorrect.'),
403: ('Forbidden', 'The request is understood, but it has been refused. An accompanying error message will explain why. This code is used when requests are being denied due to update limits.'),
404: ('Not Found', 'The URI requested is invalid or the resource requested, such as a user, does not exists.'),
406: ('Not Acceptable', 'Returned by the Search API when an invalid format is specified in the request.'),
410: ('Gone', 'This resource is gone. Used to indicate that an API endpoint has been turned off.'),
422: ('Unprocessable Entity', 'Returned when an image uploaded to POST account/update_profile_banner is unable to be processed.'),
429: ('Too Many Requests', 'Returned in API v1.1 when a request cannot be served due to the application\'s rate limit having been exhausted for the resource.'),
500: ('Internal Server Error', 'Something is broken. Please post to the group so the Twitter team can investigate.'),
502: ('Bad Gateway', 'Twitter is down or being upgraded.'),
503: ('Service Unavailable', 'The Twitter servers are up, but overloaded with requests. Try again later.'),
504: ('Gateway Timeout', 'The Twitter servers are up, but the request couldn\'t be serviced due to some failure within our stack. Try again later.'),
}

61
src/twython/exceptions.py Normal file
View File

@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
"""
twython.exceptions
~~~~~~~~~~~~~~~~~~
This module contains Twython specific Exception classes.
"""
from .endpoints import TWITTER_HTTP_STATUS_CODE
class TwythonError(Exception):
"""Generic error class, catch-all for most Twython issues.
Special cases are handled by TwythonAuthError & TwythonRateLimitError.
from twython import TwythonError, TwythonRateLimitError, TwythonAuthError
"""
def __init__(self, msg, error_code=None, retry_after=None):
self.error_code = error_code
if error_code is not None and error_code in TWITTER_HTTP_STATUS_CODE:
msg = 'Twitter API returned a %s (%s), %s' % \
(error_code,
TWITTER_HTTP_STATUS_CODE[error_code][0],
msg)
super(TwythonError, self).__init__(msg)
@property
def msg(self): # pragma: no cover
return self.args[0]
class TwythonAuthError(TwythonError):
"""Raised when you try to access a protected resource and it fails due to
some issue with your authentication.
"""
pass
class TwythonRateLimitError(TwythonError): # pragma: no cover
"""Raised when you've hit a rate limit.
The amount of seconds to retry your request in will be appended
to the message.
"""
def __init__(self, msg, error_code, retry_after=None):
if isinstance(retry_after, int):
msg = '%s (Retry after %d seconds)' % (msg, retry_after)
TwythonError.__init__(self, msg, error_code=error_code)
self.retry_after = retry_after
class TwythonStreamError(TwythonError):
"""Raised when an invalid response from the Stream API is received"""
pass

34
src/twython/helpers.py Normal file
View File

@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
"""
twython.helpers
~~~~~~~~~~~~~~~
This module contains functions that are repeatedly used throughout
the Twython library.
"""
from .compat import basestring, numeric_types
def _transparent_params(_params):
params = {}
files = {}
for k, v in _params.items():
if hasattr(v, 'read') and callable(v.read):
files[k] = v # pragma: no cover
elif isinstance(v, bool):
if v:
params[k] = 'true'
else:
params[k] = 'false'
elif isinstance(v, basestring) or isinstance(v, numeric_types):
params[k] = v
elif isinstance(v, list):
try:
params[k] = ','.join(v)
except TypeError:
params[k] = ','.join(map(str, v))
else:
continue # pragma: no cover
return params, files

View File

@@ -0,0 +1 @@
from .api import TwythonStreamer

View File

@@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
"""
twython.streaming.api
~~~~~~~~~~~~~~~~~~~~~
This module contains functionality for interfacing with streaming
Twitter API calls.
"""
from .. import __version__
from ..compat import json, is_py3
from ..helpers import _transparent_params
from .types import TwythonStreamerTypes
import requests
from requests_oauthlib import OAuth1
import time
class TwythonStreamer(object):
def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret,
timeout=300, retry_count=None, retry_in=10, client_args=None,
handlers=None, chunk_size=1):
"""Streaming class for a friendly streaming user experience
Authentication IS required to use the Twitter Streaming API
:param app_key: (required) Your applications key
:param app_secret: (required) Your applications secret key
:param oauth_token: (required) Used with oauth_token_secret to make
authenticated calls
:param oauth_token_secret: (required) Used with oauth_token to make
authenticated calls
:param timeout: (optional) How long (in secs) the streamer should wait
for a response from Twitter Streaming API
:param retry_count: (optional) Number of times the API call should be
retired
:param retry_in: (optional) Amount of time (in secs) the previous
API call should be tried again
:param client_args: (optional) Accepts some requests Session
parameters and some requests Request parameters.
See
http://docs.python-requests.org/en/latest/api/#sessionapi
and requests section below it for details.
[ex. headers, proxies, verify(SSL verification)]
:param handlers: (optional) Array of message types for which
corresponding handlers will be called
:param chunk_size: (optional) Define the buffer size before data is
actually returned from the Streaming API. Default: 1
"""
self.auth = OAuth1(app_key, app_secret,
oauth_token, oauth_token_secret)
self.client_args = client_args or {}
default_headers = {'User-Agent': 'Twython Streaming v' + __version__}
if 'headers' not in self.client_args:
# If they didn't set any headers, set our defaults for them
self.client_args['headers'] = default_headers
elif 'User-Agent' not in self.client_args['headers']:
# If they set headers, but didn't include User-Agent..
# set it for them
self.client_args['headers'].update(default_headers)
self.client_args['timeout'] = timeout
self.client = requests.Session()
self.client.auth = self.auth
self.client.stream = True
# Make a copy of the client args and iterate over them
# Pop out all the acceptable args at this point because they will
# Never be used again.
client_args_copy = self.client_args.copy()
for k, v in client_args_copy.items():
if k in ('cert', 'headers', 'hooks', 'max_redirects', 'proxies'):
setattr(self.client, k, v)
self.client_args.pop(k) # Pop, pop!
self.api_version = '1.1'
self.retry_in = retry_in
self.retry_count = retry_count
# Set up type methods
StreamTypes = TwythonStreamerTypes(self)
self.statuses = StreamTypes.statuses
self.user = StreamTypes.user
self.site = StreamTypes.site
self.connected = False
self.handlers = handlers if handlers else \
['delete', 'limit', 'disconnect']
self.chunk_size = chunk_size
def _request(self, url, method='GET', params=None):
"""Internal stream request handling"""
self.connected = True
retry_counter = 0
method = method.lower()
func = getattr(self.client, method)
params, _ = _transparent_params(params)
def _send(retry_counter):
requests_args = {}
for k, v in self.client_args.items():
# Maybe this should be set as a class
# variable and only done once?
if k in ('timeout', 'allow_redirects', 'verify'):
requests_args[k] = v
while self.connected:
try:
if method == 'get':
requests_args['params'] = params
else:
requests_args['data'] = params
response = func(url, **requests_args)
except requests.exceptions.Timeout:
self.on_timeout()
else:
if response.status_code != 200:
self.on_error(response.status_code, response.content)
if self.retry_count and \
(self.retry_count - retry_counter) > 0:
time.sleep(self.retry_in)
retry_counter += 1
_send(retry_counter)
return response
while self.connected:
response = _send(retry_counter)
for line in response.iter_lines(self.chunk_size):
if not self.connected:
break
if line:
try:
if is_py3:
line = line.decode('utf-8')
data = json.loads(line)
except ValueError: # pragma: no cover
self.on_error(response.status_code,
'Unable to decode response, \
not valid JSON.')
else:
if self.on_success(data): # pragma: no cover
for message_type in self.handlers:
if message_type in data:
handler = getattr(self,
'on_' + message_type,
None)
if handler \
and callable(handler) \
and not handler(data.get(message_type)):
break
response.close()
def on_success(self, data): # pragma: no cover
"""Called when data has been successfully received from the stream.
Returns True if other handlers for this message should be invoked.
Feel free to override this to handle your streaming data how you
want it handled.
See https://dev.twitter.com/docs/streaming-apis/messages for messages
sent along in stream responses.
:param data: data recieved from the stream
:type data: dict
"""
return True
def on_error(self, status_code, data): # pragma: no cover
"""Called when stream returns non-200 status code
Feel free to override this to handle your streaming data how you
want it handled.
:param status_code: Non-200 status code sent from stream
:type status_code: int
:param data: Error message sent from stream
:type data: dict
"""
return
def on_timeout(self): # pragma: no cover
""" Called when the request has timed out """
return
def disconnect(self):
"""Used to disconnect the streaming client manually"""
self.connected = False

View File

@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
"""
twython.streaming.types
~~~~~~~~~~~~~~~~~~~~~~~
This module contains classes and methods for :class:`TwythonStreamer` to use.
"""
class TwythonStreamerTypes(object):
"""Class for different stream endpoints
Not all streaming endpoints have nested endpoints.
User Streams and Site Streams are single streams with no nested endpoints
Status Streams include filter, sample and firehose endpoints
"""
def __init__(self, streamer):
self.streamer = streamer
self.statuses = TwythonStreamerTypesStatuses(streamer)
def user(self, **params):
"""Stream user
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/user
"""
url = 'https://userstream.twitter.com/%s/user.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
def site(self, **params):
"""Stream site
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/site
"""
url = 'https://sitestream.twitter.com/%s/site.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
class TwythonStreamerTypesStatuses(object):
"""Class for different statuses endpoints
Available so TwythonStreamer.statuses.filter() is available.
Just a bit cleaner than TwythonStreamer.statuses_filter(),
statuses_sample(), etc. all being single methods in TwythonStreamer
"""
def __init__(self, streamer):
self.streamer = streamer
def filter(self, **params):
"""Stream statuses/filter
:param \*\*params: Parameters to send with your stream request
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/post/statuses/filter
"""
url = 'https://stream.twitter.com/%s/statuses/filter.json' \
% self.streamer.api_version
self.streamer._request(url, 'POST', params=params)
def sample(self, **params):
"""Stream statuses/sample
:param \*\*params: Parameters to send with your stream request
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/statuses/sample
"""
url = 'https://stream.twitter.com/%s/statuses/sample.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)
def firehose(self, **params):
"""Stream statuses/firehose
:param \*\*params: Parameters to send with your stream request
Accepted params found at:
https://dev.twitter.com/docs/api/1.1/get/statuses/firehose
"""
url = 'https://stream.twitter.com/%s/statuses/firehose.json' \
% self.streamer.api_version
self.streamer._request(url, params=params)