292 lines
6.2 KiB
Python

import sys
import importlib
import json
import logging
import os
import tomllib
from pprint import pprint
import yaml
# Dynamic lib loader
# ================
MISSING_PKGS = []
try:
import yaml
except ModuleNotFoundError:
MISSING_PKGS.append("yaml")
try:
# TOFIX: Python 3.11 provides tomllib
import tomlkit
except ModuleNotFoundError:
MISSING_PKGS.append("tomlkit")
try:
import hcl2
except ModuleNotFoundError:
MISSING_PKGS.append("hcl2")
try:
from jinja2 import Template
except ModuleNotFoundError:
MISSING_PKGS.append("jinja2")
# Globals
# ================
logger = logging.getLogger(__name__)
# Helper functions
# ================
def empty(item):
"Check if an item is empty"
if item is None:
return True
elif isinstance(item, str):
if item.strip() == "":
return True
elif isinstance(item, (list, dict)):
if len(item) == 0:
return True
else:
if item == None:
return True
if item == []:
return True
if item == {}:
return True
if item == tuple():
return True
return False
def prune(items):
"Prune empty lists, dicts and None values from list or dict"
if isinstance(items, dict):
ret = {key: val for key, val in items.items() if not empty(val)}
elif isinstance(items, list):
ret = [val for val in items if not empty(val)]
else:
raise Exception(f"Function prune requires a list or a dict, got: {items}")
return ret
def jinja_render(payload, vars):
"Parse a string with jinja"
vars = vars or {}
assert isinstance(payload, str)
return Template(payload).render(**vars)
def format_render(payload, vars):
"Parse a string with python format"
vars = vars or {}
assert isinstance(payload, str)
return payload.format(**vars)
def open_yaml(document):
ret = []
with open(document, "r") as file:
docs = yaml.safe_load_all(file)
for doc in docs:
ret.append(doc)
return ret
def uniq(items):
"Filter out duplicates items of list while preserving order, first stay, others are discarded"
assert isinstance(items, list), f"Expected a list, got {type(items)}: {items}"
return list(dict.fromkeys(items))
# TODO: add tests
def from_yaml(string):
"Transform YAML string to python dict"
return yaml.safe_load(string)
# TODO: add tests
def to_yaml(obj, headers=False):
"Transform obj to YAML"
options = {}
return yaml.safe_dump(obj)
# TODO: add tests
def to_json(obj, nice=True):
"Transform JSON string to python dict"
if nice:
return json.dumps(obj, indent=2)
return json.dumps(obj)
# TODO: add tests
def from_json(string):
"Transform JSON string to python dict"
return json.loads(string)
# TODO: add tests
def to_dict(obj):
"""Transform JSON obj/string to python dict
Useful to transofmr nested dicts as well"""
if not isinstance(obj, str):
obj = json.dumps(obj)
return json.loads(obj)
def to_csv(obj, sep=";"):
"""List or dict in csv"""
out = []
for key, val in iterate_any(obj):
line = []
if isinstance(key, str):
line = [key]
for _, row in iterate_any(val):
line.append(str(row))
out.append(sep.join(line))
out = "\n".join(out)
return out
def to_hcl(data):
"Render to HCL"
pprint(data)
return hcl2.loads(data)
# # Only for python >3.11
# def to_toml(data):
# "Render to toml"
# pprint(data)
# return tomllib.dumps(data)
def to_toml(data):
"Render to toml"
pprint(data)
if isinstance(data, list):
raise Exception("Bug: Toml can't handle list as top level objects")
# if isinstance(data, list):
# tab = tomlkit.table()
# for idx, line in enumerate(data):
# if isinstance(line, list) and len(line) > 1:
# first_item = line[0]
# other = line[1:]
# tab.add(first_item, other)
# else:
# tab.add(f"line.{str(idx)}", line)
# data = {
# "table": tab
# }
return tomlkit.dumps(data)
def iterate_any(payload):
"Try to iterate on anything, always return key and value"
try:
iterator = iter(payload)
except TypeError:
print("NOT ITERABLE", payload)
return {}.items()
# if hasattr(payload, "items"):
try:
return payload.items()
except AttributeError:
pass
# if hasattr(payload, "count"):
# print ("PAYLOAD", type(payload), payload)
return enumerate(payload)
assert False
raise Exception(f"Could not iterate over: {payload}")
# if payload is None:
# return {}.items()
# if hasattr(payload, "items"):
# return payload.items()
# # if hasattr(payload, "append"):
# return enumerate(payload)
# raise Exception(f"Could not iterate over: {payload}")
def get_root_pkg_dir(name):
"""Return the dir where the actual paasify source code lives"""
assert not "." in name, "Only resrved for root pacakges"
# Look into each paths
for cand in sys.path:
target = os.path.join(cand, name)
if os.path.isdir(target):
return target
return None
def get_pkg_dir(name):
"""Return the dir where the actual paasify source code lives, it loads the module !!!"""
# pylint: disable=import-outside-toplevel
mod = import_module_pkg(name)
return os.path.dirname(mod.__file__)
def import_module_pkg(package):
"Simple helper to load dynamically python modules"
return importlib.import_module(package)
def import_module_from(package, *names):
"Allow to import from python packages"
mod = import_module(package)
names_len = len(names)
if names_len == 0:
return mod
if names_len == 1:
return getattr(mod, names[0])
if names_len > 1:
ret = []
for name in names:
ret.append(getattr(mod, name))
return set(ret)
def import_module(name):
"Simple helper to load python modules"
if ":" in name:
package, comp = name.rsplit(":", 1)
return import_module_from(package, comp)
return import_module_pkg(name)