# -*- coding: utf-8 -*- import os import logging from dataclasses import dataclass from typing import Any, Union from copy import deepcopy import yaml from diskcache import Cache from ansible.errors import AnsibleError, AnsibleUndefinedVariable from ansible.module_utils.common.text.converters import to_native from ansible.utils.display import Display from ansible.template import ( generate_ansible_template_vars, AnsibleEnvironment, USE_JINJA2_NATIVE, ) from kheops.app import Kheops DOCUMENTATION_OPTION_FRAGMENT = """ # Plugin configuration # ========================== config: description: - Path to Kheops configuration yaml file - Can be useful to reuse and merge existing configurations - All settings of the target files can be overriden from this file - Ignored if blank or Null default: Null ini: - section: inventory key: kheops_config env: - name: ANSIBLE_KHEOPS_CONFIG # Query configuration # ========================== namespace: description: - The Kheops namespace to use default: 'default' env: - name: ANSIBLE_KHEOPS_DEFAULT_NAMESPACE scope: description: - A hash containing the scope to use for the request, the values will be resolved as Ansible facts. - Use a dot notation to dig deeper into nested hash facts. default: node: inventory_hostname groups: group_names keys: description: - A list of keys to lookup default: Null # Behavior configuration # ========================== process_scope: description: - This setting defines how is parsed the `scope` configuration - Set `vars` to enable simple variable interpolation - Set `jinja` to enable jinja string interpolation default: 'jinja' choices: ['vars', 'jinja'] process_results: description: - This setting defines how is parsed the returned results. - Set `none` to disable jinja interpolation from result. - Set `jinja` to enable jinja result interpolation. - Using jinja may pose some security issues, as you need to be sure that your source of data is properly secured. default: 'none' choices: ['none', 'jinja'] jinja2_native: description: - Controls whether to use Jinja2 native types. - It is off by default even if global jinja2_native is True. - Has no effect if global jinja2_native is False. - This offers more flexibility than the template module which does not use Jinja2 native types at all. - Mutually exclusive with the convert_data option. default: False type: bool env: - name: ANSIBLE_JINJA2_NATIVE notes: - Kheops documentation is available on http://kheops.io/ - You can add more parameters as documented in http://kheops.io/server/api # Backend configuration (Direct/Client) # ========================== mode: description: - Choose `client` to use a remote Khéops instance (Not implemented yet) - Choose `instance` to use a Khéops directly default: 'instance' choices: ['instance'] env: - name: ANSIBLE_KHEOPS_MODE # Instance configuration (Direct) # ========================== # Instance configuration instance_config: description: - The Kheops configuration file to use. - Require mode=instance default: 'site/kheops.yml' env: - name: ANSIBLE_KHEOPS_INSTANCE_CONFIG instance_namespace: description: - The Kheops configuration file to use. - Require mode=instance default: 'default' env: - name: ANSIBLE_KHEOPS_INSTANCE_NAMESPACE instance_log_level: description: - Khéops logging level - Require mode=instance choices: ['DEBUG', 'INFO', 'WARNING', 'ERROR'] default: 'WARNING' env: - name: ANSIBLE_KHEOPS_INSTANCE_LOG_LEVEL instance_explain: description: - Khéops logging explain - Require mode=instance type: boolean default: False env: - name: ANSIBLE_KHEOPS_INSTANCE_EXPLAIN # Instance configuration (Client) # ========================== # turfu # Client configuration # turfu client_token: # turfu description: # turfu - The Kheops token to use to authenticate against Kheops server. # turfu default: '' # turfu env: # turfu - name: ANSIBLE_KHEOPS_TOKEN # turfu client_host: # turfu description: # turfu - Hostname of the Kheops Server. # turfu default: '127.0.0.1' # turfu env: # turfu - name: ANSIBLE_KHEOPS_HOST # turfu client_port: # turfu description: # turfu - Kheops port to connect to. # turfu default: '9843' # turfu env: # turfu - name: ANSIBLE_KHEOPS_PORT # turfu client_protocol: # turfu description: # turfu - The URL protocol to use. # turfu default: 'http' # turfu choices: ['http', 'https'] # turfu env: # turfu - name: ANSIBLE_KHEOPS_PROTOCOL # turfu client_validate_certs: # turfu description: # turfu - Whether or not to verify the TLS certificates of the Kheops server. # turfu type: boolean # turfu default: False # turfu env: # turfu - name: ANSIBLE_KHEOPS_VALIDATE_CERTS """ KEY_NS_SEP = "/" if USE_JINJA2_NATIVE: from ansible.utils.native_jinja import NativeJinjaText KHEOPS_CACHE=Cache("/tmp/ansible_kheops_cache/") KHEOPS_CACHE.clear() class Key(): """Key instance class""" default_config = { "key": None, "remap": None, "scope": {}, "namespace": None, "namespace_prefix": False, "explain": False, "trace": False, } def __init__(self, config): self._config = self.parse_config(config) # Create all attributes for key, value in self._config.items(): setattr(self, key, value) def parse_config(self, key_def): """ Parse a key configuration and return an object A key configuration can either be: - a string: for quick queries - a dict: for full control, that can be extended later for more options String configuration: - - : - :: Dict configuration: - key: key to query - namespace: namespace to query - remap: rename key to ansible variable """ ret = dict(self.default_config) if isinstance(key_def, dict): ret.update(key_def) elif isinstance(key_def, str): key = None remap = key namespace = self.default_namespace # Extract config from string parts = key_def.split(KEY_NS_SEP, 3) if len(parts) == 1: key = parts[0] elif len(parts) > 1: namespace = parts[0] key = parts[1] if len(parts) == 3: remap = parts[2] # Generate new config string_conf = { "key": key, "namespace": namespace, "remap": remap, } ret.update(string_conf) else: raise Exception(f"Key configuration is invalid, expected a string or dict, got: {key_def}") return ret def show(self): """Method to show the accepatable string format for Kheops""" ret = self.key if self.namespace is not None: ret = f"{self.namespace}{KEY_NS_SEP}{ret}" return ret class Keys(): """Keys config instance class""" def __init__(self, config, default_namespace="default"): self.raw_config = config self.default_namespace = default_namespace self.key_list = self.parse_config(config) def parse_config(self, keys_def, default_namespace="default"): """Parse a key config structure A keydef can either be: - a string: Single lookup - a list: ordered lookup - a dict: unordered lookup """ ret = [] if isinstance(keys_def, list): for key_def in keys_def: ret.append(Key(key_def)) elif isinstance(keys_def, str): ret.append(Key(keys_def)) elif isinstance(keys_def, dict): for key, value in keys_def.items(): if isinstance(value, str): key_def = value or key if isinstance(value, dict): key_def = value or {} key_def['key'] = key_def.get('key', key) ret.append(Key(key_def)) else: raise AnsibleError(f"Unable to process Kheops keys: {ret}") if len(ret) == 0: raise AnsibleError(f"Kheops query needs at least one key to lookup") return ret class AnsibleKheops: """Main Ansible Kheops Class""" # Extract default config from documentation default_config = { key: value.get("default", None) for key, value in yaml.safe_load( DOCUMENTATION_OPTION_FRAGMENT).items() } # Kheops instance management # ---------------------------- def __init__(self, configs=None, display=None): self.display = display or Display() configs = configs or [] self.config = self.parse_configs(configs) self.display.v("Kheops instance has been created, with config: %s" % self.config['instance_config']) # Instanciate Kheops if self.config["mode"] == "instance": # Configure logging logger = logging.getLogger("kheops") logger.setLevel(self.config["instance_log_level"]) # See for logging: https://medium.com/opsops/debugging-requests-1989797736cc #class ListLoggerHandler(logging.Handler): # def emit(self, record): # msg = self.format(record) main_logger = logging.getLogger() #main_logger.addHandler(ListLoggerHandler()) main_logger.setLevel(logging.DEBUG) # Start instance self.kheops = Kheops( config=self.config["instance_config"], namespace=self.config["instance_namespace"], cache=KHEOPS_CACHE ) elif self.config["mode"] == "client": raise AnsibleError("Kheops client mode is not implemented") def parse_configs(self, configs): """ Take a list of config items that will be merged together. A config item can either be: - A string: Represents the path of the config - A dict: Represent a direct configuration - None: Just ignore this entry #Processing order: #- Fetch the value of config or fallback on ANSIBLE_KHEOPS_CONFIG #- Load the config if any #- Overrides with other options """ # Get default configuration from environment env_items = [ # We exclude 'config' on purpose "mode", "default_namespace", "instance_config", "instance_namespace", "instance_log_level", "instance_log_explain", # Future: # "token", # "host", # "port", # "protocol", # "validate_certs", ] env_config = {} for item in env_items: envvar = "ANSIBLE_KHEOPS_" + item.upper() try: env_config[item] = os.environ[envvar] except KeyError: pass # Merge/process runtime configurations merged_configs = {} for config in configs: conf_data = None if isinstance(config, str): self.display.vv("Read Kheops file config %s" % config) if os.path.isfile(config): data = open(config, "r", encoding="utf-8") conf_data = yaml.safe_load(data) else: raise AnsibleError(f"Unable to find configuration file {config}") elif isinstance(config, dict): self.display.vv("Read Kheops direct config %s" % config) conf_data = config elif isinstance(config, type(None)): continue else: assert False, f"Bad config for, expected a path or a dict, got type {type(config)}: {config}" assert isinstance(conf_data, dict), f"Bug with conf_data: {conf_data}" if isinstance(conf_data, dict): merged_configs.update(conf_data) # Merge results combined_config = dict(self.default_config) combined_config.update(env_config) combined_config.update(merged_configs) return combined_config # Lookup methods # ---------------------------- def super_lookup_seq( self, keys, scope_config=None, scope_vars=None, namespace=None, kwargs=None, _templar=None, _process_scope=None, _process_results=None, jinja2_native=False, ): """ Sequential Lookup method wrapper """ # Determine runtime options _process_scope = _process_scope if _process_scope in ["vars", "jinja"] else self.config["process_scope"] _process_results = _process_results if isinstance(_process_results, bool ) else self.config["process_results"] _reinject_results = True # Build query context namespace = namespace or self.config.get('default_namespace') scope_config = scope_config or self.config.get('scope') keys = keys or self.config.get('keys', []) keys_config = Keys(keys, default_namespace=namespace) scope_final = self.get_scope( _process_scope, scope_vars, scope_config, _templar=_templar, jinja2_native=jinja2_native ) # Query kheops result = [] ret = {} for key in keys_config.key_list: if _reinject_results: scope_vars.update(dict(ret)) scope_final = self.get_scope( _process_scope, scope_vars, scope_config, _templar=_templar, jinja2_native=jinja2_native ) # Query kheops ret = self.kheops.lookup( keys=[key.show()], scope=scope_final, trace=key.trace, explain=key.explain, namespace_prefix=key.namespace_prefix ) # Remap output if key.remap is not None and key.remap != key.key: #print (f"REMAP KEY: {key.key} => {key.remap}") ret[key.remap] = ret[key.key] del ret[key.key] # Process output if _process_results == "jinja": with _templar.set_temporary_context(available_variables=scope_vars): ret = _templar.template( ret, preserve_trailing_newlines=True, convert_data=False, escape_backslashes=False, ) if USE_JINJA2_NATIVE and not jinja2_native: ret = NativeJinjaText(ret) result.append(ret) return result # Scope management methods # ---------------------------- def get_scope(self, method, scope_vars, scope_config, _templar=None, jinja2_native=False): """ Simple wrapper for _get_scope_template and _get_scope_vars """ ret = {} if method == "jinja": ret = self._get_scope_template(scope_vars, scope_config, _templar, jinja2_native=jinja2_native) elif method == "vars": ret = self._get_scope_vars(scope_vars, scope_config) elif method == "all": ret = scope_vars else: raise AnsibleError(f"Get_scope only accept 'jinja','vars' or 'all', got: {method}") #print ("===> Scope Config:", method) #print (scope_config) #print ("===> Scope Vars:", scope_vars.get('tiger_profiles', 'MISSING_PROFILE')) #print (scope_vars.keys()) #print ("===> Scope Result:") #print (ret) #print ("===> Scope EOF") return dict(ret) def _get_scope_template(self, data, scope_config, templar, jinja2_native=False): """ Create a scope context from data, jinja2 interpolation """ scope_config = scope_config or self.config.get('scope', {}) assert isinstance(data, dict) assert templar if USE_JINJA2_NATIVE and not jinja2_native: _templar = templar.copy_with_new_env(environment_class=AnsibleEnvironment) else: _templar = templar # Process template with jinja _vars = dict(data) ret = {} with _templar.set_temporary_context(available_variables=_vars): try: ret = _templar.template( scope_config, preserve_trailing_newlines=True, convert_data=True, escape_backslashes=False, ) if USE_JINJA2_NATIVE and not jinja2_native: # jinja2_native is true globally but off for the lookup, we need this text # not to be processed by literal_eval anywhere in Ansible ret = NativeJinjaText(ret) self.display.vvv(f"Transformed scope value: {scope_config} => {ret}") except AnsibleUndefinedVariable as err: self.display.error(f"Got templating error for string '{scope_config}': {err}") raise err return ret def _get_scope_vars(self, data, scope_config): """ Create a scope context from data, simple interpolation """ scope_config = scope_config or self.config.get('scope', {}) assert isinstance(data, dict) ret = {} for key, val in scope_config.items(): ret[key] = data.get(val, None) return ret