python-shctl-iam/iam/cli_click.py

1019 lines
25 KiB
Python
Executable File

#!/usr/bin/env python3
import io
import logging
import os
import sys
import traceback
from enum import Enum
from pprint import pprint
from types import SimpleNamespace
from iam.app import App
from iam.framework import get_app_logger
from . import exceptions as error
from .lib.cli_views import VIEW_FORMATS_NAMES, ViewItem, ViewList
from .lib.click_utils import NestedHelpGroup
# from .cli_views import ViewItem, ViewList, VIEW_FORMATS
from .lib.utils import prune, to_yaml
# import rich
# from rich import box
# from rich.console import Console
# from rich.pretty import pprint
# from rich.prompt import Prompt
# from rich.table import Table
# from rich.pretty import pprint
# Rich loader
# ======================
APP_RICH = True
table_box_params = {}
if not APP_RICH:
import click
# Create console
console = SimpleNamespace(print=print)
else:
# Import rich
# Transparent rich proxy for click (help menus)
import rich_click as click
from rich_click import RichGroup
from rich import box, print
from rich.console import Console
# Overrides defaults
from rich.pretty import pprint
# Load rich wrappers
class NestedRichHelpGroup(NestedHelpGroup, RichGroup):
"Add rich class"
NestedHelpGroup = NestedRichHelpGroup
# Create default rich table settings
table_box_params = {
"box": box.ASCII2, # box.MINIMAL
"min_width": 60,
}
# Create a console
console = Console()
logger = logging.getLogger()
cli_logger = logging.getLogger("iam.cli")
# Global vars
# ======================
# Init app
# ======================
logger = logging.getLogger(__name__)
item_view = ViewItem(output=console.print)
list_view = ViewList(output=console.print)
# General App settings
# ===============================
APP_NAME = "iam"
APP_VERSION = "__version__TOFIX"
APP_EXCEPTION = error.IamException
APP_TTY = os.isatty(0)
APP_COLORS = APP_TTY or APP_RICH
# See: https://docs.python.org/3.8/library/logging.html#logging-levels
DEFAULT_LOG_LEVEL = 30 # warning
# Get default ident
DEBUG = os.environ.get("IAM_DEBUG", "False")
DEFAULT_IDENT = os.environ.get("IAM_IDENT", os.environ.get("SHELL_IDENT", ""))
DEFAULT_FORMAT = os.environ.get("IAM_FORMAT", "table")
DEFAULT_HELP_OPTIONS = ["-h", "--help"]
DEFAULT_SHELL = os.environ.get("SHELL", "bash")
# list_view.formats_enum
# Click helpers
# ===============================
_cmd1_options = [click.option("--format")]
_cmd2_options = [click.option("--cmd2-opt")]
CONTEXT_SETTINGS = dict(
show_default=True,
help_option_names=DEFAULT_HELP_OPTIONS,
auto_envvar_prefix=APP_NAME.upper(),
)
def global_options(function):
# function = click.option(
# "debug", "--debug",
# is_flag=True, show_default=False, default=False,
# help="Debug"
# )(function)
# function = click.option(
# "force", "--force", "-f"
# is_flag=True, show_default=False, default=False,
# help="Force"
# )(function)
function = click.option(
"interactive",
"--interactive/--no-interactive",
"-I",
is_flag=True,
show_default=APP_TTY,
default=APP_TTY,
help="Interactive mode",
)(function)
function = click.option(
"colors",
"--colors/--no-colors",
"-C",
is_flag=True,
show_default=APP_COLORS,
default=APP_COLORS,
help="Enable colors",
)(function)
# function = click.option(
# "recursive", "--recursive", "-r"
# is_flag=True, show_default=False, default=False,
# help="Recursive mode"
# )(function)
# function = click.option(
# 'dry_run', '-n', '--dry-run',
# is_flag=True, show_default=False, default=False,
# help="Dry run, do not take any actions."
# )(function)
function = click.option(
"quiet",
"--quiet",
"-q",
is_flag=True,
show_default=False,
default=False,
help="Suppress output except warnings and errors.",
)(function)
function = click.option(
"log_trace",
"--trace",
is_flag=True,
show_default=False,
default=False,
help="Show traceback on errors",
)(function)
function = click.option(
"-v",
"--verbose",
count=True,
type=click.IntRange(0, int(DEFAULT_LOG_LEVEL / 10), clamp=True),
help="Increase verbosity of output. Can be repeated.",
)(function)
return function
def format_options(function):
# Duplicates
function = click.option(
"-v",
"--verbose",
count=True,
type=click.IntRange(0, int(DEFAULT_LOG_LEVEL / 10), clamp=True),
help="Increase verbosity of output. Can be repeated.",
)(function)
# Special output options
function = click.option(
"fmt_name",
"-O",
"--output",
type=click.Choice(VIEW_FORMATS_NAMES, case_sensitive=False),
help="Output format",
default=None,
show_default=DEFAULT_FORMAT,
)(function)
function = click.option(
"fmt_fields", "-H", "--headers", help="Show specific headers"
)(function)
function = click.option("fmt_sort", "-S", "--sort", help="Sort columns")(function)
return function
# Base Application example
# ===============================
# DEFAULT_CONFIG = os.environ.get("IAM_CONFIG", "MISSING")
DEFAULT_CONFIG = click.get_app_dir(APP_NAME)
# SHOW_DEFAULT_HELP=True
def get_var(name, default=None):
real_name = f"{APP_NAME.upper()}_{name.upper()}"
# print ("QUERY", real_name)
return os.environ.get(real_name, default)
# @click.group(cls=NestedHelpGroup, context_settings=CONTEXT_SETTINGS)
@click.group(cls=NestedHelpGroup)
@global_options
@format_options
@click.version_option()
@click.option(
"config",
"--config",
"-C",
# envvar='CONFIG', # multiple=True,
type=click.Path(),
default=get_var("CONFIG", DEFAULT_CONFIG),
show_default=get_var("CONFIG", DEFAULT_CONFIG),
help="Configuration directory or file",
)
@click.pass_context
def cli_app(
ctx,
verbose,
config: str = DEFAULT_IDENT,
ident: str = DEFAULT_IDENT,
quiet=False,
interactive=False,
colors=False,
log_trace=False,
fmt_fields=None,
fmt_sort=None,
fmt_name=None,
):
"""Naval Fate.
This is the docopt example adopted to Click but with some actual
commands implemented and not just the empty parsing which really
is not all that interesting.
"""
# Show trace on errors, more like the --trace flag
if log_trace:
global APP_EXCEPTION
APP_EXCEPTION = None
# Disable COLORS
global APP_COLORS
APP_COLORS = colors
# Calculate log level
log_level = DEFAULT_LOG_LEVEL - verbose * 10
log_level = log_level if log_level > 0 else 10
# Define loggers
loggers = {
"iam": {"level": log_level},
"iam.cli": {"level": "DEBUG", "handlers": ["info"]},
}
# Instanciate logger
get_app_logger(loggers=loggers, level=log_level, colors=True)
cli_config = SimpleNamespace(
log_level=log_level,
fmt_name=fmt_name or DEFAULT_FORMAT,
ident=ident,
interactive=interactive,
quiet=quiet,
)
render_config = {
"fmt_name": cli_config.fmt_name,
"table_settings": table_box_params,
}
render_item = lambda *args, conf={}, **kwargs: item_view.render(
*args, conf={**render_config, **prune(conf)}, **kwargs
)
render_list = lambda *args, conf={}, **kwargs: list_view.render(
*args, conf={**render_config, **prune(conf)}, **kwargs
)
# Instanciate app
logger.info(f"Current ident: {ident}")
ctx.obj = SimpleNamespace(
app=App(config_path=config, ident=ident, cli_context=cli_config),
cli=cli_config,
render_item=render_item,
render_list=render_list,
)
@cli_app.command()
@click.pass_context
def tests(ctx):
print("Hello world")
return
# pprint(command_tree(cli_app))
print("TREEE @")
pprint(command_tree3(cli_app, ctx))
all_help(cli_app, ctx)
# subcommand_obj = cli_app.get_command(ctx, subcommand)
# if subcommand_obj is None:
# click.echo("I don't know that command.")
# else:
# pprint (subcommand_obj)
# pprint (subcommand_obj.__dict__)
# click.echo(subcommand_obj.get_help(ctx))
# Cli Utils
# ===============================
# Dedicated help command
# Source: https://stackoverflow.com/a/53137265
@cli_app.command()
@click.argument("subcommand", nargs=-1)
@click.pass_context
def help(ctx, subcommand):
"Show a command help"
subcommand = " ".join(subcommand)
subcommand_obj = cli_app.get_command(ctx, subcommand)
if subcommand_obj is None:
click.echo("I don't know that command.")
else:
pprint(subcommand_obj)
pprint(subcommand_obj.__dict__)
click.echo(subcommand_obj.get_help(ctx))
# Global commands
# ===============================
@cli_app.command()
@click.pass_context
def dump(ctx):
ctx.obj.app.user_dump()
# Resource kind management
# ===============================
@cli_app.group("kind")
def cli__kind():
"""Manages Resources Kind."""
@cli__kind.command("list")
@click.argument("filter", required=False)
@format_options
@click.pass_context
def kind_list(ctx, filter, fmt_name=None, fmt_sort=None, fmt_fields=None, verbose=None):
"List resource kinds"
# Fetch data
# filter = name
resources_kind = ctx.obj.app.catalog.resources_kind
data = []
for name, item in sorted(resources_kind.items(), key=lambda key: key):
if filter and not name.startswith(filter):
continue
data.append((name, item.desc))
# Render data
columns = ["name", "desc"]
conf = {
"table_title": "Resources kinds listing",
"fmt_name": fmt_name,
"fmt_sort": fmt_sort,
"fmt_fields": fmt_fields,
}
ctx.obj.render_list(data, columns, conf=conf)
@cli__kind.command("show")
@click.argument("name", required=False)
@format_options
@click.pass_context
def kind_show(ctx, name, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None):
"Show resource kind"
# Fetch data
columns = ["name", "desc", "input", "needs", "remap"]
filter = name
resources_kind = ctx.obj.app.catalog.resources_kind
for name, item in sorted(resources_kind.items(), key=lambda key: key):
if filter and not name.startswith(filter):
continue
data = [
item.name,
item.desc,
f"{to_yaml(item.input)}",
f"{to_yaml(item.needs)}",
f"{to_yaml(item.remap)}",
]
# Render data
conf = {
"table_title": f"Resources kind: {data[0]}",
"fmt_name": fmt_name,
"fmt_sort": fmt_sort,
"fmt_fields": fmt_fields,
}
ctx.obj.render_item(data, columns, conf=conf)
# @cli__kind.command("move")
# @click.argument("cli__kind")
# @click.argument("x", type=float)
# @click.argument("y", type=float)
# @click.option("--speed", metavar="KN", default=10, help="Speed in knots.")
# def ship_move(cli__kind, x, y, speed):
# """Moves SHIP to the new location X,Y."""
# click.echo(f"Moving cli__kind {cli__kind} to {x},{y} with speed {speed}")
# @cli__kind.command("shoot")
# @click.argument("cli__kind")
# @click.argument("x", type=float)
# @click.argument("y", type=float)
# def ship_shoot(cli__kind, x, y):
# """Makes SHIP fire to X,Y."""
# click.echo(f"Ship {cli__kind} fires to {x},{y}")
# Resource res management
# ===============================
@cli_app.group("res")
def cli__res():
"""Manages Resources."""
@cli__res.command("list")
@click.argument("filter", required=False)
# @format_options
@click.pass_context
def res_list(ctx, filter, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None):
"List resource ress"
# Fetch data
resources = ctx.obj.app.catalog.resources
data = []
for name, res in sorted(resources.items(), key=lambda key: key):
if filter and not name.startswith(filter):
continue
data.append(
(
res.get_kind(),
res.get_name(),
res.desc,
)
)
# Render data
columns = ["kind", "name", "desc"]
conf = {
"table_title": "Resources listing",
"fmt_name": fmt_name,
"fmt_sort": fmt_sort,
"fmt_fields": fmt_fields,
}
ctx.obj.render_list(data, columns, conf=conf)
@cli__res.command("show")
@click.argument("name", required=False)
@format_options
@click.pass_context
def res_show(ctx, name, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None):
"Show resource res"
# Fetch data
columns = ["name", "uses", "input"]
columns_all = columns + ["active", "deps", "missing", "loop_max", "loop", "need"]
filter = name
resources = ctx.obj.app.catalog.resources.select("startswith", name)
for name, item in sorted(resources.items(), key=lambda key: key):
if filter and not name.startswith(filter):
continue
if not all and not item.is_active():
continue
data = [
name,
to_yaml(item.uses).strip(),
to_yaml(item.input).strip(),
]
if all:
data.extend(
[
item.is_active(),
",".join([x.name for x in item.resources_deps]),
",".join(item.resources_missing),
item.loop_limit,
to_yaml(item.loop).strip(),
to_yaml(item.needs).strip(),
]
)
# Render data
ctx.obj.render_item(
data,
columns_all if all else columns,
conf={
"table_title": f"Resources kind: {data[0]}",
"fmt_name": fmt_name,
"fmt_sort": fmt_sort,
"fmt_fields": fmt_fields,
},
)
# Resource svc management
# ===============================
@cli_app.group("svc")
def cli__svc():
"""Manages Resources Kind."""
@cli__svc.command("list")
@click.argument("filter", required=False)
@format_options
@click.pass_context
def svc_list(ctx, filter, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None):
"List services"
columns = ["name", "desc"]
services = ctx.obj.app.catalog.services
data = []
for name, res in sorted(services.items(), key=lambda key: key[1].name):
if filter and not name.startswith(filter):
continue
# if not all and not res.is_active():
# continue
data.append(
(
# res.get_kind(),
# res.get_name(),
res.name,
res.desc,
)
)
# Render data
ctx.obj.render_list(
data,
columns,
conf={
"table_title": f"Services listing",
"fmt_name": fmt_name,
"fmt_sort": fmt_sort,
"fmt_fields": fmt_fields,
},
)
@cli__svc.command("show")
@click.argument("name", required=False)
@format_options
@click.pass_context
def svc_show(ctx, name, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None):
"Show service"
columns = [
"name",
"desc",
"enabled",
"input",
"commands",
"required_svc",
"resource_lookup",
"rest_matches" "dump",
]
columns_all = columns + ["active", "deps", "missing", "loop_max", "loop", "need"]
output = {}
services = ctx.obj.app.catalog.services.select("startswith", name)
for name, svc in services.items():
assert name == svc.name
data = [
svc.name,
svc.desc,
svc.enabled,
svc.input,
to_yaml(svc.list_cmds()).strip(),
svc.required_services,
svc.resources_lookup,
svc.resources_matches,
svc.dump_item(),
]
data = [str(item) for item in data]
ctx.obj.render_item(
data,
columns_all if all else columns,
conf={
"table_title": f"Service show: {data[0]}",
"fmt_name": fmt_name,
"fmt_sort": fmt_sort,
"fmt_fields": fmt_fields,
},
)
@cli__svc.command("commands")
@format_options
@click.pass_context
def svc_commands(ctx, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None):
"Show service"
columns = ["service", "name", "type", "desc"]
data = []
for svc_name, svc in ctx.obj.app.list_services().items():
cmds = svc.list_cmds()
for source in ["shell", "cmds"]:
items = cmds[source]
# target = ret[source]
for cmd_name, conf in items.items():
# target[cmd_name] = conf
data.append([svc_name, cmd_name, source, conf])
# Render data
ctx.obj.render_list(
data,
columns,
conf={
"table_title": f"Services commands",
"fmt_name": fmt_name,
"fmt_sort": fmt_sort,
"fmt_fields": fmt_fields,
},
)
@cli__svc.command("run_v1")
@click.argument("name")
@click.argument("command")
@format_options
@click.pass_context
def svc_run(
ctx, name, command, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None
):
"Show service"
ret = ctx.obj.app.catalog.services.get(name)
tmp = ret.run_cmd(command)
pprint(tmp)
# Shell management
# ======================
@cli_app.group("shell")
def cli__shell():
"""Manages Resources Kind."""
@cli__shell.command("idents")
@format_options
@click.pass_context
def shell_idents(
ctx,
fmt_name=None,
fmt_fields=None,
fmt_sort=None,
verbose=None,
):
"Shell identities"
columns = ["name"]
data = [[x] for x in ctx.obj.app.get_idents()]
# Render data
ctx.obj.render_list(
data,
columns,
conf={
"table_title": f"Shell identities",
"fmt_name": fmt_name,
"fmt_sort": fmt_sort,
"fmt_fields": fmt_fields,
},
)
@cli__shell.command("order")
@click.option(
"--reverse", is_flag=True, show_default=False, default=False, help="Reverse order"
)
@format_options
@click.pass_context
def shell_order(
ctx, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None, reverse=False
):
"Shell loading order"
columns = ["order", "name"]
data = []
services, _ = ctx.obj.app.catalog.services.get_loading_order(reverse=reverse)
for index, name in enumerate(services):
# table.add_row(str(index), name)
data.append([str(index), name])
# Render data
ctx.obj.render_list(
data,
columns,
conf={
"table_title": f"Services commands",
"fmt_name": fmt_name,
"fmt_sort": fmt_sort,
"fmt_fields": fmt_fields,
},
)
@cli__shell.command("enable")
@click.option(
"skip_start",
"--no-start",
"-b",
is_flag=True,
show_default=False,
default=False,
help="Disable background process start",
)
@click.pass_context
def shell_enable(ctx, skip_start=True, verbose=None):
"Enable identity in shell"
app = ctx.obj.app
print(app.shell_enable(run_start=not skip_start))
@cli__shell.command("disable")
@click.option(
"run_stop",
"--kill",
"-k",
is_flag=True,
show_default=False,
default=False,
help="Kill background process on leave",
)
@click.pass_context
def shell_disable(ctx, run_stop=False, verbose=None):
"Disable identity in shell"
ret = ctx.obj.app.shell_disable(run_stop=run_stop)
print(ret)
@cli__shell.command("kill")
@click.option(
"all_idents",
"--all",
"-a",
is_flag=True,
show_default=False,
default=False,
help="Kill on all users",
)
@click.pass_context
def shell_kill(ctx, all_idents=False, verbose=None):
"Disable identity in shell"
ret = []
app = ctx.obj.app
if not all_idents:
ret.append(app.shell_enable(run_start=False))
ret.append(app.shell_disable(run_stop=True))
else:
for ident in app.idents.names():
cli_logger.warning(f"Kill ident {ident}")
app.init_ident(ident)
ret.append(app.shell_enable(run_start=False))
ret.append(app.shell_disable(run_stop=True))
print("\n".join(ret))
@cli__shell.command("switch")
@click.argument("ident", required=False)
# @format_options
@click.option(
"skip_start",
"--no-start",
"-b",
is_flag=True,
show_default=False,
default=False,
help="Disable background process start",
)
@click.option(
"run_stop",
"--kill",
"-k",
is_flag=True,
show_default=False,
default=False,
help="Kill background process on leave",
)
@click.pass_context
def shell_switch(ctx, ident="", run_stop=False, skip_start=True, verbose=None):
"Enable identity in shell"
app = ctx.obj.app
src_ident = app.catalog.ident.name
dst_ident = ident
if src_ident == dst_ident:
print(">&2 echo 'No need to change'")
else:
ret = []
# Disable existing
# logger.info(f"Disabling ident: {app.ident_name}")
ret.append(app.shell_disable(run_stop=run_stop))
# Enable new
# logger.info(f"Enabling ident: {ident}")
app.init_ident(dst_ident)
ret.append(app.shell_enable(run_start=not skip_start))
# Output
print("\n".join(ret))
class TmpGroup(click.Group):
def format_help(self, ctx, formatter):
val = "List of available commands!"
formatter.write(val)
@cli_app.command(
"run",
# cls=TmpGroup,
context_settings=dict(
ignore_unknown_options=True, allow_extra_args=True, help_option_names=[]
),
)
@click.argument("cmd_params", nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def cli_app_run(ctx, cmd_params):
# print("Will run cmd:", cmd_params)
# Check first help parameter only
show_help = False
show_list = False
for idx, param in enumerate(cmd_params):
if idx == 0:
if param in DEFAULT_HELP_OPTIONS:
show_help = True
if param in ["list", "ls"]:
show_list = True
if show_help:
ctx = click.get_current_context()
click.echo(ctx.get_help())
return
# Get instanciated app
app = ctx.obj.app
if show_list:
ret = app.catalog.services.list_cmds()
pprint(ret)
data = [[x.cmd.name, x.cmd.desc, x.svc.name] for x in ret]
# Render data
ctx.obj.render_list(
data,
["name", "desc", "service"],
conf={
"table_title": f"Services commands",
# "fmt_name": fmt_name,
# "fmt_sort": fmt_sort,
# "fmt_fields": fmt_fields,
},
)
return
# Run the output
ret = app.catalog.run_svc_cmd(cmd=cmd_params)
print(ret)
# pprint (ret, expand_all=True)
# @cli_app.command("run2",
# context_settings=dict(
# ignore_unknown_options=True,
# allow_extra_args=True,
# )
# )
# @click.pass_context
# def cli_app_run(ctx):
# print("Hello world")
# pprint (ctx.args)
# return
@cli__shell.command("install")
@click.argument("shell", required=False, default=DEFAULT_SHELL)
@click.pass_context
def shell_install(ctx, shell=None, verbose=None):
"Install iam in your shell"
ret = ctx.obj.app.shell_install(shell=shell)
# print("-- %< --" * 8)
print(ret)
# print("-- %< --" * 8)
# Exception handler
# ===============================
def clean_terminate(err):
"Terminate nicely the program depending the exception"
# Choose dead end way
if APP_EXCEPTION is not None and isinstance(err, APP_EXCEPTION):
err_name = err.__class__.__name__
# logger.error(err)
logger.critical("%s: %s" % (err_name, err))
sys.exit(1)
# Core application executor
# ===============================
def run():
"Return a MyApp App instance"
try:
cli_app(**CONTEXT_SETTINGS)
# pylint: disable=broad-except
except Exception as err:
clean_terminate(err)
# Developper catchall
if APP_COLORS:
console.print_exception(show_locals=True)
else:
# raise Exception(err) from err
logger.error(traceback.format_exc())
if APP_EXCEPTION:
logger.critical("Uncatched error %s; this may be a bug!", err.__class__)
logger.critical("Exit 1 with bugs")
else:
logger.critical("Exited with %s error: %s", err.__class__.__name__, err)
sys.exit(1)
if __name__ == "__main__":
run()