# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2020 Bitergia
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Valerio Cosentino <valcos@bitergia.com>
# Santiago DueƱas <sduenas@bitergia.com>
# Jesus M. Gonzalez-Barahona <jgb@gsyc.es>
#
import hashlib
import json
import logging
import os
import pickle
import sqlite3
import uuid
from grimoirelab_toolkit.datetime import (datetime_utcnow,
datetime_to_utc,
str_to_datetime)
from .errors import ArchiveError, ArchiveManagerError
logger = logging.getLogger(__name__)
[docs]class Archive:
"""Basic class for archiving raw items fetched by Perceval.
This class allows to archive raw items - usually HTML pages or
JSON documents - for a further recovery. These raw items will
be fetched, stored and retrieved back by a backend.
Each stored item will have a hash code used as unique identifier.
Hash codes are generated using URIs and other parameters needed
to fetch raw items.
When an instance of `Archive` is initialized it will expect
to access an existing archive file. To create a new and empty
archive used `create` class method instead. Metadata must be
initialized calling to `init_metadata` method after creating
a new archive.
:param archive_path: path where this archive is stored
:raises ArchiveError: when the archive does not exist or is invalid
"""
ARCHIVE_TABLE = "archive"
METADATA_TABLE = "metadata"
# Table structure
ARCHIVE_CREATE_STMT = "CREATE TABLE " + ARCHIVE_TABLE + " ( " \
"id INTEGER PRIMARY KEY AUTOINCREMENT, " \
"hashcode VARCHAR(256) UNIQUE NOT NULL, " \
"uri TEXT, " \
"payload BLOB, " \
"headers BLOB, " \
"data BLOB)"
METADATA_CREATE_STMT = "CREATE TABLE " + METADATA_TABLE + " ( " \
"origin TEXT, " \
"backend_name TEXT, " \
"backend_version TEXT, " \
"category TEXT, " \
"backend_params BLOB, " \
"created_on TEXT)"
def __init__(self, archive_path):
if not os.path.exists(archive_path):
raise ArchiveError(cause="archive %s does not exist" % (archive_path))
self.archive_path = archive_path
self.origin = None
self.backend_name = None
self.backend_version = None
self.category = None
self.backend_params = None
self.created_on = None
self._db = sqlite3.connect(self.archive_path)
self._verify_archive()
self._load_metadata()
def __del__(self):
conn = getattr(self, '_db', None)
if conn:
conn.close()
[docs] def store(self, uri, payload, headers, data):
"""Store a raw item in this archive.
The method will store `data` content in this archive. The unique
identifier for that item will be generated using the rest of the
parameters.
:param uri: request URI
:param payload: request payload
:param headers: request headers
:param data: data to store in this archive
:raises ArchiveError: when an error occurs storing the given data
"""
hashcode = self.make_hashcode(uri, payload, headers)
payload_dump = pickle.dumps(payload, 0)
headers_dump = pickle.dumps(headers, 0)
data_dump = pickle.dumps(data, 0)
logger.debug("Archiving %s with %s %s %s in %s",
hashcode, uri, payload, headers, self.archive_path)
try:
cursor = self._db.cursor()
insert_stmt = "INSERT INTO " + self.ARCHIVE_TABLE + " (" \
"id, hashcode, uri, payload, headers, data) " \
"VALUES(?,?,?,?,?,?)"
cursor.execute(insert_stmt, (None, hashcode, uri,
payload_dump, headers_dump, data_dump))
self._db.commit()
cursor.close()
except sqlite3.IntegrityError as e:
msg = "data storage error; cause: duplicated entry %s" % hashcode
raise ArchiveError(cause=msg)
except sqlite3.DatabaseError as e:
msg = "data storage error; cause: %s" % str(e)
raise ArchiveError(cause=msg)
logger.debug("%s data archived in %s", hashcode, self.archive_path)
[docs] def retrieve(self, uri, payload, headers):
"""Retrieve a raw item from the archive.
The method will return the `data` content corresponding to the
hascode derived from the given parameters.
:param uri: request URI
:param payload: request payload
:param headers: request headers
:returns: the archived data
:raises ArchiveError: when an error occurs retrieving data
"""
hashcode = self.make_hashcode(uri, payload, headers)
logger.debug("Retrieving entry %s with %s %s %s in %s",
hashcode, uri, payload, headers, self.archive_path)
self._db.row_factory = sqlite3.Row
try:
cursor = self._db.cursor()
select_stmt = "SELECT data " \
"FROM " + self.ARCHIVE_TABLE + " " \
"WHERE hashcode = ?"
cursor.execute(select_stmt, (hashcode,))
row = cursor.fetchone()
cursor.close()
except sqlite3.DatabaseError as e:
msg = "data retrieval error; cause: %s" % str(e)
raise ArchiveError(cause=msg)
if row:
found = pickle.loads(row['data'])
else:
msg = "entry %s not found in archive %s" % (hashcode, self.archive_path)
raise ArchiveError(cause=msg)
return found
[docs] @classmethod
def create(cls, archive_path):
"""Create a brand new archive.
Call this method to create a new and empty archive. It will initialize
the storage file in the path defined by `archive_path`.
:param archive_path: absolute path where the archive file will be created
:raises ArchiveError: when the archive file already exists
"""
if os.path.exists(archive_path):
msg = "archive %s already exists; remove it before creating a new one"
raise ArchiveError(cause=msg % (archive_path))
conn = sqlite3.connect(archive_path)
cursor = conn.cursor()
cursor.execute(cls.METADATA_CREATE_STMT)
cursor.execute(cls.ARCHIVE_CREATE_STMT)
conn.commit()
cursor.close()
conn.close()
logger.debug("Creating archive %s", archive_path)
archive = cls(archive_path)
logger.debug("Achive %s was created", archive_path)
return archive
[docs] @staticmethod
def make_hashcode(uri, payload, headers):
"""Generate a SHA1 based on the given arguments.
Hashcodes created by this method will used as unique identifiers
for the raw items or resources stored by this archive.
:param uri: URI to the resource
:param payload: payload of the request needed to fetch the resource
:param headers: headers of the request needed to fetch the resource
:returns: a SHA1 hash code
"""
def dict_to_json_str(data):
return json.dumps(data, sort_keys=True)
content = ':'.join([uri, dict_to_json_str(payload), dict_to_json_str(headers)])
hashcode = hashlib.sha1(content.encode('utf-8'))
return hashcode.hexdigest()
def _verify_archive(self):
"""Check whether the archive is valid or not.
This method will check if tables were created and if they
contain valid data.
"""
nentries = self._count_table_rows(self.ARCHIVE_TABLE)
nmetadata = self._count_table_rows(self.METADATA_TABLE)
if nmetadata > 1:
msg = "archive %s metadata corrupted; multiple metadata entries" % (self.archive_path)
raise ArchiveError(cause=msg)
if nmetadata == 0 and nentries > 0:
msg = "archive %s metadata is empty but %s entries were achived" % (self.archive_path)
raise ArchiveError(cause=msg)
logger.debug("Integrity of archive %s OK; entries: %s rows, metadata: %s rows",
self.archive_path, nentries, nmetadata)
def _load_metadata(self):
"""Load metadata from the archive file"""
logger.debug("Loading metadata infomation of archive %s", self.archive_path)
cursor = self._db.cursor()
select_stmt = "SELECT origin, backend_name, backend_version, " \
"category, backend_params, created_on " \
"FROM " + self.METADATA_TABLE + " " \
"LIMIT 1"
cursor.execute(select_stmt)
row = cursor.fetchone()
cursor.close()
if row:
self.origin = row[0]
self.backend_name = row[1]
self.backend_version = row[2]
self.category = row[3]
self.backend_params = pickle.loads(row[4])
self.created_on = str_to_datetime(row[5])
else:
logger.debug("Metadata of archive %s was empty", self.archive_path)
logger.debug("Metadata of archive %s loaded", self.archive_path)
def _count_table_rows(self, table_name):
"""Fetch the number of rows in a table"""
cursor = self._db.cursor()
select_stmt = "SELECT COUNT(*) FROM " + table_name
try:
cursor.execute(select_stmt)
row = cursor.fetchone()
except sqlite3.DatabaseError as e:
msg = "invalid archive file; cause: %s" % str(e)
raise ArchiveError(cause=msg)
finally:
cursor.close()
return row[0]
[docs]class ArchiveManager:
"""Manager for handling archives in Perceval.
This class manages the creation, deletion and access of `Archive`
objects. Archives are stored under `dirpath` directory, using
a random SHA1 for each file. The first byte of the hashcode will
be the name of the subdirectory; the remaining bytes, the archive
name.
:param: dirpath: path where the archives are stored
"""
STORAGE_EXT = '.sqlite3'
def __init__(self, dirpath):
self.dirpath = dirpath
if not os.path.exists(self.dirpath):
os.makedirs(self.dirpath)
[docs] def create_archive(self):
"""Create a new archive.
The method creates in the filesystem a brand new archive with
a random SHA1 as its name. The first byte of the hashcode will
be the name of the subdirectory; the remaining bytes, the
archive name.
:returns: a new `Archive` object
:raises ArchiveManagerError: when an error occurs creating the
new archive
"""
hashcode = uuid.uuid4().hex
archive_dir = os.path.join(self.dirpath, hashcode[0:2])
archive_name = hashcode[2:] + self.STORAGE_EXT
archive_path = os.path.join(archive_dir, archive_name)
if not os.path.exists(archive_dir):
os.makedirs(archive_dir)
try:
archive = Archive.create(archive_path)
except ArchiveError as e:
raise ArchiveManagerError(cause=str(e))
return archive
[docs] def remove_archive(self, archive_path):
"""Remove an archive.
This method deletes from the filesystem the archive stored
in `archive_path`.
:param archive_path: path to the archive
:raises ArchiveManangerError: when an error occurs removing the
archive
"""
try:
Archive(archive_path)
except ArchiveError as e:
raise ArchiveManagerError(cause=str(e))
os.remove(archive_path)
[docs] def search(self, origin, backend_name, category, archived_after):
"""Search archives.
Get the archives which store data based on the given parameters.
These parameters define which the origin was (`origin`), how data
was fetched (`backend_name`) and data type ('category').
Only those archives created on or after `archived_after` will be
returned.
The method returns a list with the file paths to those archives.
The list is sorted by the date of creation of each archive.
:param origin: data origin
:param backend_name: backed used to fetch data
:param category: type of the items fetched by the backend
:param archived_after: get archives created on or after this date
:returns: a list with archive names which match the search criteria
"""
archives = self._search_archives(origin, backend_name,
category, archived_after)
archives = [(fp, date) for fp, date in archives]
archives = [fp for fp, _ in sorted(archives, key=lambda x: x[1])]
return archives
def _search_archives(self, origin, backend_name, category, archived_after):
"""Search archives using filters."""
for archive_path in self._search_files():
try:
archive = Archive(archive_path)
except ArchiveError:
continue
match = archive.origin == origin and \
archive.backend_name == backend_name and \
archive.category == category and \
archive.created_on >= archived_after
if not match:
continue
yield archive_path, archive.created_on
def _search_files(self):
"""Retrieve the file paths stored under the base path."""
for root, _, files in os.walk(self.dirpath):
for filename in files:
location = os.path.join(root, filename)
yield location