Fix: linting with black

This commit is contained in:
mrjk 2022-02-02 00:25:54 -05:00
parent 327a0e1eb2
commit e7c66d8771
10 changed files with 385 additions and 489 deletions

View File

@ -18,8 +18,6 @@ from kheops.utils import schema_validate
log = logging.getLogger(__name__)
CONF_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
@ -31,7 +29,7 @@ CONF_SCHEMA = {
"rules_items": {},
"rules_config": {},
},
#"patternProperties": {
# "patternProperties": {
# ".*": {
# "type": "object",
# "optional": True,
@ -75,7 +73,6 @@ CONF_SCHEMA = {
},
},
},
# OLD
"tree": {
# "additionalProperties": False,
@ -148,21 +145,16 @@ CONF_SCHEMA = {
},
# },
# },
}
}
class GenericInstance():
class GenericInstance:
name = None
run = {}
class KheopsNamespace(GenericInstance, QueryProcessor):
def __init__(self, app, name, config=None):
self.name = name
@ -170,14 +162,10 @@ class KheopsNamespace(GenericInstance, QueryProcessor):
self.app = app
self.run = dict(app.run)
# Validate configuration
self.config = schema_validate(self.config, CONF_SCHEMA)
self.run["path_ns"] = str(Path(app.run['config_src']).parent.resolve())
self.run["path_ns"] = str(Path(app.run["config_src"]).parent.resolve())
# def load_namespace(self, namespace="default"):
@ -243,19 +231,15 @@ class KheopsNamespace(GenericInstance, QueryProcessor):
# return config
#def query(self, key=None, scope=None):
# processor = QueryProcessor(app=self.app)
# result = processor.exec(key, scope)
# def query(self, key=None, scope=None):
# processor = QueryProcessor(app=self.app)
# result = processor.exec(key, scope)
#
# return result
class Kheops(GenericInstance):
"""Main Kheops Application Instance
"""
"""Main Kheops Application Instance"""
def __init__(self, config="kheops.yml", namespace="default"):
"""
@ -277,20 +261,17 @@ class Kheops(GenericInstance):
self.run["config_src"] = config
if isinstance(config, str):
self.run["config_type"] = 'file'
self.run["config_type"] = "file"
self.run["path_config"] = str(Path(config).parent.resolve())
elif isinstance(config, dict):
self.run["config_type"] = 'dict'
self.run["config_type"] = "dict"
self.run["path_config"] = str(path_cwd)
else:
raise Exception("Need a valid config")
self.ns_name = namespace
self.raw_config = self.parse_conf(config)
def parse_conf(self, config="kheops.yml"):
"""
Parse Kheops configuration
@ -307,31 +288,32 @@ class Kheops(GenericInstance):
# Load config
if isinstance(config, str):
dict_conf = anyconfig.load(config)
source = f'file:{config}'
source = f"file:{config}"
elif isinstance(config, dict):
dict_conf = config
source = 'dict'
source = "dict"
return dict_conf
def lookup2(self, keys=None, policy=None, scope=None,
trace=False, explain=False, validate_schema=False,
namespace='default' ,
def lookup2(
self,
keys=None,
policy=None,
scope=None,
trace=False,
explain=False,
validate_schema=False,
namespace="default",
):
"""Lookup a key in hierarchy"""
ret = {}
# Loop over keys
for key_def in keys:
key_def = key_def or ''
key_def = key_def or ""
# Identify namespace and key
parts = key_def.split(':')
parts = key_def.split(":")
ns_name = self.ns_name
if len(parts) > 1:
ns_name = parts[0]
@ -357,29 +339,15 @@ class Kheops(GenericInstance):
return ret
def lookup(self, keys=None, policy=None, scope=None, trace=False, explain=False, validate_schema=False):
def lookup(
self,
keys=None,
policy=None,
scope=None,
trace=False,
explain=False,
validate_schema=False,
):
"""Lookup a key in hierarchy"""
log.debug("Lookup key %s with scope: %s", keys, scope)
assert isinstance(keys, list), f"Got {keys}"
@ -387,7 +355,14 @@ class Kheops(GenericInstance):
query = Query(app=self)
ret = {}
for key in keys:
ret[key] = query.exec(key=key, scope=scope, policy=policy, trace=trace, explain=explain, validate_schema=validate_schema)
ret[key] = query.exec(
key=key,
scope=scope,
policy=policy,
trace=trace,
explain=explain,
validate_schema=validate_schema,
)
return ret
def dump_schema(self):
@ -405,8 +380,7 @@ class Kheops(GenericInstance):
# print(json.dumps(ret, indent=2))
def gen_docs(self):
""" Generate documentation"""
"""Generate documentation"""
print("WIP")
return None

View File

@ -122,11 +122,13 @@ class CmdApp:
add_p = subparsers.add_parser("schema")
add_p = subparsers.add_parser("gen_doc")
# Manage command: lookup2
add_p = subparsers.add_parser("lookup")
add_p.add_argument(
"-n", "--namespace", help="Namespace name (KHEOPS_NAMESPACE)", default=os.environ.get("KHEOPS_NAMESPACE", "default")
"-n",
"--namespace",
help="Namespace name (KHEOPS_NAMESPACE)",
default=os.environ.get("KHEOPS_NAMESPACE", "default"),
)
add_p.add_argument(
"-f", "--file", help="File with params as dict. Can be stdin - ."
@ -178,7 +180,6 @@ class CmdApp:
keys = self.args.keys or [None]
new_params = {}
if self.args.file:
new_params = anyconfig.load(self.args.file, ac_parser="yaml")
@ -192,25 +193,17 @@ class CmdApp:
self.log.info("CLI: %s with env: %s", keys, new_params)
app = Kheops.Kheops(config=self.args.config, namespace=self.args.namespace)
ret = app.lookup2(
namespace=self.args.namespace,
keys=keys,
scope=new_params,
trace=self.args.trace,
explain=self.args.explain,
validate_schema=True,
)
print(anyconfig.dumps(ret, ac_parser=self.args.format))
def cli_lookup_OLD(self):
"""Display how to use logging"""
@ -232,7 +225,7 @@ class CmdApp:
self.log.info("CLI: %s with env: %s", keys, new_params)
app = Kheops.App(config=self.args.config, namespace=self.args.namespace)
#for key in keys:
# for key in keys:
ret = app.lookup(
keys=keys,
scope=new_params,

View File

@ -1,7 +1,7 @@
import json
import logging
#from pprint import pprint
# from pprint import pprint
from pathlib import Path
from prettytable import PrettyTable
@ -10,7 +10,7 @@ import kheops.plugin as KheopsPlugins
from kheops.utils import render_template, render_template_python, str_ellipsis
log = logging.getLogger(__name__)
tracer = logging.getLogger(f'{__name__}.explain')
tracer = logging.getLogger(f"{__name__}.explain")
class LoadPlugin:
@ -46,8 +46,7 @@ class LoadPlugin:
plugin_loader = LoadPlugin(KheopsPlugins)
class Query():
class Query:
key = None
scope = None
@ -62,13 +61,13 @@ class Query():
# class QueryController():
#def exec(self, key=None, scope=None):
# query = Query(key, scope)
# result = self.processor.exec(query)
# return result
# def exec(self, key=None, scope=None):
# query = Query(key, scope)
# result = self.processor.exec(query)
# return result
class QueryProcessor():
class QueryProcessor:
"""QueryProcessor class provides all the methods to be able to make queries"""
default_match_rule = {
@ -77,26 +76,24 @@ class QueryProcessor():
"strategy": "merge_deep",
}
default_lookup_item = {
"path": None,
"backend": "file",
"continue": True,
}
#def __init__(self, app):
# def __init__(self, app):
# self.app = app
#self.config = app.conf2['config'] or {}
#self.lookups = app.conf2['lookups'] or []
#self.rules = app.conf2['rules'] or []
# self.config = app.conf2['config'] or {}
# self.lookups = app.conf2['lookups'] or []
# self.rules = app.conf2['rules'] or []
def CHILDREN_INIT(self, config):
self.config = config
pass
#def exec(self, key=None, scope=None):
# def exec(self, key=None, scope=None):
def query(self, key=None, scope=None, explain=False):
if explain:
@ -109,12 +106,11 @@ class QueryProcessor():
# Assemble if more than one and merge when continue.
# Got the Matched rule (RULE CACHE)
# We'll need strategy, and it's selector field: matched/first/last/all
#key_rule = self._get_key_rule(key) or {}
#key_strategy = key_rule.get('strategy', None)
# key_rule = self._get_key_rule(key) or {}
# key_strategy = key_rule.get('strategy', None)
key_rule = self._exec_get_rule(query)
log.info("Matched rule for key '%s': %s", query.key, key_rule)
# Build the lookups [] => []
# Fetch static config from app (for include and NS:includes ...)
# Loop over lookups and process each lookup with ScopePlugins
@ -129,48 +125,52 @@ class QueryProcessor():
# Retrieve the module instance
# Get it's match policy
# TODO
plugin_name = key_rule.get('strategy', None)
strategy_plugin = plugin_loader.load('strategy', plugin_name)(self)
plugin_name = key_rule.get("strategy", None)
strategy_plugin = plugin_loader.load("strategy", plugin_name)(self)
# Get the data (strategy.selector)
# For each entry, ask the backend to return the data: file, http, consul ...
# Return zero, one or more results depending the strategy.selector
#result = get_backends_results(strategy, lookups)
candidates = self._exec_backend_plugins(parsed_lookups, selector=strategy_plugin.selector)
# result = get_backends_results(strategy, lookups)
candidates = self._exec_backend_plugins(
parsed_lookups, selector=strategy_plugin.selector
)
# Generate explain report
if explain:
self._explain_candidates(candidates, query)
# Apply the merge strategy, recall strategy
result = strategy_plugin.merge_results(candidates, key_rule, query)
# TODO: Apply output plugins
# result = self._exec_output_plugins(result)
return result
def _explain_lookups(self, parsed_lookups):
"""Explain list of lookups"""
table = PrettyTable()
for item in parsed_lookups:
col1 = json.dumps({ k:v for k, v in item.items() if k not in ['_run'] }, default=lambda o: "<not serializable>", indent=2)
col2 = json.dumps(item['_run'], default=lambda o: "<not serializable>", indent=2)
table.add_row([
"\nConfig:"+ str_ellipsis(col1, 60),
"\nRuntime:"+ str_ellipsis(col2, 60),
])
col1 = json.dumps(
{k: v for k, v in item.items() if k not in ["_run"]},
default=lambda o: "<not serializable>",
indent=2,
)
col2 = json.dumps(
item["_run"], default=lambda o: "<not serializable>", indent=2
)
table.add_row(
[
"\nConfig:" + str_ellipsis(col1, 60),
"\nRuntime:" + str_ellipsis(col2, 60),
]
)
table.field_names = ["Config", "Runtime"]
table.align = "l"
tracer.info("Explain lookups:\n" + str(table))
def _explain_candidates(self, candidates, query):
"""Explain list of candidates"""
@ -179,59 +179,66 @@ class QueryProcessor():
table = PrettyTable()
for item_obj in candidates:
item = item_obj.__dict__
item["rel_path"] = str(Path(item['path']).relative_to(Path.cwd()))
item["rel_path"] = str(Path(item["path"]).relative_to(Path.cwd()))
col1 = json.dumps({ k:v for k, v in item.items() if k not in ['run', 'data'] }, default=lambda o: "<not serializable>", indent=2)
col2 = json.dumps(item['run']['_run'], default=lambda o: "<not serializable>", indent=2)
col3 = item_obj.data.get(query.key, "NOT FOUND") if query.key is not None else item_obj.data
col1 = json.dumps(
{k: v for k, v in item.items() if k not in ["run", "data"]},
default=lambda o: "<not serializable>",
indent=2,
)
col2 = json.dumps(
item["run"]["_run"], default=lambda o: "<not serializable>", indent=2
)
col3 = (
item_obj.data.get(query.key, "NOT FOUND")
if query.key is not None
else item_obj.data
)
col3 = json.dumps(col3, default=lambda o: "<not serializable>", indent=2)
table.add_row([
"\nStatus:"+ str_ellipsis(col1, 80),
"\nRuntime:"+ str_ellipsis(col2, 60),
"\nData:"+ str_ellipsis(col3, 60),
])
table.add_row(
[
"\nStatus:" + str_ellipsis(col1, 80),
"\nRuntime:" + str_ellipsis(col2, 60),
"\nData:" + str_ellipsis(col3, 60),
]
)
table.field_names = ["Status", "Runtime", "Data"]
table.align = "l"
tracer.info("Explain candidates:\n" + str(table))
def _exec_backend_plugins(self, lookups, selector="matched"):
selector = 'matched'
assert (selector in ['last', 'first', 'all', 'matched'])
selector = "matched"
assert selector in ["last", "first", "all", "matched"]
assert isinstance(lookups, list)
#lookups = self.config.get("lookups", {}).copy()
# lookups = self.config.get("lookups", {}).copy()
plugins = {}
ret = []
for index, lookup_def in enumerate(lookups):
# Update object
lookup_def['_run']['backend_index'] = index
lookup_def["_run"]["backend_index"] = index
# Load plugin
plugin_name = lookup_def["backend"]
if plugin_name in plugins:
plugin = plugins[plugin_name]
else:
plugin = plugin_loader.load('backend', plugin_name)(namespace=self)
plugin = plugin_loader.load("backend", plugin_name)(namespace=self)
# Get candidates
candidates = plugin.fetch_data(lookup_def)
# Apply selector
for candidate in candidates:
if candidate.status == 'found' or selector == 'all':
if candidate.status == "found" or selector == "all":
ret.append(candidate)
return ret
def _exec_assemble_lookups(self, lookups, query):
assert isinstance(lookups, list)
assert len(lookups) > 0
@ -243,17 +250,17 @@ class QueryProcessor():
if isinstance(lookup_def, str):
shortform = True
lookup_def = {
'path': lookup_def,
"path": lookup_def,
}
assert isinstance(lookup_def, dict)
new_lookup = dict(self.default_lookup_item)
new_lookup.update(lookup_def)
new_lookup['_run'] = {
'scope': query.scope,
'key': query.key,
'conf': {
'index': index,
new_lookup["_run"] = {
"scope": query.scope,
"key": query.key,
"conf": {
"index": index,
}
# 'shortform': shortform,
}
@ -269,36 +276,34 @@ class QueryProcessor():
plugin_name = plugin_def.get("module", None)
if plugin_name:
plugin = plugin_loader.load('scope', plugin_name)(namespace=self)
plugin = plugin_loader.load("scope", plugin_name)(namespace=self)
ret = plugin.process_items(ret, plugin_def)
new_lookups2.extend(ret)
# Parse the `path` value with scope variables
new_lookups3 = []
for lookup in new_lookups2:
path = lookup['path']
scope = lookup['_run']['scope']
path = lookup["path"]
scope = lookup["_run"]["scope"]
new_path = render_template_python(path, scope, ignore_missing=False)
if new_path:
lookup['_run']['raw_path'] = path
lookup['path'] = new_path
lookup["_run"]["raw_path"] = path
lookup["path"] = new_path
new_lookups3.append(lookup)
else:
log.info("Ignore because of missing scope vars: '%s'", path)
return new_lookups3
def _exec_get_rule(self, query, mode='match'):
def _exec_get_rule(self, query, mode="match"):
key = query.key
rules = self.config['rules'] or {}
rules = self.config["rules"] or {}
if mode == "match":
rule = dict(self.default_match_rule)
rules = [i for i in rules if i.get('key', None) == key ]
rules = [i for i in rules if i.get("key", None) == key]
if len(rules) > 0:
match = rules[0]
rule.update(match)
@ -306,12 +311,6 @@ class QueryProcessor():
log.debug("Applying default rule for key '%s'", key)
rule = self.default_match_rule
else:
raise Exception (f"Mode '{mode}' is not implemented")
raise Exception(f"Mode '{mode}' is not implemented")
return rule

View File

@ -10,6 +10,7 @@ from kheops.utils import render_template, glob_files, render_template_python
from kheops.plugin.common import BackendPlugin, BackendCandidate
from pprint import pprint
log = logging.getLogger(__name__)
@ -27,7 +28,7 @@ log = logging.getLogger(__name__)
# return super()._report_data(data)
#class Plugin(PluginEngineClass, PluginFileGlob):
# class Plugin(PluginEngineClass, PluginFileGlob):
class Plugin(BackendPlugin):
"""Generic Plugin Class"""
@ -65,45 +66,40 @@ class Plugin(BackendPlugin):
},
}
extensions = {
'.yml': 'yaml',
'.yaml': 'yaml'
}
extensions = {".yml": "yaml", ".yaml": "yaml"}
def _init(self):
# Guess top path
top_path = self.ns.run['path_config']
path_prefix = self.ns.config['config'].get('file_path_prefix', None)
top_path = self.ns.run["path_config"]
path_prefix = self.ns.config["config"].get("file_path_prefix", None)
if path_prefix:
top_path = os.path.join(top_path, path_prefix)
self.top_path = top_path
# Fetch module config
path_suffix = self.ns.config['config'].get('file_path_suffix', "auto")
if path_suffix == 'auto':
path_suffix = self.ns.config["config"].get("file_path_suffix", "auto")
if path_suffix == "auto":
path_suffix = f"/{self.ns.name}"
self.path_suffix = path_suffix
def fetch_data(self, config) -> list:
path = config.get('path')
path = config.get("path")
if self.path_suffix:
path = f"{path}{self.path_suffix}"
raw_data = None
status = 'not_found'
status = "not_found"
for ext, parser in self.extensions.items():
new_path = os.path.join(self.top_path, path + ext )
new_path = os.path.join(self.top_path, path + ext)
if os.path.isfile(new_path):
status = 'found'
status = "found"
try:
raw_data = anyconfig.load(new_path, ac_parser=parser)
except Exception:
status = 'broken'
status = "broken"
raw_data = None
break
@ -111,8 +107,7 @@ class Plugin(BackendPlugin):
path=new_path,
status=status,
run=config,
data= raw_data,
data=raw_data,
)
return [ret]

View File

@ -8,16 +8,15 @@ from pprint import pprint
log = logging.getLogger(__name__)
# Vocabulary:
# Key Rules
# ConfPlugin[1]
# StrategyPlugin[1]
# OutPlugin[N]
# Lookup Hierarchy
# ConfPlugin[1]
# ScopePlugin[N]
# BackendPlugin[1]
# Key Rules
# ConfPlugin[1]
# StrategyPlugin[1]
# OutPlugin[N]
# Lookup Hierarchy
# ConfPlugin[1]
# ScopePlugin[N]
# BackendPlugin[1]
# Generic classes
@ -26,7 +25,6 @@ class KheopsPlugin:
plugin_type = None
plugin_kind = None
def __init__(self):
self._init()
@ -36,20 +34,21 @@ class KheopsPlugin:
class KheopsListPlugin(KheopsPlugin):
plugin_type = 'list'
plugin_type = "list"
def process_list(self, item_list) -> list:
pass
class KheopsItemPlugin(KheopsPlugin):
plugin_type = 'item'
plugin_type = "item"
def process_item(self, item) -> list:
pass
# Other classes
class BackendCandidate():
class BackendCandidate:
def __init__(self, path=None, data=None, run=None, status=None):
assert isinstance(run, dict)
self.path = path
@ -60,15 +59,18 @@ class BackendCandidate():
def __repr__(self):
return f"Status: {self.status}, Path: {self.path} => {self.data}"
# Specific classes
class ConfPlugin(KheopsListPlugin):
plugin_kind = "conf"
schema_prop = {
"include": {}, # Direct config, DICT
}
def process_list(self, item_list) -> list:
pass
class ScopePlugin(KheopsListPlugin):
plugin_kind = "scope"
@ -88,9 +90,8 @@ class ScopePlugin(KheopsListPlugin):
super().__init__()
class ScopeExtLoop():
'''This Scope Extension allow to loop over a lookup'''
class ScopeExtLoop:
"""This Scope Extension allow to loop over a lookup"""
schema_props = {
"properties": {
@ -110,11 +111,12 @@ class ScopeExtLoop():
},
}
def loop_over(
self, lookups, conf, var_name="item", callback_context=None, callback=None
):
def loop_over(self, lookups, conf, var_name='item', callback_context=None, callback=None):
var_name = conf.get('var', var_name)
var_data_ref = conf.get('data', None)
var_name = conf.get("var", var_name)
var_data_ref = conf.get("data", None)
if not var_data_ref:
log.debug("No data to loop over for: %s", var_data_ref)
@ -126,9 +128,9 @@ class ScopeExtLoop():
var_data = var_data_ref
if isinstance(var_data_ref, str):
try:
var_data = lookup['_run']['scope'][var_data]
var_data = lookup["_run"]["scope"][var_data]
except KeyError:
log.debug ("Ignoring missing '%s' from scope", var_data)
log.debug("Ignoring missing '%s' from scope", var_data)
pass
# Run callback
@ -143,28 +145,25 @@ class ScopeExtLoop():
# Create new object
for index, var_value in enumerate(var_data):
if not 'hier' in lookup['_run']:
lookup['_run']['hier'] = []
if not "hier" in lookup["_run"]:
lookup["_run"]["hier"] = []
ctx = {
'data_ref': var_data_ref,
'index': index,
'value': var_value,
'variable': var_name,
"data_ref": var_data_ref,
"index": index,
"value": var_value,
"variable": var_name,
}
new_item = copy.deepcopy(lookup)
new_item['_run']['scope'][var_name] = var_value
new_item['_run']['hier'].append(ctx)
new_item["_run"]["scope"][var_name] = var_value
new_item["_run"]["hier"].append(ctx)
ret.append(new_item)
return ret
class BackendPlugin(KheopsItemPlugin):
plugin_kind = "backend"
@ -176,16 +175,15 @@ class BackendPlugin(KheopsItemPlugin):
"consul": {},
"vault": {},
}
def fetch_data(self, lookups) -> list:
raise Exception('Not implemented')
def fetch_data(self, lookups) -> list:
raise Exception("Not implemented")
def __init__(self, namespace):
self.ns = namespace
super().__init__()
class StrategyPlugin(KheopsItemPlugin):
plugin_kind = "strategy"
schema_prop = {
@ -196,6 +194,7 @@ class StrategyPlugin(KheopsItemPlugin):
"smart": {},
"schema": {},
}
def merge_results(self, candidates, rule) -> list:
pass
@ -211,60 +210,11 @@ class OutPlugin(KheopsItemPlugin):
"toml": {},
"validate": {},
}
def process_item(self, item) -> list:
pass
# # Candidate Classes
# # =============================
# # class Candidate:

