python-shctl-iam/iam/lib/cli_views.py

552 lines
14 KiB
Python

import enum
import importlib.util
import logging
from pprint import pformat, pprint
from types import SimpleNamespace
from iam.framework import get_app_logger
from .utils import iterate_any, to_csv, to_hcl, to_json, to_toml, to_yaml
# Optional deps
MISSING_PKGS = []
try:
from rich.table import Table
except ModuleNotFoundError:
MISSING_PKGS.append("Table")
# TODO:
# toml/hcl support
# duckdb support => https://stackoverflow.com/a/70538527
# MISSING_PKGS = []
# try:
# from rich.console import Console
# except ModuleNotFoundError:
# MISSING_PKGS.append("Console")
# if not "Console" in MISSING_PKGS:
# console = Console()
# else:
# console
logger = logging.getLogger(__name__)
# Output configuration
# ======================
VIEW_MODULES = {
# "table": "rich",
"yaml": "pyaml",
"toml": "tomllib",
"hcl": ("hcl2", "python-hcl2"),
}
VIEW_FORMATS = {
"DEFAULT": "table",
# CLI
"RICH_TABLE": "table",
# Item
"ENV": "env",
"VARS": "vars",
"CSV": "csv",
"WORDS": "words",
"LINES": "lines",
# Structured
"JSON": "json",
"YAML": "yaml",
"PYTHON": "python",
# "TOML": "toml",
# "HCL": "hcl",
# Shell syntax
# "SHELL": "sh",
# "SHELL_BASH": "bash",
# "SHELL_ZSH": "zsh",
# "SHELL_FISH": "fish",
}
VIEW_FORMATS_NAMES = list(set(VIEW_FORMATS.values()))
# Structurers
# ===============================
def restructure_to_item_view(data, columns):
"Restructure data to fit to item view"
ret = {}
for index, key in enumerate(columns):
value = data[index]
ret[key] = value
return ret
def restructure_to_list_view(data, columns):
"Restructure data to fit to list view"
col_count = len(columns)
ret = []
for row in data:
line = {}
for index in range(col_count):
col = columns[index]
cell = row[index]
line[col] = cell
ret.append(line)
return ret
def restructure_to_csv(data, columns):
"Restructure data to fit to CSV view"
col_count = len(columns)
ret = [columns]
for row in data:
line = []
for key, val in iterate_any(row):
line.append(val)
ret.append(line)
return ret
def restructure_to_env(data, columns, prefix_index=-1, prefix="", count_var="_count"):
"Restructure data to fit to item env/vars view"
# Prepare
col_count = len(columns)
data_count = len(data)
prefix_col = None
if (prefix_index >= 0) and (prefix_index < col_count):
prefix_col = columns[prefix_index]
# Loop over each records
ret = []
for idx, row in enumerate(data):
# Defaults
curr_prefix = ""
curr_suffix = ""
# Rebuild dict config
dict_payload = {}
for index in range(col_count):
key = columns[index]
value = row[index]
dict_payload[key] = value
# Extract named prefix if more than one result
if len(data) > 1:
if prefix_col:
curr_prefix = f"{dict_payload.pop(prefix_col)}__"
else:
curr_suffix = f"_{idx}"
# Build each value settings
payload = []
for column_name in columns:
if column_name in dict_payload:
key = f"{prefix}{curr_prefix}{column_name}{curr_suffix}"
value = dict_payload[column_name]
payload.append([key, value])
ret.extend(payload)
# Add count var if many records
if count_var and len(data) > 1:
ret.insert(0, [count_var, len(data)])
return ret
def restructure_to_toml(data, columns, prefix="", root="items"):
ret = {}
for line in data:
name = line[0]
value = {}
if len(line) > 1:
for idx, val in enumerate(line[1:]):
key = columns[idx + 1]
value[key] = val
ret[prefix + name] = value
if root:
ret = {root: data}
# pprint (ret)
return ret
# Formats
# ===============================
def to_vars(data, export=False, comments=True, prefix=""):
"Render to vars or environment vars, Input ormat: [[key, value,]"
out = []
for line in data:
assert isinstance(line, (list, set)), f"Got: {line}"
# Fetch key/value
key = str(line[0])
value = str(line[1]) if len(line) > 1 else ""
# # Comments
# if comments and len(line) > 2:
# comments = "# " + ', '.join(line[2:])
# else:
# comment = ""
content = f"{prefix}{key}='{value}'\t\t"
# Exports
if export:
content = f"export {prefix.upper()}{key.upper()}='{value}'"
out.append(content)
return "\n".join(out)
def to_rich_table(data, columns=None, title=None, settings=None):
settings = settings or {}
if title:
settings["title"] = title
columns = columns or []
rows = data or []
table = Table(**settings)
for column in columns:
if isinstance(column, dict):
txt, args = column["text"], column.get("args", {})
table.add_column(str(text), **args)
else:
table.add_column(str(column))
# Loop over each lines
for key, cells in iterate_any(rows):
line = []
if isinstance(key, str):
line = [key]
# Loop over each cell
for _, cell in iterate_any(cells):
line.append(str(cell))
table.add_row(*line)
return table
# Helpers
# ===============================
def is_module_present(name):
"Return true if a module is present"
spam_spec = importlib.util.find_spec("spam")
return spam_spec is not None
def view_table_sort(row, indexes):
"Helper to sort tables on columns"
ret = []
for idx in indexes:
ret.append(row[idx])
# ret = list(set(ret))
return ret
# Main Views Class
# ===============================
class _RootView:
name = "UNSET"
formats_dict = {"DEFAULT": "default"}
formats_modules = {"DEFAULT": None}
def __init__(self, output=None):
self._init_enum()
self._init_default()
self.output = output or print
## DEPRECATED !!!
def _init_enum(self):
"Create enum"
self.formats_enum = enum.Enum("OutputFormat", self.formats_dict)
def _init_default(self):
"Init default format"
self.default_format = self.formats_enum.DEFAULT.value
## Rendering
def render(self, data, columns, conf=None):
"Render data"
# assert isinstance(data, dict)
# assert isinstance(columns, dict)
# Prepare vars
conf = conf or {}
# fmt = (
# conf.get("fmt_format", None)
# or conf.get("fmt", None)
# or self.default_format
# )
# Extract Format config
# ---------------------------
fmt_config = dict(name=None, sort=None, fields=None)
fmt_config.update(
{
key.replace("fmt_", ""): val
for key, val in conf.items()
if key.startswith("fmt_") and val is not None
}
)
fmt_ = SimpleNamespace(**fmt_config)
# print("FORMAT CONFIG")
# pprint(conf)
# pprint(fmt_)
assert fmt_.name, f"Format name can't be None: {fmt_.name}"
# check module presence
mod_spec = self.formats_modules.get(fmt_.name, None)
# if mod_spec:
# # Extract specs
# mod_package = None
# mod_name = mod_spec
# if isinstance(mod_spec, (list, set, tuple)):
# mod_name = mod_spec[0]
# mod_package = mod_spec[1]
# # Check module presence
# # print("CHECKING MODE NAME", mod_name)
# ret = is_module_present(mod_name)
# if not ret:
# _msg = f"Missing python module '{mod_name}' to support '{fmt_.name}' output."
# if mod_package:
# _msg = f"{_msg} Please first install package: {mod_package}"
# logger.warning(_msg)
# # raise Exception(_msg)
# Fetch renderer method
# ---------------------------
fmt_name = fmt_.name
fn_name = f"to__{fmt_name}"
fn = getattr(self, fn_name, None)
if not callable(fn):
raise Exception(
f"Unsupported format {fmt_.name} for {self.__class__.__name__}: No such {fn_name} method"
)
# Sort data
# ---------------------------
if fmt_.sort:
col_names = fmt_.sort.split(",")
# col_name = fmt_.sort
# Get columns indexes
indexes = []
for col_name in col_names:
try:
_idx = columns.index(col_name)
except ValueError:
choices = ",".join(columns)
raise Exception(
f"No such column '{col_name}', please choose one of: {choices}"
)
indexes.append(_idx)
# Sort data
# print ("SORT COLUMNS", indexes)
data.sort(key=lambda row: view_table_sort(row, indexes))
# Filter out fields
# ---------------------------
if fmt_.fields:
fields_keys = fmt_.fields.split(",")
# Get columns indexes
indexes = []
for col_name in fields_keys:
try:
_idx = columns.index(col_name)
except ValueError:
choices = ",".join(columns)
raise Exception(
f"No such column '{col_name}', please choose one of: {choices}"
)
indexes.append(_idx)
# print ("LIMIT FIELDS !!!", fields_keys, indexes)
rm_cols = [idx for idx, item in enumerate(columns) if not idx in indexes]
# Recreate smaller table
new_columns = []
for idx in indexes:
col_name = columns[idx]
new_columns.append(col_name)
new_data = []
for row in data:
new_row = []
for idx in indexes:
new_row.append(row[idx])
new_data.append(new_row)
data = new_data
columns = new_columns
# Call renderer
# ---------------------------
stripped_conf = {
key.replace(f"{fmt_name}_", ""): val
for key, val in conf.items()
if key.startswith(fmt_name)
}
ret = fn(data, columns, **stripped_conf)
# Display or return
# ---------------------------
if callable(self.output):
self.output(ret)
else:
return ret
# Views Implementations
# ===============================
class SpecificView(_RootView):
"Sprecific views (BETA)"
formats_dict = VIEW_FORMATS
formats_modules = VIEW_MODULES
def to__hcl(self, data, columns):
tmp = restructure_to_list_view(data, columns)
return to_hcl(tmp)
def to__toml(self, data, columns):
return to_toml(restructure_to_toml(data, columns, root="data"))
# def to__toml(self, data, columns):
# return to_toml(data)
# def to__toml(self, data, columns):
# return to_toml(restructure_to_list_view(data, columns))
class ViewList(_RootView):
name = "list"
formats_dict = VIEW_FORMATS
formats_modules = VIEW_MODULES
# Standard formats
def to__yaml(self, data, columns):
return to_yaml(restructure_to_list_view(data, columns))
def to__json(self, data, columns):
return to_json(restructure_to_list_view(data, columns), nice=True)
def to__python(self, data, columns):
return pformat(restructure_to_list_view(data, columns), indent=2)
# Rich table output
def to__table(self, data, columns, **kwargs):
"Return a rich table"
return to_rich_table(data, columns=columns, **kwargs)
# Script helpers
def to__csv(self, data, columns):
return to_csv(restructure_to_csv(data, columns))
def to__vars(self, data, columns):
return to_vars(restructure_to_env(data, columns), export=False)
def to__env(self, data, columns):
return to_vars(restructure_to_env(data, columns), export=True)
def to__words(self, data, columns, **kwargs):
"Return words"
ret = []
for line in data:
for cell in line:
ret.append(cell)
return " ".join(ret)
pprint(data)
return None
return to_rich_table(data, columns=columns, **kwargs)
def to__lines(self, data, columns, **kwargs):
"Return lines"
ret = []
for line in data:
ret.append(" ".join(line))
return "\n".join(ret)
class ViewItem(_RootView):
name = "item"
formats_dict = VIEW_FORMATS
formats_modules = VIEW_MODULES
# Standard formats
def to__yaml(self, data, columns):
return to_yaml(restructure_to_item_view(data, columns))
def to__json(self, data, columns):
return to_json(restructure_to_item_view(data, columns), nice=True)
def to__toml(self, data, columns):
return to_toml(restructure_to_toml(data, columns))
def to__python(self, data, columns):
return pformat(restructure_to_item_view(data, columns), indent=2)
# Rich table output
def to__table(self, data, columns, **kwargs):
"Return a rich table"
# pprint (columns)
# pprint (data)
# Rotate list
_list = []
_list.append(columns)
_list.append(data)
data = list(zip(*_list))
# Render table
columns = ["Field", "Value"]
return to_rich_table(data, columns=columns, **kwargs)
def to__csv(self, data, columns):
return to_csv(restructure_to_csv(data, columns))
def to__vars(self, data, columns):
return to_vars(restructure_to_env(data, columns), export=False)
def to__env(self, data, columns):
return to_vars(restructure_to_env(data, columns), export=True)