# Virus Health Check: a validation tool for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Factory functions
A number of tests have a lot of machinery in common, so here we provide some
factory functions to avoid repetitions and bugs.
Examples
--------
>>> # create a driver with a custom function
>>> def foo(vcheck, fname, lpath):
... msg = "doing something on {} for the driver {}"
... print(msg.format(fname[lpath:], vcheck))
>>> a_function_driver = function_factory(foo)
>>> # create a driver with a command line call
>>> cmd = "ls -lh"
>>> ls_driver = command_line_factory(cmd)
>>> # also a list is accepted
>>> cmd = ["ls", "-lh"]
>>> ls_driver = command_line_factory(cmd)
>>> # if you want to use a cure tool, the command must be:
>>> cmd = "checkheader -E -r {recipe}"
>>> cure_driver = command_line_factory(cmd)
>>> # create a driver with a function returning the command line
>>> def cmd_func(vcheck, fname, log, conf):
... cmd = "ls -lh"
... return cmd
>>> ls_driver = command_func_factory(cmd_func)
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import os
import shlex
import subprocess as sp
import types
import six
from six.moves import copyreg
import pyhetdex.tools.files.file_tools as ft
import pyhetdex.tools.processes as proc
from pyhetdex.tools import six_ext
import libvhc.config as vhcconf
import libvhc.exceptions as vhcexcept
import libvhc.loggers as vhclog
import libvhc.html as vhchtml
import libvhc.utils as vhcutils
# defaults
FILES_IFU = 4
"number of files per ifu"
N_EXP_OPTION = 'n_exposures'
[docs]class function_factory(object):
"""Gives the number of expected files per IFU to the html interface, loops
through the files for the recipe at hand, and call ``func``.
Parameters
----------
func : callable
function to execute::
func(vcheck, fname, lpath)
+------+-------------------------------------------------------------+
|**vcheck** : instance of :class:`~libvhc.VCheck` |
+------+-------------------------------------------------------------+
| | store the recipe name and the check currently executing |
+------+-------------------------------------------------------------+
|**fname** : string |
+------+-------------------------------------------------------------+
| | name of the file to check |
+------+-------------------------------------------------------------+
|**lpath** : int |
+------+-------------------------------------------------------------+
| | lenght of the path in ``fname`` to remove for the html |
| | interface |
+------+-------------------------------------------------------------+
files_IFU : int
expected files per IFU. Should always be 4.
n_exp_option : string
name of the option containing the number of exposures. This option must
be present in the section of the recipe at hand.
Returns
-------
driver : callable
implementation of the driver. For the signature see
:func:`~libvhc.function_signature`; ``args`` and ``kwargs`` are not
implemented here.
"""
def __init__(self, func, files_IFU=FILES_IFU,
n_exp_option=N_EXP_OPTION):
self.func = func
self.files_IFU = files_IFU
self.n_exp_option = n_exp_option
[docs] def __call__(self, vcheck, path, argv):
"""Execute the driver
Parameters
----------
vcheck : instance of :class:`~libvhc.VCheck`
store the recipe name and the check currently executing
path: string
path provided to the ``vhc`` executable.
argv: list of strings
remaining of the command line
"""
conf = vhcconf.get_config()
worker = proc.get_worker(name=path)
log = vhclog.getLogger(name=path)
# Get the wildcard for the files expected given the recipe
rec_wildcard = vhcutils.recipe_match[vcheck.recipe]
# get the path length to pass to the driver
len_path = len(path)
# if the are argument, the first one should be of the kind 'exp01'. In
# this case we should work only on that exposure
if argv:
path = os.path.join(path, argv[0])
n_exposures = 1
else:
n_exposures = conf.getint(vcheck.recipe, self.n_exp_option)
log.info("running")
# set the number of tests (files) expected per IFU
vhchtml.add_ntests(vcheck, self.files_IFU * n_exposures)
i = -1
for i, fname in enumerate(ft.scan_files(path, matches=rec_wildcard)):
worker(self.func, vcheck, fname, len_path + 1)
worker.get_results()
log.info("{} files tested".format(i+1))
[docs]class command_func_factory(function_factory):
"""Gives the number of expected files per IFU to the html interface, loop
through the files for the recipe at hand, and call a function that execute
the shell command returned by ``cmd_func`` on each file, log the stdout and
stderr appropriately and inform the html renderer about the test.
The factory can expands the following variables in the output of
``cmd_func``:
+-------------+--------------------------------------------------+
| **recipe** | recipe at hand |
+-------------+--------------------------------------------------+
| **driver** | driver at hand |
+-------------+--------------------------------------------------+
| **ifuslot** | id of the slot from ``fname`` |
+-------------+--------------------------------------------------+
| **ifuid** | id of the ifu corresponding to the above |
+-------------+--------------------------------------------------+
| **fname** | name of the file to test |
+-------------+--------------------------------------------------+
| **headkey** | instruct ``cure`` to write in the fits headers |
| | the version of VHC and |
| | whether the check is successful or not |
+-------------+--------------------------------------------------+
| **json** | instruct ``cure`` to write json ouput of the |
| | to STDOUT, this is then parsed by the loggers |
+-------------+--------------------------------------------------+
The substitution syntax follow the
`new string format
<https://docs.python.org/2/library/string.html#formatstrings>`_; e.g.
``checkheader {fname}`` expands the ``fname`` to the value passed to the
function.
Parameters
----------
cmd_func : callable
function returning the string of the shell call::
cmd_func(vcheck, fname, log, conf) -> cmd
+---------+------+----------------------------------------------------+
| **Parameters**: |
+---------+------+----------------------------------------------------+
| |**vcheck** : instance of :class:`~libvhc.VCheck` |
+---------+------+----------------------------------------------------+
| | | store the recipe name and the check currently |
| | | executing |
+---------+------+----------------------------------------------------+
| |**fname** : string |
+---------+------+----------------------------------------------------+
| | | name of the file to check |
+---------+------+----------------------------------------------------+
| |**log** : instance of :class:`logging.LoggerAdapter` |
+---------+------+----------------------------------------------------+
| | | logger |
+---------+------+----------------------------------------------------+
| |**conf** : instance of :class:`configparser.ConfigParser` |
+---------+------+----------------------------------------------------+
| | | configuration object |
+---------+------+----------------------------------------------------+
| **Returns**: |
+---------+------+----------------------------------------------------+
| |**cmd** : string |
+---------+------+----------------------------------------------------+
| | | string of the command to execute |
+---------+------+----------------------------------------------------+
files_IFU : int
expected files per IFU. Should always be 4.
n_exp_option : string
name of the option containing the number of exposures. This option must
be present in the section of the recipe at hand.
Returns
-------
driver : callable
implementation of the driver. For the signature see
:func:`~libvhc.function_signature`; ``args`` and ``kwargs`` are not
implemented here.
"""
def __init__(self, cmd_func, files_IFU=FILES_IFU,
n_exp_option=N_EXP_OPTION):
self.cmd_func = cmd_func
super(command_func_factory, self).__init__(self.file_driver,
files_IFU=files_IFU,
n_exp_option=n_exp_option)
[docs] def __call__(self, vcheck, path, argv):
"""Execute the driver
Parameters
----------
vcheck : instance of :class:`~libvhc.VCheck`
store the recipe name and the check currently executing
path: string
path provided to the ``vhc`` executable.
argv: list of strings
remaining of the command line
"""
conf = vhcconf.get_config()
self.format_dic = {}
# Add option to tell cure to write the check keyword in the fits files
if conf.getboolean("headerkeys", "write_vhc_header_key"):
args = (' --writekey --extra_key {k} --extra_value "{v}"'
' --extra_comment "{c}" ')
extra_value = vhcutils.extra_value_with_conf_revision(conf)
self.format_dic['headkey'] = args.format(k=vhcutils.EXTRA_KEY,
v=extra_value,
c=vhcutils.EXTRA_COMMENT)
else:
self.format_dic['headkey'] = ""
# Add option to tell cure to write JSON output to STDOUT
if conf.getboolean("general", "write_json"):
self.format_dic['json'] = ' --json '
else:
self.format_dic['json'] = ""
super(command_func_factory, self).__call__(vcheck, path, argv)
[docs] def file_driver(self, vcheck, fname, lpath):
"""Run the given check on every file
Parameters
----------
vcheck : instance of :class:`~libvhc.VCheck`
store the recipe name and the check currently executing
fname : string
name of the file
lpath : int
lenght of the path in ``fname`` to remove for the html interface
"""
log = vhclog.getLogger(name=fname[:lpath-1])
conf = vhcconf.get_config()
self.format_dic['recipe'] = vcheck.recipe
self.format_dic['driver'] = vcheck.check
self.format_dic['fname'] = fname
# get the IFU slot and id
try:
ifuslot = vhcutils.ifuslot(fname, section=vcheck.recipe)
except IndexError:
ifuslot = '999'
try:
ifuid = vhcutils.ifuid(ifuslot)
except KeyError:
ifuid = '999'
self.format_dic['ifuslot'] = ifuslot
self.format_dic['ifuid'] = ifuid
# default message: short version of the name
msg = fname[lpath:]
# default status
html_success = False
try:
_cmd = self.cmd_func(vcheck, fname, log, conf)
try:
temp_cmd = _cmd.format(**self.format_dic)
except KeyError as e:
raise vhcexcept.VHCFactoryKeyError(e)
log.info("running: %s", temp_cmd)
p1 = sp.Popen(shlex.split(temp_cmd), stdout=sp.PIPE,
stderr=sp.PIPE, universal_newlines=True)
stdout, stderr = p1.communicate()
log.info(stdout.strip('\n'))
if stderr:
log.error("'%s' failed with '%s'", temp_cmd,
stderr.strip('\n'))
if p1.returncode > 0:
log.critical("'%s' crashed", temp_cmd)
if not stderr and p1.returncode == 0:
html_success = True
if ifuid == '999': # could not get the IFU id
msg = "The file '{}' is probably corrupted.".format(msg)
# catch and log the errors
except vhcexcept.VHCFactoryKeyError as e:
msg = "I could not format the command properly because the"
msg += " keyword '{}' does not exist.".format(str(e))
msg += " Command '{}'".format(_cmd)
log.critical(msg, exc_info=False)
except six_ext.SubprocessExeError as e:
msg = "The executable has not been found."
msg += " Command '{}'".format(temp_cmd)
log.critical(msg, exc_info=True)
except Exception as e:
msg = "Error " + str(e) + " while processing file " + msg
log.critical(msg, exc_info=True)
vhchtml.add_ifu_test(vcheck, ifuslot, msg, html_success)
[docs]class command_line_factory(command_func_factory):
"""Gives the number of expected files per IFU to the html interface, loop
through the files for the recipe at hand, and call a function that
execute the shell command ``cmd`` on each file, log the stdout and stderr
appropriately and inform the html renderer about the test.
The factory allows to expand the variables described in
:class:`command_func_factory`
Parameters
----------
cmd : string or list of string
command to execute
files_IFU : int
expected files per IFU. Should always be 4.
n_exp_option : string
name of the option containing the number of exposures. This option must
be present in the section of the recipe at hand.
Returns
-------
driver : callable
implementation of the driver. For the signature see
:func:`~libvhc.function_signature`; ``args`` and ``kwargs`` are not
implemented here.
"""
def __init__(self, cmd, files_IFU=FILES_IFU,
n_exp_option=N_EXP_OPTION):
# if it's a list, or similar, join it into a string for later
# substitution, and add '{fname}' at the end if it's not already there
self.cmd = cmd
if not isinstance(cmd, six.string_types):
self.cmd = " ".join(cmd)
super(command_line_factory, self).__init__(self.cmd_,
files_IFU=files_IFU,
n_exp_option=n_exp_option)
[docs] def cmd_(self, *_):
"""Function that ignores all input and return ``cmd``"""
return self.cmd
[docs]def _pickle_method(m): # pragma: no cover
"""In python 2 it's not possible to pickle instance methods directly.
This function does some magic to do it.
Only for python 2 is this function registers to allow methods pickling
Copied from `Stack Overflow
<http://stackoverflow.com/questions/25156768/cant-pickle-type-instancemethod-using-pythons-multiprocessing-pool-apply-a>`_
"""
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
if six.PY2: # pragma: no cover
copyreg.pickle(types.MethodType, _pickle_method)