View File

@ -10,6 +10,7 @@ log = logging.getLogger(__name__)
from pprint import pprint
class Plugin(ScopePlugin, ScopeExtLoop):
"""Hierarchy plugin"""
@ -25,7 +26,6 @@ class Plugin(ScopePlugin, ScopeExtLoop):
{
"type": "string",
},
{
"type": "object",
"additionalProperties": True,
@ -59,31 +59,29 @@ class Plugin(ScopePlugin, ScopeExtLoop):
}
}
def _process_item(self, data, ctx):
return path_assemble_hier(data,
sep=ctx['var_split'],
reverse=ctx['var_reversed'],
start_index=ctx['var_start'],
return path_assemble_hier(
data,
sep=ctx["var_split"],
reverse=ctx["var_reversed"],
start_index=ctx["var_start"],
)
def process_items(self, lookups, conf):
ctx = {
"var_split": conf.get('split', '/'),
"var_reversed": conf.get('reversed', False),
"var_start": conf.get('start', 0),
"var_split": conf.get("split", "/"),
"var_reversed": conf.get("reversed", False),
"var_start": conf.get("start", 0),
}
lookups = self.loop_over(
lookups,
conf=conf,
var_name='item_hier',
var_name="item_hier",
callback=self._process_item,
callback_context=ctx,
)
return lookups

View File

@ -9,7 +9,7 @@ from kheops.utils import path_assemble_hier
log = logging.getLogger(__name__)
class Plugin(ScopePlugin,ScopeExtLoop):
class Plugin(ScopePlugin, ScopeExtLoop):
"""Hierarchy plugin"""
_plugin_name = "hier"
@ -56,16 +56,15 @@ class Plugin(ScopePlugin,ScopeExtLoop):
}
}
def process_items(self, lookups, conf):
item_name = conf.get('var', "item_loop")
item_data = conf.get('data', None)
item_name = conf.get("var", "item_loop")
item_data = conf.get("data", None)
lookups = self.loop_over(
lookups,
conf=conf,
var_name='item_loop',
var_name="item_loop",
)
return lookups

