Change: Complete code architecture rework
This commit is contained in:
parent
1a926452a3
commit
1c56c3235a
@ -0,0 +1,12 @@
|
||||
"""
|
||||
.. module:: kheops
|
||||
:platform: Unix
|
||||
:synopsis: A useful module indeed.
|
||||
:noindex:
|
||||
|
||||
.. moduleauthor:: Robin Cordier <andrew@invalid.com>
|
||||
|
||||
"""
|
||||
|
||||
from . import app
|
||||
from . import cli
|
||||
342
kheops/app.py
342
kheops/app.py
@ -1,24 +1,26 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Kheops App interface"""
|
||||
|
||||
import cProfile
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import anyconfig
|
||||
from diskcache import Cache
|
||||
|
||||
from kheops.controllers import QueryProcessor
|
||||
from kheops.utils import schema_validate
|
||||
from kheops.query import Query
|
||||
import kheops.plugin as KheopsPlugins
|
||||
from kheops.managers import BackendsManager, RulesManager
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class App:
|
||||
"""Main Kheops Application Instance"""
|
||||
|
||||
schema = {
|
||||
|
||||
|
||||
CONF_SCHEMA = {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"additionalProperties": False,
|
||||
@ -29,20 +31,20 @@ class App:
|
||||
"rules_items": {},
|
||||
"rules_config": {},
|
||||
},
|
||||
"patternProperties": {
|
||||
".*": {
|
||||
"type": "object",
|
||||
"optional": True,
|
||||
"additionalProperties": False,
|
||||
#"patternProperties": {
|
||||
# ".*": {
|
||||
# "type": "object",
|
||||
# "optional": True,
|
||||
# "additionalProperties": False,
|
||||
"properties": {
|
||||
"config": {
|
||||
"type": "object",
|
||||
"default": {},
|
||||
"additionalProperties": False,
|
||||
"additionalProperties": True,
|
||||
"properties": {
|
||||
"app": {
|
||||
"type": "object",
|
||||
# "default": {},
|
||||
"default": {},
|
||||
"additionalProperties": False,
|
||||
"properties": {
|
||||
"root": {
|
||||
@ -58,9 +60,45 @@ class App:
|
||||
},
|
||||
],
|
||||
},
|
||||
"cache": {
|
||||
"default": "kheops_cache",
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "null",
|
||||
"description": "Disable cache",
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Path of the cache directory",
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
# OLD
|
||||
"tree": {
|
||||
# "additionalProperties": False,
|
||||
"type": "object",
|
||||
"default": {},
|
||||
"deprecated": True,
|
||||
"properties": {
|
||||
"prefix": {
|
||||
"default": None,
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "null",
|
||||
"description": "Disable prefix, all files are lookup up from the app root dir.",
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Add a path prefix before all paths. This is quite useful to store your YAML data in a dedicated tree.",
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
"lookups": {
|
||||
# "additionalProperties": False,
|
||||
"type": "object",
|
||||
"default": {},
|
||||
@ -94,74 +132,262 @@ class App:
|
||||
"properties": {"$ref": "#/$defs/backends_items"},
|
||||
},
|
||||
},
|
||||
"lookups": {
|
||||
"type": "array",
|
||||
"default": [],
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {"$ref": "#/$defs/backends_items"},
|
||||
},
|
||||
},
|
||||
"rules": {
|
||||
"type": "array",
|
||||
"default": [],
|
||||
# "arrayItem": { "$ref": "#/$defs/rules_items" },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
# },
|
||||
# },
|
||||
}
|
||||
|
||||
def __init__(self, config="kheops.yml", namespace="default"):
|
||||
conf2 = anyconfig.load(config)
|
||||
self.run = {}
|
||||
|
||||
|
||||
|
||||
class GenericInstance():
|
||||
|
||||
name = None
|
||||
run = {}
|
||||
|
||||
|
||||
|
||||
|
||||
class KheopsNamespace(GenericInstance, QueryProcessor):
|
||||
|
||||
def __init__(self, app, name, config=None):
|
||||
|
||||
self.name = name
|
||||
self.config = config or {}
|
||||
self.app = app
|
||||
self.run = dict(app.run)
|
||||
|
||||
|
||||
# Validate configuration
|
||||
schema_validate(conf2, self.schema)
|
||||
try:
|
||||
conf2 = conf2[namespace]
|
||||
except KeyError:
|
||||
log.error("Can't find namespace '%s' in config '%s'", namespace, config)
|
||||
sys.exit(1)
|
||||
self.config = schema_validate(self.config, CONF_SCHEMA)
|
||||
|
||||
# Get application paths
|
||||
# =====================
|
||||
# Fetch app root
|
||||
if conf2["config"]["app"]["root"]:
|
||||
path_root = Path(conf2["config"]["app"]["root"])
|
||||
log.debug("Root path is hard coded.")
|
||||
self.run["path_ns"] = str(Path(app.run['config_src']).parent.resolve())
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# def load_namespace(self, namespace="default"):
|
||||
# # Validate configuration
|
||||
#
|
||||
# config = dict(self.raw_config)
|
||||
# try:
|
||||
# config = config[namespace]
|
||||
# except KeyError:
|
||||
# log.error("Can't find namespace '%s' in config '%s'", namespace, config)
|
||||
# sys.exit(1)
|
||||
# config = schema_validate(config, self.schema)
|
||||
#
|
||||
# pprint (config)
|
||||
#
|
||||
# self.run["path_cwd"]
|
||||
#
|
||||
# print ("OKKKKK")
|
||||
#
|
||||
#
|
||||
# conf2 = config
|
||||
# # Get application paths
|
||||
# path_root = conf2["config"].get("app", {}).get("root", None)
|
||||
# if path_root is None:
|
||||
# path_root = Path(config).parent
|
||||
# log.debug("Root path guessed from conf file location.")
|
||||
# else:
|
||||
# path_root = Path(conf2["config"]["app"]["root"])
|
||||
# log.debug("Root path is steup in config")
|
||||
#
|
||||
#
|
||||
#
|
||||
# path_root = str(path_root.resolve())
|
||||
# self.run["path_root"] = path_root
|
||||
#
|
||||
#
|
||||
# # path_prefix = conf2["config"]["app"]["prefix"]
|
||||
# # if not path_prefix:
|
||||
# # path_prefix = ''
|
||||
# # p = Path(path_prefix)
|
||||
# # if not p.is_absolute():
|
||||
# # p = path_root / p
|
||||
# # try:
|
||||
# # p = p.resolve().relative_to(Path.cwd().resolve())
|
||||
# # except ValueError:
|
||||
# # pass
|
||||
#
|
||||
#
|
||||
# # Cache paths
|
||||
# path_cache = Path(conf2["config"]["app"]["cache"])
|
||||
# if not path_cache.is_absolute():
|
||||
# path_cache = Path(path_root) / path_cache
|
||||
# path_cache = str(path_cache)
|
||||
# self.run["path_cache"] = path_cache
|
||||
# self.cache = {
|
||||
# 'files': Cache(path_cache),
|
||||
# 'queries': Cache(path_cache),
|
||||
# }
|
||||
#
|
||||
# # self.run['path_prefix'] = str(p.resolve())
|
||||
# log.debug("Working directory is %s, cwd is: %s", path_root, path_cwd)
|
||||
#
|
||||
# return config
|
||||
|
||||
|
||||
|
||||
#def query(self, key=None, scope=None):
|
||||
# processor = QueryProcessor(app=self.app)
|
||||
# result = processor.exec(key, scope)
|
||||
#
|
||||
# return result
|
||||
|
||||
|
||||
|
||||
class Kheops(GenericInstance):
|
||||
"""Main Kheops Application Instance
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, config="kheops.yml", namespace="default"):
|
||||
"""
|
||||
init function
|
||||
|
||||
:param kind: Optional "kind" of ingredients.
|
||||
:type kind: list[str] or None
|
||||
:raise lumache.InvalidKindError: If the kind is invalid.
|
||||
:return: The ingredients list.
|
||||
:rtype: list[str]
|
||||
"""
|
||||
|
||||
# Init
|
||||
path_cwd = Path.cwd().resolve()
|
||||
|
||||
# Fetch app paths
|
||||
self.run = {}
|
||||
self.run["path_cwd"] = str(path_cwd)
|
||||
|
||||
self.run["config_src"] = config
|
||||
if isinstance(config, str):
|
||||
self.run["config_type"] = 'file'
|
||||
self.run["path_config"] = str(Path(config).parent.resolve())
|
||||
elif isinstance(config, dict):
|
||||
self.run["config_type"] = 'dict'
|
||||
self.run["path_config"] = str(path_cwd)
|
||||
else:
|
||||
path_root = Path(config).parent
|
||||
log.debug("Root path guessed from conf file.")
|
||||
raise Exception("Need a valid config")
|
||||
|
||||
# path_prefix = conf2["config"]["app"]["prefix"]
|
||||
# if not path_prefix:
|
||||
# path_prefix = ''
|
||||
# p = Path(path_prefix)
|
||||
# if not p.is_absolute():
|
||||
# p = path_root / p
|
||||
# try:
|
||||
# p = p.resolve().relative_to(Path.cwd().resolve())
|
||||
# except ValueError:
|
||||
# pass
|
||||
|
||||
# Save paths
|
||||
path_cwd = str(Path.cwd().resolve())
|
||||
path_root = str(path_root.resolve())
|
||||
self.ns_name = namespace
|
||||
self.raw_config = self.parse_conf(config)
|
||||
|
||||
self.run["path_cwd"] = path_cwd
|
||||
self.run["path_root"] = path_root
|
||||
|
||||
# self.run['path_prefix'] = str(p.resolve())
|
||||
log.debug("Working directory is %s, cwd is: %s", path_root, path_cwd)
|
||||
|
||||
# path_root = path_root.resolve().relative_to(Path.cwd())
|
||||
def parse_conf(self, config="kheops.yml"):
|
||||
"""
|
||||
Parse Kheops configuration
|
||||
|
||||
# conf2["config"]["app"]["root"] = str(path_root)
|
||||
:param config: Kheops configuration, can either be a file path or a dict.
|
||||
:type config: dict or str or None
|
||||
:param namespace: Configuration namespace to use.
|
||||
:type namespace: str
|
||||
:return: The parsed configuration.
|
||||
:rtype: dict
|
||||
|
||||
# Finish
|
||||
self.conf2 = dict(conf2)
|
||||
"""
|
||||
|
||||
log.debug("Loading config: %s", config)
|
||||
log.debug("Root directory is: %s", path_root)
|
||||
# Load config
|
||||
if isinstance(config, str):
|
||||
dict_conf = anyconfig.load(config)
|
||||
source = f'file:{config}'
|
||||
elif isinstance(config, dict):
|
||||
dict_conf = config
|
||||
source = 'dict'
|
||||
return dict_conf
|
||||
|
||||
def lookup(self, key=None, policy=None, scope=None, trace=False, explain=False):
|
||||
|
||||
|
||||
|
||||
|
||||
def lookup2(self, keys=None, policy=None, scope=None,
|
||||
trace=False, explain=False, validate_schema=False,
|
||||
namespace='default' ,
|
||||
):
|
||||
"""Lookup a key in hierarchy"""
|
||||
log.debug("Lookup key %s with scope: %s", key, scope)
|
||||
|
||||
|
||||
ret = {}
|
||||
# Loop over keys
|
||||
for key_def in keys:
|
||||
|
||||
key_def = key_def or ''
|
||||
|
||||
# Identify namespace and key
|
||||
parts = key_def.split(':')
|
||||
ns_name = self.ns_name
|
||||
if len(parts) > 1:
|
||||
ns_name = parts[0]
|
||||
key_name = parts[1]
|
||||
else:
|
||||
key_name = parts[0]
|
||||
|
||||
# Load namespace
|
||||
ns_config = self.raw_config[ns_name]
|
||||
ns = KheopsNamespace(self, ns_name, ns_config)
|
||||
|
||||
# Get result
|
||||
result = ns.query(key=key_name, scope=scope, explain=explain)
|
||||
|
||||
# TODO: This may lead to inconsistant output format :/
|
||||
# Return result
|
||||
if len(keys) > 1:
|
||||
log.debug("Append '%s' to results", key_name)
|
||||
ret[key_name] = result
|
||||
else:
|
||||
log.debug("Return '%s' result", key_name)
|
||||
return result
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def lookup(self, keys=None, policy=None, scope=None, trace=False, explain=False, validate_schema=False):
|
||||
"""Lookup a key in hierarchy"""
|
||||
log.debug("Lookup key %s with scope: %s", keys, scope)
|
||||
assert isinstance(keys, list), f"Got {keys}"
|
||||
|
||||
query = Query(app=self)
|
||||
ret = query.exec(key=key, scope=scope, policy=policy, trace=trace, explain=explain)
|
||||
ret = {}
|
||||
for key in keys:
|
||||
ret[key] = query.exec(key=key, scope=scope, policy=policy, trace=trace, explain=explain, validate_schema=validate_schema)
|
||||
return ret
|
||||
|
||||
def dump_schema(self):
|
||||
|
||||
@ -23,7 +23,7 @@ class CmdApp:
|
||||
"""Start new App"""
|
||||
|
||||
self.get_args()
|
||||
self.get_logger(verbose=self.args.verbose, logger_name="kheops")
|
||||
self.get_logger(verbose=self.args.verbose + 1, logger_name="kheops")
|
||||
self.cli()
|
||||
|
||||
def get_logger(self, logger_name=None, create_file=False, verbose=0):
|
||||
@ -106,13 +106,13 @@ class CmdApp:
|
||||
"--verbose",
|
||||
action="count",
|
||||
default=int(os.environ.get("KHEOPS_VERBOSE", "0")),
|
||||
help="Increase verbosity",
|
||||
help="Increase verbosity (KHEOPS_VERBOSE)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--config",
|
||||
default=os.environ.get("KHEOPS_CONFIG", "kheops.yml"),
|
||||
help="Kheops configuration file",
|
||||
help="Kheops configuration file (KHEOPS_CONFIG)",
|
||||
)
|
||||
# parser.add_argument("help", help="Show usage")
|
||||
subparsers = parser.add_subparsers(
|
||||
@ -122,10 +122,11 @@ class CmdApp:
|
||||
add_p = subparsers.add_parser("schema")
|
||||
add_p = subparsers.add_parser("gen_doc")
|
||||
|
||||
# Manage command: demo
|
||||
|
||||
# Manage command: lookup2
|
||||
add_p = subparsers.add_parser("lookup")
|
||||
add_p.add_argument(
|
||||
"-n", "--namespace", help="Namespace name", default="default"
|
||||
"-n", "--namespace", help="Namespace name (KHEOPS_NAMESPACE)", default=os.environ.get("KHEOPS_NAMESPACE", "default")
|
||||
)
|
||||
add_p.add_argument(
|
||||
"-f", "--file", help="File with params as dict. Can be stdin - ."
|
||||
@ -143,7 +144,7 @@ class CmdApp:
|
||||
default="yaml",
|
||||
help="Output format",
|
||||
)
|
||||
add_p.add_argument("key", default=None, nargs="*")
|
||||
add_p.add_argument("keys", default=None, nargs="*")
|
||||
|
||||
# Manage command: demo
|
||||
add_p = subparsers.add_parser("demo")
|
||||
@ -173,6 +174,44 @@ class CmdApp:
|
||||
self.log.debug("Command line vars: %s", vars(self.args))
|
||||
|
||||
def cli_lookup(self):
|
||||
"""Lookup database"""
|
||||
|
||||
keys = self.args.keys or [None]
|
||||
|
||||
|
||||
new_params = {}
|
||||
if self.args.file:
|
||||
new_params = anyconfig.load(self.args.file, ac_parser="yaml")
|
||||
|
||||
# Parse cli params
|
||||
for i in self.args.scope_param:
|
||||
ret = i.split("=")
|
||||
if len(ret) != 2:
|
||||
raise Exception("Malformed params")
|
||||
new_params[ret[0]] = ret[1]
|
||||
|
||||
self.log.info("CLI: %s with env: %s", keys, new_params)
|
||||
|
||||
|
||||
app = Kheops.Kheops(config=self.args.config, namespace=self.args.namespace)
|
||||
ret = app.lookup2(
|
||||
namespace=self.args.namespace,
|
||||
keys=keys,
|
||||
scope=new_params,
|
||||
|
||||
trace=self.args.trace,
|
||||
explain=self.args.explain,
|
||||
validate_schema=True,
|
||||
)
|
||||
print(anyconfig.dumps(ret, ac_parser=self.args.format))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def cli_lookup_OLD(self):
|
||||
"""Display how to use logging"""
|
||||
|
||||
# self.log.debug(f"Command line vars: {vars(self.args)}")
|
||||
@ -193,14 +232,15 @@ class CmdApp:
|
||||
self.log.info("CLI: %s with env: %s", keys, new_params)
|
||||
|
||||
app = Kheops.App(config=self.args.config, namespace=self.args.namespace)
|
||||
for key in keys:
|
||||
ret = app.lookup(
|
||||
key=key,
|
||||
scope=new_params,
|
||||
trace=self.args.trace,
|
||||
explain=self.args.explain,
|
||||
)
|
||||
print(anyconfig.dumps(ret, ac_parser=self.args.format))
|
||||
#for key in keys:
|
||||
ret = app.lookup(
|
||||
keys=keys,
|
||||
scope=new_params,
|
||||
trace=self.args.trace,
|
||||
explain=self.args.explain,
|
||||
validate_schema=True,
|
||||
)
|
||||
print(anyconfig.dumps(ret, ac_parser=self.args.format))
|
||||
|
||||
def cli_schema(self):
|
||||
"""Display configuration schema"""
|
||||
|
||||
317
kheops/controllers.py
Normal file
317
kheops/controllers.py
Normal file
@ -0,0 +1,317 @@
|
||||
|
||||
import json
|
||||
import logging
|
||||
#from pprint import pprint
|
||||
|
||||
from pathlib import Path
|
||||
from prettytable import PrettyTable
|
||||
|
||||
import kheops.plugin2 as KheopsPlugins
|
||||
from kheops.utils import render_template, render_template_python, str_ellipsis
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
tracer = logging.getLogger(f'{__name__}.explain')
|
||||
|
||||
|
||||
class LoadPlugin:
|
||||
"""Generic class to load plugins"""
|
||||
|
||||
def __init__(self, plugins):
|
||||
self.plugins = plugins
|
||||
|
||||
def load(self, kind, name):
|
||||
|
||||
assert isinstance(name, str), f"Got: {name}"
|
||||
|
||||
# Get plugin kind
|
||||
try:
|
||||
plugins = getattr(self.plugins, kind)
|
||||
except Exception as err:
|
||||
raise Exception(f"Unknown module kind '{kind}': {err}")
|
||||
|
||||
# Get plugin class
|
||||
try:
|
||||
plugin_cls = getattr(plugins, name)
|
||||
except Exception as err:
|
||||
raise Exception(f"Unknown module '{kind}.{name}': {err}")
|
||||
|
||||
assert hasattr(
|
||||
plugin_cls, "Plugin"
|
||||
), f"Plugin {kind}/{name} is not a valid plugin"
|
||||
|
||||
# Return plugin Classe
|
||||
return plugin_cls.Plugin
|
||||
|
||||
|
||||
plugin_loader = LoadPlugin(KheopsPlugins)
|
||||
|
||||
|
||||
|
||||
class Query():
|
||||
|
||||
key = None
|
||||
scope = None
|
||||
|
||||
def __init__(self, key, scope):
|
||||
self.key = key or None
|
||||
self.scope = scope or {}
|
||||
|
||||
self.rule = None
|
||||
|
||||
|
||||
# class QueryController():
|
||||
|
||||
|
||||
#def exec(self, key=None, scope=None):
|
||||
# query = Query(key, scope)
|
||||
# result = self.processor.exec(query)
|
||||
# return result
|
||||
|
||||
|
||||
class QueryProcessor():
|
||||
"""QueryProcessor class provides all the methods to be able to make queries"""
|
||||
|
||||
default_match_rule = {
|
||||
"key": None,
|
||||
"continue": False,
|
||||
"strategy": "merge_deep",
|
||||
}
|
||||
|
||||
|
||||
default_lookup_item = {
|
||||
"path": None,
|
||||
"backend": "file",
|
||||
"continue": True,
|
||||
}
|
||||
|
||||
#def __init__(self, app):
|
||||
# self.app = app
|
||||
|
||||
#self.config = app.conf2['config'] or {}
|
||||
#self.lookups = app.conf2['lookups'] or []
|
||||
#self.rules = app.conf2['rules'] or []
|
||||
|
||||
def CHILDREN_INIT(self, config):
|
||||
self.config = config
|
||||
pass
|
||||
|
||||
|
||||
#def exec(self, key=None, scope=None):
|
||||
def query(self, key=None, scope=None, explain=False):
|
||||
|
||||
if explain:
|
||||
tracer.setLevel(logging.DEBUG)
|
||||
|
||||
query = Query(key, scope)
|
||||
|
||||
# Match the KeyRule in keys (RULE CACHE)
|
||||
# Get the matching keys
|
||||
# Assemble if more than one and merge when continue.
|
||||
# Got the Matched rule (RULE CACHE)
|
||||
# We'll need strategy, and it's selector field: matched/first/last/all
|
||||
#key_rule = self._get_key_rule(key) or {}
|
||||
#key_strategy = key_rule.get('strategy', None)
|
||||
key_rule = self._exec_get_rule(query)
|
||||
log.info("Matched rule for key '%s': %s", query.key, key_rule)
|
||||
|
||||
|
||||
# Build the lookups [] => []
|
||||
# Fetch static config from app (for include and NS:includes ...)
|
||||
# Loop over lookups and process each lookup with ScopePlugins
|
||||
lookups = self.config.get("lookups", {}).copy()
|
||||
parsed_lookups = self._exec_assemble_lookups(lookups, query)
|
||||
|
||||
# Generate explain report
|
||||
if explain:
|
||||
self._explain_lookups(parsed_lookups)
|
||||
|
||||
# FEtch the module
|
||||
# Retrieve the module instance
|
||||
# Get it's match policy
|
||||
# TODO
|
||||
plugin_name = key_rule.get('strategy', None)
|
||||
strategy_plugin = plugin_loader.load('strategy', plugin_name)(self)
|
||||
|
||||
|
||||
# Get the data (strategy.selector)
|
||||
# For each entry, ask the backend to return the data: file, http, consul ...
|
||||
# Return zero, one or more results depending the strategy.selector
|
||||
#result = get_backends_results(strategy, lookups)
|
||||
candidates = self._exec_backend_plugins(parsed_lookups, selector=strategy_plugin.selector)
|
||||
|
||||
|
||||
# Generate explain report
|
||||
if explain:
|
||||
self._explain_candidates(candidates, query)
|
||||
|
||||
|
||||
# Apply the merge strategy, recall strategy
|
||||
result = strategy_plugin.merge_results(candidates, key_rule, query)
|
||||
|
||||
|
||||
# TODO: Apply output plugins
|
||||
# result = self._exec_output_plugins(result)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _explain_lookups(self, parsed_lookups):
|
||||
"""Explain list of lookups"""
|
||||
|
||||
table = PrettyTable()
|
||||
for item in parsed_lookups:
|
||||
col1 = json.dumps({ k:v for k, v in item.items() if k not in ['_run'] }, default=lambda o: "<not serializable>", indent=2)
|
||||
col2 = json.dumps(item['_run'], default=lambda o: "<not serializable>", indent=2)
|
||||
table.add_row([
|
||||
"\nConfig:"+ str_ellipsis(col1, 60),
|
||||
"\nRuntime:"+ str_ellipsis(col2, 60),
|
||||
])
|
||||
table.field_names = ["Config", "Runtime"]
|
||||
table.align = "l"
|
||||
tracer.info("Explain lookups:\n" + str(table))
|
||||
|
||||
|
||||
def _explain_candidates(self, candidates, query):
|
||||
"""Explain list of candidates"""
|
||||
|
||||
# TOFIX: query is not needed here !
|
||||
|
||||
table = PrettyTable()
|
||||
for item_obj in candidates:
|
||||
item = item_obj.__dict__
|
||||
item["rel_path"] = str(Path(item['path']).relative_to(Path.cwd()))
|
||||
|
||||
col1 = json.dumps({ k:v for k, v in item.items() if k not in ['run', 'data'] }, default=lambda o: "<not serializable>", indent=2)
|
||||
col2 = json.dumps(item['run']['_run'], default=lambda o: "<not serializable>", indent=2)
|
||||
col3 = item_obj.data.get(query.key, "NOT FOUND") if query.key is not None else item_obj.data
|
||||
col3 = json.dumps(col3, default=lambda o: "<not serializable>", indent=2)
|
||||
table.add_row([
|
||||
"\nStatus:"+ str_ellipsis(col1, 80),
|
||||
"\nRuntime:"+ str_ellipsis(col2, 60),
|
||||
"\nData:"+ str_ellipsis(col3, 60),
|
||||
])
|
||||
|
||||
table.field_names = ["Status", "Runtime", "Data"]
|
||||
table.align = "l"
|
||||
tracer.info("Explain candidates:\n" + str(table))
|
||||
|
||||
|
||||
def _exec_backend_plugins(self, lookups, selector="matched"):
|
||||
selector = 'matched'
|
||||
assert (selector in ['last', 'first', 'all', 'matched'])
|
||||
assert isinstance(lookups, list)
|
||||
#lookups = self.config.get("lookups", {}).copy()
|
||||
|
||||
plugins = {}
|
||||
ret = []
|
||||
for index, lookup_def in enumerate(lookups):
|
||||
|
||||
# Update object
|
||||
lookup_def['_run']['backend_index'] = index
|
||||
|
||||
# Load plugin
|
||||
plugin_name = lookup_def["backend"]
|
||||
if plugin_name in plugins:
|
||||
plugin = plugins[plugin_name]
|
||||
else:
|
||||
plugin = plugin_loader.load('backend', plugin_name)(namespace=self)
|
||||
|
||||
# Get candidates
|
||||
candidates = plugin.fetch_data(lookup_def)
|
||||
|
||||
# Apply selector
|
||||
for candidate in candidates:
|
||||
if candidate.status == 'found' or selector == 'all':
|
||||
ret.append(candidate)
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
|
||||
|
||||
def _exec_assemble_lookups(self, lookups, query):
|
||||
|
||||
|
||||
assert isinstance(lookups, list)
|
||||
assert len(lookups) > 0
|
||||
|
||||
# Init the scope list
|
||||
new_lookups1 = []
|
||||
for index, lookup_def in enumerate(lookups):
|
||||
shortform = False
|
||||
|
||||
if isinstance(lookup_def, str):
|
||||
shortform = True
|
||||
lookup_def = {
|
||||
'path': lookup_def,
|
||||
}
|
||||
assert isinstance(lookup_def, dict)
|
||||
|
||||
new_lookup = dict(self.default_lookup_item)
|
||||
new_lookup.update(lookup_def)
|
||||
new_lookup['_run'] = {
|
||||
'scope': query.scope,
|
||||
'key': query.key,
|
||||
'conf': {
|
||||
'index': index,
|
||||
}
|
||||
# 'shortform': shortform,
|
||||
}
|
||||
new_lookups1.append(new_lookup)
|
||||
|
||||
# Apply lookups modules
|
||||
new_lookups2 = []
|
||||
for index, lookup in enumerate(new_lookups1):
|
||||
plugins = lookup.get("scope", [])
|
||||
|
||||
ret = [lookup]
|
||||
for plugin_def in plugins:
|
||||
plugin_name = plugin_def.get("module", None)
|
||||
|
||||
if plugin_name:
|
||||
plugin = plugin_loader.load('scope', plugin_name)(namespace=self)
|
||||
ret = plugin.process_items(ret, plugin_def)
|
||||
|
||||
new_lookups2.extend(ret)
|
||||
|
||||
|
||||
# Parse the `path` value with scope variables
|
||||
new_lookups3 = []
|
||||
for lookup in new_lookups2:
|
||||
path = lookup['path']
|
||||
scope = lookup['_run']['scope']
|
||||
new_path = render_template_python(path, scope, ignore_missing=False)
|
||||
if new_path:
|
||||
lookup['_run']['raw_path'] = path
|
||||
lookup['path'] = new_path
|
||||
new_lookups3.append(lookup)
|
||||
else:
|
||||
log.info("Ignore because of missing scope vars: '%s'", path)
|
||||
|
||||
return new_lookups3
|
||||
|
||||
|
||||
def _exec_get_rule(self, query, mode='match'):
|
||||
|
||||
key = query.key
|
||||
rules = self.config['rules'] or {}
|
||||
|
||||
if mode == "match":
|
||||
rule = dict(self.default_match_rule)
|
||||
rules = [i for i in rules if i.get('key', None) == key ]
|
||||
if len(rules) > 0:
|
||||
match = rules[0]
|
||||
rule.update(match)
|
||||
else:
|
||||
log.debug("Applying default rule for key '%s'", key)
|
||||
rule = self.default_match_rule
|
||||
else:
|
||||
raise Exception (f"Mode '{mode}' is not implemented")
|
||||
|
||||
return rule
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@ -1,365 +0,0 @@
|
||||
"""Manager Classes"""
|
||||
|
||||
import logging
|
||||
|
||||
import dpath.util
|
||||
|
||||
from kheops.utils import schema_validate
|
||||
import kheops.plugin as KheopsPlugins
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LoadPlugin:
|
||||
"""Generic class to load plugins"""
|
||||
|
||||
def __init__(self, plugins):
|
||||
self.plugins = plugins
|
||||
|
||||
def load(self, kind, name):
|
||||
|
||||
assert isinstance(name, str), f"Got: {name}"
|
||||
|
||||
# Get plugin kind
|
||||
try:
|
||||
plugins = getattr(self.plugins, kind)
|
||||
except Exception as err:
|
||||
raise Exception(f"Unknown module kind '{kind}': {err}")
|
||||
|
||||
# Get plugin class
|
||||
try:
|
||||
plugin_cls = getattr(plugins, name)
|
||||
except Exception as err:
|
||||
raise Exception(f"Unknown module '{kind}.{name}': {err}")
|
||||
|
||||
assert hasattr(
|
||||
plugin_cls, "Plugin"
|
||||
), f"Plugin {kind}/{name} is not a valid plugin"
|
||||
|
||||
# Return plugin Classe
|
||||
return plugin_cls.Plugin
|
||||
|
||||
|
||||
class Manager:
|
||||
"""Generic manager class"""
|
||||
|
||||
_app_kind = "core"
|
||||
plugins_kind = []
|
||||
_schema_props_default = None
|
||||
_schema_props_new = None
|
||||
_props_position = None
|
||||
|
||||
@classmethod
|
||||
def get_schema(cls, plugins_db, mode="full"):
|
||||
"""Retrieve configuration schema"""
|
||||
|
||||
# Properties
|
||||
ret = {
|
||||
"core_schema": {},
|
||||
"plugin": {},
|
||||
}
|
||||
ret3 = {}
|
||||
for kind in cls.plugins_kind:
|
||||
ret["plugin"][kind] = {}
|
||||
plugin_kind = getattr(plugins_db, kind)
|
||||
|
||||
for plugin_name in [i for i in dir(plugin_kind) if not i.startswith("_")]:
|
||||
plugin = getattr(plugin_kind, plugin_name)
|
||||
plugin_cls = getattr(plugin, "Plugin", None)
|
||||
if plugin_cls:
|
||||
schema_props = getattr(
|
||||
plugin_cls, "_schema_props_new", "MISSING ITEM"
|
||||
)
|
||||
if schema_props:
|
||||
ret["plugin"][kind][plugin_name + "_schema"] = schema_props
|
||||
ret3.update(schema_props)
|
||||
ret3.update(cls._schema_props_new)
|
||||
|
||||
# Injection
|
||||
ret1 = cls._schema_props_default
|
||||
position = cls._props_position
|
||||
dpath.util.set(ret1, position, ret3)
|
||||
ret["core_schema"] = cls._schema_props_new
|
||||
|
||||
if mode == "full":
|
||||
return ret1
|
||||
|
||||
ret4 = {
|
||||
"config_schema": {},
|
||||
"items": ret,
|
||||
}
|
||||
|
||||
return ret4
|
||||
|
||||
|
||||
class BackendsManager(Manager):
|
||||
"""Backend Manager"""
|
||||
|
||||
_app_kind = "manager"
|
||||
plugins_kind = ["engine", "backend"]
|
||||
|
||||
_schema_props_new = {
|
||||
"engine": {
|
||||
"type": "string",
|
||||
"default": "jerakia",
|
||||
"optional": False,
|
||||
},
|
||||
"value": {
|
||||
"default": "UNSET",
|
||||
"optional": False,
|
||||
},
|
||||
}
|
||||
|
||||
_props_position = "oneOf/0/properties"
|
||||
_schema_props_default = {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"default": "",
|
||||
# This does not work :(
|
||||
# "$def": {
|
||||
# "props": {},
|
||||
# },
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "object",
|
||||
"additionalProperties": True,
|
||||
"default": {},
|
||||
"title": "object",
|
||||
"properties": {},
|
||||
"description": "Object to configure a bacjend item",
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"default": "BLAAAAHHH",
|
||||
"title": "string",
|
||||
"description": "Enter a simple string configuration value for default engine",
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
def _validate_item(self, item):
|
||||
"""Private method to validate sub class"""
|
||||
if isinstance(item, str):
|
||||
item = {
|
||||
"engine": self.config_main.default_engine,
|
||||
"value": item,
|
||||
}
|
||||
item = schema_validate(item, self._schema_props_default)
|
||||
assert isinstance(item, dict)
|
||||
return item
|
||||
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
|
||||
self.config_app = app.conf2["config"]["app"]
|
||||
self.config_main = app.conf2["config"]["tree"]
|
||||
self.config_items = list(app.conf2["tree"])
|
||||
# THIS MAKE A BUG !!!! self.plugin_loader = LoadPlugin(KheopsPlugins)
|
||||
|
||||
self.plugins = [
|
||||
"init",
|
||||
"loop",
|
||||
"hier",
|
||||
]
|
||||
|
||||
# Auto init
|
||||
self.backends = self.config_items
|
||||
|
||||
def query(self, key=None, scope=None, trace=False):
|
||||
backends = self.get_backends(key=key, scope=scope, trace=trace)
|
||||
ret = self.get_results(backends, trace=trace)
|
||||
return ret
|
||||
|
||||
def get_backends(self, key=None, scope=None, trace=False):
|
||||
log.debug("Look for candidates for key '%s' in backend: %s", key, self.backends)
|
||||
|
||||
# Prepare plugins
|
||||
plugin_loader = LoadPlugin(KheopsPlugins)
|
||||
_run = {
|
||||
"key": key,
|
||||
"scope": scope,
|
||||
}
|
||||
|
||||
# Preprocess backends plugins
|
||||
backends = self.config_items
|
||||
log.debug("Backend preprocessing of %s elements", len(backends))
|
||||
for plugin in self.plugins:
|
||||
# backend_cls = plugin_loader.load('backend', plugin)
|
||||
plugin = plugin_loader.load("backend", plugin)()
|
||||
|
||||
log.debug("Run %s", plugin)
|
||||
new_backend, _run = plugin.process(backends, _run)
|
||||
|
||||
assert isinstance(new_backend, list), f"Got: {new_backend}"
|
||||
assert isinstance(_run, dict), f"Got: {_run}"
|
||||
backends = new_backend
|
||||
|
||||
# pprint (backends)
|
||||
for i in backends:
|
||||
assert i.get("engine"), f"Got: {i}"
|
||||
|
||||
log.debug("Backend preprocessing made %s elements", len(backends))
|
||||
return backends
|
||||
|
||||
def get_results(self, backends, trace=False):
|
||||
"""Return results"""
|
||||
|
||||
# Prepare plugins
|
||||
plugin_loader = LoadPlugin(KheopsPlugins)
|
||||
|
||||
new_results = []
|
||||
for backend in backends:
|
||||
# result_cls = result_loader.load('result', result)
|
||||
# print ("BACKKENNDNNDNDNDND")
|
||||
# pprint(backend)
|
||||
|
||||
engine = plugin_loader.load("engine", backend["engine"])(
|
||||
backend, parent=self, app=self.app
|
||||
)
|
||||
|
||||
log.debug("Run engine: %s", engine)
|
||||
new_result = engine.process()
|
||||
|
||||
assert isinstance(new_result, list), f"Got: {new_result}"
|
||||
new_results.extend(new_result)
|
||||
|
||||
# Filter out? Not here !new_results = [i for i in new_results if i['found'] ]
|
||||
# pprint (new_results)
|
||||
# print ("OKKKKKKKKKKKKKKKKKKKKKKKKK SO FAR")
|
||||
return new_results
|
||||
|
||||
|
||||
class RulesManager(Manager):
|
||||
"""Rule Manager Class"""
|
||||
|
||||
_app_kind = "rules"
|
||||
plugins_kind = ["strategy"]
|
||||
|
||||
_schema_props_new = {
|
||||
"rule": {
|
||||
"default": ".*",
|
||||
"optional": True,
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "string",
|
||||
},
|
||||
{
|
||||
"type": "null",
|
||||
},
|
||||
],
|
||||
},
|
||||
"strategy": {
|
||||
"type": "string",
|
||||
"default": "schema",
|
||||
# "default": "last",
|
||||
"optional": True,
|
||||
# "enum": ["first", "last", "merge"],
|
||||
},
|
||||
"trace": {
|
||||
"type": "boolean",
|
||||
"default": False,
|
||||
},
|
||||
"explain": {
|
||||
"type": "boolean",
|
||||
"default": False,
|
||||
},
|
||||
}
|
||||
|
||||
_props_position = "oneOf/1/properties"
|
||||
_schema_props_default = {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"default": "",
|
||||
"$def": {
|
||||
"items": {},
|
||||
},
|
||||
"oneOf": [
|
||||
{"type": "string", "default": "BLAAAAHHH"},
|
||||
{
|
||||
"type": "object",
|
||||
"additionalProperties": True,
|
||||
"default": {},
|
||||
"properties": {"$ref": "#/$defs/name"},
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
OLD_rule_schema = {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"additionalProperties": False,
|
||||
"properties": {
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"default": None,
|
||||
"optional": True,
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "null",
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"additionalProperties": True,
|
||||
"default": {},
|
||||
"properties": {"$ref": "#/$defs/name"},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
|
||||
self.config_app = app.conf2["config"]["app"]
|
||||
self.config_main = app.conf2["config"]["rules"]
|
||||
self.config_items = list(app.conf2["rules"])
|
||||
|
||||
def get_result(self, candidates, key=None, scope=None, trace=False, explain=False):
|
||||
"""Return query results"""
|
||||
|
||||
# trace=False
|
||||
|
||||
rules = self.config_items
|
||||
key = key or ""
|
||||
|
||||
# Filter out invalid candidates
|
||||
matched_candidates = [i for i in candidates if i["found"] is True]
|
||||
if len(matched_candidates) == 0:
|
||||
log.debug("No matched candidates")
|
||||
return None
|
||||
|
||||
# Look for matching key in rules defiunitions
|
||||
regex_support = False
|
||||
matched_rule = {}
|
||||
if regex_support:
|
||||
raise Exception("Not Implemented")
|
||||
|
||||
rule = [i for i in rules if i.get("rule") == key]
|
||||
if len(rule) == 0:
|
||||
log.debug("No matched rule for %s, applying defaults", key)
|
||||
else:
|
||||
matched_rule = rule[0]
|
||||
log.debug("Matcher rule for %s: %s", key, matched_rule)
|
||||
|
||||
matched_rule["trace"] = trace
|
||||
matched_rule["explain"] = explain
|
||||
schema_validate(matched_rule, self._schema_props_default)
|
||||
|
||||
# Prepare plugins
|
||||
assert isinstance(matched_candidates, list), f"Got: {matched_candidates}"
|
||||
assert isinstance(matched_rule, dict), f"Got: {matched_rule}"
|
||||
strategy = matched_rule.get("strategy", "schema")
|
||||
log.debug("Key '%s' matched rule '%s' with '%s' strategy", key, rule, strategy)
|
||||
|
||||
# Load plugin
|
||||
log.debug("Run strategy: %s", strategy)
|
||||
plugin_loader = LoadPlugin(KheopsPlugins)
|
||||
strategy = plugin_loader.load(
|
||||
"strategy",
|
||||
strategy,
|
||||
)(parent=self, app=self.app)
|
||||
new_result = strategy.process(matched_candidates, matched_rule)
|
||||
|
||||
return new_result
|
||||
@ -1,3 +1,5 @@
|
||||
from . import engine
|
||||
from . import common
|
||||
|
||||
from . import scope
|
||||
from . import backend
|
||||
from . import strategy
|
||||
|
||||
@ -1,5 +1,3 @@
|
||||
"""Backend plugins"""
|
||||
|
||||
from . import init
|
||||
from . import loop
|
||||
from . import hier
|
||||
from . import file
|
||||
|
||||
118
kheops/plugin/backend/file.py
Normal file
118
kheops/plugin/backend/file.py
Normal file
@ -0,0 +1,118 @@
|
||||
"""File Backend Code"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import anyconfig
|
||||
|
||||
from kheops.utils import render_template, glob_files, render_template_python
|
||||
from kheops.plugin2.common import BackendPlugin, BackendCandidate
|
||||
|
||||
from pprint import pprint
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# class FileCandidate(Candidate):
|
||||
# path = None
|
||||
#
|
||||
# def _report_data(self):
|
||||
# data = {
|
||||
# # "rule": self.config,
|
||||
# "value": self.engine._plugin_value,
|
||||
# "data": self.data,
|
||||
# "path": str(self.path.relative_to(Path.cwd())),
|
||||
# }
|
||||
# data = dict(self.config)
|
||||
# return super()._report_data(data)
|
||||
|
||||
|
||||
#class Plugin(PluginEngineClass, PluginFileGlob):
|
||||
class Plugin(BackendPlugin):
|
||||
"""Generic Plugin Class"""
|
||||
|
||||
_plugin_name = "file"
|
||||
|
||||
_plugin_engine = "file"
|
||||
# _schema_props_files = {
|
||||
_schema_props_new = {
|
||||
"path": {
|
||||
"anyOf": [
|
||||
{
|
||||
"type": "string",
|
||||
},
|
||||
{
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string",
|
||||
},
|
||||
},
|
||||
]
|
||||
},
|
||||
"glob": {
|
||||
"default": "ansible.yml",
|
||||
"anyOf": [
|
||||
{
|
||||
"type": "string",
|
||||
},
|
||||
# {
|
||||
# "type": "array",
|
||||
# "items": {
|
||||
# "type": "string",
|
||||
# },
|
||||
# },
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
extensions = {
|
||||
'.yml': 'yaml',
|
||||
'.yaml': 'yaml'
|
||||
}
|
||||
|
||||
def _init(self):
|
||||
|
||||
|
||||
# Guess top path
|
||||
top_path = self.ns.run['path_config']
|
||||
path_prefix = self.ns.config['config'].get('file_path_prefix', None)
|
||||
if path_prefix:
|
||||
top_path = os.path.join(top_path, path_prefix)
|
||||
self.top_path = top_path
|
||||
|
||||
# Fetch module config
|
||||
path_suffix = self.ns.config['config'].get('file_path_suffix', "auto")
|
||||
if path_suffix == 'auto':
|
||||
path_suffix = f"/{self.ns.name}"
|
||||
self.path_suffix = path_suffix
|
||||
|
||||
def fetch_data(self, config) -> list:
|
||||
|
||||
path = config.get('path')
|
||||
if self.path_suffix:
|
||||
path = f"{path}{self.path_suffix}"
|
||||
|
||||
|
||||
raw_data = None
|
||||
status = 'not_found'
|
||||
for ext, parser in self.extensions.items():
|
||||
new_path = os.path.join(self.top_path, path + ext )
|
||||
|
||||
if os.path.isfile(new_path):
|
||||
status = 'found'
|
||||
try:
|
||||
raw_data = anyconfig.load(new_path, ac_parser=parser)
|
||||
except Exception:
|
||||
status = 'broken'
|
||||
raw_data = None
|
||||
break
|
||||
|
||||
ret = BackendCandidate(
|
||||
path=new_path,
|
||||
status=status,
|
||||
run=config,
|
||||
data= raw_data,
|
||||
)
|
||||
|
||||
return [ret]
|
||||
|
||||
@ -1,109 +0,0 @@
|
||||
"""Hierarchy backend plugin"""
|
||||
|
||||
import copy
|
||||
import logging
|
||||
|
||||
from kheops.plugin.common import PluginBackendClass
|
||||
from kheops.utils import path_assemble_hier
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Plugin(PluginBackendClass):
|
||||
"""Hierarchy plugin"""
|
||||
|
||||
_plugin_name = "hier"
|
||||
_schema_props_new = {
|
||||
"hier": {
|
||||
"default": None,
|
||||
"optional": True,
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "null",
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
},
|
||||
{
|
||||
"additionalProperties": True,
|
||||
"properties": {
|
||||
"data": {
|
||||
"default": None,
|
||||
"anyOf": [
|
||||
{"type": "null"},
|
||||
{"type": "string"},
|
||||
{"type": "array"},
|
||||
],
|
||||
},
|
||||
"var": {
|
||||
"type": "string",
|
||||
"default": "hier_item",
|
||||
"optional": True,
|
||||
},
|
||||
"separator": {
|
||||
"type": "string",
|
||||
"default": "/",
|
||||
"optional": True,
|
||||
},
|
||||
"reversed": {
|
||||
"type": "boolean",
|
||||
"default": False,
|
||||
"optional": True,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
def process(self, backends: list, ctx: dict) -> (list, dict):
|
||||
"""Return results"""
|
||||
|
||||
new_backends = []
|
||||
|
||||
for cand in backends:
|
||||
|
||||
# Fetch backend data
|
||||
plugin_config = cand.get("hier", {})
|
||||
hier_data = plugin_config.get("data", None)
|
||||
if not hier_data:
|
||||
new_backends.append(cand)
|
||||
continue
|
||||
|
||||
hier_var = plugin_config.get("var", "item")
|
||||
hier_sep = plugin_config.get("separator", "/")
|
||||
|
||||
# Retrieve data to loop over
|
||||
if isinstance(hier_data, str):
|
||||
# If it's a string, fetch value from scope
|
||||
hier_data = cand["_run"]["scope"].get(hier_data, None)
|
||||
|
||||
# Do the hierarchical replacement
|
||||
if isinstance(hier_data, (str, list)):
|
||||
hier_data = path_assemble_hier(hier_data, hier_sep)
|
||||
|
||||
if not isinstance(hier_data, list):
|
||||
log.debug(
|
||||
"Hier module can't loop over non list data, got: %s for %s",
|
||||
hier_data,
|
||||
cand,
|
||||
)
|
||||
continue
|
||||
|
||||
# Build result list
|
||||
ret1 = hier_data
|
||||
log.debug("Hier plugin will loop over: %s", ret1)
|
||||
ret2 = []
|
||||
for index, item in enumerate(ret1):
|
||||
_cand = copy.deepcopy(cand)
|
||||
run = {
|
||||
"index": index,
|
||||
"hier_value": item,
|
||||
"hier_var": hier_var,
|
||||
}
|
||||
_cand["_run"]["hier"] = run
|
||||
_cand["_run"]["scope"][hier_var] = item
|
||||
ret2.append(_cand)
|
||||
|
||||
new_backends.extend(ret2)
|
||||
return new_backends, ctx
|
||||
@ -1,37 +0,0 @@
|
||||
"""Init backend plugin"""
|
||||
|
||||
import copy
|
||||
import logging
|
||||
from kheops.plugin.common import PluginBackendClass
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Plugin(PluginBackendClass):
|
||||
"""Init backend plugin"""
|
||||
|
||||
_plugin_name = "init"
|
||||
_schema_props_new = None
|
||||
|
||||
default_engine = "jerakia"
|
||||
|
||||
def process(self, backends: list, ctx: dict) -> (list, dict):
|
||||
"""Return the new backend list"""
|
||||
|
||||
new_backends = []
|
||||
for index, item in enumerate(backends):
|
||||
default = {
|
||||
"value": item,
|
||||
}
|
||||
|
||||
if not isinstance(item, dict):
|
||||
item = default
|
||||
|
||||
item["engine"] = item.get("engine", self.default_engine)
|
||||
item["_run"] = copy.deepcopy(ctx)
|
||||
item["_run"]["backend"] = {
|
||||
"index": index,
|
||||
}
|
||||
new_backends.append(item)
|
||||
|
||||
return new_backends, ctx
|
||||
@ -1,160 +0,0 @@
|
||||
"""Loop backend plugin"""
|
||||
|
||||
import copy
|
||||
import logging
|
||||
|
||||
from kheops.plugin.common import PluginBackendClass
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class Plugin(PluginBackendClass):
|
||||
"""Loop backend plugin"""
|
||||
|
||||
_plugin_name = "loop"
|
||||
_plugin_help = (
|
||||
"""
|
||||
This module helps to loop over a backend
|
||||
""",
|
||||
)
|
||||
_schema_props_new = {
|
||||
"loop": {
|
||||
"description": _plugin_help,
|
||||
"default": None,
|
||||
"optional": True,
|
||||
"examples": [
|
||||
{
|
||||
"value": "site/{{ loop_env }}/config/{{ os }}",
|
||||
"loop": {
|
||||
"var": "loop_env",
|
||||
"data": [
|
||||
"dev",
|
||||
"preprod",
|
||||
"prod",
|
||||
],
|
||||
},
|
||||
"comment": "The module will loop three time over the value, and the variable `loop_env` will consecutely have `dev`, `preprod` and `prod` as value",
|
||||
},
|
||||
{
|
||||
"value": "site/{{ loop_env2 }}/config/{{ os }}",
|
||||
"loop": {
|
||||
"var": "loop_env2",
|
||||
"data": "my_scope_var",
|
||||
},
|
||||
"comment": "Like the previous example, but it will fetch the list from any scope variables",
|
||||
},
|
||||
{
|
||||
"loop": None,
|
||||
"comment": "Disable this module, no loop will operate",
|
||||
},
|
||||
# "loop": {
|
||||
# "var": "my_var",
|
||||
# },
|
||||
# },
|
||||
# "loop": {
|
||||
# "var": "my_var",
|
||||
# },
|
||||
# "example": "",
|
||||
# },
|
||||
# "loop": {
|
||||
# "var": "my_var",
|
||||
# },
|
||||
# "example": "",
|
||||
# },
|
||||
],
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "object",
|
||||
"additionalProperties": False,
|
||||
"default": {},
|
||||
"title": "Complete config",
|
||||
"description": "",
|
||||
"properties": {
|
||||
"data": {
|
||||
"default": None,
|
||||
"optional": False,
|
||||
"title": "Module configuration",
|
||||
"description": "Data list used for iterations. It only accept lists as type. It disable the module if set to `null`.",
|
||||
"anyOf": [
|
||||
{
|
||||
"type": "null",
|
||||
"title": "Disable Module",
|
||||
"description": "Disable the module",
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"title": "Scope variable",
|
||||
"description": "Will look the value of the loop list from the scope. TOFIX: What if variablle does not exists?",
|
||||
},
|
||||
{
|
||||
"type": "array",
|
||||
"title": "Hardcoded list",
|
||||
"description": "Simply enter the list of value to be iterated to.",
|
||||
},
|
||||
],
|
||||
},
|
||||
"var": {
|
||||
"type": "string",
|
||||
"default": "loop_item",
|
||||
"optional": True,
|
||||
"title": "Module configuration",
|
||||
"description": "Name of the variable to be used in templating language",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"title": "Short config",
|
||||
"description": "If set to string, it will define the name of the variable to lookup into the scope.",
|
||||
},
|
||||
{
|
||||
"type": "null",
|
||||
"title": "Disable",
|
||||
"description": "If set to null, it disable the module",
|
||||
},
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
def process(self, backends: list, ctx: dict) -> (list, dict):
|
||||
"""Return results"""
|
||||
|
||||
new_backends = []
|
||||
for cand in backends:
|
||||
cand = dict(cand)
|
||||
|
||||
# Init
|
||||
loop_config = cand.get("loop", {})
|
||||
loop_data = loop_config.get("data", None)
|
||||
if not loop_data:
|
||||
new_backends.append(cand)
|
||||
continue
|
||||
|
||||
# Retrieve config data
|
||||
loop_var = loop_config.get("var", "item")
|
||||
if isinstance(loop_data, str):
|
||||
loop_data = cand["_run"]["scope"].get(loop_data, None)
|
||||
if not isinstance(loop_data, list):
|
||||
log.debug(
|
||||
"Got an empty list for loop for var %s, skipping this entry: %s",
|
||||
cand,
|
||||
loop_data,
|
||||
)
|
||||
continue
|
||||
|
||||
# Build a new list
|
||||
ret = []
|
||||
for idx, item in enumerate(loop_data):
|
||||
_cand = copy.deepcopy(cand)
|
||||
run = {
|
||||
"loop_index": idx,
|
||||
"loop_value": item,
|
||||
"loop_var": loop_var,
|
||||
}
|
||||
_cand["_run"]["loop"] = run
|
||||
_cand["_run"]["scope"][loop_var] = item
|
||||
# _cand.scope[loop_var] = item
|
||||
ret.append(_cand)
|
||||
|
||||
new_backends.extend(ret)
|
||||
|
||||
return new_backends, ctx
|
||||
@ -1,178 +1,438 @@
|
||||
"""Common libraries for plugins"""
|
||||
|
||||
import copy
|
||||
import logging
|
||||
from kheops.utils import schema_validate
|
||||
from pprint import pprint
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Candidate Classes
|
||||
# =============================
|
||||
# class Candidate:
|
||||
# engine = None
|
||||
# found = False
|
||||
# data = None
|
||||
# run = None
|
||||
# scope = None
|
||||
# key = None
|
||||
#
|
||||
# def __init__(self, run):
|
||||
# self.run = copy.deepcopy(run)
|
||||
#
|
||||
# def __repr__(self):
|
||||
# return f"{self.__dict__}"
|
||||
#
|
||||
# def _report_data(self, data=None):
|
||||
# default_data = {
|
||||
# # "rule": self.config,
|
||||
# "value": self.engine._plugin_value,
|
||||
# "data": self.data,
|
||||
# }
|
||||
# data = data or default_data
|
||||
# data = json.dumps(data, indent=2) # , sort_keys=True, )
|
||||
# return data
|
||||
|
||||
# Vocabulary:
|
||||
# Key Rules
|
||||
# ConfPlugin[1]
|
||||
# StrategyPlugin[1]
|
||||
# OutPlugin[N]
|
||||
# Lookup Hierarchy
|
||||
# ConfPlugin[1]
|
||||
# ScopePlugin[N]
|
||||
# BackendPlugin[1]
|
||||
|
||||
|
||||
# Generic Classes
|
||||
# =============================
|
||||
class PluginClass:
|
||||
"""Generic plugin class"""
|
||||
# Generic classes
|
||||
class KheopsPlugin:
|
||||
plugin_name = None
|
||||
plugin_type = None
|
||||
plugin_kind = None
|
||||
|
||||
_plugin_type = "none"
|
||||
_plugin_value = None
|
||||
|
||||
_schema_props_new = "UNSET PLUGIN PROPRIETIES"
|
||||
def __init__(self):
|
||||
self._init()
|
||||
|
||||
_schema_props_plugin = {
|
||||
"engine": {
|
||||
"type": "string",
|
||||
"default": "jerakia",
|
||||
},
|
||||
"value": {},
|
||||
}
|
||||
def _init(self):
|
||||
"""Place holder to init plugins"""
|
||||
pass
|
||||
|
||||
|
||||
class KheopsListPlugin(KheopsPlugin):
|
||||
plugin_type = 'list'
|
||||
|
||||
def process_list(self, item_list) -> list:
|
||||
pass
|
||||
|
||||
class KheopsItemPlugin(KheopsPlugin):
|
||||
plugin_type = 'item'
|
||||
|
||||
def process_item(self, item) -> list:
|
||||
pass
|
||||
|
||||
|
||||
# Other classes
|
||||
class BackendCandidate():
|
||||
def __init__(self, path=None, data=None, run=None, status=None):
|
||||
assert isinstance(run, dict)
|
||||
self.path = path
|
||||
self.status = status or "unparsed"
|
||||
self.run = run or {}
|
||||
self.data = data or None
|
||||
|
||||
def __repr__(self):
|
||||
kind = self._plugin_type
|
||||
name = self._plugin_name
|
||||
value = getattr(self, "value", "NO VALUE")
|
||||
return f"{kind}.{name}:{value}"
|
||||
return f"Status: {self.status}, Path: {self.path} => {self.data}"
|
||||
|
||||
def __init__(self, config=None, parent=None, app=None):
|
||||
# assert (isinstance(config, dict)), f"Got: {config}"
|
||||
self.parent = parent
|
||||
self.app = app
|
||||
self.config = config or {}
|
||||
|
||||
self._init()
|
||||
self._validate()
|
||||
|
||||
def _init(self):
|
||||
# Specific classes
|
||||
class ConfPlugin(KheopsListPlugin):
|
||||
plugin_kind = "conf"
|
||||
schema_prop = {
|
||||
"include": {}, # Direct config, DICT
|
||||
}
|
||||
def process_list(self, item_list) -> list:
|
||||
pass
|
||||
|
||||
def _validate(self):
|
||||
class ScopePlugin(KheopsListPlugin):
|
||||
plugin_kind = "scope"
|
||||
|
||||
schema_prop = {
|
||||
"_scope": [], # List of scope modification to apply
|
||||
"init": {},
|
||||
"loop_N": {},
|
||||
"hier_N": {},
|
||||
}
|
||||
|
||||
def process_item(self, item_list) -> list:
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class PluginBackendClass(PluginClass):
|
||||
"""Backend plugin class"""
|
||||
|
||||
_plugin_type = "backend"
|
||||
|
||||
def _init(self):
|
||||
pass
|
||||
def __init__(self, namespace):
|
||||
self.ns = namespace
|
||||
super().__init__()
|
||||
|
||||
|
||||
class PluginStrategyClass(PluginClass):
|
||||
"""Strategy plugin class"""
|
||||
|
||||
_plugin_type = "strategy"
|
||||
class ScopeExtLoop():
|
||||
'''This Scope Extension allow to loop over a lookup'''
|
||||
|
||||
def _init(self):
|
||||
pass
|
||||
|
||||
|
||||
class PluginEngineClass(PluginClass):
|
||||
"""Engine plugin class"""
|
||||
|
||||
_plugin_type = "engine"
|
||||
|
||||
_schema_props_default = {
|
||||
"engine": {
|
||||
"default": "UNSET",
|
||||
},
|
||||
"value": {
|
||||
"default": "UNSET",
|
||||
schema_props = {
|
||||
"properties": {
|
||||
"data": {
|
||||
"default": None,
|
||||
"anyOf": [
|
||||
{"type": "null"},
|
||||
{"type": "string"},
|
||||
{"type": "array"},
|
||||
],
|
||||
},
|
||||
"var": {
|
||||
"type": "string",
|
||||
"default": "item",
|
||||
"optional": True,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# Default plugin API Methods
|
||||
# =====================
|
||||
def _init(self):
|
||||
assert isinstance(self.config, dict), f"Got: {self.config}"
|
||||
|
||||
def _validate(self):
|
||||
def loop_over(self, lookups, conf, var_name='item', callback_context=None, callback=None):
|
||||
|
||||
# Build schema
|
||||
schema_keys = [a for a in dir(self) if a.startswith("_schema_props_")]
|
||||
props = {}
|
||||
for key in schema_keys:
|
||||
schema = getattr(self, key)
|
||||
props.update(schema)
|
||||
self.schema = {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"type": "object",
|
||||
"additionalProperties": True,
|
||||
"properties": props,
|
||||
}
|
||||
var_name = conf.get('var', var_name)
|
||||
var_data_ref = conf.get('data', None)
|
||||
|
||||
# log.debug (f"Validate {self.config} against {self.schema}")
|
||||
self.config = schema_validate(self.config, self.schema)
|
||||
return True
|
||||
if not var_data_ref:
|
||||
log.debug("No data to loop over for: %s", var_data_ref)
|
||||
return lookups
|
||||
|
||||
ret = []
|
||||
for index, lookup in enumerate(lookups):
|
||||
|
||||
var_data = var_data_ref
|
||||
if isinstance(var_data_ref, str):
|
||||
try:
|
||||
var_data = lookup['_run']['scope'][var_data]
|
||||
except KeyError:
|
||||
log.debug ("Ignoring missing '%s' from scope", var_data)
|
||||
pass
|
||||
|
||||
# Run callback
|
||||
if callback:
|
||||
var_data = callback(var_data, callback_context)
|
||||
|
||||
# Validate generated
|
||||
if not isinstance(var_data, list):
|
||||
log.warning("Hier data must be a list, got: %s", var_data)
|
||||
pass
|
||||
|
||||
# Create new object
|
||||
for index, var_value in enumerate(var_data):
|
||||
|
||||
if not 'hier' in lookup['_run']:
|
||||
lookup['_run']['hier'] = []
|
||||
|
||||
ctx = {
|
||||
'data_ref': var_data_ref,
|
||||
'index': index,
|
||||
'value': var_value,
|
||||
'variable': var_name,
|
||||
}
|
||||
|
||||
new_item = copy.deepcopy(lookup)
|
||||
new_item['_run']['scope'][var_name] = var_value
|
||||
new_item['_run']['hier'].append(ctx)
|
||||
|
||||
|
||||
ret.append(new_item)
|
||||
|
||||
# Public Methods
|
||||
# =====================
|
||||
def dump(self) -> dict:
|
||||
"""Dump plugin configuration"""
|
||||
|
||||
ret = {
|
||||
"config": self.config,
|
||||
}
|
||||
return ret
|
||||
|
||||
def lookup_candidates(self, key=None, scope=None) -> list:
|
||||
"""Placeholder to return candidates"""
|
||||
raise Exception("Module does not implement this method :(")
|
||||
# It must always return a list of `Candidate` instances
|
||||
|
||||
# def _example(self):
|
||||
# print(f"Module does not implement this method :(")
|
||||
# return None
|
||||
|
||||
|
||||
# File plugins Extensions
|
||||
# =============================
|
||||
class BackendPlugin(KheopsItemPlugin):
|
||||
plugin_kind = "backend"
|
||||
|
||||
schema_prop = {
|
||||
"backend": {}, # GENERIC, String
|
||||
"file": {},
|
||||
"glob": {},
|
||||
"http": {},
|
||||
"consul": {},
|
||||
"vault": {},
|
||||
}
|
||||
def fetch_data(self, lookups) -> list:
|
||||
raise Exception('Not implemented')
|
||||
|
||||
|
||||
class PluginFileGlob:
|
||||
"""Provide glob functionnality"""
|
||||
def __init__(self, namespace):
|
||||
self.ns = namespace
|
||||
super().__init__()
|
||||
|
||||
_schema_props_glob = {
|
||||
"glob": {
|
||||
"additionalProperties": False,
|
||||
"default": {
|
||||
"file": "ansible.yaml",
|
||||
},
|
||||
"properties": {
|
||||
"file": {
|
||||
"type": "string",
|
||||
"default": "ansible",
|
||||
"optional": True,
|
||||
},
|
||||
"ext": {
|
||||
"type": "array",
|
||||
"default": ["yml", "yaml"],
|
||||
"optional": True,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class StrategyPlugin(KheopsItemPlugin):
|
||||
plugin_kind = "strategy"
|
||||
schema_prop = {
|
||||
"_strategy": {}, # GENERIC, String
|
||||
"merge": {},
|
||||
"first": {},
|
||||
"last": {},
|
||||
"smart": {},
|
||||
"schema": {},
|
||||
}
|
||||
def merge_results(self, candidates, rule) -> list:
|
||||
pass
|
||||
|
||||
def __init__(self, namespace):
|
||||
self.ns = namespace
|
||||
super().__init__()
|
||||
|
||||
|
||||
class OutPlugin(KheopsItemPlugin):
|
||||
plugin_kind = "out"
|
||||
schema_prop = {
|
||||
"_out": {}, # GENERIC, List of dict
|
||||
"toml": {},
|
||||
"validate": {},
|
||||
}
|
||||
def process_item(self, item) -> list:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# # Candidate Classes
|
||||
# # =============================
|
||||
# # class Candidate:
|
||||
# # engine = None
|
||||
# # found = False
|
||||
# # data = None
|
||||
# # run = None
|
||||
# # scope = None
|
||||
# # key = None
|
||||
# #
|
||||
# # def __init__(self, run):
|
||||
# # self.run = copy.deepcopy(run)
|
||||
# #
|
||||
# # def __repr__(self):
|
||||
# # return f"{self.__dict__}"
|
||||
# #
|
||||
# # def _report_data(self, data=None):
|
||||
# # default_data = {
|
||||
# # # "rule": self.config,
|
||||
# # "value": self.engine._plugin_value,
|
||||
# # "data": self.data,
|
||||
# # }
|
||||
# # data = data or default_data
|
||||
# # data = json.dumps(data, indent=2) # , sort_keys=True, )
|
||||
# # return data
|
||||
|
||||
|
||||
# # Generic Classes
|
||||
# # =============================
|
||||
# class PluginClass:
|
||||
# """Generic plugin class"""
|
||||
|
||||
# _plugin_type = "none"
|
||||
# _plugin_value = None
|
||||
|
||||
# _schema_props_new = "UNSET PLUGIN PROPRIETIES"
|
||||
|
||||
# _schema_props_plugin = {
|
||||
# "engine": {
|
||||
# "type": "string",
|
||||
# "default": "jerakia",
|
||||
# },
|
||||
# "value": {},
|
||||
# }
|
||||
|
||||
# def __repr__(self):
|
||||
# kind = self._plugin_type
|
||||
# name = self._plugin_name
|
||||
# value = getattr(self, "value", "NO VALUE")
|
||||
# return f"{kind}.{name}:{value}"
|
||||
|
||||
# def __init__(self, config=None, parent=None, app=None, validate_schema=False):
|
||||
# # assert (isinstance(config, dict)), f"Got: {config}"
|
||||
# self.parent = parent
|
||||
# self.app = app
|
||||
# self.config = config or {}
|
||||
|
||||
# self._init()
|
||||
# if validate_schema:
|
||||
# self._validate()
|
||||
|
||||
# def _init(self):
|
||||
# pass
|
||||
|
||||
# def _validate(self):
|
||||
# pass
|
||||
|
||||
|
||||
# class PluginBackendClass(PluginClass):
|
||||
# """Backend plugin class"""
|
||||
|
||||
# _plugin_type = "backend"
|
||||
|
||||
# def _init(self):
|
||||
# pass
|
||||
|
||||
|
||||
# class PluginStrategyClass(PluginClass):
|
||||
# """Strategy plugin class"""
|
||||
|
||||
# _plugin_type = "strategy"
|
||||
|
||||
# def _init(self):
|
||||
# pass
|
||||
|
||||
|
||||
# class PluginEngineClass(PluginClass):
|
||||
# """Engine plugin class"""
|
||||
|
||||
# _plugin_type = "engine"
|
||||
|
||||
# _schema_props_default = {
|
||||
# "engine": {
|
||||
# "default": "UNSET",
|
||||
# },
|
||||
# "value": {
|
||||
# "default": "UNSET",
|
||||
# },
|
||||
# }
|
||||
|
||||
# # Default plugin API Methods
|
||||
# # =====================
|
||||
# def _init(self):
|
||||
# assert isinstance(self.config, dict), f"Got: {self.config}"
|
||||
|
||||
# def _validate(self):
|
||||
|
||||
# # Build schema
|
||||
# schema_keys = [a for a in dir(self) if a.startswith("_schema_props_")]
|
||||
# props = {}
|
||||
# for key in schema_keys:
|
||||
# schema = getattr(self, key)
|
||||
# props.update(schema)
|
||||
# self.schema = {
|
||||
# "$schema": "http://json-schema.org/draft-07/schema#",
|
||||
# "type": "object",
|
||||
# "additionalProperties": True,
|
||||
# "properties": props,
|
||||
# }
|
||||
|
||||
# # log.debug (f"Validate {self.config} against {self.schema}")
|
||||
# self.config = schema_validate(self.config, self.schema)
|
||||
# return True
|
||||
|
||||
# # Public Methods
|
||||
# # =====================
|
||||
# def dump(self) -> dict:
|
||||
# """Dump plugin configuration"""
|
||||
|
||||
# ret = {
|
||||
# "config": self.config,
|
||||
# }
|
||||
# return ret
|
||||
|
||||
# def lookup_candidates(self, key=None, scope=None) -> list:
|
||||
# """Placeholder to return candidates"""
|
||||
# raise Exception("Module does not implement this method :(")
|
||||
# # It must always return a list of `Candidate` instances
|
||||
|
||||
# # def _example(self):
|
||||
# # print(f"Module does not implement this method :(")
|
||||
# # return None
|
||||
|
||||
|
||||
# # File plugins Extensions
|
||||
# # =============================
|
||||
|
||||
|
||||
# class PluginFileGlob:
|
||||
# """Provide glob functionnality"""
|
||||
|
||||
# _schema_props_glob = {
|
||||
# "glob": {
|
||||
# "additionalProperties": False,
|
||||
# "default": {
|
||||
# "file": "ansible.yaml",
|
||||
# },
|
||||
# "properties": {
|
||||
# "file": {
|
||||
# "type": "string",
|
||||
# "default": "ansible",
|
||||
# "optional": True,
|
||||
# },
|
||||
# "ext": {
|
||||
# "type": "array",
|
||||
# "default": ["yml", "yaml"],
|
||||
# "optional": True,
|
||||
# },
|
||||
# },
|
||||
# }
|
||||
# }
|
||||
|
||||
@ -1,3 +0,0 @@
|
||||
"""Engine plugins"""
|
||||
|
||||
from . import jerakia
|
||||
@ -1,171 +0,0 @@
|
||||
"""Jerakia Engine Code"""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import anyconfig
|
||||
|
||||
from kheops.utils import render_template, glob_files
|
||||
from kheops.plugin.common import PluginEngineClass, PluginFileGlob # , Candidate
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# class FileCandidate(Candidate):
|
||||
# path = None
|
||||
#
|
||||
# def _report_data(self):
|
||||
# data = {
|
||||
# # "rule": self.config,
|
||||
# "value": self.engine._plugin_value,
|
||||
# "data": self.data,
|
||||
# "path": str(self.path.relative_to(Path.cwd())),
|
||||
# }
|
||||
# data = dict(self.config)
|
||||
# return super()._report_data(data)
|
||||
|
||||
|
||||
class Plugin(PluginEngineClass, PluginFileGlob):
|
||||
"""Generic Plugin Class"""
|
||||
|
||||
_plugin_name = "jerakia"
|
||||
|
||||
_plugin_engine = "jerakia"
|
||||
# _schema_props_files = {
|
||||
_schema_props_new = {
|
||||
"path": {
|
||||
"anyOf": [
|
||||
{
|
||||
"type": "string",
|
||||
},
|
||||
{
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string",
|
||||
},
|
||||
},
|
||||
]
|
||||
},
|
||||
"glob": {
|
||||
"default": "ansible.yml",
|
||||
"anyOf": [
|
||||
{
|
||||
"type": "string",
|
||||
},
|
||||
# {
|
||||
# "type": "array",
|
||||
# "items": {
|
||||
# "type": "string",
|
||||
# },
|
||||
# },
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
def _init(self):
|
||||
|
||||
paths = self.config.get("path", self.config.get("value"))
|
||||
if isinstance(paths, str):
|
||||
paths = [paths]
|
||||
elif isinstance(paths, list):
|
||||
pass
|
||||
else:
|
||||
raise Exception(
|
||||
f"Unsupported path value, expected str or dict, got: {paths} in {self.config}"
|
||||
)
|
||||
|
||||
self.paths = paths
|
||||
self.value = paths
|
||||
|
||||
def _paths_template(self, scope):
|
||||
|
||||
# Manage loops
|
||||
paths = self.paths
|
||||
|
||||
# Manage var substr
|
||||
ret = []
|
||||
for path in paths:
|
||||
path = render_template(path, scope)
|
||||
ret.append(path)
|
||||
|
||||
log.debug("Render pattern: %s", ret)
|
||||
|
||||
return ret
|
||||
|
||||
def _show_paths(self, path_top, scope):
|
||||
|
||||
parsed = self._paths_template(scope)
|
||||
log.debug("Expanded paths to: %s", parsed)
|
||||
|
||||
# Look for files (NOT BE HERE !!!)
|
||||
ret3 = []
|
||||
for item in parsed:
|
||||
globbed = glob_files(path_top / item, "ansible.yaml")
|
||||
ret3.extend(globbed)
|
||||
log.debug("Matched globs: %s", ret3)
|
||||
|
||||
return ret3
|
||||
|
||||
def process(self):
|
||||
"""return results"""
|
||||
|
||||
# Detect path root and path prefix
|
||||
path_root = self.app.run["path_root"]
|
||||
path_prefix = self.app.conf2["config"]["tree"]["prefix"]
|
||||
|
||||
if path_prefix:
|
||||
path_prefix = Path(path_prefix)
|
||||
if path_prefix.is_absolute():
|
||||
path_top = path_prefix
|
||||
else:
|
||||
path_top = Path(path_root) / path_prefix
|
||||
else:
|
||||
path_top = path_root
|
||||
|
||||
log.debug("Path Top: %s", path_top)
|
||||
|
||||
scope = self.config["_run"]["scope"]
|
||||
key = self.config["_run"]["key"]
|
||||
assert isinstance(scope, dict), f"Got: {scope}"
|
||||
assert isinstance(key, (str, type(None))), f"Got: {key}"
|
||||
|
||||
# t = self._show_paths(path_top, scope)
|
||||
|
||||
ret = []
|
||||
for index, path in enumerate(self._show_paths(path_top, scope)):
|
||||
log.debug("Reading file: %s", path)
|
||||
# Fetch data
|
||||
found = False
|
||||
raw_data = anyconfig.load(path, ac_parser="yaml")
|
||||
data = None
|
||||
if key is None:
|
||||
data = raw_data
|
||||
found = True
|
||||
else:
|
||||
try:
|
||||
data = raw_data[key]
|
||||
found = True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Assemble relative path
|
||||
try:
|
||||
rel_path = Path(path).resolve().relative_to(Path.cwd())
|
||||
except ValueError:
|
||||
rel_path = Path(path).resolve()
|
||||
|
||||
# Build result object
|
||||
result = {}
|
||||
result["run"] = {
|
||||
"index": index,
|
||||
"path": path,
|
||||
"rel_path": str(rel_path),
|
||||
}
|
||||
result["parent"] = self.config
|
||||
result["data"] = data
|
||||
result["found"] = found
|
||||
|
||||
ret.append(result)
|
||||
|
||||
return ret
|
||||
4
kheops/plugin/scope/__init__.py
Normal file
4
kheops/plugin/scope/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
"""Scope plugins"""
|
||||
|
||||
from . import loop
|
||||
from . import hier
|
||||
89
kheops/plugin/scope/hier.py
Normal file
89
kheops/plugin/scope/hier.py
Normal file
@ -0,0 +1,89 @@
|
||||
"""Hierarchy backend plugin"""
|
||||
|
||||
|
||||
import logging
|
||||
|
||||
from kheops.plugin2.common import ScopePlugin, ScopeExtLoop
|
||||
from kheops.utils import path_assemble_hier
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
from pprint import pprint
|
||||
|
||||
class Plugin(ScopePlugin, ScopeExtLoop):
|
||||
"""Hierarchy plugin"""
|
||||
|
||||
_plugin_name = "hier"
|
||||
_schema_props_new = {
|
||||
"hier": {
|
||||
"default": None,
|
||||
"optional": True,
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "null",
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
},
|
||||
|
||||
{
|
||||
"type": "object",
|
||||
"additionalProperties": True,
|
||||
"properties": {
|
||||
"data": {
|
||||
"default": None,
|
||||
"anyOf": [
|
||||
{"type": "null"},
|
||||
{"type": "string"},
|
||||
{"type": "array"},
|
||||
],
|
||||
},
|
||||
"var": {
|
||||
"type": "string",
|
||||
"default": "hier_item",
|
||||
"optional": True,
|
||||
},
|
||||
"separator": {
|
||||
"type": "string",
|
||||
"default": "/",
|
||||
"optional": True,
|
||||
},
|
||||
"reversed": {
|
||||
"type": "boolean",
|
||||
"default": False,
|
||||
"optional": True,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def _process_item(self, data, ctx):
|
||||
|
||||
return path_assemble_hier(data,
|
||||
sep=ctx['var_split'],
|
||||
reverse=ctx['var_reversed'],
|
||||
start_index=ctx['var_start'],
|
||||
)
|
||||
|
||||
def process_items(self, lookups, conf):
|
||||
|
||||
ctx = {
|
||||
"var_split": conf.get('split', '/'),
|
||||
"var_reversed": conf.get('reversed', False),
|
||||
"var_start": conf.get('start', 0),
|
||||
}
|
||||
|
||||
lookups = self.loop_over(
|
||||
lookups,
|
||||
conf=conf,
|
||||
var_name='item_hier',
|
||||
callback=self._process_item,
|
||||
callback_context=ctx,
|
||||
)
|
||||
|
||||
return lookups
|
||||
|
||||
|
||||
71
kheops/plugin/scope/loop.py
Normal file
71
kheops/plugin/scope/loop.py
Normal file
@ -0,0 +1,71 @@
|
||||
"""Hierarchy backend plugin"""
|
||||
|
||||
import copy
|
||||
import logging
|
||||
|
||||
from kheops.plugin2.common import ScopePlugin, ScopeExtLoop
|
||||
from kheops.utils import path_assemble_hier
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Plugin(ScopePlugin,ScopeExtLoop):
|
||||
"""Hierarchy plugin"""
|
||||
|
||||
_plugin_name = "hier"
|
||||
_schema_props_new = {
|
||||
"hier": {
|
||||
"default": None,
|
||||
"optional": True,
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "null",
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
},
|
||||
{
|
||||
"additionalProperties": True,
|
||||
"properties": {
|
||||
"data": {
|
||||
"default": None,
|
||||
"anyOf": [
|
||||
{"type": "null"},
|
||||
{"type": "string"},
|
||||
{"type": "array"},
|
||||
],
|
||||
},
|
||||
"var": {
|
||||
"type": "string",
|
||||
"default": "hier_item",
|
||||
"optional": True,
|
||||
},
|
||||
"separator": {
|
||||
"type": "string",
|
||||
"default": "/",
|
||||
"optional": True,
|
||||
},
|
||||
"reversed": {
|
||||
"type": "boolean",
|
||||
"default": False,
|
||||
"optional": True,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def process_items(self, lookups, conf):
|
||||
|
||||
item_name = conf.get('var', "item_loop")
|
||||
item_data = conf.get('data', None)
|
||||
|
||||
lookups = self.loop_over(
|
||||
lookups,
|
||||
conf=conf,
|
||||
var_name='item_loop',
|
||||
)
|
||||
|
||||
return lookups
|
||||
@ -1,4 +1,4 @@
|
||||
"""Strategy plugins"""
|
||||
|
||||
from . import last
|
||||
from . import schema
|
||||
from . import merge_deep
|
||||
|
||||
@ -1,17 +1,42 @@
|
||||
"""Simple last strategy"""
|
||||
"""Last strategy Plugin"""
|
||||
|
||||
import logging
|
||||
from kheops.plugin.common import PluginStrategyClass
|
||||
from kheops.plugin2.common import StrategyPlugin
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class Plugin(PluginStrategyClass):
|
||||
#class Plugin(PluginStrategyClass):
|
||||
|
||||
class Plugin(StrategyPlugin):
|
||||
"""Last strategy plugin"""
|
||||
|
||||
_plugin_name = "last"
|
||||
_schema_props_new = None
|
||||
|
||||
def process(self, candidates: list, rule=None) -> (list, dict):
|
||||
selector = 'last'
|
||||
|
||||
def merge_results(self, candidates: list, rule: dict, query) -> (list, dict):
|
||||
"""Return results"""
|
||||
|
||||
return candidates[-1]
|
||||
key = query.key
|
||||
result = None
|
||||
|
||||
for cand in reversed(candidates):
|
||||
#try:
|
||||
data = cand.data
|
||||
|
||||
if key is None:
|
||||
result = data
|
||||
else:
|
||||
if isinstance(data, dict):
|
||||
try:
|
||||
result = data[key]
|
||||
break
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
#else:
|
||||
# raise Exception(f"Data must be a dict, not something else ... {data}")
|
||||
|
||||
|
||||
return result
|
||||
|
||||
64
kheops/plugin/strategy/merge_deep.py
Normal file
64
kheops/plugin/strategy/merge_deep.py
Normal file
@ -0,0 +1,64 @@
|
||||
"""Last strategy Plugin"""
|
||||
|
||||
import logging
|
||||
from mergedeep import merge, Strategy
|
||||
|
||||
from kheops.plugin2.common import StrategyPlugin
|
||||
|
||||
from pprint import pprint
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Plugin(StrategyPlugin):
|
||||
"""Last strategy plugin"""
|
||||
|
||||
_plugin_name = "merge_deep"
|
||||
_schema_props_new = None
|
||||
|
||||
selector = 'matched'
|
||||
|
||||
|
||||
def _init(self):
|
||||
|
||||
# Fetch module config
|
||||
# See documentation: https://github.com/clarketm/mergedeep
|
||||
algo = self.ns.config['config'].get('merge_deep_algo', "replace").upper()
|
||||
strategy = getattr(Strategy, algo, None)
|
||||
if strategy is None:
|
||||
strategies = [ i.lower() for i in dir(Strategy) if i.isupper() ]
|
||||
raise Exception (f"Unknown algorithm: {algo}, please choose one of: {strategies}")
|
||||
self.strategy = strategy
|
||||
|
||||
|
||||
def merge_results(self, candidates: list, rule: dict, query) -> (list, dict):
|
||||
"""Return results"""
|
||||
|
||||
key = query.key
|
||||
results = []
|
||||
|
||||
for cand in candidates:
|
||||
|
||||
data = cand.data
|
||||
|
||||
if key is None:
|
||||
result = results.append(cand.data)
|
||||
else:
|
||||
if isinstance(data, dict):
|
||||
try:
|
||||
result = results.append(cand.data[key])
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
|
||||
#else:
|
||||
# raise Exception(f"Data must be a dict, not something else ... {data}")
|
||||
|
||||
|
||||
log.debug("Merging %s results", len(results))
|
||||
result = None
|
||||
if len(results) > 0 :
|
||||
result = merge(*results, strategy=self.strategy)
|
||||
|
||||
return result
|
||||
@ -1,216 +0,0 @@
|
||||
"""Schema strategy"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pprint import pprint
|
||||
|
||||
from jsonmerge import Merger
|
||||
from prettytable import PrettyTable
|
||||
|
||||
from kheops.plugin.common import PluginStrategyClass
|
||||
from kheops.utils import str_ellipsis
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Plugin(PluginStrategyClass):
|
||||
"""Schema strategy plugin"""
|
||||
|
||||
_plugin_name = "schema"
|
||||
_schema_props_new = {
|
||||
"schema": {
|
||||
"default": None,
|
||||
"optional": True,
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "null",
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
},
|
||||
{
|
||||
"type": "array",
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"additionalProperties": True,
|
||||
"default": {},
|
||||
"properties": {
|
||||
"data": {
|
||||
"default": None,
|
||||
"optional": False,
|
||||
"anyOf": [
|
||||
{"type": "null"},
|
||||
{"type": "string"},
|
||||
{"type": "array"},
|
||||
],
|
||||
},
|
||||
"var": {
|
||||
"type": "string",
|
||||
"default": "loop_item",
|
||||
"optional": True,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
default_merge_schema = {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"oneOf": [
|
||||
{
|
||||
"type": "array",
|
||||
"mergeStrategy": "append",
|
||||
# "mergeStrategy": "arrayMergeById",
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"mergeStrategy": "objectMerge",
|
||||
},
|
||||
{
|
||||
"type": "boolean",
|
||||
"mergeStrategy": "overwrite",
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"mergeStrategy": "overwrite",
|
||||
},
|
||||
{
|
||||
"type": "integer",
|
||||
"mergeStrategy": "overwrite",
|
||||
},
|
||||
{
|
||||
"type": "number",
|
||||
"mergeStrategy": "overwrite",
|
||||
},
|
||||
{
|
||||
"type": "null",
|
||||
"mergeStrategy": "overwrite",
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
def process(self, candidates: list, rule=None) -> (list, dict):
|
||||
"""Return results"""
|
||||
|
||||
trace = rule["trace"]
|
||||
explain = rule["explain"]
|
||||
|
||||
schema = rule.get("schema", None) or self.default_merge_schema
|
||||
merger = Merger(schema)
|
||||
t = PrettyTable()
|
||||
t1 = PrettyTable()
|
||||
|
||||
new_candidate = None
|
||||
for index, item in enumerate(candidates):
|
||||
new_value = item["data"]
|
||||
result = merger.merge(new_candidate, new_value)
|
||||
|
||||
backend_info = dict(item["parent"])
|
||||
backend_run = backend_info.pop("_run")
|
||||
if explain:
|
||||
t1.add_row(
|
||||
[
|
||||
index,
|
||||
"\nBackendRun: "
|
||||
+ str_ellipsis(
|
||||
json.dumps(
|
||||
backend_run,
|
||||
default=lambda o: "<not serializable>",
|
||||
indent=2,
|
||||
),
|
||||
70,
|
||||
),
|
||||
"\nRuleRun: "
|
||||
+ str_ellipsis(
|
||||
json.dumps(
|
||||
item["run"],
|
||||
default=lambda o: "<not serializable>",
|
||||
indent=2,
|
||||
),
|
||||
70,
|
||||
),
|
||||
"---\nResult: "
|
||||
+ str_ellipsis(
|
||||
json.dumps(
|
||||
result, default=lambda o: "<not serializable>", indent=2
|
||||
),
|
||||
70,
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
if trace:
|
||||
t.add_row(
|
||||
[
|
||||
index,
|
||||
"---\nBackendConfig: "
|
||||
+ str_ellipsis(
|
||||
json.dumps(
|
||||
backend_info,
|
||||
default=lambda o: "<not serializable>",
|
||||
indent=2,
|
||||
),
|
||||
70,
|
||||
)
|
||||
+ "\nBackendRun: "
|
||||
+ str_ellipsis(
|
||||
json.dumps(
|
||||
backend_run,
|
||||
default=lambda o: "<not serializable>",
|
||||
indent=2,
|
||||
),
|
||||
70,
|
||||
),
|
||||
"---\nRuleConfig: "
|
||||
+ str_ellipsis(
|
||||
json.dumps(
|
||||
rule, default=lambda o: "<not serializable>", indent=2
|
||||
),
|
||||
70,
|
||||
)
|
||||
+ "\nRuleRun: "
|
||||
+ str_ellipsis(
|
||||
json.dumps(
|
||||
item["run"],
|
||||
default=lambda o: "<not serializable>",
|
||||
indent=2,
|
||||
),
|
||||
70,
|
||||
)
|
||||
+
|
||||
#'\nSource: ' + str_ellipsis(json.dumps(
|
||||
# new_candidate,
|
||||
# default=lambda o: '<not serializable>', indent=2), 70) +
|
||||
"\nNew data: "
|
||||
+ str_ellipsis(
|
||||
json.dumps(
|
||||
new_value,
|
||||
default=lambda o: "<not serializable>",
|
||||
indent=2,
|
||||
),
|
||||
70,
|
||||
),
|
||||
"---\nResult: "
|
||||
+ str_ellipsis(
|
||||
json.dumps(
|
||||
result, default=lambda o: "<not serializable>", indent=2
|
||||
),
|
||||
70,
|
||||
),
|
||||
]
|
||||
)
|
||||
new_candidate = result
|
||||
|
||||
if trace:
|
||||
t.field_names = ["Index", "Backend", "Rule", "Data"]
|
||||
t.align = "l"
|
||||
print(t)
|
||||
if explain:
|
||||
t1.field_names = ["Index", "Backend", "Rule", "Data"]
|
||||
t1.align = "l"
|
||||
print("Explain:\n" + repr(t1))
|
||||
|
||||
return new_candidate
|
||||
@ -1,41 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Kheops Query Class"""
|
||||
|
||||
import logging
|
||||
from pprint import pprint
|
||||
from kheops.managers import BackendsManager, RulesManager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Query
|
||||
##########################################
|
||||
|
||||
|
||||
class Query:
|
||||
"""Kheops Query Class"""
|
||||
|
||||
def __init__(self, app):
|
||||
|
||||
self.app = app
|
||||
|
||||
def exec(self, key=None, scope=None, policy=None, trace=False, explain=False):
|
||||
"""Execute the query"""
|
||||
|
||||
bmgr = BackendsManager(app=self.app)
|
||||
mmgr = RulesManager(app=self.app)
|
||||
|
||||
log.debug("New query created")
|
||||
candidates = bmgr.query(key, scope, trace=trace)
|
||||
result = mmgr.get_result(candidates, key=key, trace=trace, explain=explain)
|
||||
return result
|
||||
|
||||
def dump(self):
|
||||
"""Dump the query object"""
|
||||
|
||||
ret = {}
|
||||
for i in dir(self):
|
||||
if not i.startswith("_"):
|
||||
ret[i] = getattr(self, i)
|
||||
|
||||
pprint(ret)
|
||||
@ -14,6 +14,9 @@ log = logging.getLogger(__name__)
|
||||
# =====================
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def glob_files(path, pattern):
|
||||
"""Return a list of path that match a glob"""
|
||||
log.debug("Search glob '%s' in '%s'", pattern, path)
|
||||
@ -22,7 +25,7 @@ def glob_files(path, pattern):
|
||||
return [str(i) for i in ret]
|
||||
|
||||
|
||||
def path_assemble_hier(path, sep="/"):
|
||||
def path_assemble_hier(path, sep="/", reverse=False, start_index=0):
|
||||
"""Append the previous"""
|
||||
|
||||
if isinstance(path, str):
|
||||
@ -32,25 +35,64 @@ def path_assemble_hier(path, sep="/"):
|
||||
else:
|
||||
raise Exception(f"This function only accepts string or lists, got: {path}")
|
||||
|
||||
|
||||
|
||||
if reverse:
|
||||
list_data = list_data[::-1]
|
||||
|
||||
|
||||
if start_index > 0:
|
||||
fixed_part = list_data[:start_index]
|
||||
if reverse:
|
||||
fixed_part = fixed_part[::-1]
|
||||
fixed_part = sep.join(fixed_part)
|
||||
|
||||
hier_part = list_data[start_index:]
|
||||
|
||||
new_data = [fixed_part]
|
||||
new_data.extend(hier_part)
|
||||
list_data = new_data
|
||||
|
||||
|
||||
assert isinstance(list_data, list), f"Got: {list_data}"
|
||||
ret = []
|
||||
for index, part in enumerate(list_data):
|
||||
prefix =''
|
||||
try:
|
||||
prefix = ret[index - 1]
|
||||
prefix = f"{prefix}/"
|
||||
except IndexError:
|
||||
prefix = f"{sep}"
|
||||
prefix = ""
|
||||
item = f"{prefix}{part}{sep}"
|
||||
pass
|
||||
item = f"{prefix}{part}"
|
||||
ret.append(item)
|
||||
return ret
|
||||
|
||||
|
||||
def render_template(path, params):
|
||||
def render_template(text, params):
|
||||
"""Render template for a given string"""
|
||||
assert isinstance(params, dict), f"Got: {params}"
|
||||
tpl = Template(path)
|
||||
tpl = Template(text)
|
||||
return tpl.render(**params)
|
||||
|
||||
class Default(dict):
|
||||
def __missing__(self, key):
|
||||
return ''
|
||||
|
||||
|
||||
def render_template_python(text, params, ignore_missing=True):
|
||||
"""Render template for a given string"""
|
||||
assert isinstance(params, dict), f"Got: {params}"
|
||||
|
||||
if ignore_missing:
|
||||
return text.format_map(Default(params))
|
||||
|
||||
try:
|
||||
return text.format_map(params)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
|
||||
|
||||
# Schema Methods
|
||||
# =====================
|
||||
@ -100,7 +142,7 @@ def schema_validate(config, schema):
|
||||
return config
|
||||
|
||||
|
||||
def str_ellipsis(txt, length=120):
|
||||
def str_ellipsis(txt, length=60):
|
||||
"""Truncate with ellipsis too wide texts"""
|
||||
txt = str(txt)
|
||||
ret = []
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user