Update: Kheops core classes

This commit is contained in:
mrjk 2022-03-10 12:08:12 -05:00
parent 5f80881c3c
commit f12d9fd7be
2 changed files with 204 additions and 251 deletions

View File

@ -11,89 +11,50 @@ from pathlib import Path
import anyconfig
from diskcache import Cache
import kheops.plugin as KheopsPlugins
from kheops.controllers import QueryProcessor
from kheops.utils import schema_validate
log = logging.getLogger(__name__)
CONF_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": False,
"default": {},
"$def": {
"backends_items": {},
"backends_config": {},
"rules_items": {},
"rules_config": {},
},
# "patternProperties": {
# ".*": {
# "type": "object",
# "optional": True,
# "additionalProperties": False,
"required": ["config"],
#"$def": {
# "backends_items": {},
# "backends_config": {},
# "rules_items": {},
# "rules_config": {},
#},
"properties": {
"lookups": {
"type": "array",
"default": [],
"items": {
"type": "object",
#"properties": {"$ref": "#/$defs/backends_items"},
},
},
"rules": {
"type": "array",
"default": [],
# "arrayItem": { "$ref": "#/$defs/rules_items" },
},
"config": {
"type": "object",
"default": {},
"additionalProperties": True,
#"required": ["app"],
"properties": {
"app": {
"type": "object",
"default": {},
"additionalProperties": False,
"properties": {
"root": {
"default": None,
"oneOf": [
{
"type": "null",
"description": "Application current working directory is the `kheops.yml` directory",
},
{
"type": "string",
"description": "Application working directory. If a relative path is used, it will be depending on `kheops.yml` directory",
},
],
},
"cache": {
"default": "kheops_cache",
"oneOf": [
{
"type": "null",
"description": "Disable cache",
},
{
"type": "string",
"description": "Path of the cache directory",
},
],
},
},
},
# OLD
"tree": {
# "additionalProperties": False,
"type": "object",
"default": {},
"deprecated": True,
"properties": {
"prefix": {
"default": None,
"oneOf": [
{
"type": "null",
"description": "Disable prefix, all files are lookup up from the app root dir.",
},
{
"type": "string",
"description": """Add a path prefix before all paths. This is quite useful to store your YAML data in a dedicated tree.""",
},
],
},
},
},
"lookups": {
# "additionalProperties": False,
@ -121,30 +82,7 @@ CONF_SCHEMA = {
},
},
},
"tree": {
"type": "array",
"default": [],
"items": {
"type": "object",
"properties": {"$ref": "#/$defs/backends_items"},
},
},
"lookups": {
"type": "array",
"default": [],
"items": {
"type": "object",
"properties": {"$ref": "#/$defs/backends_items"},
},
},
"rules": {
"type": "array",
"default": [],
# "arrayItem": { "$ref": "#/$defs/rules_items" },
},
},
# },
# },
}
@ -184,18 +122,17 @@ class KheopsNamespace(GenericInstance, QueryProcessor):
:type config: Any
"""
config = schema_validate(config, CONF_SCHEMA)
super().__init__(config)
self.name = name
self.config = config or {}
self.app = app
self.run = dict(app.run)
# Validate configuration
self.config = schema_validate(self.config, CONF_SCHEMA)
self.run["path_ns"] = str(Path(app.run["config_src"]).parent.resolve())
class Kheops(GenericInstance):
"""
Kheops Application Class
@ -278,8 +215,6 @@ class Kheops(GenericInstance):
:type scope: dict
"""
ret = {}
# Loop over keys
for key_def in keys:
@ -304,7 +239,6 @@ class Kheops(GenericInstance):
except KeyError as err:
raise Exception(f"Unknown kheops namespace: {ns_name}")
print ("CREATE", ns_name, ns_config)
ns = KheopsNamespace(self, ns_name, ns_config)
# Get result
@ -328,33 +262,7 @@ class Kheops(GenericInstance):
# def DEPRECATED_lookup(
# self,
# keys=None,
# policy=None,
# scope=None,
# trace=False,
# explain=False,
# validate_schema=False,
# ):
# """Lookup a key in hierarchy"""
# log.debug("Lookup key %s with scope: %s", keys, scope)
# assert isinstance(keys, list), f"Got {keys}"
# query = Query(app=self)
# ret = {}
# for key in keys:
# ret[key] = query.exec(
# key=key,
# scope=scope,
# policy=policy,
# trace=trace,
# explain=explain,
# validate_schema=validate_schema,
# )
# return ret
# To clean/implement
# def DEPRECATED_dump_schema(self):
# """Dump configuration schema"""

View File

@ -1,3 +1,9 @@
"""
Kheops controller
Main Kheops model classes
"""
import json
import logging
@ -7,35 +13,45 @@ from pathlib import Path
from prettytable import PrettyTable
import kheops.plugin as KheopsPlugins
from kheops.utils import render_template, render_template_python, str_ellipsis
from kheops.utils import render_template_python, str_ellipsis
from pprint import pprint
log = logging.getLogger(__name__)
tracer = logging.getLogger(f"{__name__}.explain")
# Helper classes
# ------------------------
class LoadPlugin:
"""Generic class to load plugins"""
"""Kheops plugins loader
This plugin loader is a helper to load a python module (Kheops Plugin) from
a plugin kind and name.
"""
def __init__(self, plugins):
self.plugins = plugins
def load(self, kind, name):
"""
Load a plugin
"""
assert isinstance(name, str), f"Got: {name}"
# Get plugin kind
try:
plugins = getattr(self.plugins, kind)
except Exception as err:
raise Exception(f"Unknown module kind '{kind}': {err}")
except AttributeError as err:
raise Exception(f"Unknown module kind '{kind}': {err}") from err
# Get plugin class
try:
plugin_cls = getattr(plugins, name)
except Exception as err:
raise Exception(f"Unknown module '{kind}.{name}': {err}")
except AttributeError as err:
raise Exception(f"Unknown module '{kind}.{name}': {err}") from err
assert hasattr(
plugin_cls, "Plugin"
@ -44,11 +60,29 @@ class LoadPlugin:
# Return plugin Classe
return plugin_cls.Plugin
class BackendCandidate():
"""Backend Candidate
plugin_loader = LoadPlugin(KheopsPlugins)
This object represents a backend candidate. It holds the value of the
requested key, but also so source path, the status and some other metadata.
"""
def __init__(self, path=None, data=None, run=None, status=None):
assert isinstance(run, dict)
self.path = path
self.status = status or "unparsed"
self.run = run or {}
self.data = data or None
def __repr__(self):
return f"Status: {self.status}, Path: {self.path} => {self.data}"
class Query:
"""Query object
Object that hold key and scope.
"""
key = None
scope = None
@ -60,17 +94,25 @@ class Query:
self.rule = None
# class QueryController():
# def exec(self, key=None, scope=None):
# query = Query(key, scope)
# result = self.processor.exec(query)
# return result
# Query Processor class
# ------------------------
class QueryProcessor:
"""QueryProcessor class provides all the methods to be able to make queries"""
"""QueryProcessor
This class helps to do queries for a given key and scope. It provides a single
public method. It also implement an explain mechanism to help to troubleshoot query
lookup issues.
The query process consists in:
* Create a new query with the key and th scope
* Fetch and expand the lookup list (_exec_assemble_lookups)
* Fetch the rule that match the key (_exec_get_rule)
* Fetch the strategy that match the key
* Query all backends with lookup list (_exec_backend_plugins)
* Return result
"""
default_match_rule = {
"key": None,
@ -84,19 +126,17 @@ class QueryProcessor:
"continue": True,
}
# def __init__(self, app):
# self.app = app
# self.config = app.conf2['config'] or {}
# self.lookups = app.conf2['lookups'] or []
# self.rules = app.conf2['rules'] or []
def CHILDREN_INIT(self, config):
def __init__(self, config):
self.plugin_loader = LoadPlugin(KheopsPlugins)
self.config = config
pass
# def exec(self, key=None, scope=None):
# Query methods
# ------------------------
def query(self, key=None, scope=None, explain=False):
"""Query key with scope
"""
if explain:
tracer.setLevel(logging.DEBUG)
@ -109,27 +149,23 @@ class QueryProcessor:
# Assemble if more than one and merge when continue.
# Got the Matched rule (RULE CACHE)
# We'll need strategy, and it's selector field: matched/first/last/all
# key_rule = self._get_key_rule(key) or {}
# key_strategy = key_rule.get('strategy', None)
key_rule = self._exec_get_rule(query)
log.info("Matched rule for key '%s': %s", query.key, key_rule)
# Build the lookups [] => []
# Fetch static config from app (for include and NS:includes ...)
# Loop over lookups and process each lookup with ScopePlugins
lookups = self.config.get("lookups", {}).copy()
lookups = self.config["lookups"].copy()
parsed_lookups = self._exec_assemble_lookups(lookups, query)
# Generate explain report
if explain:
self._explain_lookups(parsed_lookups)
# FEtch the module
# Fetch the module
# Retrieve the module instance
# Get it's match policy
# TODO
plugin_name = key_rule.get("strategy", None)
strategy_plugin = plugin_loader.load("strategy", plugin_name)(self)
strategy_plugin = self.plugin_loader.load("strategy", plugin_name)(self)
# Get the data (strategy.selector)
# For each entry, ask the backend to return the data: file, http, consul ...
@ -151,94 +187,29 @@ class QueryProcessor:
return result
def _explain_lookups(self, parsed_lookups):
"""Explain list of lookups"""
table = PrettyTable()
for item in parsed_lookups:
col1 = json.dumps(
{k: v for k, v in item.items() if k not in ["_run"]},
default=lambda o: "<not serializable>",
indent=2,
)
col2 = json.dumps(
item["_run"], default=lambda o: "<not serializable>", indent=2
)
table.add_row(
[
"\nConfig:" + str_ellipsis(col1, 60),
"\nRuntime:" + str_ellipsis(col2, 60),
]
)
table.field_names = ["Config", "Runtime"]
table.align = "l"
tracer.info("Explain lookups:\n" + str(table))
# Query parts methods
# ------------------------
def _explain_candidates(self, candidates, query):
"""Explain list of candidates"""
def _exec_get_rule(self, query, mode="match"):
# TOFIX: query is not needed here !
key = query.key
rules = self.config["rules"] or {}
table = PrettyTable()
for item_obj in candidates:
item = item_obj.__dict__
item["rel_path"] = str(Path(item["path"]).relative_to(Path.cwd()))
col1 = json.dumps(
{k: v for k, v in item.items() if k not in ["run", "data"]},
default=lambda o: "<not serializable>",
indent=2,
)
col2 = json.dumps(
item["run"]["_run"], default=lambda o: "<not serializable>", indent=2
)
col3 = (
item_obj.data.get(query.key, "NOT FOUND")
if query.key is not None
else item_obj.data
)
col3 = json.dumps(col3, default=lambda o: "<not serializable>", indent=2)
table.add_row(
[
"\nStatus:" + str_ellipsis(col1, 80),
"\nRuntime:" + str_ellipsis(col2, 60),
"\nKey:" + str_ellipsis(col3, 60),
]
)
table.field_names = ["Status", "Runtime", "Key Value"]
table.align = "l"
tracer.info("Explain candidates:\n" + str(table))
def _exec_backend_plugins(self, lookups, selector="matched"):
selector = "matched"
assert selector in ["last", "first", "all", "matched"]
assert isinstance(lookups, list)
# lookups = self.config.get("lookups", {}).copy()
plugins = {}
ret = []
for index, lookup_def in enumerate(lookups):
# Update object
lookup_def["_run"]["backend_index"] = index
# Load plugin
plugin_name = lookup_def["backend"]
if plugin_name in plugins:
plugin = plugins[plugin_name]
if mode == "match":
rule = dict(self.default_match_rule)
rules = [i for i in rules if i.get("key", None) == key]
if len(rules) > 0:
match = rules[0]
rule.update(match)
else:
plugin = plugin_loader.load("backend", plugin_name)(namespace=self)
log.debug("Applying default rule for key '%s'", key)
rule = self.default_match_rule
else:
raise Exception(f"Mode '{mode}' is not implemented")
# Get candidates
candidates = plugin.fetch_data(lookup_def)
return rule
# Apply selector
for candidate in candidates:
if candidate.status == "found" or selector == "all":
ret.append(candidate)
return ret
def _exec_assemble_lookups(self, lookups, query):
@ -248,10 +219,10 @@ class QueryProcessor:
# Init the scope list
new_lookups1 = []
for index, lookup_def in enumerate(lookups):
shortform = False
#shortform = False
if isinstance(lookup_def, str):
shortform = True
#shortform = True
lookup_def = {
"path": lookup_def,
}
@ -279,7 +250,7 @@ class QueryProcessor:
plugin_name = plugin_def.get("module", None)
if plugin_name:
plugin = plugin_loader.load("scope", plugin_name)(namespace=self)
plugin = self.plugin_loader.load("scope", plugin_name)(namespace=self)
ret = plugin.process_items(ret, plugin_def)
new_lookups2.extend(ret)
@ -299,21 +270,95 @@ class QueryProcessor:
return new_lookups3
def _exec_get_rule(self, query, mode="match"):
key = query.key
rules = self.config["rules"] or {}
def _exec_backend_plugins(self, lookups, selector="matched"):
selector = "matched"
assert selector in ["last", "first", "all", "matched"]
assert isinstance(lookups, list)
# lookups = self.config.get("lookups", {}).copy()
if mode == "match":
rule = dict(self.default_match_rule)
rules = [i for i in rules if i.get("key", None) == key]
if len(rules) > 0:
match = rules[0]
rule.update(match)
plugins = {}
ret = []
for index, lookup_def in enumerate(lookups):
# Update object
lookup_def["_run"]["backend_index"] = index
# Load plugin
plugin_name = lookup_def["backend"]
if plugin_name in plugins:
plugin = plugins[plugin_name]
else:
log.debug("Applying default rule for key '%s'", key)
rule = self.default_match_rule
else:
raise Exception(f"Mode '{mode}' is not implemented")
plugin = self.plugin_loader.load("backend", plugin_name)(namespace=self)
return rule
# Get candidates
candidates = plugin.fetch_data(lookup_def)
# Apply selector
for candidate in candidates:
if candidate.status == "found" or selector == "all":
ret.append(candidate)
return ret
# Explain methods
# ------------------------
def _explain_lookups(self, parsed_lookups):
"""Explain list of lookups"""
table = PrettyTable()
for item in parsed_lookups:
col1 = json.dumps(
{k: v for k, v in item.items() if k not in ["_run"]},
default=lambda o: "<not serializable>",
indent=2,
)
col2 = json.dumps(
item["_run"], default=lambda o: "<not serializable>", indent=2
)
table.add_row(
[
"\nConfig:" + str_ellipsis(col1, 60),
"\nRuntime:" + str_ellipsis(col2, 60),
]
)
table.field_names = ["Config", "Runtime"]
table.align = "l"
tracer.info("Explain lookups:\n%s", str(table))
def _explain_candidates(self, candidates, query):
"""Explain list of candidates"""
# TOFIX: query is not needed here !
table = PrettyTable()
for item_obj in candidates:
item = item_obj.__dict__
item["rel_path"] = str(Path(item["path"]).relative_to(Path.cwd()))
col1 = json.dumps(
{k: v for k, v in item.items() if k not in ["run", "data"]},
default=lambda o: "<not serializable>",
indent=2,
)
col2 = json.dumps(
item["run"]["_run"], default=lambda o: "<not serializable>", indent=2
)
col3 = (
item_obj.data.get(query.key, "NOT FOUND")
if query.key is not None and isinstance(item_obj.data, dict)
else item_obj.data
)
col3 = json.dumps(col3, default=lambda o: "<not serializable>", indent=2)
table.add_row(
[
"\nStatus:" + str_ellipsis(col1, 80),
"\nRuntime:" + str_ellipsis(col2, 60),
"\nKey:" + str_ellipsis(col3, 60),
]
)
table.field_names = ["Status", "Runtime", "Key Value"]
table.align = "l"
tracer.info("Explain candidates:\n%s", str(table))