Source code for flagman.core

# -*- coding: utf-8 -*-
"""The core of flagman.

Contains the logic to implement signal handlers and dispatch to user-defined functions.
"""
import logging
import signal
from operator import attrgetter
from types import FrameType
from typing import Iterable, List, Mapping, MutableSet, Sequence, Type, Union

import pkg_resources

from flagman.actions import Action
from flagman.exceptions import ActionClosed
from flagman.types import ActionArgument, ActionName, SignalNumber

logger = logging.getLogger(__name__)

_value_getter = attrgetter('value')

#: Signals in this list are handled by flagman.
#: The CLI module auto-generates the appropriate CLI option for each signal.
HANDLED_SIGNALS: List[signal.Signals] = sorted(
    {signal.SIGHUP, signal.SIGUSR1, signal.SIGUSR2}, key=_value_getter
)

#: The global flag set for raised signals.
#: A signal that has been delivered to a handler will have its number in the set.
#: The number is removed after the actions for the signal have been executed.
SIGNAL_FLAGS: MutableSet[SignalNumber] = set()

#: Mapping of action entry point names to Action classes.
#: Populated from the pkg_resources `flagman.action` entry point group.
KNOWN_ACTIONS: Mapping[ActionName, Type[Action]] = {
    action.name: action.load()
    for action in pkg_resources.iter_entry_points('flagman.action')
}

#: Mapping of SignalNumbers to sequences of instantiated Actions ("action bundles")
#: that will be executed for that signal.
#: Populated by `create_action_bundles`.
ACTION_BUNDLES: Mapping[SignalNumber, List[Action]] = {
    signum.value: [] for signum in HANDLED_SIGNALS
}


[docs]def create_action_bundles( args_dict: Mapping[str, Iterable[Sequence[Union[ActionName, ActionArgument]]]] ) -> int: """Parse the enabled actions and insert them into the global ACTION_BUNDLES mapping. The input dictionary should be like:: {'usr1': [['action1', 'arg1a', 'arg2a'], ['action2', 'arg2a']], 'usr2': [['action3'], ['action4', 'arg4a', 'arg4b']]} :param args_dict: a mapping of strings to an Iterable of Action names :returns: The number of configured actions """ logger.debug('Creating action bundles') for signum in HANDLED_SIGNALS: logger.debug('Creating action bundle for %s', signum.name) action_calls = args_dict.get(signum.name[3:].lower(), []) actions = [] for action_call in action_calls: try: actions.append((KNOWN_ACTIONS[action_call[0]], action_call[1:])) except KeyError: logger.warning('Unknown action `%s`; skipping', action_call[0]) action_generators = [ prime_action_generator(action[0], action[1]) for action in actions ] ACTION_BUNDLES[signum.value].extend(action_generators) return sum(len(bundle) for bundle in ACTION_BUNDLES.values())
def prime_action_generator( action: Type[Action], args: Iterable[ActionArgument] ) -> Action: """Instantiate an Action. Given a class of type Action, instantiate the class with the passed-in arguments. :param action: the Action :param args: an Iterable of strings, the aruments to the Action :returns: the primed Action """ logger.debug( 'Priming action class `%s.%s` with arguments `%s`', action.__module__, action.__qualname__, args, ) # instantiate the generator and run set up code action_generator = action(*args) return action_generator
[docs]def set_handlers() -> None: """Register handlers for the signals we're interested in. Uses the global HANDLED_SIGNALS to decide what signals to register for. Danger starts here! """ logger.info('Registering signal handlers for actions') for signum in HANDLED_SIGNALS: if len(ACTION_BUNDLES[signum]) > 0: def handler( num: int, _frame: FrameType ) -> None: # noqa: D403 (capitalization) """flagman handler for {}.""".format(signum.name) SIGNAL_FLAGS.add(num) logger.debug('Registering signal handler for signal `%s`', signum.name) signal.signal(signum, handler) else: logger.debug( 'No actions registered for signal `%s`; skipping handler registration', signum.name, ) logger.info('Done registering signal handlers for actions')
[docs]def run() -> None: """Run the flagman "event loop". Waits for a signal to be raised and dispatches to the user-defined handlers as appropriate. """ logger.info('Starting event loop') while True: logger.debug('Pausing for signal') signal.pause() logger.debug('Woke for signal') while SIGNAL_FLAGS: try: num = SIGNAL_FLAGS.pop() logger.debug('Found raised flag for signal number `%d`', num) except KeyError: continue logger.debug( 'Taking actions `%s` for signal number `%d`', [action.__class__.__name__ for action in ACTION_BUNDLES[num]], num, ) # make a copy since we might want to remove an element while iterating actions_to_take = enumerate(ACTION_BUNDLES[num].copy()) idx_adjust = 0 for idx, action in actions_to_take: try: logger.debug( 'Taking action `%s` for signal number `%d`', action.__class__.__name__, num, ) action._run() logger.debug( 'Done taking action `%s` for signal number `%d`', action.__class__.__name__, num, ) except ActionClosed: logger.warning( 'Received `ActionClosed`; removing action `%s`', action.__class__.__name__, exc_info=True, ) ACTION_BUNDLES[num].pop(idx - idx_adjust) idx_adjust += 1 logger.debug('Lowering flag for signal number `%d`', num) SIGNAL_FLAGS.discard(num) # check if there are any actions left in our bundles if not any(ACTION_BUNDLES.values()): logger.warning('No actions remain active; exiting event loop') return None