# Virus Health Check: a validation tool for HETDEX/VIRUS data
# Copyright (C) 2016, 2017 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Implement a queue based communication system for the html render
The QueueListener creates and store internally an instance of
:class:`~libvhc.html.fplane.FPlane`. Each element put into the queue is
interpreted as a list of three elements:
1. the name of the method or of a property of the ``FPlane`` instance: this
**must** be present;
2. the list of args of the method (optional): if absent or None, interpreted as
an empty list; if the first element is a property, the first element of this
list is assigned to the property;
3. the dictionary of kwargs the method (optional): if absent or None,
interpreted as an empty dictionary; if the first element is a property, this
is ignored.
The only exception is the ``sentinel``, defaulting to ``None``, that is passed
along without any manipulation.
Example:
"""
import inspect
import logging
try:
import threading
except ImportError: # pragma: no cover
threading = None
import pyhetdex.tools.queue as phqueue
import six
from libvhc.exceptions import VHCQueueItemError
[docs]class HTMLQueue(phqueue.QueueContext):
"""Queue used in the HTML interface to communicate from the processes to
:class:`~libvhc.html.fplane.FPlane` object
Attributes
----------
sentinel
if the object put into the queue is ``sentinel`` no check is done
"""
def __init__(self, *args, **kwargs):
super(HTMLQueue, self).__init__(*args, **kwargs)
self.sentinel = None
[docs] def put(self, obj, block=True, timeout=None):
"""Check that *obj* comply with the specifications in
:mod:`libvhc.html.queue` and, if it does, put *obj* into the queue.
Parameters
----------
obj :
object to insert into the queue
block : bool, optional
if ``True``, block until a free slot in the queue is available or,
if set, *timeout* seconds pass; if ``False`` raise a
:class:`queue.Full` error if there are no free slots
timeout : number, optional
number of seconds to wait if *block* if ``True`` before raising a
:class:`queue.Full` error; ignored if *block* is ``False``
Raises
------
VHCQueueItemError
if the *obj* is not of the correct type or shape
"""
if obj is not self.sentinel:
if isinstance(obj, six.string_types):
obj = [obj, ]
try:
len_obj = len(obj)
except TypeError:
raise VHCQueueItemError('The object passed to the queue is not'
' a list nor a string')
if len_obj == 0 or len_obj > 3:
raise VHCQueueItemError('The object can be a list or a string'
' made of one to three elements')
# convert to spec if None found
obj = (list(obj) + [None, None])[:3]
if obj[1] is None:
obj[1] = []
if obj[2] is None:
obj[2] = {}
# check that all elements are acceptable
if not isinstance(obj[0], six.string_types):
raise VHCQueueItemError("The first element must be the string"
" with the name of the method to call")
# check that the second and third elements are correct
try:
self._test_args_kwargs(*obj[1], **obj[2])
except TypeError as e:
if '**' in str(e):
msg = ("The third element, if present, must be a None or a"
" mapping (dictionary)")
else:
msg = ("The second element, if present, must be a None or"
" a sequence (list, tuple)")
six.raise_from(VHCQueueItemError(msg), e)
super(HTMLQueue, self).put(obj, block=block, timeout=timeout)
def _test_args_kwargs(self, *args, **kwargs):
"""Used to test if the arguments passed to the queue can be unpacked
properly"""
pass
if threading:
[docs] class QueueListener(phqueue.QueueListener):
"""
This class implements an internal threaded listener which watches for
LogRecords being added to a queue, removes them and passes them to a
list of handlers for processing.
Parameters
----------
queue_ : queue-like instance
queue to use to transmit the information
fplane_class : :class:`~libvhc.html.fplane.FPlane` class
class declaration that will be instantiated by the lister
recipe : string
name of the recipe
fplane_file : string
name of the file describing the focal plane.
"""
def __init__(self, queue_, fplane_class, recipe, fplane_file):
super(QueueListener, self).__init__(queue_)
self.fplane = fplane_class(recipe, fplane_file)
try:
self._sentinel = queue_.sentinel
except AttributeError:
self._sentinel = None
[docs] def handle(self, record):
"""
Handle a record.
This just loops through the handlers offering them the record
to handle.
If the handling of any of the records fails, a message with be
logged on screen
"""
record = self.prepare(record)
try:
method = getattr(self.fplane, record[0])
except AttributeError:
msg = "Failed to get the attribute %s from the fplane object"
log = logging.getLogger("html_queue_listener")
log.critical(msg, record[0], exc_info=True)
return
try:
if inspect.ismethod(method):
method(*record[1], **record[2])
else: # assume it's a property
setattr(self.fplane, record[0], record[1][0])
except:
msg = ("Failed to call or set attribute %s with arguments %s"
" and kwargs %s")
log = logging.getLogger("html_queue_listener")
log.critical(msg, record[0], record[1], record[2],
exc_info=True)
# Setup and stop a QueueListener in as separate process
[docs]class SetupQueueListener(phqueue.SetupQueueListener):
"""Start the :class:`QueueListener`, in a separate process if required.
Adapted from `logging cookbook
<https://docs.python.org/3/howto/logging-cookbook.html#a-more-elaborate-multiprocessing-example>`_.
The :class:`SetupQueueListener` instance can be used as a context manager
for a :keyword:`with` statement. Upon exiting the statement, the process
and :class:`QueueListener` are stopped. If an exception happens, it will be
logged as critical before stopping: in order to avoid possible errors with
missing formatter keywords, the handler formatters are temporarily
substituted with "%(level)s %(message)s".
Parameters
----------
queue_ : queue-like instance
queue to use to transmit the information
fplane_class : :class:`~libvhc.html.fplane.FPlane` class
class declaration that will be instantiated by the listener
recipe : string
name of the recipe
fplane_file : string
name of the file describing the focal plane.
use_process : bool, optional
if ``True`` start the listener in a separate process
Attributes
----------
queue : as above
stop_event : :class:`multiprocessing.Event` instance
event used to signal to stop the listener
lp : :class `multiprocessing.Process` instance
process running the listener, if ``use_process`` is ``True``
listener : :class:`QueueListener` instance
"""
def __init__(self, queue_, fplane_class, recipe, fplane_file,
use_process=True):
super(SetupQueueListener, self).__init__(QueueListener, queue_,
use_process=use_process,
qlc_args=[fplane_class,
recipe,
fplane_file])