196 lines
5.3 KiB
Python
196 lines
5.3 KiB
Python
import glob
|
|
import logging
|
|
import os
|
|
from collections import namedtuple
|
|
from copy import copy
|
|
from pprint import pprint
|
|
from types import SimpleNamespace
|
|
|
|
from xdg import BaseDirectory
|
|
|
|
from . import exceptions as error
|
|
from .catalog import Catalog
|
|
from .framework import DictItem, scoped_ident
|
|
from .idents import Idents
|
|
from .lib.utils import import_module, open_yaml, get_root_pkg_dir
|
|
from .meta import NAME, VERSION
|
|
from .providers import Providers
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
IamException = error.IamException
|
|
|
|
|
|
class App:
|
|
name = NAME
|
|
|
|
def __init__(self, ident="_", config_path=None):
|
|
# Load application
|
|
db = self.load_config_files(config_path=config_path)
|
|
|
|
self.load_db(db)
|
|
self.init_ident(ident)
|
|
|
|
def init_ident(self, ident):
|
|
"Init application with ident"
|
|
|
|
self.ident_raw = ident
|
|
view = scoped_ident(ident)
|
|
|
|
# Save important vars
|
|
self.ident_name = view.ident
|
|
self.ident_scope = view.scope
|
|
|
|
# Fetch ident config
|
|
if ident == "_" or not ident:
|
|
logger.debug("Enable default ident")
|
|
self.ident = self.idents.items_class("default")
|
|
else:
|
|
self.ident = self.idents.get(self.ident_name)
|
|
|
|
# Load user and catalog
|
|
self.catalog = Catalog(self, self.ident)
|
|
|
|
def load_config_files(self, config_path=None):
|
|
"Fetch config from default place"
|
|
config_candidates = []
|
|
|
|
# Load file or config
|
|
if config_path is not None:
|
|
path = config_path
|
|
else:
|
|
path = os.path.join(BaseDirectory.xdg_config_home, self.name)
|
|
|
|
self.config_dir = path
|
|
|
|
if os.path.isdir(path):
|
|
# files = glob.glob("*.yml", root_dir=path) # Python > 3.10
|
|
files = glob.glob(f"{path}/*.yml")
|
|
|
|
for file_ in files:
|
|
file_ = os.path.join(path, file_)
|
|
payloads = open_yaml(file_)
|
|
for payload in payloads:
|
|
config_candidates.append(
|
|
(
|
|
file_,
|
|
payload,
|
|
)
|
|
)
|
|
|
|
if len(config_candidates) == 0:
|
|
_msg = f"Can't find any yaml configuration in: {path}"
|
|
raise error.MissingConfigFiles(_msg)
|
|
|
|
config = {}
|
|
for cand in config_candidates:
|
|
path, conf = cand
|
|
config.update(conf)
|
|
|
|
return config
|
|
|
|
def load_db(self, db):
|
|
"Load database"
|
|
|
|
# Init Databases
|
|
self.db = db
|
|
self._load_plugins(db.get("plugins", []))
|
|
|
|
# Load providers
|
|
providers_conf = {}
|
|
providers_conf.update(self.plugin_providers)
|
|
providers_conf.update(db.get("providers", {}))
|
|
self.providers = Providers("ConfigProviders", providers_conf)
|
|
|
|
# Load idents
|
|
idents_conf = {
|
|
# "_": {},
|
|
}
|
|
idents_conf.update(db.get("idents", {}))
|
|
self.idents = Idents("ConfigIdents", idents_conf)
|
|
|
|
def _load_plugins(self, plugins_refs):
|
|
"Load plugins"
|
|
|
|
plugins = {}
|
|
for plugin_name in plugins_refs:
|
|
if not ":" in plugin_name:
|
|
plugin_name = plugin_name + ":all"
|
|
|
|
load_one = False
|
|
if not plugin_name.endswith(":all"):
|
|
load_one = plugin_name.split(":", 2)[1]
|
|
|
|
mod = None
|
|
try:
|
|
mod = import_module(plugin_name)
|
|
except ModuleNotFoundError as err:
|
|
msg = f"Impossible to load module: {plugin_name}"
|
|
raise IamException(msg)
|
|
print(err)
|
|
continue
|
|
|
|
if load_one:
|
|
mod = {load_one: mod}
|
|
|
|
plugins.update(mod)
|
|
|
|
self.plugin_providers = plugins
|
|
|
|
def user_dump(self, pattern=None):
|
|
"Dump ident config"
|
|
|
|
self.catalog.dump()
|
|
|
|
def get_idents(self, extended=False):
|
|
"Return list of identities"
|
|
|
|
ret = list(self.idents.get_idents())
|
|
|
|
if extended:
|
|
out = ["mrjk/backups", "mrjk/pictures"]
|
|
ret.extend(out)
|
|
|
|
return ret
|
|
|
|
def list_services(self):
|
|
"List all services"
|
|
|
|
return self.catalog.services
|
|
|
|
ret = {svc.name: svc.list_cmds() for svc in self.catalog.services.values()}
|
|
return ret
|
|
|
|
def shell_enable(self, **kwargs):
|
|
"Enable shell"
|
|
return self.catalog.shell_enable(**kwargs)
|
|
|
|
def shell_disable(self, **kwargs):
|
|
"Disable shell"
|
|
return self.catalog.shell_disable(**kwargs)
|
|
|
|
|
|
def shell_install(self, shell="bash", completion=True):
|
|
"Show your install script for shell"
|
|
|
|
assets_dir = os.path.join(get_root_pkg_dir("iam"), "assets")
|
|
|
|
# Shell selector
|
|
if shell.endswith("bash"):
|
|
config_file = ".bashrc"
|
|
init_file = os.path.join(assets_dir, "init_bash.sh")
|
|
completion_file = os.path.join(assets_dir, "iam_completion.sh")
|
|
|
|
else:
|
|
raise Exception(f"Unsupported shell: {shell}")
|
|
|
|
# TOFIX: Implement for other shells ?
|
|
out = []
|
|
out.append(f"### IAM ###")
|
|
out.append(f"if command -v iam &>/dev/null; then")
|
|
out.append(f" source {init_file}")
|
|
if completion:
|
|
out.append(f" source {completion_file}")
|
|
out.append(f"fi")
|
|
out.append(f"### IAM ###")
|
|
return '\n'.join(out) |