#!/usr/bin/env python3 import io import logging import os import sys import traceback from enum import Enum from pprint import pprint from types import SimpleNamespace from iam.app import App from iam.framework import get_app_logger from . import exceptions as error from .lib.cli_views import VIEW_FORMATS_NAMES, ViewItem, ViewList from .lib.click_utils import NestedHelpGroup # from .cli_views import ViewItem, ViewList, VIEW_FORMATS from .lib.utils import prune, to_yaml # import rich # from rich import box # from rich.console import Console # from rich.pretty import pprint # from rich.prompt import Prompt # from rich.table import Table # from rich.pretty import pprint # Rich loader # ====================== APP_RICH = True table_box_params = {} if not APP_RICH: import click # Create console console = SimpleNamespace(print=print) else: # Import rich # Transparent rich proxy for click (help menus) import rich_click as click from rich_click import RichGroup from rich import box, print from rich.console import Console # Overrides defaults from rich.pretty import pprint # Load rich wrappers class NestedRichHelpGroup(NestedHelpGroup, RichGroup): "Add rich class" NestedHelpGroup = NestedRichHelpGroup # Create default rich table settings table_box_params = { "box": box.ASCII2, # box.MINIMAL "min_width": 60, } # Create a console console = Console() logger = logging.getLogger() cli_logger = logging.getLogger("iam.cli") # Global vars # ====================== # Init app # ====================== logger = logging.getLogger(__name__) item_view = ViewItem(output=console.print) list_view = ViewList(output=console.print) # General App settings # =============================== APP_NAME = "iam" APP_VERSION = "__version__TOFIX" APP_EXCEPTION = error.IamException APP_TTY = os.isatty(0) APP_COLORS = APP_TTY or APP_RICH # See: https://docs.python.org/3.8/library/logging.html#logging-levels DEFAULT_LOG_LEVEL = 30 # warning # Get default ident DEBUG = os.environ.get("IAM_DEBUG", "False") DEFAULT_IDENT = os.environ.get("IAM_IDENT", os.environ.get("SHELL_IDENT", "")) DEFAULT_FORMAT = os.environ.get("IAM_FORMAT", "table") DEFAULT_HELP_OPTIONS = ["-h", "--help"] DEFAULT_SHELL = os.environ.get("SHELL", "bash") # list_view.formats_enum # Click helpers # =============================== _cmd1_options = [click.option("--format")] _cmd2_options = [click.option("--cmd2-opt")] CONTEXT_SETTINGS = dict( show_default=True, help_option_names=DEFAULT_HELP_OPTIONS, auto_envvar_prefix=APP_NAME.upper(), ) def global_options(function): # function = click.option( # "debug", "--debug", # is_flag=True, show_default=False, default=False, # help="Debug" # )(function) # function = click.option( # "force", "--force", "-f" # is_flag=True, show_default=False, default=False, # help="Force" # )(function) function = click.option( "interactive", "--interactive/--no-interactive", "-I", is_flag=True, show_default=APP_TTY, default=APP_TTY, help="Interactive mode", )(function) function = click.option( "colors", "--colors/--no-colors", "-C", is_flag=True, show_default=APP_COLORS, default=APP_COLORS, help="Enable colors", )(function) # function = click.option( # "recursive", "--recursive", "-r" # is_flag=True, show_default=False, default=False, # help="Recursive mode" # )(function) # function = click.option( # 'dry_run', '-n', '--dry-run', # is_flag=True, show_default=False, default=False, # help="Dry run, do not take any actions." # )(function) function = click.option( "quiet", "--quiet", "-q", is_flag=True, show_default=False, default=False, help="Suppress output except warnings and errors.", )(function) function = click.option( "log_trace", "--trace", is_flag=True, show_default=False, default=False, help="Show traceback on errors", )(function) function = click.option( "-v", "--verbose", count=True, type=click.IntRange(0, int(DEFAULT_LOG_LEVEL / 10), clamp=True), help="Increase verbosity of output. Can be repeated.", )(function) return function def format_options(function): # Duplicates function = click.option( "-v", "--verbose", count=True, type=click.IntRange(0, int(DEFAULT_LOG_LEVEL / 10), clamp=True), help="Increase verbosity of output. Can be repeated.", )(function) # Special output options function = click.option( "fmt_name", "-O", "--output", type=click.Choice(VIEW_FORMATS_NAMES, case_sensitive=False), help="Output format", default=None, show_default=DEFAULT_FORMAT, )(function) function = click.option( "fmt_fields", "-H", "--headers", help="Show specific headers" )(function) function = click.option("fmt_sort", "-S", "--sort", help="Sort columns")(function) return function # Base Application example # =============================== # DEFAULT_CONFIG = os.environ.get("IAM_CONFIG", "MISSING") DEFAULT_CONFIG = click.get_app_dir(APP_NAME) # SHOW_DEFAULT_HELP=True def get_var(name, default=None): real_name = f"{APP_NAME.upper()}_{name.upper()}" # print ("QUERY", real_name) return os.environ.get(real_name, default) # @click.group(cls=NestedHelpGroup, context_settings=CONTEXT_SETTINGS) @click.group(cls=NestedHelpGroup) @global_options @format_options @click.version_option() @click.option( "config", "--config", "-C", # envvar='CONFIG', # multiple=True, type=click.Path(), default=get_var("CONFIG", DEFAULT_CONFIG), show_default=get_var("CONFIG", DEFAULT_CONFIG), help="Configuration directory or file", ) @click.pass_context def cli_app( ctx, verbose, config: str = DEFAULT_IDENT, ident: str = DEFAULT_IDENT, quiet=False, interactive=False, colors=False, log_trace=False, fmt_fields=None, fmt_sort=None, fmt_name=None, ): """Naval Fate. This is the docopt example adopted to Click but with some actual commands implemented and not just the empty parsing which really is not all that interesting. """ # Show trace on errors, more like the --trace flag if log_trace: global APP_EXCEPTION APP_EXCEPTION = None # Disable COLORS global APP_COLORS APP_COLORS = colors # Calculate log level log_level = DEFAULT_LOG_LEVEL - verbose * 10 log_level = log_level if log_level > 0 else 10 # Define loggers loggers = { "iam": {"level": log_level}, "iam.cli": {"level": "DEBUG", "handlers": ["info"]}, } # Instanciate logger get_app_logger(loggers=loggers, level=log_level, colors=True) cli_config = SimpleNamespace( log_level=log_level, fmt_name=fmt_name or DEFAULT_FORMAT, ident=ident, interactive=interactive, quiet=quiet, ) render_config = { "fmt_name": cli_config.fmt_name, "table_settings": table_box_params, } render_item = lambda *args, conf={}, **kwargs: item_view.render( *args, conf={**render_config, **prune(conf)}, **kwargs ) render_list = lambda *args, conf={}, **kwargs: list_view.render( *args, conf={**render_config, **prune(conf)}, **kwargs ) # Instanciate app logger.info(f"Current ident: {ident}") ctx.obj = SimpleNamespace( app=App(config_path=config, ident=ident, cli_context=cli_config), cli=cli_config, render_item=render_item, render_list=render_list, ) @cli_app.command() @click.pass_context def tests(ctx): print("Hello world") return # pprint(command_tree(cli_app)) print("TREEE @") pprint(command_tree3(cli_app, ctx)) all_help(cli_app, ctx) # subcommand_obj = cli_app.get_command(ctx, subcommand) # if subcommand_obj is None: # click.echo("I don't know that command.") # else: # pprint (subcommand_obj) # pprint (subcommand_obj.__dict__) # click.echo(subcommand_obj.get_help(ctx)) # Cli Utils # =============================== # Dedicated help command # Source: https://stackoverflow.com/a/53137265 @cli_app.command() @click.argument("subcommand", nargs=-1) @click.pass_context def help(ctx, subcommand): "Show a command help" subcommand = " ".join(subcommand) subcommand_obj = cli_app.get_command(ctx, subcommand) if subcommand_obj is None: click.echo("I don't know that command.") else: pprint(subcommand_obj) pprint(subcommand_obj.__dict__) click.echo(subcommand_obj.get_help(ctx)) # Global commands # =============================== @cli_app.command() @click.pass_context def dump(ctx): ctx.obj.app.user_dump() # Resource kind management # =============================== @cli_app.group("kind") def cli__kind(): """Manages Resources Kind.""" @cli__kind.command("list") @click.argument("filter", required=False) @format_options @click.pass_context def kind_list(ctx, filter, fmt_name=None, fmt_sort=None, fmt_fields=None, verbose=None): "List resource kinds" # Fetch data # filter = name resources_kind = ctx.obj.app.catalog.resources_kind data = [] for name, item in sorted(resources_kind.items(), key=lambda key: key): if filter and not name.startswith(filter): continue data.append((name, item.desc)) # Render data columns = ["name", "desc"] conf = { "table_title": "Resources kinds listing", "fmt_name": fmt_name, "fmt_sort": fmt_sort, "fmt_fields": fmt_fields, } ctx.obj.render_list(data, columns, conf=conf) @cli__kind.command("show") @click.argument("name", required=False) @format_options @click.pass_context def kind_show(ctx, name, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None): "Show resource kind" # Fetch data columns = ["name", "desc", "input", "needs", "remap"] filter = name resources_kind = ctx.obj.app.catalog.resources_kind for name, item in sorted(resources_kind.items(), key=lambda key: key): if filter and not name.startswith(filter): continue data = [ item.name, item.desc, f"{to_yaml(item.input)}", f"{to_yaml(item.needs)}", f"{to_yaml(item.remap)}", ] # Render data conf = { "table_title": f"Resources kind: {data[0]}", "fmt_name": fmt_name, "fmt_sort": fmt_sort, "fmt_fields": fmt_fields, } ctx.obj.render_item(data, columns, conf=conf) # @cli__kind.command("move") # @click.argument("cli__kind") # @click.argument("x", type=float) # @click.argument("y", type=float) # @click.option("--speed", metavar="KN", default=10, help="Speed in knots.") # def ship_move(cli__kind, x, y, speed): # """Moves SHIP to the new location X,Y.""" # click.echo(f"Moving cli__kind {cli__kind} to {x},{y} with speed {speed}") # @cli__kind.command("shoot") # @click.argument("cli__kind") # @click.argument("x", type=float) # @click.argument("y", type=float) # def ship_shoot(cli__kind, x, y): # """Makes SHIP fire to X,Y.""" # click.echo(f"Ship {cli__kind} fires to {x},{y}") # Resource res management # =============================== @cli_app.group("res") def cli__res(): """Manages Resources.""" @cli__res.command("list") @click.argument("filter", required=False) # @format_options @click.pass_context def res_list(ctx, filter, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None): "List resource ress" # Fetch data resources = ctx.obj.app.catalog.resources data = [] for name, res in sorted(resources.items(), key=lambda key: key): if filter and not name.startswith(filter): continue data.append( ( res.get_kind(), res.get_name(), res.desc, ) ) # Render data columns = ["kind", "name", "desc"] conf = { "table_title": "Resources listing", "fmt_name": fmt_name, "fmt_sort": fmt_sort, "fmt_fields": fmt_fields, } ctx.obj.render_list(data, columns, conf=conf) @cli__res.command("show") @click.argument("name", required=False) @format_options @click.pass_context def res_show(ctx, name, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None): "Show resource res" # Fetch data columns = ["name", "uses", "input"] columns_all = columns + ["active", "deps", "missing", "loop_max", "loop", "need"] filter = name resources = ctx.obj.app.catalog.resources.select("startswith", name) for name, item in sorted(resources.items(), key=lambda key: key): if filter and not name.startswith(filter): continue if not all and not item.is_active(): continue data = [ name, to_yaml(item.uses).strip(), to_yaml(item.input).strip(), ] if all: data.extend( [ item.is_active(), ",".join([x.name for x in item.resources_deps]), ",".join(item.resources_missing), item.loop_limit, to_yaml(item.loop).strip(), to_yaml(item.needs).strip(), ] ) # Render data ctx.obj.render_item( data, columns_all if all else columns, conf={ "table_title": f"Resources kind: {data[0]}", "fmt_name": fmt_name, "fmt_sort": fmt_sort, "fmt_fields": fmt_fields, }, ) # Resource svc management # =============================== @cli_app.group("svc") def cli__svc(): """Manages Resources Kind.""" @cli__svc.command("list") @click.argument("filter", required=False) @format_options @click.pass_context def svc_list(ctx, filter, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None): "List services" columns = ["name", "desc"] services = ctx.obj.app.catalog.services data = [] for name, res in sorted(services.items(), key=lambda key: key[1].name): if filter and not name.startswith(filter): continue # if not all and not res.is_active(): # continue data.append( ( # res.get_kind(), # res.get_name(), res.name, res.desc, ) ) # Render data ctx.obj.render_list( data, columns, conf={ "table_title": f"Services listing", "fmt_name": fmt_name, "fmt_sort": fmt_sort, "fmt_fields": fmt_fields, }, ) @cli__svc.command("show") @click.argument("name", required=False) @format_options @click.pass_context def svc_show(ctx, name, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None): "Show service" columns = [ "name", "desc", "enabled", "input", "commands", "required_svc", "resource_lookup", "rest_matches" "dump", ] columns_all = columns + ["active", "deps", "missing", "loop_max", "loop", "need"] output = {} services = ctx.obj.app.catalog.services.select("startswith", name) for name, svc in services.items(): assert name == svc.name data = [ svc.name, svc.desc, svc.enabled, svc.input, to_yaml(svc.list_cmds()).strip(), svc.required_services, svc.resources_lookup, svc.resources_matches, svc.dump_item(), ] data = [str(item) for item in data] ctx.obj.render_item( data, columns_all if all else columns, conf={ "table_title": f"Service show: {data[0]}", "fmt_name": fmt_name, "fmt_sort": fmt_sort, "fmt_fields": fmt_fields, }, ) @cli__svc.command("commands") @format_options @click.pass_context def svc_commands(ctx, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None): "Show service" columns = ["service", "name", "type", "desc"] data = [] for svc_name, svc in ctx.obj.app.list_services().items(): cmds = svc.list_cmds() for source in ["shell", "cmds"]: items = cmds[source] # target = ret[source] for cmd_name, conf in items.items(): # target[cmd_name] = conf data.append([svc_name, cmd_name, source, conf]) # Render data ctx.obj.render_list( data, columns, conf={ "table_title": f"Services commands", "fmt_name": fmt_name, "fmt_sort": fmt_sort, "fmt_fields": fmt_fields, }, ) @cli__svc.command("run_v1") @click.argument("name") @click.argument("command") @format_options @click.pass_context def svc_run( ctx, name, command, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None ): "Show service" ret = ctx.obj.app.catalog.services.get(name) tmp = ret.run_cmd(command) pprint(tmp) # Shell management # ====================== @cli_app.group("shell") def cli__shell(): """Manages Resources Kind.""" @cli__shell.command("idents") @format_options @click.pass_context def shell_idents( ctx, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None, ): "Shell identities" columns = ["name"] data = [[x] for x in ctx.obj.app.get_idents()] # Render data ctx.obj.render_list( data, columns, conf={ "table_title": f"Shell identities", "fmt_name": fmt_name, "fmt_sort": fmt_sort, "fmt_fields": fmt_fields, }, ) @cli__shell.command("order") @click.option( "--reverse", is_flag=True, show_default=False, default=False, help="Reverse order" ) @format_options @click.pass_context def shell_order( ctx, fmt_name=None, fmt_fields=None, fmt_sort=None, verbose=None, reverse=False ): "Shell loading order" columns = ["order", "name"] data = [] services, _ = ctx.obj.app.catalog.services.get_loading_order(reverse=reverse) for index, name in enumerate(services): # table.add_row(str(index), name) data.append([str(index), name]) # Render data ctx.obj.render_list( data, columns, conf={ "table_title": f"Services commands", "fmt_name": fmt_name, "fmt_sort": fmt_sort, "fmt_fields": fmt_fields, }, ) @cli__shell.command("enable") @click.option( "skip_start", "--no-start", "-b", is_flag=True, show_default=False, default=False, help="Disable background process start", ) @click.pass_context def shell_enable(ctx, skip_start=True, verbose=None): "Enable identity in shell" app = ctx.obj.app print(app.shell_enable(run_start=not skip_start)) @cli__shell.command("disable") @click.option( "run_stop", "--kill", "-k", is_flag=True, show_default=False, default=False, help="Kill background process on leave", ) @click.pass_context def shell_disable(ctx, run_stop=False, verbose=None): "Disable identity in shell" ret = ctx.obj.app.shell_disable(run_stop=run_stop) print(ret) @cli__shell.command("kill") @click.option( "all_idents", "--all", "-a", is_flag=True, show_default=False, default=False, help="Kill on all users", ) @click.pass_context def shell_kill(ctx, all_idents=False, verbose=None): "Disable identity in shell" ret = [] app = ctx.obj.app if not all_idents: ret.append(app.shell_enable(run_start=False)) ret.append(app.shell_disable(run_stop=True)) else: for ident in app.idents.names(): cli_logger.warning(f"Kill ident {ident}") app.init_ident(ident) ret.append(app.shell_enable(run_start=False)) ret.append(app.shell_disable(run_stop=True)) print("\n".join(ret)) @cli__shell.command("switch") @click.argument("ident", required=False) # @format_options @click.option( "skip_start", "--no-start", "-b", is_flag=True, show_default=False, default=False, help="Disable background process start", ) @click.option( "run_stop", "--kill", "-k", is_flag=True, show_default=False, default=False, help="Kill background process on leave", ) @click.pass_context def shell_switch(ctx, ident="", run_stop=False, skip_start=True, verbose=None): "Enable identity in shell" app = ctx.obj.app src_ident = app.catalog.ident.name dst_ident = ident if src_ident == dst_ident: print(">&2 echo 'No need to change'") else: ret = [] # Disable existing # logger.info(f"Disabling ident: {app.ident_name}") ret.append(app.shell_disable(run_stop=run_stop)) # Enable new # logger.info(f"Enabling ident: {ident}") app.init_ident(dst_ident) ret.append(app.shell_enable(run_start=not skip_start)) # Output print("\n".join(ret)) class TmpGroup(click.Group): def format_help(self, ctx, formatter): val = "List of available commands!" formatter.write(val) @cli_app.command( "run", # cls=TmpGroup, context_settings=dict( ignore_unknown_options=True, allow_extra_args=True, help_option_names=[] ), ) @click.argument("cmd_params", nargs=-1, type=click.UNPROCESSED) @click.pass_context def cli_app_run(ctx, cmd_params): # print("Will run cmd:", cmd_params) # Check first help parameter only show_help = False show_list = False for idx, param in enumerate(cmd_params): if idx == 0: if param in DEFAULT_HELP_OPTIONS: show_help = True if param in ["list", "ls"]: show_list = True if show_help: ctx = click.get_current_context() click.echo(ctx.get_help()) return # Get instanciated app app = ctx.obj.app if show_list: ret = app.catalog.services.list_cmds() pprint(ret) data = [[x.cmd.name, x.cmd.desc, x.svc.name] for x in ret] # Render data ctx.obj.render_list( data, ["name", "desc", "service"], conf={ "table_title": f"Services commands", # "fmt_name": fmt_name, # "fmt_sort": fmt_sort, # "fmt_fields": fmt_fields, }, ) return # Run the output ret = app.catalog.run_svc_cmd(cmd=cmd_params) print(ret) # pprint (ret, expand_all=True) # @cli_app.command("run2", # context_settings=dict( # ignore_unknown_options=True, # allow_extra_args=True, # ) # ) # @click.pass_context # def cli_app_run(ctx): # print("Hello world") # pprint (ctx.args) # return @cli__shell.command("install") @click.argument("shell", required=False, default=DEFAULT_SHELL) @click.pass_context def shell_install(ctx, shell=None, verbose=None): "Install iam in your shell" ret = ctx.obj.app.shell_install(shell=shell) # print("-- %< --" * 8) print(ret) # print("-- %< --" * 8) # Exception handler # =============================== def clean_terminate(err): "Terminate nicely the program depending the exception" # Choose dead end way if APP_EXCEPTION is not None and isinstance(err, APP_EXCEPTION): err_name = err.__class__.__name__ # logger.error(err) logger.critical("%s: %s" % (err_name, err)) sys.exit(1) # Core application executor # =============================== def run(): "Return a MyApp App instance" try: cli_app(**CONTEXT_SETTINGS) # pylint: disable=broad-except except Exception as err: clean_terminate(err) # Developper catchall if APP_COLORS: console.print_exception(show_locals=True) else: # raise Exception(err) from err logger.error(traceback.format_exc()) if APP_EXCEPTION: logger.critical("Uncatched error %s; this may be a bug!", err.__class__) logger.critical("Exit 1 with bugs") else: logger.critical("Exited with %s error: %s", err.__class__.__name__, err) sys.exit(1) if __name__ == "__main__": run()