View File

@ -5,7 +5,8 @@ from kheops.plugin.common import StrategyPlugin
log = logging.getLogger(__name__)
#class Plugin(PluginStrategyClass):
# class Plugin(PluginStrategyClass):
class Plugin(StrategyPlugin):
"""Last strategy plugin"""
@ -13,7 +14,7 @@ class Plugin(StrategyPlugin):
_plugin_name = "last"
_schema_props_new = None
selector = 'last'
selector = "last"
def merge_results(self, candidates: list, rule: dict, query) -> (list, dict):
"""Return results"""
@ -22,7 +23,7 @@ class Plugin(StrategyPlugin):
result = None
for cand in reversed(candidates):
#try:
# try:
data = cand.data
if key is None:
@ -35,8 +36,7 @@ class Plugin(StrategyPlugin):
except KeyError:
pass
#else:
# else:
# raise Exception(f"Data must be a dict, not something else ... {data}")
return result

View File

@ -17,21 +17,21 @@ class Plugin(StrategyPlugin):
_plugin_name = "merge_deep"
_schema_props_new = None
selector = 'matched'
selector = "matched"
def _init(self):
# Fetch module config
# See documentation: https://github.com/clarketm/mergedeep
algo = self.ns.config['config'].get('merge_deep_algo', "replace").upper()
algo = self.ns.config["config"].get("merge_deep_algo", "replace").upper()
strategy = getattr(Strategy, algo, None)
if strategy is None:
strategies = [ i.lower() for i in dir(Strategy) if i.isupper() ]
raise Exception (f"Unknown algorithm: {algo}, please choose one of: {strategies}")
strategies = [i.lower() for i in dir(Strategy) if i.isupper()]
raise Exception(
f"Unknown algorithm: {algo}, please choose one of: {strategies}"
)
self.strategy = strategy
def merge_results(self, candidates: list, rule: dict, query) -> (list, dict):
"""Return results"""
@ -51,14 +51,12 @@ class Plugin(StrategyPlugin):
except KeyError:
pass
#else:
# else:
# raise Exception(f"Data must be a dict, not something else ... {data}")
log.debug("Merging %s results", len(results))
result = None
if len(results) > 0 :
if len(results) > 0:
result = merge(*results, strategy=self.strategy)
return result

