kheops/albero/plugin/engine/jerakia.py

164 lines
4.2 KiB
Python

from pathlib import Path
from albero.utils import render_template, glob_files
from albero.plugin.common import PluginEngineClass, PluginFileGlob #, Candidate
from pprint import pprint
import logging
import anyconfig
log = logging.getLogger(__name__)
#class FileCandidate(Candidate):
# path = None
#
# def _report_data(self):
# data = {
# # "rule": self.config,
# "value": self.engine._plugin_value,
# "data": self.data,
# "path": str(self.path.relative_to(Path.cwd())),
# }
# data = dict(self.config)
# return super()._report_data(data)
class Plugin(PluginEngineClass, PluginFileGlob):
"""Generic Plugin Class"""
_plugin_name = "jerakia"
_plugin_engine = "jerakia"
# _schema_props_files = {
_schema_props_new = {
"path": {
"anyOf": [
{
"type": "string",
},
{
"type": "array",
"items": {
"type": "string",
},
},
]
},
"glob": {
"default": "ansible.yml",
"anyOf": [
{
"type": "string",
},
# {
# "type": "array",
# "items": {
# "type": "string",
# },
# },
]
}
}
def _init(self):
paths = self.config.get("path", self.config.get("value"))
if isinstance(paths, str):
paths = [paths]
elif isinstance(paths, list):
pass
else:
raise Exception(
f"Unsupported path value, expected str or dict, got: {paths} in {self.config}"
)
self.paths = paths
self.value = paths
def _paths_template(self, scope):
# Manage loops
paths = self.paths
# Manage var substr
ret = []
for path in paths:
path = render_template(path, scope)
ret.append(path)
log.debug("Render pattern: %s", ret)
return ret
def _show_paths(self, path_top, scope):
parsed = self._paths_template(scope)
log.debug("Expanded paths to: %s", parsed)
# Look for files (NOT BE HERE !!!)
ret3 = []
for p in parsed:
globbed = glob_files(path_top / p, 'ansible.yaml')
ret3.extend(globbed)
log.debug(f"Matched globs: %s", ret3)
return ret3
def process(self):
# Detect path root and path prefix
path_root = self.app.run['path_root']
path_prefix = self.app.conf2['config']['tree']['prefix']
if path_prefix:
path_prefix = Path(path_prefix)
if path_prefix.is_absolute():
path_top = path_prefix
else:
path_top = Path(path_root) / path_prefix
else:
path_top = path_root
path_top = path_top
log.debug (f"Path Top: {path_top}")
scope = self.config["_run"]["scope"]
key = self.config["_run"]["key"]
assert isinstance(scope, dict), f"Got: {scope}"
assert isinstance(key, (str, type(None))), f"Got: {key}"
# t = self._show_paths(path_top, scope)
ret = []
for index, path in enumerate(self._show_paths(path_top, scope)):
log.debug(f"Reading file: {path}")
# Fetch data
found = False
raw_data = anyconfig.load(path, ac_parser="yaml")
data = None
if key is None:
data = raw_data
found = True
else:
try:
data = raw_data[key]
found = True
except Exception:
pass
# Build result object
result = {}
result["run"] = {
"path": path,
"rel_path": str(Path(path).relative_to(Path.cwd())),
}
result["parent"] = self.config
result["data"] = data
result["found"] = found
ret.append(result)
return ret