Clean: common plugin classes

This commit is contained in:
mrjk 2022-03-10 12:07:12 -05:00
parent ff2f170685
commit 25c6bf36a2

View File

@ -19,13 +19,23 @@ log = logging.getLogger(__name__)
# BackendPlugin[1] # BackendPlugin[1]
# Generic classes # Generic Plugin classes
# -------------------------
class KheopsPlugin: class KheopsPlugin:
plugin_name = None plugin_name = None
plugin_type = None plugin_type = None
plugin_kind = None plugin_kind = None
def __init__(self): def __init__(self):
assert isinstance(self.plugin_name, str), f"Missing name attribute in plugin: {self.__class__}"
assert isinstance(self.plugin_kind, str)
config_key = f"{self.plugin_kind}_{self.plugin_name}"
self.config = self.ns.config["config"].get(config_key, {})
self.config_key = config_key
log.debug("Looking for plugin configuration in config with key '%s', got: %s", config_key, self.config)
self._init() self._init()
def _init(self): def _init(self):
@ -47,20 +57,60 @@ class KheopsItemPlugin(KheopsPlugin):
pass pass
# Other classes # Plugin classes
class BackendCandidate: # -------------------------
def __init__(self, path=None, data=None, run=None, status=None):
assert isinstance(run, dict)
self.path = path
self.status = status or "unparsed"
self.run = run or {}
self.data = data or None
def __repr__(self): class BackendPlugin(KheopsItemPlugin):
return f"Status: {self.status}, Path: {self.path} => {self.data}" plugin_kind = "backend"
schema_prop = {
"backend": {}, # GENERIC, String
"file": {},
"glob": {},
"http": {},
"consul": {},
"vault": {},
}
def fetch_data(self, config) -> list:
raise Exception("Not implemented")
def __init__(self, namespace):
self.ns = namespace
super().__init__()
class StrategyPlugin(KheopsItemPlugin):
plugin_kind = "strategy"
schema_prop = {
"_strategy": {}, # GENERIC, String
"merge": {},
"first": {},
"last": {},
"smart": {},
"schema": {},
}
def merge_results(self, candidates, rule) -> list:
pass
def __init__(self, namespace):
self.ns = namespace
super().__init__()
class OutPlugin(KheopsItemPlugin):
plugin_kind = "out"
schema_prop = {
"_out": {}, # GENERIC, List of dict
"toml": {},
"validate": {},
}
def process_item(self, item) -> list:
pass
# Specific classes
class ConfPlugin(KheopsListPlugin): class ConfPlugin(KheopsListPlugin):
plugin_kind = "conf" plugin_kind = "conf"
schema_prop = { schema_prop = {
@ -89,6 +139,22 @@ class ScopePlugin(KheopsListPlugin):
self.ns = namespace self.ns = namespace
super().__init__() super().__init__()
# Helper classes
# -------------------------
class BackendCandidate():
"""Represent a backend candidate"""
def __init__(self, path=None, data=None, run=None, status=None):
assert isinstance(run, dict)
self.path = path
self.status = status or "unparsed"
self.run = run or {}
self.data = data or None
def __repr__(self):
return f"Status: {self.status}, Path: {self.path} => {self.data}"
class ScopeExtLoop: class ScopeExtLoop:
"""This Scope Extension allow to loop over a lookup""" """This Scope Extension allow to loop over a lookup"""
@ -165,55 +231,9 @@ class ScopeExtLoop:
return ret return ret
class BackendPlugin(KheopsItemPlugin):
plugin_kind = "backend"
schema_prop = {
"backend": {}, # GENERIC, String
"file": {},
"glob": {},
"http": {},
"consul": {},
"vault": {},
}
def fetch_data(self, lookups) -> list:
raise Exception("Not implemented")
def __init__(self, namespace):
self.ns = namespace
super().__init__()
class StrategyPlugin(KheopsItemPlugin): # To clean/implement
plugin_kind = "strategy"
schema_prop = {
"_strategy": {}, # GENERIC, String
"merge": {},
"first": {},
"last": {},
"smart": {},
"schema": {},
}
def merge_results(self, candidates, rule) -> list:
pass
def __init__(self, namespace):
self.ns = namespace
super().__init__()
class OutPlugin(KheopsItemPlugin):
plugin_kind = "out"
schema_prop = {
"_out": {}, # GENERIC, List of dict
"toml": {},
"validate": {},
}
def process_item(self, item) -> list:
pass
# # Candidate Classes # # Candidate Classes