# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2020 Bitergia
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Santiago DueƱas <sduenas@bitergia.com>
# Jesus M. Gonzalez-Barahona <jgb@gsyc.es>
# Valerio Cosentino <valcos@bitergia.com>
# Harshal Mittal <harshalmittal4@gmail.com>
#
import json
import logging
from grimoirelab_toolkit.datetime import datetime_to_utc, datetime_utcnow
from grimoirelab_toolkit.uris import urijoin
from ...backend import (Backend,
BackendCommand,
BackendCommandArgumentParser)
from ...client import HttpClient, RateLimitHandler
from ...utils import DEFAULT_DATETIME
logger = logging.getLogger(__name__)
CATEGORY_POST = "post"
MAX_ITEMS = 60
# Range before sleeping until rate limit reset
MIN_RATE_LIMIT = 10
MAX_RATE_LIMIT = 500
# Default sleep time to deal with connection/server problems
DEFAULT_SLEEP_TIME = 1
[docs]class Mattermost(Backend):
"""Mattermost backend.
This class retrieves the posts sent to a Mattermost channel.
To access the server an API token is required, which must
have enough permissions to read from the given channel.
To initialize this class the URL of the server must be provided.
The origin of data will be set using this `url` plus the channel
from data is obtained (i.e: https://mattermost.example.com/abcdefg).
If using channel and team names instead of a channel id, this will
take the form `url` plus `team` plus `channel`.
The `team` parameter is only required if providing a channel
name instead of a channel ID.
:param url: URL of the server
:param channel: identifier/name of the channel where data will be fetched
:param api_token: token or key needed to use the API
:param max_items: maximum number of message requested on the same query
:param tag: label used to mark the data
:param archive: archive to store/retrieve items
:param team: (optional) The name of the team the channel is in
:param sleep_for_rate: sleep until rate limit is reset
:param min_rate_to_sleep: minimun rate needed to sleep until
it will be reset
:param sleep_time: time (in seconds) to sleep in case
of connection problems
:param ssl_verify: enable/disable SSL verification
"""
version = '0.5.0'
CATEGORIES = [CATEGORY_POST]
EXTRA_SEARCH_FIELDS = {
'channel_id': ['channel_data', 'id'],
'channel_name': ['channel_data', 'name'],
}
def __init__(self, url, channel, api_token, max_items=MAX_ITEMS,
tag=None, archive=None, team=None,
sleep_for_rate=False, min_rate_to_sleep=MIN_RATE_LIMIT,
sleep_time=DEFAULT_SLEEP_TIME, ssl_verify=True):
if team is not None:
origin = urijoin(url, team, channel)
else:
origin = urijoin(url, channel)
super().__init__(origin, tag=tag, archive=archive, ssl_verify=ssl_verify)
self.url = url
self.team = team
self.channel = channel
self.api_token = api_token
self.max_items = max_items
self.sleep_for_rate = sleep_for_rate
self.min_rate_to_sleep = min_rate_to_sleep
self.sleep_time = sleep_time
self.client = None
self._users = {}
[docs] def fetch(self, category=CATEGORY_POST, from_date=DEFAULT_DATETIME):
"""Fetch the posts from the channel.
This method fetches the posts stored on the channel that were
sent since the given date.
:param category: the category of items to fetch
:param from_date: obtain posts sent since this date
:returns: a generator of posts
"""
if not from_date:
from_date = DEFAULT_DATETIME
from_date = datetime_to_utc(from_date)
kwargs = {'from_date': from_date}
items = super().fetch(category, **kwargs)
return items
[docs] def fetch_items(self, category, **kwargs):
"""Fetch the messages.
:param category: the category of items to fetch
:param kwargs: backend arguments
:returns: a generator of items
"""
from_date = kwargs['from_date']
logger.info("Fetching messages of '%s' - '%s' channel from %s",
self.url, self.channel, str(from_date))
fetching = True
page = 0
nposts = 0
# If `self.team` is not None, that means that the both the `self.team` and
# `self.channel` are names instead of IDs. If this is the case, we want to
# perform a lookup by name instead of by ID, and then we can extract the channel
# ID.
if self.team is not None:
channel_info_raw = self.client.channel_by_name(self.team, self.channel)
channel_info = self.parse_json(channel_info_raw)
self.channel = channel_info['id']
self.team = None # Mark that we've looked up the channel
else:
channel_info_raw = self.client.channel(self.channel)
channel_info = self.parse_json(channel_info_raw)
# Convert timestamp to integer for comparing
since = int(from_date.timestamp() * 1000)
while fetching:
raw_posts = self.client.posts(self.channel, page=page)
posts_before = nposts
for post in self._parse_posts(raw_posts):
if post['update_at'] < since:
fetching = False
break
if 'metadata' in post and 'images' in post['metadata']:
post['metadata']['images'] = self._parse_images(post['metadata']['images'])
# Fetch user data
user_id = post['user_id']
user = self._get_or_fetch_user(user_id)
post['user_data'] = user
post['channel_data'] = channel_info
yield post
nposts += 1
if fetching:
# If no new posts were fetched; stop the process
if posts_before == nposts:
fetching = False
else:
page += 1
logger.info("Fetch process completed: %s posts fetched", nposts)
[docs] @classmethod
def has_archiving(cls):
"""Returns whether it supports archiving items on the fetch process.
:returns: this backend supports items archive
"""
return True
[docs] @classmethod
def has_resuming(cls):
"""Returns whether it supports to resume the fetch process.
:returns: this backend does not support items resuming
"""
return False
[docs] @staticmethod
def parse_json(raw_json):
"""Parse a Mattermost JSON stream.
The method parses a JSON stream and returns a
dict with the parsed data.
:param raw_json: JSON string to parse
:returns: a dict with the parsed data
"""
result = json.loads(raw_json)
return result
def _init_client(self, from_archive=False):
"""Init client"""
return MattermostClient(self.url, self.api_token,
max_items=self.max_items,
sleep_for_rate=self.sleep_for_rate,
min_rate_to_sleep=self.min_rate_to_sleep,
sleep_time=self.sleep_time,
archive=self.archive, from_archive=from_archive,
ssl_verify=self.ssl_verify)
def _parse_posts(self, raw_posts):
"""Parse posts and returns in order."""
parsed_posts = self.parse_json(raw_posts)
# Posts are not sorted. The order is provided by
# 'order' key.
for post_id in parsed_posts['order']:
yield parsed_posts['posts'][post_id]
def _parse_images(self, images):
"""Parse images and returns a list of images."""
list_images = [{**images[i], **{'url': i}} for i in images]
return list_images
def _get_or_fetch_user(self, user_id):
if user_id in self._users:
return self._users[user_id]
logger.debug("User %s not found on client cache; fetching it", user_id)
raw_user = self.client.user(user_id)
user = self.parse_json(raw_user)
self._users[user_id] = user
return user
[docs]class MattermostClient(HttpClient, RateLimitHandler):
"""Mattermost API client.
Client for fetching information from a Mattermost server
using its REST API.
:param base_url: URL of the Mattermost server
:param api_key: key needed to use the API
:param max_items: maximum number of items fetched per request
:param sleep_for_rate: sleep until rate limit is reset
:param min_rate_to_sleep: minimun rate needed to sleep until
it will be reset
:param sleep_time: time (in seconds) to sleep in case
of connection problems
:param archive: an archive to store/read fetched data
:param from_archive: it tells whether to write/read the archive
:param ssl_verify: enable/disable SSL verification
"""
API_URL = urijoin('%(base_url)s', 'api', 'v4', '%(entrypoint)s')
# API resources
RCHANNELS = 'channels'
RPOSTS = 'posts'
RUSERS = 'users'
RCHANNELS_BY_NAME = 'teams/name/%s/channels/name/%s'
# API headers
HAUTHORIZATION = 'Authorization'
# Resource parameters
PCHANNEL_ID = 'channel_id'
PPAGE = 'page'
PPER_PAGE = 'per_page'
def __init__(self, base_url, api_token, max_items=MAX_ITEMS,
sleep_for_rate=False, min_rate_to_sleep=MIN_RATE_LIMIT,
sleep_time=DEFAULT_SLEEP_TIME,
archive=None, from_archive=False, ssl_verify=True):
self.api_token = api_token
self.max_items = max_items
super().__init__(base_url.rstrip('/'),
sleep_time=sleep_time,
extra_headers=self._set_extra_headers(),
archive=archive, from_archive=from_archive, ssl_verify=ssl_verify)
super().setup_rate_limit_handler(sleep_for_rate=sleep_for_rate,
min_rate_to_sleep=min_rate_to_sleep)
[docs] def channel(self, channel):
"""Fetch the channel information"""
entrypoint = self.RCHANNELS + '/' + channel
params = {
self.PCHANNEL_ID: channel
}
response = self._fetch(entrypoint, params)
return response
[docs] def channel_by_name(self, team: str, channel: str):
"""Fetch the channel information by channel/team name
This provides identical information to the :func:`channel` method, with the key
difference of looking up a channel by channel name and team name instead of by the
channel ID.
"""
entrypoint = self.RCHANNELS_BY_NAME % (team, channel)
params = {}
response = self._fetch(entrypoint, params)
return response
[docs] def posts(self, channel, page=None):
"""Fetch the history of a channel."""
entrypoint = self.RCHANNELS + '/' + channel + '/' + self.RPOSTS
params = {
self.PPER_PAGE: self.max_items
}
if page is not None:
params[self.PPAGE] = page
response = self._fetch(entrypoint, params)
return response
[docs] def user(self, user):
"""Fetch user data."""
entrypoint = self.RUSERS + '/' + user
response = self._fetch(entrypoint, None)
return response
[docs] def fetch(self, url, payload=None, headers=None, method=HttpClient.GET, stream=False, auth=None):
"""Override fetch method to handle API rate limit.
:param url: link to the resource
:param payload: payload of the request
:param headers: headers of the request
:param method: type of request call (GET or POST)
:param stream: defer downloading the response body until the response content is available
:param auth: auth of the request
:returns a response object
"""
if not self.from_archive:
self.sleep_for_rate_limit()
response = super().fetch(url, payload, headers, method, stream)
if not self.from_archive:
self.update_rate_limit(response)
return response
[docs] def calculate_time_to_reset(self):
"""Number of seconds to wait.
The time is obtained by the different between the current date
and the next date when the token is fully regenerated.
"""
current_epoch = datetime_utcnow().replace(microsecond=0).timestamp() + 1
time_to_reset = self.rate_limit_reset_ts - current_epoch
if time_to_reset < 0:
time_to_reset = 0
return time_to_reset
def _fetch(self, entry_point, params):
"""Fetch a resource.
:param entrypoint: entrypoint to access
:param params: dict with the HTTP parameters needed to access the
given entry point
"""
url = self.API_URL % {'base_url': self.base_url, 'entrypoint': entry_point}
logger.debug("Mattermost client requests: %s params: %s",
entry_point, str(params))
r = self.fetch(url, payload=params)
return r.text
def _set_extra_headers(self):
"""Set authentication tokens."""
headers = {
self.HAUTHORIZATION: 'Bearer ' + self.api_token
}
return headers
[docs] @staticmethod
def sanitize_for_archive(url, headers, payload):
"""Sanitize payload of a HTTP request by removing the
token information before storing/retrieving archived items.
:param: url: HTTP url request
:param: headers: HTTP headers request
:param: payload: HTTP payload request
:returns url, headers and the sanitized payload
"""
if not headers:
return url, headers, payload
if MattermostClient.HAUTHORIZATION in headers:
headers.pop(MattermostClient.HAUTHORIZATION, None)
return url, headers, payload
[docs]class MattermostCommand(BackendCommand):
"""Class to run Mattermost backend from the command line."""
BACKEND = Mattermost
DESCRIPTION = 'Can either be called a channel ID, or a channel name. If '\
'a channel name is used, the team name is required. '\
'Otherwise, the team argument is ignored.'
[docs] @classmethod
def setup_cmd_parser(cls):
"""Returns the Meetup argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND,
from_date=True,
token_auth=True,
archive=True,
ssl_verify=True)
# Mattermost options
group = parser.parser.add_argument_group('Mattermost arguments', description=cls.DESCRIPTION)
group.add_argument('--max-items', dest='max_items',
type=int, default=MAX_ITEMS,
help="maximum number of items requested on the same query")
group.add_argument('--sleep-for-rate', dest='sleep_for_rate',
action='store_true',
help="sleep for getting more rate")
group.add_argument('--min-rate-to-sleep', dest='min_rate_to_sleep',
default=MIN_RATE_LIMIT, type=int,
help="sleep until reset when the rate limit reaches this value")
group.add_argument('--sleep-time', dest='sleep_time',
default=DEFAULT_SLEEP_TIME, type=int,
help="minimun sleeping time to avoid too many request exception")
# Positional arguments
parser.parser.add_argument('url',
help="URL of Mattermost server")
parser.parser.add_argument('channel',
help="channel id OR channel name")
parser.parser.add_argument('team',
help="team name (only if using channel name)", nargs='?', default=None)
return parser