# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2020 Bitergia
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Valerio Cosentino <valcos@bitergia.com>
# Santiago DueƱas <sduenas@bitergia.com>
#
import logging
import time
import requests
import urllib3.util
from .errors import RateLimitError
from ._version import __version__
logger = logging.getLogger(__name__)
[docs]class HttpClient:
"""Abstract class for HTTP clients.
Base class to query data sources taking care of retrying
requests in case connection issues. If the data source
does not send back a response after retrying a request,
a RetryError exception is thrown.
Sub-classes can use the methods fetch to obtain data
from the data source.
To track which version of the client was used during
the fetching process, this class provides a `version`
attribute that each client may override.
:param base_url: base URL of the data source
:param max_retries: number of max retries to a data source
before raising a RetryError exception
:param sleep_time: time (in seconds) to sleep in case
of connection problems
:param extra_headers: extra headers to be included in
the requests
:param extra_status_forcelist: a set of HTTP status codes that will
force a retry on
:param extra_retry_after_status: a set of HTTP status codes that will
perform a retry respecting the Retry-After header
:param archive: archive to store/retrieve items
:param from_archive: if `True` the data is fetched
from an archive
:param ssl_verify: enable/disable SSL verification
"""
version = '0.3.0'
DEFAULT_SLEEP_TIME = 1
MAX_RETRIES = 5
MAX_RETRIES_ON_CONNECT = 5
MAX_RETRIES_ON_READ = 5
MAX_RETRIES_ON_REDIRECT = 5
MAX_RETRIES_ON_STATUS = 5
DEFAULT_METHOD_WHITELIST = False
DEFAULT_RAISE_ON_REDIRECT = True
DEFAULT_RAISE_ON_STATUS = True
DEFAULT_RESPECT_RETRY_AFTER_HEADER = True
DEFAULT_RETRY_AFTER_STATUS_CODES = [413, 429, 503]
DEFAULT_STATUS_FORCE_LIST = [408, 423, 504]
DEFAULT_HEADERS = {'User-Agent': 'Perceval/' + __version__}
GET = "GET"
POST = "POST"
def __init__(self, base_url, max_retries=MAX_RETRIES, sleep_time=DEFAULT_SLEEP_TIME,
extra_headers=None, extra_status_forcelist=None, extra_retry_after_status=None,
archive=None, from_archive=False, ssl_verify=True):
self.base_url = base_url
self.ssl_verify = ssl_verify
self.headers = dict(self.DEFAULT_HEADERS)
if extra_headers:
self.headers.update(extra_headers)
self.status_forcelist = list(self.DEFAULT_STATUS_FORCE_LIST)
if extra_status_forcelist:
self.status_forcelist.extend(extra_status_forcelist)
self.retry_after_status = list(self.DEFAULT_RETRY_AFTER_STATUS_CODES)
if extra_retry_after_status:
self.retry_after_status.extend(extra_retry_after_status)
self.max_retries = max_retries
self.max_retries_on_connect = self.MAX_RETRIES_ON_CONNECT
self.max_retries_on_read = self.MAX_RETRIES_ON_READ
self.max_retries_on_redirect = self.MAX_RETRIES_ON_REDIRECT
self.max_retries_on_status = self.MAX_RETRIES_ON_STATUS
self.method_whitelist = self.DEFAULT_METHOD_WHITELIST
self.raise_on_redirect = self.DEFAULT_RAISE_ON_REDIRECT
self.raise_on_status = self.DEFAULT_RAISE_ON_STATUS
self.respect_retry_after_header = self.DEFAULT_RESPECT_RETRY_AFTER_HEADER
self.sleep_time = sleep_time
self.archive = archive
self.from_archive = from_archive
self._create_http_session()
def __del__(self):
self._close_http_session()
[docs] def fetch(self, url, payload=None, headers=None, method=GET, stream=False, auth=None):
"""Fetch the data from a given URL.
:param url: link to the resource
:param payload: payload of the request
:param headers: headers of the request
:param method: type of request call (GET or POST)
:param stream: defer downloading the response body until the response content is available
:param auth: auth of the request
:returns a response object
"""
if self.from_archive:
response = self._fetch_from_archive(url, payload, headers)
else:
response = self._fetch_from_remote(url, payload, headers, method, stream, auth)
return response
[docs] @staticmethod
def sanitize_for_archive(url, headers, payload):
"""Sanitize the URL, headers and payload of a HTTP request before storing/retrieving items.
By default, this method does not modify url, headers and payload. The modifications take
place within the specific backends that redefine the sanitize_for_archive.
:param: url: HTTP url request
:param: headers: HTTP headers request
:param: payload: HTTP payload request
:returns url, headers and payload sanitized
"""
return url, headers, payload
def _fetch_from_archive(self, url, payload, headers):
url, headers, payload = self.sanitize_for_archive(url, headers, payload)
response = self.archive.retrieve(url, payload, headers)
if not isinstance(response, requests.Response):
raise response
return response
def _fetch_from_remote(self, url, payload, headers, method, stream, auth):
if method == self.GET:
response = self.session.get(url, params=payload, headers=headers, stream=stream,
verify=self.ssl_verify, auth=auth)
else:
response = self.session.post(url, data=payload, headers=headers, stream=stream,
verify=self.ssl_verify, auth=auth)
try:
response.raise_for_status()
except Exception as e:
if self.archive:
url, headers, payload = self.sanitize_for_archive(url, headers, payload)
self.archive.store(url, payload, headers, e)
raise e
if self.archive:
url, headers, payload = self.sanitize_for_archive(url, headers, payload)
self.archive.store(url, payload, headers, response)
return response
def _create_http_session(self):
"""Create a http session and initialize the retry object."""
self.session = requests.Session()
if self.headers:
self.session.headers.update(self.headers)
retries = urllib3.util.Retry(total=self.max_retries,
connect=self.max_retries_on_connect,
read=self.max_retries_on_read,
redirect=self.max_retries_on_redirect,
status=self.max_retries_on_status,
method_whitelist=self.method_whitelist,
status_forcelist=self.status_forcelist,
backoff_factor=self.sleep_time,
raise_on_redirect=self.raise_on_redirect,
raise_on_status=self.raise_on_status,
respect_retry_after_header=self.respect_retry_after_header)
self.session.mount('http://', requests.adapters.HTTPAdapter(max_retries=retries))
self.session.mount('https://', requests.adapters.HTTPAdapter(max_retries=retries))
def _close_http_session(self):
"""Close the http session."""
if self.session:
self.session.keep_alive = False
[docs]class RateLimitHandler:
"""Class to handle rate limit for HTTP clients.
:param sleep_for_rate: sleep until rate limit is reset
:param min_rate_to_sleep: minimun rate needed to sleep until it will be rese
:param rate_limit_header: header to know the current rate limit
:param rate_limit_reset_header: header to know the next rate limit reset
"""
version = '0.2'
MIN_RATE_LIMIT = 10
MAX_RATE_LIMIT = 500
RATE_LIMIT_HEADER = "X-RateLimit-Remaining"
RATE_LIMIT_RESET_HEADER = "X-RateLimit-Reset"
[docs] def setup_rate_limit_handler(self, sleep_for_rate=False, min_rate_to_sleep=MIN_RATE_LIMIT,
rate_limit_header=RATE_LIMIT_HEADER,
rate_limit_reset_header=RATE_LIMIT_RESET_HEADER):
"""Setup the rate limit handler.
:param sleep_for_rate: sleep until rate limit is reset
:param min_rate_to_sleep: minimun rate needed to make the fecthing process sleep
:param rate_limit_header: header from where extract the rate limit data
:param rate_limit_reset_header: header from where extract the rate limit reset data
"""
self.rate_limit = None
self.rate_limit_reset_ts = None
self.sleep_for_rate = sleep_for_rate
self.rate_limit_header = rate_limit_header
self.rate_limit_reset_header = rate_limit_reset_header
if min_rate_to_sleep > self.MAX_RATE_LIMIT:
msg = "Minimum rate to sleep value exceeded (%d)."
msg += "High values might cause the client to sleep forever."
msg += "Reset to %d."
self.min_rate_to_sleep = self.MAX_RATE_LIMIT
logger.warning(msg, min_rate_to_sleep, self.MAX_RATE_LIMIT)
else:
self.min_rate_to_sleep = min_rate_to_sleep
[docs] def sleep_for_rate_limit(self):
"""The fetching process sleeps until the rate limit is restored or
raises a RateLimitError exception if sleep_for_rate flag is disabled.
"""
if self.rate_limit is not None and self.rate_limit <= self.min_rate_to_sleep:
seconds_to_reset = self.calculate_time_to_reset()
if seconds_to_reset < 0:
logger.warning("Value of sleep for rate limit is negative, reset it to 0")
seconds_to_reset = 0
cause = "Rate limit exhausted."
if self.sleep_for_rate:
logger.info("%s Waiting %i secs for rate limit reset.", cause, seconds_to_reset)
time.sleep(seconds_to_reset)
else:
raise RateLimitError(cause=cause, seconds_to_reset=seconds_to_reset)
[docs] def calculate_time_to_reset(self):
"""Calculate the seconds to reset the token requests."""
raise NotImplementedError
[docs] def update_rate_limit(self, response):
"""Update the rate limit and the time to reset
from the response headers.
:param: response: the response object
"""
if self.rate_limit_header in response.headers:
self.rate_limit = int(response.headers[self.rate_limit_header])
logger.debug("Rate limit: %s", self.rate_limit)
else:
self.rate_limit = None
if self.rate_limit_reset_header in response.headers:
self.rate_limit_reset_ts = int(response.headers[self.rate_limit_reset_header])
logger.debug("Rate limit reset: %s", self.calculate_time_to_reset())
else:
self.rate_limit_reset_ts = None