import importlib import json import logging import os from collections import namedtuple from copy import copy from logging.config import dictConfig from pprint import pprint from types import SimpleNamespace import yaml from colorama import Back, Fore, Style from jinja2 import Template from . import exceptions as error KeyValue = namedtuple("KeyValue", "key value") KeyValueExtra = namedtuple("KeyValueExtra", "key value extra") logger = logging.getLogger(__name__) # Logging helpers # ================ def get_app_logger(loggers=None, level="WARNING", colors=False, format="default"): "Instanciate application logger" loggers = loggers or {} # Settings fclass = "logging.Formatter" # msconds = "" if colors: # Require coloredlogs fclass = "coloredlogs.ColoredFormatter" # msconds = "%(msecs)03d" # Define formatters formatters = { "default": { "()": fclass, "format": "[%(levelname)8s] %(message)s", # 'datefmt': '%Y-%m-%d %H:%M:%S', }, "extended": { "()": fclass, "format": "[%(levelname)8s] %(name)s: %(message)s", "datefmt": "%H:%M:%S", }, "audit": { "()": fclass, "format": "%(asctime)s.%(msecs)03d [%(levelname)s] %(name)s: %(message)s", "datefmt": "%Y-%m-%d %H:%M:%S", }, "debug": { "()": fclass, "format": "%(msecs)03d [%(levelname)8s] %(name)s: %(message)s [%(filename)s/%(funcName)s:%(lineno)d]", "datefmt": "%H:%M:%S", }, } # Assert arguments if not format in formatters: choice = ",".join(formatters.keys()) raise Exception(f"Invalid format: '{format}', please choose one of: {choice}") # Logging config logging_config = { "version": 1, "disable_existing_loggers": True, # How logs looks like "formatters": formatters, # Where goes the logs "handlers": { "default": { "level": level, "formatter": format, "class": "logging.StreamHandler", "stream": "ext://sys.stderr", # Default is stderr }, "info": { "level": "INFO", "formatter": format, "class": "logging.StreamHandler", "stream": "ext://sys.stderr", # Default is stderr }, }, # Where logs come from "loggers": { # Used to catch ALL logs "": { # root logger "handlers": ["default"], "level": "WARNING", "propagate": False, }, # # Used to catch all logs of myapp and sublibs # 'myapp': { # 'handlers': ['default'], # 'level': 'INFO', # 'propagate': False # }, # # Used to catch cli logs only # 'myapp.cli': { # 'handlers': ['default'], # 'level': 'INFO', # 'propagate': False # }, # # Used to catch app components, instanciated loggers # 'myapp.comp': { # 'handlers': ['default'], # 'level': 'DEBUG', # 'propagate': False # }, }, } # Prepare logger components for name, conf in loggers.items(): logging_config["loggers"][name] = { "propagate": False, "handlers": ["default"], } logging_config["loggers"][name].update(conf) # Load logger logging.config.dictConfig(logging_config) # Exceptions # ================ # # DEPRECATED # class IamException(Exception): # "Iam Exception" # Common classes # ================ def scoped_ident(*args): "Serialize and deserialize scoped indents" if len(args) == 1: ident = args[0] scope = "" if "/" in args[0]: parts = payload.split("/", 2) ident = parts[0] scope = parts[1] return SimpleNamespace(ident=ident, scope=scope) elif len(args) == 2: ident = args[0] scope = args[1] return f"{ident}/{scope}" raise Exception(f"Bad arguments for scoped_ident: {args}") class _DictBase: default_attrs = {} payload_type = dict def __init__(self, name, payload=None, parent=None, **kwargs): "Init object" self.name = name self.parent = parent self._payload_raw = payload self._payload = self._payload_raw or {} self.kwargs = kwargs # Call internal methods self.prepare() self.payload_transform(name, kwargs=kwargs) self.payload_validate() self.config_load() self.config_validate() self.init() def __repr__(self): return f"<{self.__class__.__name__}: {self.name}>" # Overrides # --------------- def prepare(self): "Prepare object" def payload_transform(self, name, kwargs=None): "Transform payload, accessible from self._payload" def payload_validate(self): "Validate payload is expected type" payload = self._payload payload_type = self.payload_type if not isinstance(payload, payload_type): # pprint (payload) assert False raise IamException( f"Invalid configuration for {self}, expected a {payload_type} payload, got {type(payload)} instead: {payload}" ) def config_load(self): "Load configuration, raise Exception if not good" def config_validate(self): "Validate configuration, raise Exception if not good" def init(self): "Init object" class DictItem(_DictBase): "Dict Item" default_attrs = {} # payload_type = dict # Methods # --------------- def payload_transform(self, name, kwargs=None): "Transform payload, accessible from self._payload" self._payload["name"] = name self._payload.update(kwargs or {}) def config_load(self): "Load configuration from payload" # Set default attributes for key, val in self.default_attrs.items(): setattr(self, key, copy(val)) # Overrides default attributes with payload payload = self._payload for key, val in payload.items(): setattr(self, key, val) def dump_item(self): ret = { key: val for key, val in self.__dict__.items() if not key.startswith("_") } return ret class DictCtrl(_DictBase): "Dict ctrl" items_class = None # Overrides # --------------- def prepare(self): "Prepare controller" # assert False kwargs = self.kwargs # Set kwargs kwargs = kwargs or {} for key, val in kwargs.items(): # print ("DICT SET:", key, val) setattr(self, key, val) def config_load(self): "Prepare controller" # Set catalog self._items = {} if self.items_class: # Note: self.items_class is derived from DictItem !!! for item_name, item in self._payload.items(): # print ("CONFIG_LOAD", item_name) # pprint (item) obj = self.items_class(item_name, payload=item, parent=self) self._items[obj.name] = obj # Dunders # --------------- def __contains__(self, name): return self.has(name) # Methods # --------------- def items(self): return self._items.items() def values(self): return self._items.values() def keys(self): return self._items.keys() def names(self): return list(self._items.keys()) def has(self, name): "Return true or false if an item exists" return True if name in self._items else False def select(self, mode, name): "Select items on criterion" name = name or "" modes = ["startswith", "endswith"] if name.endswith(":"): self.get(name[:-1]) assert mode in modes, f"Unsupported mode: {mode}" ret = {} for key, value in self._items.items(): validated = False if mode == "startswith" and key.startswith(name): validated = True elif mode == "endswith" and key.endswith(name): validated = True if validated: ret[key] = value return ret def get(self, *args): "Retrieve an item or raise exception if not exists, args: [name [default]]" # Fetch all items if len(args) == 0: return self._items # Parse argument count name = None default = None use_default = False if len(args) == 2: name = args[0] default = args[1] use_default = True elif len(args) == 1: name = args[0] # Return result if not name in self._items: if use_default: return default choices = "\n - ".join(self._items.keys()) msg = f"Missing {self.name} item: {name}, please choose one of:\n - {choices}" raise error.UnknownCatalogItem(msg) return self._items[name] raise IamException(f"Unsupported arguments for .get(), got: {args}")