# Virus Health Check: a validation tool for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2017 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Deal with python path and dynamic module and function load
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import inspect
import logging
import textwrap as tw
import warnings
import pkg_resources
import libvhc
import libvhc.reference_file_parser as vhcrfp
import libvhc.utils as vhcutil
[docs]def load_recipes(log, entry_point_group='vhc.recipes'):
"""Load the recipes advertised under the entry point group and fill the
``recipe_match`` and ``default_drivers`` dictionaries
Parameters
----------
log : :class:`logging.LoggerAdapter` or :class:`logging.Logger` instance
logger
entry_point_group : string, optional
name of the entry point group
Returns
-------
recipe_names : list of strings
name of the loaded recipes
"""
# get all the entry points
recipes = load_entrypoints(entry_point_group, log=log)
# try to execute the functions implementing the recipe and to fill the
# recipe_match and default_drivers dictionaries
recipe_names = []
for name, _, func in recipes:
try:
r_match, def_drivers = func(name)
except Exception:
log.error("Failed to execute the function describing the recipe"
" '%s'.", name, exc_info=True)
continue
vhcutil.recipe_match[name] = r_match
vhcutil.default_drivers[name] = def_drivers
recipe_names.append(name)
return recipe_names
[docs]def load_drivers(log, entry_point_group='vhc.drivers'):
"""Load the drivers advertised under the entry point group
Parameters
----------
log : :class:`logging.LoggerAdapter` or :class:`logging.Logger` instance
logger
entry_point_group : string, optional
name of the entry point group
Returns
-------
ddrivers : dict
dictionary mapping the driver names with the functions implementing
them
"""
# get all the entry points
drivers = load_entrypoints(entry_point_group, log=log)
# check that the driver signatures are consistent with the template and
# drop the ones that are not
ddrivers = {}
for k, _, v in drivers:
if not callable(v):
log.error("The driver '%s' is not associated to a callable."
" Ignoring it", k)
continue
if inspect.isfunction(v):
func = v
else:
func = v.__call__
try:
inspect.getcallargs(func, libvhc.VCheck(), '/a/path', [])
except TypeError as e:
log.error("The signature of the function or method implementing"
" the driver '%s' is wrong because of '%s'. Ignoring it",
k, e)
continue
ddrivers[k] = v
return ddrivers
[docs]def load_reference_file_parsers(log, entry_point_group='vhc.file_parsers'):
"""Load the classes implementing file parsers
Parameters
----------
group : string
name of the group to load
name : string, optional
name of the entry points to load
Returns
-------
names : list of strings
name of the loaded parsers
"""
# load the reference file parsers
parsers = load_entrypoints(entry_point_group, log=log)
# check that they have the correct signature and if they do, add them to
# the list of registered parsers
for k, _, v in parsers:
if not inspect.isclass(v):
log.error("The file parser '%s' is not a class."
" Ignoring it", k)
continue
try:
inspect.getcallargs(v.__init__, 'self', 'conf', 'log')
except TypeError as e:
log.error("The signature of the class implementing the file parser"
" '%s' is wrong because of '%s'. Ignoring it", k, e)
continue
vhcrfp._registered[k] = v
return list(vhcrfp._registered.keys())
[docs]def load_entrypoints(group, name=None, log=None):
"""Get all the entry points for the ``group`` and load them.
Parameters
----------
group : string
name of the group to load
name : string, optional
name of the entry points to load
log : :class:`logging.LoggerAdapter` or :class:`logging.Logger`, optional
if given, exception when loading entry points are logged as errors,
with the full traceback; if ``None`` the exception is re-raised
Returns
-------
entry_points : list
yields the entry points as a list of
[name, :class:`pkg_resources.EntryPoint`, loaded]
"""
for ep in pkg_resources.iter_entry_points(group, name=name):
apname = ep.name
try:
loaded = ep.load()
yield [apname, ep, loaded]
except Exception as e:
if log:
msg = ("Failed to load entry point: '%s' of group '%s',"
" because of '%s'")
log.error(msg, apname, group, e, exc_info=True)
continue
else:
raise
# implementation of vhc_plugins
[docs]def parse(argv=None):
"""Command line parser"""
import argparse as ap
p = ap.ArgumentParser(description='''List and basic introspection of
installed VHC plugins''',
parents=[vhcutil.common_parser_arguments(), ],
formatter_class=ap.ArgumentDefaultsHelpFormatter, )
p.add_argument('-d', '--description', help='''Add '%(dest)s' lines from
the plugins doc-string''', type=int, default=0)
p.add_argument('-w', "--what", help="""What to show""",
choices=['all', 'recipes', 'drivers', 'parsers'],
default='all')
p.add_argument('-f', '--full-exception', help="""When the loading or the
validation fails print the full exception instead of just
the error message""", action='store_true')
p.add_argument('-v', '--validate', help="""Load the plugins through the
default mechanism, which performs validation. The validation
is done after printing the names and description""",
action='store_true')
p.add_argument('-r', '--return-value', action='store_true',
help='''If the validation is enabled, print the return value
of the plugins. Available for the type 'recipes'.''')
return p.parse_args(args=argv)
[docs]def _print(name, doc, n_lines=3):
"""Print the name and n_lines of the documentation"""
print(" "*3 + "*", name)
indent = " "*6
if not doc and n_lines > 0:
print(indent, "No documentation available")
elif n_lines > 0:
doc_ = doc.split('\n')[:n_lines]
if "\n".join(doc_) != doc:
doc_[-1] += " [...]"
doc_ = " ".join(doc_)
# remove multiple whitespaces
doc_ = " ".join(doc_.split())
doc_ = tw.wrap(doc_, initial_indent=indent, subsequent_indent=indent)
print('\n'.join(doc_))
[docs]class ListStream(list):
"""Wrap a list into a class with a :meth:`write` and :meth:`flush`, so that
it's possible to use it as stream handler"""
[docs] def write(self, val):
"""Append the value"""
self.append(val)
[docs] def flush(self):
"""don't need to flush a list"""
pass
[docs]def _logger():
"""Create a logger with a string IO handler and return it and the """
log = logging.getLogger('vhc_plugin')
log.handlers = []
# make the handler
stream = ListStream()
h = logging.StreamHandler(stream=stream)
h.setFormatter(logging.Formatter(fmt='[%(levelname)s] %(message)s'))
log.addHandler(h)
return log, stream
[docs]def _report(args, what, name, load_func):
"""Load the entry points and print the report
Parameters
----------
args : Namespace
parsed command line
what : string
name of the group to report
name : string
name of the entry point group
load_func : callable
function used to do the loading and the checking
"""
what = what.lower()
log, stream = _logger()
print("{}:".format(what.capitalize()))
for i, ep in enumerate(load_entrypoints(name, log=log)):
_print(ep[0], ep[2].__doc__, n_lines=args.description)
_print_exception(stream, full_exception=args.full_exception)
if args.validate:
log, stream_validate = _logger()
loaded = load_func(log)
if len(loaded) == i + 1:
print("All recipes have been correctly loaded")
else:
print("Only the following recipes has been loaded:"
" '{}'".format(" ".join(loaded)))
_print_exception(stream_validate, full_exception=args.full_exception,
old_log=stream)
[docs]def _print_exception(stream, full_exception=False, old_log=None):
"""Get the value from the stream and print it
If ``full_exception`` is False, print only the error message.
Parameters
----------
stream : :class:`ListStream`
stream containing the information to print
full_exception : bool, optional
whether to print the whole stream, or only the lines containing
``[ERROR]``
old_log : string, optional
if given, don't report thing already contained in old_log
"""
logs = stream
if logs and old_log:
# remove the entries already present in old logs
for ol in old_log:
try:
logs.remove(ol)
except ValueError:
pass # no problem if ``ol`` does not exist
if logs:
print('\nThe following exception(s) happened while loading:')
if full_exception:
print('\n'.join(logs))
else:
for l in logs:
print(l.split('\n')[0])
[docs]def _recipe_values():
'''Print the content of the dictionaries with the file matches and
default drivers for the recipes'''
try:
max_len_key = max(len(k) for k in vhcutil.recipe_match.keys()) + 2
max_len_match = max(len(k) for k in vhcutil.recipe_match.values()) + 2
except ValueError:
print('No recipe loaded')
return
try:
max_len_driver = max(len(d)
for drivers in vhcutil.default_drivers.values()
for d in drivers) + 2
except ValueError:
max_len_driver = 10
seps = '|{{0:-^{lk}}}|{{0:-^{lm}}}|{{0:-^{ld}}}|'
seps = seps.format(lk=max_len_key, lm=max_len_match, ld=max_len_driver)
seps = seps.format('')
table_row = '|{{0:^{lk}}}|{{1:^{lm}}}|{{2:^{ld}}}|'
table_row = table_row.format(lk=max_len_key, lm=max_len_match,
ld=max_len_driver)
print(seps)
print(table_row.format('recipe', 'match', 'drivers'))
print(seps)
for r, match in vhcutil.recipe_match.items():
drivers = vhcutil.default_drivers[r]
if drivers:
d0 = drivers[0]
else:
d0 = 'No drivers'
print(table_row.format(r, match, d0))
for d in drivers[1:]:
print(table_row.format('', '', d))
print(seps)
[docs]def main(argv=None):
"""Print to screen available plugins"""
args = parse(argv=argv)
print("List of available VHC plugins")
print()
spacer = "\n" + "-"*10 + "\n"
if args.what in ['all', 'recipes']:
_report(args, "recipes", 'vhc.recipes', load_recipes)
if args.validate and args.return_value:
_recipe_values()
if args.what in ['all', 'drivers']:
if args.what == 'all':
print(spacer)
_report(args, "drivers", 'vhc.drivers', load_drivers)
if args.what in ['all', 'parsers']:
if args.what == 'all':
print(spacer)
_report(args, "reference file parsers", 'vhc.file_parsers',
load_reference_file_parsers)
print()