Source code for libvhc.loaders

# Virus Health Check: a validation tool for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2017  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""Deal with python path and dynamic module and function load
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import inspect
import logging
import textwrap as tw
import warnings

import pkg_resources

import libvhc
import libvhc.reference_file_parser as vhcrfp
import libvhc.utils as vhcutil


[docs]def load_recipes(log, entry_point_group='vhc.recipes'): """Load the recipes advertised under the entry point group and fill the ``recipe_match`` and ``default_drivers`` dictionaries Parameters ---------- log : :class:`logging.LoggerAdapter` or :class:`logging.Logger` instance logger entry_point_group : string, optional name of the entry point group Returns ------- recipe_names : list of strings name of the loaded recipes """ # get all the entry points recipes = load_entrypoints(entry_point_group, log=log) # try to execute the functions implementing the recipe and to fill the # recipe_match and default_drivers dictionaries recipe_names = [] for name, _, func in recipes: try: r_match, def_drivers = func(name) except Exception: log.error("Failed to execute the function describing the recipe" " '%s'.", name, exc_info=True) continue vhcutil.recipe_match[name] = r_match vhcutil.default_drivers[name] = def_drivers recipe_names.append(name) return recipe_names
[docs]def load_drivers(log, entry_point_group='vhc.drivers'): """Load the drivers advertised under the entry point group Parameters ---------- log : :class:`logging.LoggerAdapter` or :class:`logging.Logger` instance logger entry_point_group : string, optional name of the entry point group Returns ------- ddrivers : dict dictionary mapping the driver names with the functions implementing them """ # get all the entry points drivers = load_entrypoints(entry_point_group, log=log) # check that the driver signatures are consistent with the template and # drop the ones that are not ddrivers = {} for k, _, v in drivers: if not callable(v): log.error("The driver '%s' is not associated to a callable." " Ignoring it", k) continue if inspect.isfunction(v): func = v else: func = v.__call__ try: inspect.getcallargs(func, libvhc.VCheck(), '/a/path', []) except TypeError as e: log.error("The signature of the function or method implementing" " the driver '%s' is wrong because of '%s'. Ignoring it", k, e) continue ddrivers[k] = v return ddrivers
[docs]def load_reference_file_parsers(log, entry_point_group='vhc.file_parsers'): """Load the classes implementing file parsers Parameters ---------- group : string name of the group to load name : string, optional name of the entry points to load Returns ------- names : list of strings name of the loaded parsers """ # load the reference file parsers parsers = load_entrypoints(entry_point_group, log=log) # check that they have the correct signature and if they do, add them to # the list of registered parsers for k, _, v in parsers: if not inspect.isclass(v): log.error("The file parser '%s' is not a class." " Ignoring it", k) continue try: inspect.getcallargs(v.__init__, 'self', 'conf', 'log') except TypeError as e: log.error("The signature of the class implementing the file parser" " '%s' is wrong because of '%s'. Ignoring it", k, e) continue vhcrfp._registered[k] = v return list(vhcrfp._registered.keys())
[docs]def load_entrypoints(group, name=None, log=None): """Get all the entry points for the ``group`` and load them. Parameters ---------- group : string name of the group to load name : string, optional name of the entry points to load log : :class:`logging.LoggerAdapter` or :class:`logging.Logger`, optional if given, exception when loading entry points are logged as errors, with the full traceback; if ``None`` the exception is re-raised Returns ------- entry_points : list yields the entry points as a list of [name, :class:`pkg_resources.EntryPoint`, loaded] """ for ep in pkg_resources.iter_entry_points(group, name=name): apname = ep.name try: loaded = ep.load() yield [apname, ep, loaded] except Exception as e: if log: msg = ("Failed to load entry point: '%s' of group '%s'," " because of '%s'") log.error(msg, apname, group, e, exc_info=True) continue else: raise
# implementation of vhc_plugins
[docs]def parse(argv=None): """Command line parser""" import argparse as ap p = ap.ArgumentParser(description='''List and basic introspection of installed VHC plugins''', parents=[vhcutil.common_parser_arguments(), ], formatter_class=ap.ArgumentDefaultsHelpFormatter, ) p.add_argument('-d', '--description', help='''Add '%(dest)s' lines from the plugins doc-string''', type=int, default=0) p.add_argument('-w', "--what", help="""What to show""", choices=['all', 'recipes', 'drivers', 'parsers'], default='all') p.add_argument('-f', '--full-exception', help="""When the loading or the validation fails print the full exception instead of just the error message""", action='store_true') p.add_argument('-v', '--validate', help="""Load the plugins through the default mechanism, which performs validation. The validation is done after printing the names and description""", action='store_true') p.add_argument('-r', '--return-value', action='store_true', help='''If the validation is enabled, print the return value of the plugins. Available for the type 'recipes'.''') return p.parse_args(args=argv)
[docs]def _print(name, doc, n_lines=3): """Print the name and n_lines of the documentation""" print(" "*3 + "*", name) indent = " "*6 if not doc and n_lines > 0: print(indent, "No documentation available") elif n_lines > 0: doc_ = doc.split('\n')[:n_lines] if "\n".join(doc_) != doc: doc_[-1] += " [...]" doc_ = " ".join(doc_) # remove multiple whitespaces doc_ = " ".join(doc_.split()) doc_ = tw.wrap(doc_, initial_indent=indent, subsequent_indent=indent) print('\n'.join(doc_))
[docs]class ListStream(list): """Wrap a list into a class with a :meth:`write` and :meth:`flush`, so that it's possible to use it as stream handler"""
[docs] def write(self, val): """Append the value""" self.append(val)
[docs] def flush(self): """don't need to flush a list""" pass
[docs]def _logger(): """Create a logger with a string IO handler and return it and the """ log = logging.getLogger('vhc_plugin') log.handlers = [] # make the handler stream = ListStream() h = logging.StreamHandler(stream=stream) h.setFormatter(logging.Formatter(fmt='[%(levelname)s] %(message)s')) log.addHandler(h) return log, stream
[docs]def _report(args, what, name, load_func): """Load the entry points and print the report Parameters ---------- args : Namespace parsed command line what : string name of the group to report name : string name of the entry point group load_func : callable function used to do the loading and the checking """ what = what.lower() log, stream = _logger() print("{}:".format(what.capitalize())) for i, ep in enumerate(load_entrypoints(name, log=log)): _print(ep[0], ep[2].__doc__, n_lines=args.description) _print_exception(stream, full_exception=args.full_exception) if args.validate: log, stream_validate = _logger() loaded = load_func(log) if len(loaded) == i + 1: print("All recipes have been correctly loaded") else: print("Only the following recipes has been loaded:" " '{}'".format(" ".join(loaded))) _print_exception(stream_validate, full_exception=args.full_exception, old_log=stream)
[docs]def _print_exception(stream, full_exception=False, old_log=None): """Get the value from the stream and print it If ``full_exception`` is False, print only the error message. Parameters ---------- stream : :class:`ListStream` stream containing the information to print full_exception : bool, optional whether to print the whole stream, or only the lines containing ``[ERROR]`` old_log : string, optional if given, don't report thing already contained in old_log """ logs = stream if logs and old_log: # remove the entries already present in old logs for ol in old_log: try: logs.remove(ol) except ValueError: pass # no problem if ``ol`` does not exist if logs: print('\nThe following exception(s) happened while loading:') if full_exception: print('\n'.join(logs)) else: for l in logs: print(l.split('\n')[0])
[docs]def _recipe_values(): '''Print the content of the dictionaries with the file matches and default drivers for the recipes''' try: max_len_key = max(len(k) for k in vhcutil.recipe_match.keys()) + 2 max_len_match = max(len(k) for k in vhcutil.recipe_match.values()) + 2 except ValueError: print('No recipe loaded') return try: max_len_driver = max(len(d) for drivers in vhcutil.default_drivers.values() for d in drivers) + 2 except ValueError: max_len_driver = 10 seps = '|{{0:-^{lk}}}|{{0:-^{lm}}}|{{0:-^{ld}}}|' seps = seps.format(lk=max_len_key, lm=max_len_match, ld=max_len_driver) seps = seps.format('') table_row = '|{{0:^{lk}}}|{{1:^{lm}}}|{{2:^{ld}}}|' table_row = table_row.format(lk=max_len_key, lm=max_len_match, ld=max_len_driver) print(seps) print(table_row.format('recipe', 'match', 'drivers')) print(seps) for r, match in vhcutil.recipe_match.items(): drivers = vhcutil.default_drivers[r] if drivers: d0 = drivers[0] else: d0 = 'No drivers' print(table_row.format(r, match, d0)) for d in drivers[1:]: print(table_row.format('', '', d)) print(seps)
[docs]def main(argv=None): """Print to screen available plugins""" args = parse(argv=argv) print("List of available VHC plugins") print() spacer = "\n" + "-"*10 + "\n" if args.what in ['all', 'recipes']: _report(args, "recipes", 'vhc.recipes', load_recipes) if args.validate and args.return_value: _recipe_values() if args.what in ['all', 'drivers']: if args.what == 'all': print(spacer) _report(args, "drivers", 'vhc.drivers', load_drivers) if args.what in ['all', 'parsers']: if args.what == 'all': print(spacer) _report(args, "reference file parsers", 'vhc.file_parsers', load_reference_file_parsers) print()