mirror of
https://github.com/MCV-Software/TWBlue.git
synced 2024-11-29 22:23:12 -06:00
203 lines
7.7 KiB
Python
203 lines
7.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
twython.streaming.api
|
|
~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
This module contains functionality for interfacing with streaming
|
|
Twitter API calls.
|
|
"""
|
|
|
|
from .. import __version__
|
|
from ..compat import json, is_py3
|
|
from ..helpers import _transparent_params
|
|
from .types import TwythonStreamerTypes
|
|
|
|
import requests
|
|
from requests_oauthlib import OAuth1
|
|
|
|
import time
|
|
|
|
|
|
class TwythonStreamer(object):
|
|
def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret,
|
|
timeout=300, retry_count=None, retry_in=10, client_args=None,
|
|
handlers=None, chunk_size=1):
|
|
"""Streaming class for a friendly streaming user experience
|
|
Authentication IS required to use the Twitter Streaming API
|
|
|
|
:param app_key: (required) Your applications key
|
|
:param app_secret: (required) Your applications secret key
|
|
:param oauth_token: (required) Used with oauth_token_secret to make
|
|
authenticated calls
|
|
:param oauth_token_secret: (required) Used with oauth_token to make
|
|
authenticated calls
|
|
:param timeout: (optional) How long (in secs) the streamer should wait
|
|
for a response from Twitter Streaming API
|
|
:param retry_count: (optional) Number of times the API call should be
|
|
retired
|
|
:param retry_in: (optional) Amount of time (in secs) the previous
|
|
API call should be tried again
|
|
:param client_args: (optional) Accepts some requests Session
|
|
parameters and some requests Request parameters.
|
|
See
|
|
http://docs.python-requests.org/en/latest/api/#sessionapi
|
|
and requests section below it for details.
|
|
[ex. headers, proxies, verify(SSL verification)]
|
|
:param handlers: (optional) Array of message types for which
|
|
corresponding handlers will be called
|
|
|
|
:param chunk_size: (optional) Define the buffer size before data is
|
|
actually returned from the Streaming API. Default: 1
|
|
"""
|
|
|
|
self.auth = OAuth1(app_key, app_secret,
|
|
oauth_token, oauth_token_secret)
|
|
|
|
self.client_args = client_args or {}
|
|
default_headers = {'User-Agent': 'Twython Streaming v' + __version__}
|
|
if 'headers' not in self.client_args:
|
|
# If they didn't set any headers, set our defaults for them
|
|
self.client_args['headers'] = default_headers
|
|
elif 'User-Agent' not in self.client_args['headers']:
|
|
# If they set headers, but didn't include User-Agent..
|
|
# set it for them
|
|
self.client_args['headers'].update(default_headers)
|
|
self.client_args['timeout'] = timeout
|
|
|
|
self.client = requests.Session()
|
|
self.client.auth = self.auth
|
|
self.client.stream = True
|
|
|
|
# Make a copy of the client args and iterate over them
|
|
# Pop out all the acceptable args at this point because they will
|
|
# Never be used again.
|
|
client_args_copy = self.client_args.copy()
|
|
for k, v in client_args_copy.items():
|
|
if k in ('cert', 'headers', 'hooks', 'max_redirects', 'proxies'):
|
|
setattr(self.client, k, v)
|
|
self.client_args.pop(k) # Pop, pop!
|
|
|
|
self.api_version = '1.1'
|
|
|
|
self.retry_in = retry_in
|
|
self.retry_count = retry_count
|
|
|
|
# Set up type methods
|
|
StreamTypes = TwythonStreamerTypes(self)
|
|
self.statuses = StreamTypes.statuses
|
|
self.user = StreamTypes.user
|
|
self.site = StreamTypes.site
|
|
|
|
self.connected = False
|
|
|
|
self.handlers = handlers if handlers else \
|
|
['delete', 'limit', 'disconnect']
|
|
|
|
self.chunk_size = chunk_size
|
|
|
|
def _request(self, url, method='GET', params=None):
|
|
"""Internal stream request handling"""
|
|
self.connected = True
|
|
retry_counter = 0
|
|
|
|
method = method.lower()
|
|
func = getattr(self.client, method)
|
|
params, _ = _transparent_params(params)
|
|
|
|
def _send(retry_counter):
|
|
requests_args = {}
|
|
for k, v in self.client_args.items():
|
|
# Maybe this should be set as a class
|
|
# variable and only done once?
|
|
if k in ('timeout', 'allow_redirects', 'verify'):
|
|
requests_args[k] = v
|
|
|
|
while self.connected:
|
|
try:
|
|
if method == 'get':
|
|
requests_args['params'] = params
|
|
else:
|
|
requests_args['data'] = params
|
|
|
|
response = func(url, **requests_args)
|
|
except requests.exceptions.Timeout:
|
|
self.on_timeout()
|
|
else:
|
|
# self.connected = False
|
|
if response.status_code != 200:
|
|
self.on_error(response.status_code, response.content)
|
|
|
|
if self.retry_count and \
|
|
(self.retry_count - retry_counter) > 0:
|
|
time.sleep(self.retry_in)
|
|
retry_counter += 1
|
|
_send(retry_counter)
|
|
|
|
return response
|
|
|
|
while self.connected:
|
|
response = _send(retry_counter)
|
|
|
|
for line in response.iter_lines(self.chunk_size):
|
|
if not self.connected:
|
|
break
|
|
if line:
|
|
try:
|
|
if is_py3:
|
|
line = line.decode('utf-8')
|
|
data = json.loads(line)
|
|
except ValueError: # pragma: no cover
|
|
self.on_error(response.status_code,
|
|
'Unable to decode response, \
|
|
not valid JSON.')
|
|
else:
|
|
if self.on_success(data): # pragma: no cover
|
|
for message_type in self.handlers:
|
|
if message_type in data:
|
|
handler = getattr(self,
|
|
'on_' + message_type,
|
|
None)
|
|
if handler \
|
|
and callable(handler) \
|
|
and not handler(data.get(message_type)):
|
|
break
|
|
|
|
response.close()
|
|
|
|
def on_success(self, data): # pragma: no cover
|
|
"""Called when data has been successfully received from the stream.
|
|
Returns True if other handlers for this message should be invoked.
|
|
|
|
Feel free to override this to handle your streaming data how you
|
|
want it handled.
|
|
See https://dev.twitter.com/docs/streaming-apis/messages for messages
|
|
sent along in stream responses.
|
|
|
|
:param data: data recieved from the stream
|
|
:type data: dict
|
|
"""
|
|
return True
|
|
|
|
def on_error(self, status_code, data): # pragma: no cover
|
|
"""Called when stream returns non-200 status code
|
|
|
|
Feel free to override this to handle your streaming data how you
|
|
want it handled.
|
|
|
|
:param status_code: Non-200 status code sent from stream
|
|
:type status_code: int
|
|
|
|
:param data: Error message sent from stream
|
|
:type data: dict
|
|
"""
|
|
return
|
|
|
|
def on_timeout(self): # pragma: no cover
|
|
""" Called when the request has timed out """
|
|
return
|
|
|
|
def disconnect(self):
|
|
"""Used to disconnect the streaming client manually"""
|
|
self.connected = False
|