View File

@ -14,9 +14,6 @@ log = logging.getLogger(__name__)
# =====================
def glob_files(path, pattern):
"""Return a list of path that match a glob"""
log.debug("Search glob '%s' in '%s'", pattern, path)
@ -35,12 +32,9 @@ def path_assemble_hier(path, sep="/", reverse=False, start_index=0):
else:
raise Exception(f"This function only accepts string or lists, got: {path}")
if reverse:
list_data = list_data[::-1]
if start_index > 0:
fixed_part = list_data[:start_index]
if reverse:
@ -53,11 +47,10 @@ def path_assemble_hier(path, sep="/", reverse=False, start_index=0):
new_data.extend(hier_part)
list_data = new_data
assert isinstance(list_data, list), f"Got: {list_data}"
ret = []
for index, part in enumerate(list_data):
prefix =''
prefix = ""
try:
prefix = ret[index - 1]
prefix = f"{prefix}/"
@ -74,9 +67,10 @@ def render_template(text, params):
tpl = Template(text)
return tpl.render(**params)
class Default(dict):
def __missing__(self, key):
return ''
return ""
def render_template_python(text, params, ignore_missing=True):
@ -92,8 +86,6 @@ def render_template_python(text, params, ignore_missing=True):
return None
# Schema Methods
# =====================
@ -136,9 +128,7 @@ def schema_validate(config, schema):
path = list(collections.deque(err.schema_path))
path = "/".join([str(i) for i in path])
path = f"schema/{path}"
raise Exception(
f"Failed validating {path} for resource with content: {config}"
)
raise Exception(f"Failed validating {path} for resource with content: {config}")
return config