Source code for perceval.backends.core.twitter
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2020 Bitergia
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Valerio Cosentino <valcos@bitergia.com>
# Jesus M. Gonzalez-Barahona <jgb@gsyc.es>
# Harshal Mittal <harshalmittal4@gmail.com>
#
import json
import logging
from grimoirelab_toolkit.datetime import datetime_utcnow, str_to_datetime
from ...backend import (Backend,
BackendCommand,
BackendCommandArgumentParser,
DEFAULT_SEARCH_FIELD)
from ...client import HttpClient, RateLimitHandler
from ...errors import BackendError
CATEGORY_TWEET = "tweet"
MAX_SEARCH_QUERY = 500
TWITTER_URL = 'https://twitter.com/'
TWITTER_API_URL = 'https://api.twitter.com/1.1/search/tweets.json'
MAX_ITEMS = 100
# Range before sleeping until rate limit reset
MIN_RATE_LIMIT = 1
# Time to avoid too many request exception
SLEEP_TIME = 30
TWEET_TYPE_MIXED = "mixed"
TWEET_TYPE_RECENT = "recent"
TWEET_TYPE_POPULAR = "popular"
RATE_LIMIT_HEADER = "x-rate-limit-remaining"
RATE_LIMIT_RESET_HEADER = "x-rate-limit-reset"
logger = logging.getLogger(__name__)
[docs]class Twitter(Backend):
"""Twitter backend.
This class allows to fetch samples of tweets containing specific
keywords. Initialize this class passing API key needed
for authentication with the parameter `api_key`.
:param query: query to fetch tweets
:param api_token: token or key needed to use the API
:param max_items: maximum number of issues requested on the same query
:param sleep_for_rate: sleep until rate limit is reset
:param min_rate_to_sleep: minimun rate needed to sleep until
it will be reset
:param sleep_time: time (in seconds) to sleep in case
of connection problems
:param tag: label used to mark the data
:param archive: archive to store/retrieve items
:param ssl_verify: enable/disable SSL verification
"""
version = '0.4.0'
CATEGORIES = [CATEGORY_TWEET]
def __init__(self, query, api_token, max_items=MAX_ITEMS,
sleep_for_rate=False, min_rate_to_sleep=MIN_RATE_LIMIT,
sleep_time=SLEEP_TIME,
tag=None, archive=None, ssl_verify=True):
origin = TWITTER_URL
if len(query) >= MAX_SEARCH_QUERY:
msg = "Search query length exceeded %s, max is %s" % (len(query), MAX_SEARCH_QUERY)
raise BackendError(cause=msg)
super().__init__(origin, tag=tag, archive=archive, ssl_verify=ssl_verify)
self.query = query
self.api_token = api_token
self.max_items = max_items
self.sleep_for_rate = sleep_for_rate
self.min_rate_to_sleep = min_rate_to_sleep
self.sleep_time = sleep_time
self.client = None
[docs] def search_fields(self, item):
"""Add search fields to an item.
It adds the values of `metadata_id` plus the hashtags of a tweet.
:param item: the item to extract the search fields values
:returns: a dict of search fields
"""
search_fields = {
DEFAULT_SEARCH_FIELD: self.metadata_id(item)
}
entities = item['entities']
if 'hashtags' in entities:
search_fields['hashtags'] = [h['text'] for h in entities['hashtags']]
return search_fields
[docs] def fetch(self, category=CATEGORY_TWEET, since_id=None, max_id=None,
geocode=None, lang=None,
include_entities=True, tweets_type=TWEET_TYPE_MIXED):
"""Fetch the tweets from the server.
This method fetches tweets from the TwitterSearch API published in the last seven days.
:param category: the category of items to fetch
:param since_id: if not null, it returns results with an ID greater than the specified ID
:param max_id: when it is set or if not None, it returns results with an ID less than the specified ID
:param geocode: if enabled, returns tweets by users located at latitude,longitude,"mi"|"km"
:param lang: if enabled, restricts tweets to the given language, given by an ISO 639-1 code
:param include_entities: if disabled, it excludes entities node
:param tweets_type: type of tweets returned. Default is “mixed”, others are "recent" and "popular"
:returns: a generator of tweets
"""
kwargs = {"since_id": since_id,
"max_id": max_id,
"geocode": geocode,
"lang": lang,
"include_entities": include_entities,
"result_type": tweets_type}
items = super().fetch(category, **kwargs)
return items
[docs] def fetch_items(self, category, **kwargs):
"""Fetch the tweets
:param category: the category of items to fetch
:param kwargs: backend arguments
:returns: a generator of items
"""
since_id = kwargs['since_id']
max_id = kwargs['max_id']
geocode = kwargs['geocode']
lang = kwargs['lang']
entities = kwargs['include_entities']
tweets_type = kwargs['result_type']
logger.info("Fetching tweets %s from %s to %s",
self.query, str(since_id),
str(max_id) if max_id else '--')
tweets_ids = []
min_date = None
max_date = None
group_tweets = self.client.tweets(self.query, since_id=since_id, max_id=max_id, geocode=geocode,
lang=lang, include_entities=entities, result_type=tweets_type)
for tweets in group_tweets:
for i in range(len(tweets)):
tweet = tweets[i]
tweets_ids.append(tweet['id'])
if tweets[-1] == tweet:
min_date = str_to_datetime(tweets[-1]['created_at'])
if tweets[0] == tweet and not max_date:
max_date = str_to_datetime(tweets[0]['created_at'])
yield tweet
logger.info("Fetch process completed: %s (unique %s) tweets fetched, from %s to %s",
len(tweets_ids), len(list(set(tweets_ids))), min_date, max_date)
[docs] @classmethod
def has_archiving(cls):
"""Returns whether it supports archiving items on the fetch process.
:returns: this backend supports items archive
"""
return True
[docs] @classmethod
def has_resuming(cls):
"""Returns whether it supports to resume the fetch process.
:returns: this backend supports items resuming
"""
return False
[docs] @staticmethod
def metadata_id(item):
"""Extracts the identifier from a Twitter item."""
return str(item['id_str'])
[docs] @staticmethod
def metadata_updated_on(item):
"""Extracts and coverts the update time from a Twitter item.
The timestamp is extracted from 'created_at' field and converted
to a UNIX timestamp.
:param item: item generated by the backend
:returns: a UNIX timestamp
"""
ts = item['created_at']
ts = str_to_datetime(ts)
return ts.timestamp()
[docs] @staticmethod
def metadata_category(item):
"""Extracts the category from a Twitter item.
This backend only generates one type of item which is
'tweet'.
"""
return CATEGORY_TWEET
def _init_client(self, from_archive=False):
"""Init client"""
return TwitterClient(self.api_token, self.max_items,
self.sleep_for_rate, self.min_rate_to_sleep, self.sleep_time,
self.archive, from_archive, self.ssl_verify)
[docs]class TwitterClient(HttpClient, RateLimitHandler):
"""Twitter API client.
Client for fetching information from the Twitter server
using its REST API v1.1.
:param api_key: key needed to use the API
:param max_items: maximum number of items per request
:param sleep_for_rate: sleep until rate limit is reset
:param min_rate_to_sleep: minimun rate needed to sleep until
it will be reset
:param sleep_time: time (in seconds) to sleep in case
of connection problems
:param archive: an archive to store/read fetched data
:param from_archive: it tells whether to write/read the archive
:param ssl_verify: enable/disable SSL verification
"""
# API headers
HAUTHORIZATION = 'Authorization'
# Resource parameters
PQUERY = 'q'
PCOUNT = 'count'
PSINCE_ID = 'since_id'
PMAX_ID = 'max_id'
PGEOCODE = 'geocode'
PLANG = 'lang'
PINCLUDE_ENTITIES = 'include_entities'
PRESULT_TYPE = 'result_type'
def __init__(self, api_key, max_items=MAX_ITEMS,
sleep_for_rate=False, min_rate_to_sleep=MIN_RATE_LIMIT, sleep_time=SLEEP_TIME,
archive=None, from_archive=False, ssl_verify=True):
self.api_key = api_key
self.max_items = max_items
super().__init__(TWITTER_API_URL, sleep_time=sleep_time, extra_status_forcelist=[429],
archive=archive, from_archive=from_archive, ssl_verify=ssl_verify)
super().setup_rate_limit_handler(sleep_for_rate=sleep_for_rate, min_rate_to_sleep=min_rate_to_sleep,
rate_limit_header=RATE_LIMIT_HEADER,
rate_limit_reset_header=RATE_LIMIT_RESET_HEADER)
[docs] def calculate_time_to_reset(self):
"""Number of seconds to wait. They are contained in the rate limit reset header"""
time_to_reset = self.rate_limit_reset_ts - (datetime_utcnow().replace(microsecond=0).timestamp() + 1)
time_to_reset = 0 if time_to_reset < 0 else time_to_reset
return time_to_reset
[docs] @staticmethod
def sanitize_for_archive(url, headers, payload):
"""Sanitize payload of a HTTP request by removing the token information
before storing/retrieving archived items
:param: url: HTTP url request
:param: headers: HTTP headers request
:param: payload: HTTP payload request
:returns url, headers and the sanitized payload
"""
if TwitterClient.HAUTHORIZATION in headers:
headers.pop(TwitterClient.HAUTHORIZATION)
return url, headers, payload
[docs] def tweets(self, query, since_id=None, max_id=None, geocode=None, lang=None,
include_entities=True, result_type=TWEET_TYPE_MIXED):
"""Fetch tweets for a given query between since_id and max_id.
:param query: query to fetch tweets
:param since_id: if not null, it returns results with an ID greater than the specified ID
:param max_id: if not null, it returns results with an ID less than the specified ID
:param geocode: if enabled, returns tweets by users located at latitude,longitude,"mi"|"km"
:param lang: if enabled, restricts tweets to the given language, given by an ISO 639-1 code
:param include_entities: if disabled, it excludes entities node
:param result_type: type of tweets returned. Default is “mixed”, others are "recent" and "popular"
:returns: a generator of tweets
"""
resource = self.base_url
params = {self.PQUERY: query,
self.PCOUNT: self.max_items}
if since_id:
params[self.PSINCE_ID] = since_id
if max_id:
params[self.PMAX_ID] = max_id
if geocode:
params[self.PGEOCODE] = geocode
if lang:
params[self.PLANG] = lang
params[self.PINCLUDE_ENTITIES] = include_entities
params[self.PRESULT_TYPE] = result_type
while True:
raw_tweets = self._fetch(resource, params=params)
tweets = json.loads(raw_tweets)
if not tweets['statuses']:
break
params[self.PMAX_ID] = tweets['statuses'][-1]['id'] - 1
yield tweets['statuses']
def _fetch(self, url, params):
"""Fetch a resource.
Method to fetch and to iterate over the contents of a
type of resource. The method returns a generator of
pages for that resource and parameters.
:param url: the endpoint of the API
:param params: parameters to filter
:returns: the text of the response
"""
if not self.from_archive:
self.sleep_for_rate_limit()
headers = {self.HAUTHORIZATION: 'Bearer ' + self.api_key}
r = self.fetch(url, payload=params, headers=headers)
if not self.from_archive:
self.update_rate_limit(r)
return r.text
[docs]class TwitterCommand(BackendCommand):
"""Class to run Twitter backend from the command line."""
BACKEND = Twitter
[docs] @classmethod
def setup_cmd_parser(cls):
"""Returns the Twitter argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND,
token_auth=True,
archive=True,
ssl_verify=True)
# Backend token is required
action = parser.parser._option_string_actions['--api-token']
action.required = True
# Meetup options
group = parser.parser.add_argument_group('Twitter arguments')
group.add_argument('--max-items', dest='max_items',
type=int, default=MAX_ITEMS,
help="Maximum number of items requested on the same query")
group.add_argument('--no-entities', dest='include_entities',
action='store_false',
help=" Exclude entities node")
group.add_argument('--geo-code', dest='geocode',
help="Select tweets by users located at latitude,longitude,radius")
group.add_argument('--lang', dest='lang',
help="Select tweets to the given language in ISO 639-1 code")
group.add_argument('--tweets-type', dest='tweets_type', default=TWEET_TYPE_MIXED,
help="Type of tweets returned. Default is 'mixed', others are 'recent' and 'popular'")
group.add_argument('--sleep-for-rate', dest='sleep_for_rate',
action='store_true',
help="sleep for getting more rate")
group.add_argument('--min-rate-to-sleep', dest='min_rate_to_sleep',
default=MIN_RATE_LIMIT, type=int,
help="sleep until reset when the rate limit reaches this value")
group.add_argument('--sleep-time', dest='sleep_time',
default=SLEEP_TIME, type=int,
help="minimun sleeping time to avoid too many request exception")
# Required arguments
parser.parser.add_argument('query',
help="Search query including operators, max 500 chars")
return parser