Source code for libvhc.html.queue

# Virus Health Check: a validation tool for HETDEX/VIRUS data
# Copyright (C) 2016, 2017  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""Implement a queue based communication system for the html render

The QueueListener creates and store internally an instance of
:class:`~libvhc.html.fplane.FPlane`. Each element put into the queue is
interpreted as a list of three elements:

1. the name of the method or of a property of the ``FPlane`` instance: this
   **must** be present;
2. the list of args of the method (optional): if absent or None, interpreted as
   an empty list; if the first element is a property, the first element of this
   list is assigned to the property;
3. the dictionary of kwargs the method (optional): if absent or None,
   interpreted as an empty dictionary; if the first element is a property, this
   is ignored.

The only exception is the ``sentinel``, defaulting to ``None``, that is passed
along without any manipulation.

Example:
"""

import inspect
import logging
try:
    import threading
except ImportError:  # pragma: no cover
    threading = None

import pyhetdex.tools.queue as phqueue
import six

from libvhc.exceptions import VHCQueueItemError


[docs]class HTMLQueue(phqueue.QueueContext): """Queue used in the HTML interface to communicate from the processes to :class:`~libvhc.html.fplane.FPlane` object Attributes ---------- sentinel if the object put into the queue is ``sentinel`` no check is done """ def __init__(self, *args, **kwargs): super(HTMLQueue, self).__init__(*args, **kwargs) self.sentinel = None
[docs] def put(self, obj, block=True, timeout=None): """Check that *obj* comply with the specifications in :mod:`libvhc.html.queue` and, if it does, put *obj* into the queue. Parameters ---------- obj : object to insert into the queue block : bool, optional if ``True``, block until a free slot in the queue is available or, if set, *timeout* seconds pass; if ``False`` raise a :class:`queue.Full` error if there are no free slots timeout : number, optional number of seconds to wait if *block* if ``True`` before raising a :class:`queue.Full` error; ignored if *block* is ``False`` Raises ------ VHCQueueItemError if the *obj* is not of the correct type or shape """ if obj is not self.sentinel: if isinstance(obj, six.string_types): obj = [obj, ] try: len_obj = len(obj) except TypeError: raise VHCQueueItemError('The object passed to the queue is not' ' a list nor a string') if len_obj == 0 or len_obj > 3: raise VHCQueueItemError('The object can be a list or a string' ' made of one to three elements') # convert to spec if None found obj = (list(obj) + [None, None])[:3] if obj[1] is None: obj[1] = [] if obj[2] is None: obj[2] = {} # check that all elements are acceptable if not isinstance(obj[0], six.string_types): raise VHCQueueItemError("The first element must be the string" " with the name of the method to call") # check that the second and third elements are correct try: self._test_args_kwargs(*obj[1], **obj[2]) except TypeError as e: if '**' in str(e): msg = ("The third element, if present, must be a None or a" " mapping (dictionary)") else: msg = ("The second element, if present, must be a None or" " a sequence (list, tuple)") six.raise_from(VHCQueueItemError(msg), e) super(HTMLQueue, self).put(obj, block=block, timeout=timeout)
def _test_args_kwargs(self, *args, **kwargs): """Used to test if the arguments passed to the queue can be unpacked properly""" pass
if threading:
[docs] class QueueListener(phqueue.QueueListener): """ This class implements an internal threaded listener which watches for LogRecords being added to a queue, removes them and passes them to a list of handlers for processing. Parameters ---------- queue_ : queue-like instance queue to use to transmit the information fplane_class : :class:`~libvhc.html.fplane.FPlane` class class declaration that will be instantiated by the lister recipe : string name of the recipe fplane_file : string name of the file describing the focal plane. """ def __init__(self, queue_, fplane_class, recipe, fplane_file): super(QueueListener, self).__init__(queue_) self.fplane = fplane_class(recipe, fplane_file) try: self._sentinel = queue_.sentinel except AttributeError: self._sentinel = None
[docs] def handle(self, record): """ Handle a record. This just loops through the handlers offering them the record to handle. If the handling of any of the records fails, a message with be logged on screen """ record = self.prepare(record) try: method = getattr(self.fplane, record[0]) except AttributeError: msg = "Failed to get the attribute %s from the fplane object" log = logging.getLogger("html_queue_listener") log.critical(msg, record[0], exc_info=True) return try: if inspect.ismethod(method): method(*record[1], **record[2]) else: # assume it's a property setattr(self.fplane, record[0], record[1][0]) except: msg = ("Failed to call or set attribute %s with arguments %s" " and kwargs %s") log = logging.getLogger("html_queue_listener") log.critical(msg, record[0], record[1], record[2], exc_info=True)
# Setup and stop a QueueListener in as separate process
[docs]class SetupQueueListener(phqueue.SetupQueueListener): """Start the :class:`QueueListener`, in a separate process if required. Adapted from `logging cookbook <https://docs.python.org/3/howto/logging-cookbook.html#a-more-elaborate-multiprocessing-example>`_. The :class:`SetupQueueListener` instance can be used as a context manager for a :keyword:`with` statement. Upon exiting the statement, the process and :class:`QueueListener` are stopped. If an exception happens, it will be logged as critical before stopping: in order to avoid possible errors with missing formatter keywords, the handler formatters are temporarily substituted with "%(level)s %(message)s". Parameters ---------- queue_ : queue-like instance queue to use to transmit the information fplane_class : :class:`~libvhc.html.fplane.FPlane` class class declaration that will be instantiated by the listener recipe : string name of the recipe fplane_file : string name of the file describing the focal plane. use_process : bool, optional if ``True`` start the listener in a separate process Attributes ---------- queue : as above stop_event : :class:`multiprocessing.Event` instance event used to signal to stop the listener lp : :class `multiprocessing.Process` instance process running the listener, if ``use_process`` is ``True`` listener : :class:`QueueListener` instance """ def __init__(self, queue_, fplane_class, recipe, fplane_file, use_process=True): super(SetupQueueListener, self).__init__(QueueListener, queue_, use_process=use_process, qlc_args=[fplane_class, recipe, fplane_file])