# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2020 Bitergia
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Aditya Prajapati <aditya10699@gmail.com>
# Animesh Kumar <animuz111@gmail.com>
#
import logging
import json
from grimoirelab_toolkit.uris import urijoin
from grimoirelab_toolkit.datetime import (datetime_utcnow,
datetime_to_utc,
str_to_datetime)
from ...backend import (Backend,
BackendCommand,
BackendCommandArgumentParser)
from ...client import HttpClient, RateLimitHandler
from ...utils import DEFAULT_DATETIME
CATEGORY_MESSAGE = "message"
API_EXTENSION = "/api/v1/"
MIN_RATE_LIMIT = 10
MAX_ITEMS = 100
logger = logging.getLogger(__name__)
[docs]class RocketChat(Backend):
"""Rocket.Chat backend.
This class allows to fetch messages from a channel(room) on a Rocket.Chat server.
An API token and a User Id is required to access the server.
:param url: server url from where messages are to be fetched
:param channel: name of the channel from where data will be fetched
:param user_id: generated User Id using your Rocket.Chat account
:param api_token: token needed to use the API
:param max_items: maximum number of message requested on the same query
:param sleep_for_rate: sleep until rate limit is reset
:param min_rate_to_sleep: minimum rate needed to sleep until
it will be reset
:param tag: label used to mark the data
:param archive: archive to store/retrieve items
:param ssl_verify: enable/disable SSL verification
"""
version = '0.1.0'
CATEGORIES = [CATEGORY_MESSAGE]
EXTRA_SEARCH_FIELDS = {
'channel_name': ['channel_info', 'name'],
'channel_id': ['channel_info', '_id']
}
def __init__(self, url, channel, user_id, api_token, max_items=MAX_ITEMS,
sleep_for_rate=False, min_rate_to_sleep=MIN_RATE_LIMIT,
tag=None, archive=None, ssl_verify=True):
origin = urijoin(url, channel)
super().__init__(origin, tag=tag, archive=archive, ssl_verify=ssl_verify)
self.url = url
self.channel = channel
self.user_id = user_id
self.api_token = api_token
self.max_items = max_items
self.sleep_for_rate = sleep_for_rate
self.min_rate_to_sleep = min_rate_to_sleep
self.client = None
[docs] def fetch(self, category=CATEGORY_MESSAGE, from_date=DEFAULT_DATETIME, filter_classified=False):
"""Fetch the messages from the channel.
This method fetches the messages stored on the channel that were
sent since the given date.
:param category: the category of items to fetch
:param from_date: obtain messages sent since this date
:param filter_classified: remove classified fields from the resulting items
:returns: a generator of messages
"""
if not from_date:
from_date = DEFAULT_DATETIME
from_date = datetime_to_utc(from_date)
kwargs = {'from_date': from_date}
items = super().fetch(category, **kwargs)
return items
[docs] def fetch_items(self, category, **kwargs):
"""Fetch the messages.
:param category: the category of items to fetch
:param kwargs: backend arguments
:returns: a generator of items
"""
from_date = kwargs['from_date']
logger.info("Fetching messages of channel: %s from date: %s",
self.channel, from_date)
raw_channel_info = self.client.channel_info(self.channel)
channel_info = self.parse_channel_info(raw_channel_info)
fetching = True
nmsgs = 0
offset = 0
while fetching:
raw_messages = self.client.messages(self.channel, from_date, offset)
messages, total = self.parse_messages(raw_messages)
for message in messages:
message["channel_info"] = channel_info
nmsgs += 1
yield message
offset += len(messages)
if offset == total:
fetching = False
logger.info("Fetch process completed: %s message fetched", nmsgs)
[docs] @staticmethod
def parse_messages(raw_messages):
"""Parse a channel messages JSON stream.
This method parses a JSON stream, containing the
history of a channel. It returns a list of messages
and the total messages count in that channel.
:param raw_messages: JSON string to parse
:returns: a tuple with a list of dicts with the parsed messages
and a total messages count in the channel.
"""
result = json.loads(raw_messages)
return result['messages'], result['total']
[docs] @staticmethod
def parse_channel_info(raw_channel_info):
"""Parse a channel's information JSON stream.
This method parses a JSON stream, containing the information
of the channel, and returns a dict with the parsed data.
:param raw_channel_info: JSON string to parse
:returns: a dict with the parsed channel's information
"""
result = json.loads(raw_channel_info)
return result['channel']
[docs] @classmethod
def has_archiving(cls):
"""Returns whether it supports archiving items on the fetch process.
:returns: this backend supports items archive
"""
return True
[docs] @classmethod
def has_resuming(cls):
"""Returns whether it supports to resume the fetch process.
:returns: this backend supports items resuming
"""
return True
def _init_client(self, from_archive=False):
"""Init client"""
return RocketChatClient(self.url, self.user_id, self.api_token,
self.max_items, self.sleep_for_rate,
self.min_rate_to_sleep, from_archive, self.archive, self.ssl_verify)
[docs]class RocketChatClient(HttpClient, RateLimitHandler):
"""Rocket.Chat API client.
Client for fetching information from the Rocket.Chat server
using its REST API.
:param url: server url from where messages are to be fetched
:param user_id: generated User Id using your Rocket.Chat account
:param api_token: token needed to use the API
:param max_items: maximum number of message requested on the same query
:param sleep_for_rate: sleep until rate limit is reset
:param min_rate_to_sleep: minimum rate needed to sleep until
it will be reset
:param from_archive: it tells whether to write/read the archive
:param archive: archive to store/retrieve items
:param ssl_verify: enable/disable SSL verification
"""
RCHANNEL_MESSAGES = 'channels.messages'
RCHANNEL_INFO = 'channels.info'
HAUTH_TOKEN = 'X-Auth-Token'
HUSER_ID = 'X-User-Id'
PCHANNEL_NAME = 'roomName'
PCOUNT = "count"
POLDEST = "oldest"
def __init__(self, url, user_id, api_token, max_items=MAX_ITEMS,
sleep_for_rate=False, min_rate_to_sleep=MIN_RATE_LIMIT,
from_archive=False, archive=None, ssl_verify=True):
base_url = urijoin(url, API_EXTENSION)
self.user_id = user_id
self.api_token = api_token
self.max_items = max_items
super().__init__(base_url, archive=archive, from_archive=from_archive,
ssl_verify=ssl_verify)
super().setup_rate_limit_handler(sleep_for_rate=sleep_for_rate, min_rate_to_sleep=min_rate_to_sleep)
[docs] def calculate_time_to_reset(self):
"""Number of seconds to wait. They are contained in the rate limit reset header."""
time_to_reset = self.rate_limit_reset_ts - (datetime_utcnow().replace(microsecond=0).timestamp() + 1) * 1000
time_to_reset /= 1000
if time_to_reset < 0:
time_to_reset = 0
return time_to_reset
[docs] def channel_info(self, channel):
"""Fetch information about a channel."""
params = {
self.PCHANNEL_NAME: channel,
}
path = urijoin(self.base_url, self.RCHANNEL_INFO)
response = self.fetch(path, params)
return response
[docs] def messages(self, channel, from_date, offset):
"""Fetch messages from a channel.
The messages are fetch in ascending order i.e. from the oldest
to the latest based on the time they were last updated. A query is
also passed as a param to fetch the messages from a given date.
"""
query = '{"_updatedAt": {"$gte": {"$date": "%s"}}}' % from_date.isoformat()
# The 'sort' param accepts a field based on which the messages are sorted.
# The value of the field can be 1 for ascending order or -1 for descending order.
params = {
"roomName": channel,
"sort": '{"_updatedAt": 1}',
"count": self.max_items,
"offset": offset,
"query": query
}
path = urijoin(self.base_url, self.RCHANNEL_MESSAGES)
response = self.fetch(path, params)
return response
[docs] def fetch(self, url, payload=None, headers=None):
"""Fetch the data from a given URL.
:param url: link to the resource
:param payload: payload of the request
:param headers: headers of the request
:returns a response object
"""
headers = {
self.HAUTH_TOKEN: self.api_token,
self.HUSER_ID: self.user_id
}
logger.debug("Rocket.Chat client message request with params: %s", str(payload))
if not self.from_archive:
self.sleep_for_rate_limit()
response = super().fetch(url, payload, headers=headers)
if not self.from_archive:
self.update_rate_limit(response)
return response.text
[docs] @staticmethod
def sanitize_for_archive(url, headers, payload):
"""Sanitize payload of a HTTP request by removing the token and
user id information before storing/retrieving archived items.
:param: url: HTTP url request
:param: headers: HTTP headers request
:param: payload: HTTP payload request
:returns: url, headers and the sanitized payload
"""
if RocketChatClient.HAUTH_TOKEN in headers:
headers.pop(RocketChatClient.HAUTH_TOKEN)
if RocketChatClient.HUSER_ID in headers:
headers.pop(RocketChatClient.HUSER_ID)
return url, headers, payload
[docs]class RocketChatCommand(BackendCommand):
"""Class to run Rocket.Chat backend from the command line."""
BACKEND = RocketChat
[docs] @classmethod
def setup_cmd_parser(cls):
"""Returns the Rocket.Chat argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND,
from_date=True,
token_auth=True,
archive=True,
ssl_verify=True)
# Backend token is required
action = parser.parser._option_string_actions['--api-token']
action.required = True
parser.parser.add_argument('-u', '--user-id', dest='user_id',
required=True,
help="User Id to fetch messages")
# Required positional arguments
parser.parser.add_argument('url',
help="URL of the Rocket.Chat server")
parser.parser.add_argument('channel',
help="Rocket.Chat channel(room) name")
# Rocket.Chat options
group = parser.parser.add_argument_group('Rocket.Chat arguments')
group.add_argument('--max-items', dest='max_items',
type=int, default=MAX_ITEMS,
help="Maximum number of items requested on the same query")
group.add_argument('--sleep-for-rate', dest='sleep_for_rate',
action='store_true',
help="sleep for getting more rate")
group.add_argument('--min-rate-to-sleep', dest='min_rate_to_sleep',
default=MIN_RATE_LIMIT, type=int,
help="sleep until reset when the rate limit reaches this value")
return parser