import importlib import json import logging import os import tomllib from pprint import pprint import yaml # Dynamic lib loader # ================ MISSING_PKGS = [] try: import yaml except ModuleNotFoundError: MISSING_PKGS.append("yaml") try: # TOFIX: Python 3.11 provides tomllib import tomlkit except ModuleNotFoundError: MISSING_PKGS.append("tomlkit") try: import hcl2 except ModuleNotFoundError: MISSING_PKGS.append("hcl2") try: from jinja2 import Template except ModuleNotFoundError: MISSING_PKGS.append("jinja2") # Globals # ================ logger = logging.getLogger(__name__) # Helper functions # ================ def empty(item): "Check if an item is empty" if item is None: return True elif isinstance(item, str): if item.strip() == "": return True elif isinstance(item, (list, dict)): if len(item) == 0: return True else: if item == None: return True if item == []: return True if item == {}: return True if item == tuple(): return True return False def prune(items): "Prune empty lists, dicts and None values from list or dict" if isinstance(items, dict): ret = {key: val for key, val in items.items() if not empty(val)} elif isinstance(items, list): ret = [val for val in items if not empty(val)] else: raise Exception(f"Function prune requires a list or a dict, got: {items}") return ret def jinja_render(payload, vars): "Parse a string with jinja" vars = vars or {} assert isinstance(payload, str) return Template(payload).render(**vars) def format_render(payload, vars): "Parse a string with python format" vars = vars or {} assert isinstance(payload, str) return payload.format(**vars) def open_yaml(document): ret = [] with open(document, "r") as file: docs = yaml.safe_load_all(file) for doc in docs: ret.append(doc) return ret def uniq(items): "Filter out duplicates items of list while preserving order, first stay, others are discarded" assert isinstance(items, list), f"Expected a list, got {type(items)}: {items}" return list(dict.fromkeys(items)) # TODO: add tests def from_yaml(string): "Transform YAML string to python dict" return yaml.safe_load(string) # TODO: add tests def to_yaml(obj, headers=False): "Transform obj to YAML" options = {} return yaml.safe_dump(obj) # TODO: add tests def to_json(obj, nice=True): "Transform JSON string to python dict" if nice: return json.dumps(obj, indent=2) return json.dumps(obj) # TODO: add tests def from_json(string): "Transform JSON string to python dict" return json.loads(string) # TODO: add tests def to_dict(obj): """Transform JSON obj/string to python dict Useful to transofmr nested dicts as well""" if not isinstance(obj, str): obj = json.dumps(obj) return json.loads(obj) def to_csv(obj, sep=";"): """List or dict in csv""" out = [] for key, val in iterate_any(obj): line = [] if isinstance(key, str): line = [key] for _, row in iterate_any(val): line.append(str(row)) out.append(sep.join(line)) out = "\n".join(out) return out def to_hcl(data): "Render to HCL" pprint(data) return hcl2.loads(data) # # Only for python >3.11 # def to_toml(data): # "Render to toml" # pprint(data) # return tomllib.dumps(data) def to_toml(data): "Render to toml" pprint(data) if isinstance(data, list): raise Exception("Bug: Toml can't handle list as top level objects") # if isinstance(data, list): # tab = tomlkit.table() # for idx, line in enumerate(data): # if isinstance(line, list) and len(line) > 1: # first_item = line[0] # other = line[1:] # tab.add(first_item, other) # else: # tab.add(f"line.{str(idx)}", line) # data = { # "table": tab # } return tomlkit.dumps(data) def iterate_any(payload): "Try to iterate on anything, always return key and value" try: iterator = iter(payload) except TypeError: print("NOT ITERABLE", payload) return {}.items() # if hasattr(payload, "items"): try: return payload.items() except AttributeError: pass # if hasattr(payload, "count"): # print ("PAYLOAD", type(payload), payload) return enumerate(payload) assert False raise Exception(f"Could not iterate over: {payload}") # if payload is None: # return {}.items() # if hasattr(payload, "items"): # return payload.items() # # if hasattr(payload, "append"): # return enumerate(payload) # raise Exception(f"Could not iterate over: {payload}") def get_pkg_dir(name): """Return the dir where the actual paasify source code lives""" # pylint: disable=import-outside-toplevel mod = import_module_pkg(name) return os.path.dirname(mod.__file__) def import_module_pkg(package): "Simple helper to load dynamically python modules" return importlib.import_module(package) def import_module_from(package, *names): "Allow to import from python packages" mod = import_module(package) names_len = len(names) if names_len == 0: return mod if names_len == 1: return getattr(mod, names[0]) if names_len > 1: ret = [] for name in names: ret.append(getattr(mod, name)) return set(ret) def import_module(name): "Simple helper to load python modules" if ":" in name: package, comp = name.rsplit(":", 1) return import_module_from(package, comp) return import_module_pkg(name)