Source code for project_config.utils.jmespath

"""Utilities related to JMESPaths."""

from __future__ import annotations

import builtins
import glob
import json
import operator
import os
import pickle
import pprint
import re
import shlex
import shutil
import sys
import warnings
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

import deepmerge
from jmespath import (
    Options as JMESPathOptions,
    compile as jmespath_compile,
)
from jmespath.exceptions import (
    JMESPathError as OriginalJMESPathError,
    ParseError as JMESPathParserError,
)
from jmespath.functions import (
    Functions as JMESPathFunctions,
    signature as jmespath_func_signature,
)
from jmespath.parser import (
    ParsedResult as JMESPathParsedResult,
    Parser,
)

from project_config import tree
from project_config.cache import Cache
from project_config.compat import removeprefix, removesuffix
from project_config.exceptions import ProjectConfigException


if TYPE_CHECKING:
    pass


[docs]class JMESPathError(ProjectConfigException): """Class to wrap all JMESPath errors of the plugin."""
BUILTIN_TYPES = ["str", "bool", "int", "float", "list", "dict", "set"] BUILTIN_DEEPMERGE_STRATEGIES = {} for maybe_merge_strategy_name in dir(deepmerge): if not maybe_merge_strategy_name.startswith("_"): maybe_merge_strategy_instance = getattr( deepmerge, maybe_merge_strategy_name, ) if isinstance(maybe_merge_strategy_instance, deepmerge.Merger): BUILTIN_DEEPMERGE_STRATEGIES[maybe_merge_strategy_name] = ( maybe_merge_strategy_instance ) OPERATORS_FUNCTIONS = { "<": operator.lt, "<=": operator.le, "==": operator.eq, "!=": operator.ne, ">=": operator.ge, ">": operator.gt, "is": operator.is_, "is_not": operator.is_not, "is-not": operator.is_not, "is not": operator.is_not, "isNot": operator.is_not, "+": operator.add, "&": operator.and_, "and": operator.and_, "//": operator.floordiv, "<<": operator.lshift, "%": operator.mod, "*": operator.mul, "@": operator.matmul, "|": operator.or_, "or": operator.or_, "**": operator.pow, ">>": operator.rshift, "-": operator.sub, "/": operator.truediv, "^": operator.xor, "count_of": operator.countOf, "count of": operator.countOf, "count-of": operator.countOf, "countOf": operator.countOf, "index_of": operator.indexOf, "index of": operator.indexOf, "index-of": operator.indexOf, "indexOf": operator.indexOf, } SET_OPERATORS = {"<", ">", "<=", ">=", "and", "&", "or", "|", "-", "^"} OPERATORS_THAT_RETURN_SET = {"and", "&", "or", "|", "-", "^"} # map from jmespath exceptions class names to readable error types JMESPATH_READABLE_ERRORS = { "ParserError": "parsing error", "IncompleteExpressionError": "incomplete expression error", "LexerError": "lexing error", "ArityError": "arity error", "VariadictArityError": "arity error", "JMESPathTypeError": "type error", "EmptyExpressionError": "empty expression error", "UnknownFunctionError": "unknown function error", } # JMESPath variables that can not be cached in evaluations # as are not deterministic UNCACHEABLE_JMESPATH_VARIABLES = { "rootdir", # rootdir_name() "listdir", # ... "isfile", "isdir", "exists", "mkdir", "rmdir", "glob", "getenv", # other useful "dirname", "basename", "extname", }
[docs]def _create_simple_transform_function_for_string( func_name: str, ) -> Callable[[type, str], str]: func = getattr(str, func_name) return jmespath_func_signature({"types": ["string"]})( lambda _self, value: func(value), )
[docs]def _create_is_function_for_string( func_suffix: str, ) -> Callable[[type, str], bool]: func = getattr(str, f"is{func_suffix}") return jmespath_func_signature({"types": ["string"]})( lambda _self, value: func(value), )
[docs]def _create_find_function_for_string_or_array( func_prefix: str, ) -> Callable[[type, list[Any] | str, Any, Any], int]: getattr(str, f"{func_prefix}find") def _wrapper( _self: type, value: list[Any] | str, sub: Any, *args: Any, ) -> int: if isinstance(value, list): try: return value.index(sub, *args) except ValueError: return -1 return value.find(sub, *args) return jmespath_func_signature( {"types": ["string", "array"], "variadic": True}, )(_wrapper)
[docs]def _create_just_function_for_string( func_prefix: str, ) -> Callable[[type, str, int, Any], str]: func = getattr(str, f"{func_prefix}just") return jmespath_func_signature( {"types": ["string"]}, {"types": ["number"], "variadic": True}, )(lambda _self, value, width, *args: func(value, width, *args))
[docs]def _create_partition_function_for_string( func_prefix: str, ) -> Callable[[type, str, str], list[str]]: func = getattr(str, f"{func_prefix}partition") return jmespath_func_signature( {"types": ["string"]}, {"types": ["string"]}, )(lambda _self, value, sep: list(func(value, sep)))
[docs]def _create_split_function_for_string( func_prefix: str, ) -> Callable[[type, str, Any], list[str]]: func = getattr(str, f"{func_prefix}split") return jmespath_func_signature( {"types": ["string"], "variadic": True}, )(lambda _self, value, *args: func(value, *args))
[docs]def _create_strip_function_for_string( func_prefix: str, ) -> Callable[[type, str], str]: func = getattr(str, f"{func_prefix}strip") return jmespath_func_signature( {"types": ["string"], "variadic": True}, )(lambda _self, value, *args: func(value, *args))
[docs]def _create_removeaffix_function_for_string( func_suffix: str, ) -> Callable[[type, str, str], str]: func = removesuffix if func_suffix.startswith("s") else removeprefix return jmespath_func_signature( {"types": ["string"]}, {"types": ["string"]}, )(lambda _self, value, affix: func(value, affix))
[docs]def _to_items(value: Any) -> list[Any]: return [[key, value] for key, value in value.items()]
[docs]class JMESPathProjectConfigFunctions(JMESPathFunctions): """JMESPath class to include custom functions.""" # Functions that expands the functionality of the standard JMESPath # functions:
[docs] @jmespath_func_signature( {"types": ["string"]}, {"types": ["string", "array-string"], "variadic": True}, ) def _func_starts_with( self, search: str, suffix: str | tuple[str], *args: Any, ) -> bool: if isinstance(suffix, list): suffix = tuple(suffix) return search.startswith(suffix, *args)
[docs] @jmespath_func_signature( {"types": ["string"]}, {"types": ["string", "array-string"], "variadic": True}, ) def _func_ends_with( self, search: str, suffix: str | tuple[str], *args: Any, ) -> bool: if isinstance(suffix, list): suffix = tuple(suffix) return search.endswith(suffix, *args)
# Functions that expands the standard JMESPath functions:
[docs] @jmespath_func_signature( {"types": ["string"]}, {"types": ["string"], "variadic": True}, ) def _func_regex_match(self, regex: str, value: str, *args: Any) -> bool: return bool(re.match(regex, value, *args))
[docs] @jmespath_func_signature( {"types": ["string"]}, {"types": ["array-string", "object"]}, ) def _func_regex_matchall(self, regex: str, container: str) -> bool: warnings.warn( "The JMESPath function 'regex_matchall' is deprecated and" " will be removed in 1.0.0. Use 'regex_match' as child" " elements of subexpression filtering the output. See" " https://github.com/mondeja/project-config/issues/69 for" " a more detailed explanation.", DeprecationWarning, stacklevel=2, ) return all(bool(re.match(regex, value)) for value in container)
[docs] @jmespath_func_signature( {"types": ["string"]}, {"types": ["string"]}, {"types": ["string"], "variadic": True}, ) def _func_regex_sub( _self, regex: str, repl: str, value: str, *args: Any, ) -> str: return re.sub(regex, repl, value, *args) # noqa: B034
[docs] @jmespath_func_signature({"types": ["string"]}) def _func_regex_escape(self, regex: str) -> str: return re.escape(regex)
[docs] @jmespath_func_signature( {"types": []}, {"types": ["string"]}, {"types": [], "variadic": True}, ) def _func_op(self, a: float, operator: str, b: float, *args: Any) -> Any: operators = [] current_op = None for i, op_or_value in enumerate([operator, b] + (list(args) or [])): if i % 2 == 0: try: func = OPERATORS_FUNCTIONS[op_or_value] # type: ignore except KeyError: raise OriginalJMESPathError( f"Invalid operator '{op_or_value}' passed to op()" f" function at index {i}, expected one of:" f" {', '.join(list(OPERATORS_FUNCTIONS))}", ) from None else: current_op = (func, op_or_value) else: operators.append((current_op, op_or_value)) partial_result = a for (func, operator_), b_ in operators: # type: ignore if ( isinstance(b_, list) and isinstance(partial_result, list) and operator_ in SET_OPERATORS ): # both values are lists and the operator is only valid for sets, # so convert both values to set applying the operator if operator_ in OPERATORS_THAT_RETURN_SET: partial_result = list(func(set(partial_result), set(b_))) else: partial_result = func(set(partial_result), set(b_)) else: partial_result = func(partial_result, b_) return partial_result
[docs] @jmespath_func_signature({"types": ["array-string"]}) def _func_shlex_join(self, cmd_list: list[str]) -> str: return shlex.join(cmd_list)
[docs] @jmespath_func_signature({"types": ["string"]}) def _func_shlex_split(self, cmd_str: str) -> list[str]: return shlex.split(cmd_str)
[docs] @jmespath_func_signature( { "types": ["number"], "variadic": True, }, ) def _func_round(self, *args: Any) -> Any: return round(*args)
[docs] @jmespath_func_signature( { "types": ["number"], "variadic": True, }, ) def _func_range(self, *args: Any) -> list[float] | list[int]: return list(range(*args))
[docs] @jmespath_func_signature( {"types": ["string"]}, {"types": ["number"], "variadic": True}, ) def _func_center(self, value: str, width: int, *args: Any) -> str: return value.center(width, *args)
[docs] @jmespath_func_signature( {"types": ["string", "array"]}, {"types": [], "variadic": True}, ) def _func_count( self, value: list[Any] | str, sub: Any, *args: Any, ) -> int: return value.count(sub, *args)
[docs] @jmespath_func_signature( {"types": [], "variadic": True}, ) def _func_format(self, schema: str, *args: Any) -> str: return schema.format(*args)
[docs] @jmespath_func_signature({"types": ["string"], "variadic": True}) def _func_splitlines(self, value: str, *args: Any) -> list[str]: return value.splitlines(*args)
[docs] @jmespath_func_signature({"types": ["string"]}, {"types": ["number"]}) def _func_zfill(self, value: str, width: int) -> str: return value.zfill(width)
[docs] @jmespath_func_signature({"types": ["string", "array", "object"]}) def _func_enumerate( self, value: list[Any] | str | dict[str, Any], ) -> list[list[Any]]: if isinstance(value, dict): return [list(item) for item in enumerate(_to_items(value))] return [list(item) for item in enumerate(value)]
[docs] @jmespath_func_signature({"types": ["object"]}) def _func_to_items( self, value: dict[str, Any], ) -> list[list[Any]]: return _to_items(value)
[docs] @jmespath_func_signature({"types": ["array"]}) def _func_from_items(self, value: list[Any]) -> dict[str, Any]: return {str(key): subv for key, subv in value}
[docs] @jmespath_func_signature() def _func_rootdir_name(self) -> str: return os.path.basename(os.environ["PROJECT_CONFIG_ROOTDIR"])
[docs] @jmespath_func_signature( {"types": [], "variadic": True}, ) def _func_deepmerge( self, base: Any, nxt: Any, *args: Any, ) -> Any: # TODO: if base and nxt are strings use merge with other # strategies such as prepend or append text. if len(args) > 0: strategies: str | list[dict[str, list[str]] | list[str]] = args[0] else: strategies = "conservative_merger" if isinstance(strategies, str): try: merger = BUILTIN_DEEPMERGE_STRATEGIES[strategies] except KeyError: raise OriginalJMESPathError( f"Invalid strategy '{strategies}' passed to deepmerge()" " function, expected one of:" f" {', '.join(list(BUILTIN_DEEPMERGE_STRATEGIES))}", ) from None else: type_strategies = [] for key, value in strategies[0]: # type: ignore key_type = {"array": "list", "object": "dict"}.get( key, # type: ignore key, # type: ignore ) if key_type not in BUILTIN_TYPES: raise OriginalJMESPathError( f"Invalid type passed to deepmerge() function in" " strategies array, expected one of:" f" {', '.join(BUILTIN_TYPES)}", ) from None type_strategies.append( (getattr(builtins, key_type), value), # type: ignore ) # TODO: cache merge objects by strategies used merger = deepmerge.Merger( type_strategies, *strategies[1:], ) merger.merge(base, nxt) return base
[docs] @jmespath_func_signature({"types": ["object"]}, {"types": ["object"]}) def _func_update( self, base: dict[str, Any], nxt: dict[str, Any], ) -> dict[str, Any]: base.update(nxt) return base
[docs] @jmespath_func_signature( {"types": ["array"]}, {"types": ["number"]}, {"types": []}, ) def _func_insert( self, base: list[Any], index: int, item: Any, ) -> list[Any]: base.insert(index, item) return base
[docs] @jmespath_func_signature( {"types": ["object"]}, {"types": ["string"]}, {"types": []}, ) def _func_set( self, base: dict[str, Any], key: str, value: Any, ) -> dict[str, Any]: base[key] = value return base
[docs] @jmespath_func_signature( {"types": ["object"]}, {"types": ["string"]}, ) def _func_unset( self, base: dict[str, Any], key: str, ) -> dict[str, Any]: if key in base: del base[key] return base
[docs] @jmespath_func_signature( {"types": ["string"]}, {"types": ["string"]}, {"types": ["string"], "variadic": True}, ) def _func_replace( self, base: str, old: str, new: str, *args: Any, # count ) -> str: return base.replace(old, new, *args)
[docs] @jmespath_func_signature() def _func_os(self) -> str: return sys.platform
[docs] @jmespath_func_signature({"types": ["string"]}) def _func_getenv(self, envvar: str) -> str | None: return os.environ.get(envvar)
[docs] @jmespath_func_signature( {"types": ["string"]}, {"types": ["string", "null"]}, ) def _func_setenv( self, envvar: str, value: str | None, ) -> dict[str, str]: if value is None: del os.environ[envvar] else: os.environ[envvar] = value return dict(os.environ)
# File system functions
[docs] @jmespath_func_signature({"types": ["string"]}) def _func_isfile(self, path: str) -> bool: return os.path.isfile(path)
[docs] @jmespath_func_signature({"types": ["string"]}) def _func_isdir(self, path: str) -> bool: return os.path.isdir(path)
[docs] @jmespath_func_signature({"types": ["string"]}) def _func_exists(self, path: str) -> bool: try: os.stat(path) except FileNotFoundError: return False return True
[docs] @jmespath_func_signature({"types": ["string"]}) def _func_mkdir(self, path: str) -> bool: try: os.stat(path) except FileNotFoundError: os.mkdir(path) return True return False
[docs] @jmespath_func_signature({"types": ["string"]}) def _func_rmdir(self, path: str) -> bool: try: os.stat(path) except FileNotFoundError: return False shutil.rmtree(path) return True
[docs] @jmespath_func_signature({"types": ["string"]}) def _func_listdir(self, path: str) -> list[str] | None: try: return os.listdir(path) except FileNotFoundError: return None
[docs] @jmespath_func_signature( {"types": ["string"], "variadic": True}, ) def _func_glob( self, pattern: str, *args: Any, # recursive ) -> list[str]: return glob.glob(pattern, recursive=args[0] if args else False)
# Github functions
[docs] @jmespath_func_signature( {"types": ["string"]}, {"types": ["string"], "variadic": True}, ) def _func_gh_tags( self, repo_owner: str, repo_name: str, *args: Any, ) -> list[str]: from project_config.fetchers.github import get_latest_release_tags kwargs = {} if len(args): kwargs["only_semver"] = args[0] return get_latest_release_tags(repo_owner, repo_name, **kwargs)
# built-in Python's functions locals().update( dict( { f"_func_{func_name}": ( _create_simple_transform_function_for_string(func_name) ) for func_name in ( "capitalize", "casefold", "lower", "swapcase", "title", "upper", ) }, **{ f"_func_{func_prefix}find": ( _create_find_function_for_string_or_array( func_prefix, ) ) for func_prefix in ("", "r") }, **{ f"_func_is{func_suffix}": _create_is_function_for_string( func_suffix, ) for func_suffix in ( "alnum", "alpha", "ascii", "decimal", "digit", "identifier", "lower", "numeric", "printable", "space", "title", "upper", ) }, **{ f"_func_{func_prefix}just": _create_just_function_for_string( func_prefix, ) for func_prefix in ("l", "r") }, **{ f"_func_{func_prefix}split": _create_split_function_for_string( func_prefix, ) for func_prefix in ("", "r") }, **{ f"_func_{func_prefix}strip": _create_strip_function_for_string( func_prefix, ) for func_prefix in ("", "l", "r") }, **{ f"_func_{func_prefix}partition": ( _create_partition_function_for_string(func_prefix) ) for func_prefix in ("", "r") }, **{ f"_func_remove{func_suffix}": ( _create_removeaffix_function_for_string(func_suffix) ) for func_suffix in ("suffix", "prefix") }, ), )
jmespath_project_config_options = JMESPathProjectConfigFunctions() jmespath_options = JMESPathOptions( custom_functions=jmespath_project_config_options, )
[docs]def compile_JMESPath_expression(expression: str) -> JMESPathParsedResult: """Compile a JMESPath expression. Args: expression (str): JMESPath expression to compile. Returns: :py:class:`jmespath.parser.ParsedResult`: JMESPath expression compiled. """ compiled_expression: JMESPathParsedResult = Cache.get(f"jm://{expression}") if compiled_expression is None: compiled_expression = jmespath_compile(expression) Cache.set(f"jm://{expression}", compiled_expression) return compiled_expression
[docs]def compile_JMESPath_expression_or_error( expression: str, ) -> JMESPathParsedResult: """Compile a JMESPath expression or raise a ``JMESPathError``. Args: expression (str): JMESPath expression to compile. Returns: :py:class:`jmespath.parser.ParsedResult`: JMESPath expression compiled. Raises: ``JMESPathError``: If the expression cannot be compiled. """ try: return compile_JMESPath_expression(expression) except OriginalJMESPathError as exc: error_type = JMESPATH_READABLE_ERRORS.get( exc.__class__.__name__, "error", ) raise JMESPathError( f"Invalid JMESPath expression {pprint.pformat(expression)}." f" Raised JMESPath {error_type}: {str(exc)}", ) from None
[docs]def compile_JMESPath_or_expected_value_error( expression: str, expected_value: Any, ) -> JMESPathParsedResult: """Compile a JMESPath expression or raise a ``JMESPathError``. You can pass a expected value that was being expected in the error message. Args: expression (str): JMESPath expression to compile. expected_value (Any): Value that was expected to match against expression. Returns: :py:class:`jmespath.parser.ParsedResult`: JMESPath expression compiled. Raises: ``JMESPathError``: If the expression cannot be compiled. """ # noqa: E501 try: return compile_JMESPath_expression(expression) except OriginalJMESPathError as exc: error_type = JMESPATH_READABLE_ERRORS.get( exc.__class__.__name__, "error", ) raise JMESPathError( f"Invalid JMESPath expression {pprint.pformat(expression)}." f" Expected to return {pprint.pformat(expected_value)}, raised" f" JMESPath {error_type}: {str(exc)}", ) from None
[docs]def compile_JMESPath_or_expected_value_from_other_file_error( expression: str, expected_value_file: str, expected_value_expression: str, ) -> JMESPathParsedResult: """Compile a JMESPath expression or raise a ``JMESPathError``. Show that the expression was being expected to match the value applying the expression to another file than the actual. Args: expression (str): JMESPath expression to compile. expected_value_file (str): File to the query is applied to. expected_value_expression (str): Expected result value not satisfied by the expression. Returns: :py:class:`jmespath.parser.ParsedResult`: JMESPath expression compiled. Raises: ``JMESPathError``: If the expression cannot be compiled. """ try: return compile_JMESPath_expression(expression) except OriginalJMESPathError as exc: error_type = JMESPATH_READABLE_ERRORS.get( exc.__class__.__name__, "error", ) raise JMESPathError( f"Invalid JMESPath expression {pprint.pformat(expression)}." f" Expected to return from applying the expresion" f" {pprint.pformat(expected_value_expression)} to the file" f" {pprint.pformat(expected_value_file)}, raised" f" JMESPath {error_type}: {str(exc)}", ) from None
[docs]def evaluate_JMESPath( compiled_expression: JMESPathParsedResult, instance: Any, ) -> Any: """Evaluate a JMESPath expression against a instance. Args: compiled_expression (:py:class:`jmespath.parser.ParsedResult`): JMESPath expression to evaluate. instance (any): Instance to evaluate the expression against. Returns: any: Result of the evaluation. Raises: ``JMESPathError``: If the expression cannot be evaluated. """ # This caching is a bit tricky, because some things as if a # directory exists can not be cached, can change. # # Also, Python modules can't be pickled, so we can't cache them. # # TODO: This needs to be properly tested, currently a cache # inconsistency is affecting the example 008. is_cacheable_expression = True for uncacheable_variable in UNCACHEABLE_JMESPATH_VARIABLES: if uncacheable_variable in compiled_expression.expression: is_cacheable_expression = False break if is_cacheable_expression is False: return compiled_expression.search( instance, options=jmespath_options, ) try: pickled_instance = pickle.dumps(instance) except TypeError: return compiled_expression.search( instance, options=jmespath_options, ) result = Cache.get( f"jm://E?{compiled_expression.expression}:{hash(pickled_instance)}", ) if result is None: try: result = compiled_expression.search( instance, options=jmespath_options, ) except OriginalJMESPathError as exc: formatted_expression = pprint.pformat( compiled_expression.expression, ) error_type = JMESPATH_READABLE_ERRORS.get( exc.__class__.__name__, "error", ) raise JMESPathError( f"Invalid JMESPath {formatted_expression}." f" Raised JMESPath {error_type}: {str(exc)}", ) from None Cache.set( f"jm://E?{compiled_expression.expression}:{str(instance)}", result, ) return result
[docs]def evaluate_JMESPath_or_expected_value_error( compiled_expression: JMESPathParsedResult, expected_value: Any, instance: Any, ) -> Any: """Evaluate a JMESPath expression against a instance or raise a ``JMESPathError``. You can pass a expected value that was being expected in the error message. Args: compiled_expression (:py:class:`jmespath.parser.ParsedResult`): JMESPath expression to evaluate. expected_value (any): Value that was expected to match against expression. instance (any): Instance to evaluate the expression against. Returns: any: Result of the evaluation. Raises: ``JMESPathError``: If the expression cannot be evaluated. """ # noqa: E501 try: return evaluate_JMESPath(compiled_expression, instance) except OriginalJMESPathError as exc: formatted_expression = pprint.pformat(compiled_expression.expression) error_type = JMESPATH_READABLE_ERRORS.get( exc.__class__.__name__, "error", ) raise JMESPathError( f"Invalid JMESPath {formatted_expression}." f" Expected to return {pprint.pformat(expected_value)}, raised" f" JMESPath {error_type}: {str(exc)}", ) from None
[docs]def fix_tree_serialized_file_by_jmespath( compiled_expression: JMESPathParsedResult, instance: Any, fpath: str, ) -> bool: """Fix a file by aplying a JMESPath expression to an instance. This function is used to fix a file by applying a JMESPath expression. The result of the expression will be the serialized version of the updated instance. Args: compiled_expression (:py:class:`jmespath.parser.ParsedResult`): JMESPath expression to evaluate. instance (any): Instance to evaluate the expression against. fpath (str): Path to the file to fix. Returns: bool: True if the file was fixed, False otherwise. """ new_content = evaluate_JMESPath( compiled_expression, instance, ) return tree.edit_local_file(fpath, new_content)
REVERSE_JMESPATH_TYPE_PYOBJECT: dict[ str | None, dict[Any, Any] | list[Any] | str | int | None, ] = { "string": "", "number": 0, "object": {}, "array": [], "null": None, None: None, }
[docs]def _build_reverse_jmes_type_object(jmespath_type: str) -> Any: return REVERSE_JMESPATH_TYPE_PYOBJECT[jmespath_type]
[docs]def smart_fixer_by_expected_value( # noqa: PLR0911, PLR0912, PLR0915 compiled_expression: JMESPathParsedResult, expected_value: Any, ) -> str: """Smart JMESPath fixer queries creator. Build a smart JMESPath query fixer by altering a expression to match a expected value given the syntax of an expression. Args: compiled_expression (:py:class:`jmespath.parser.ParsedResult`): JMESPath expression to evaluate. expected_value (any): Value that was expected to match against expression. Returns: str: JMESPath query fixer. """ fixer_expression = "" parser = Parser() ast = parser.parse(compiled_expression.expression).parsed merge_strategy = "conservative_merger" if ( ast["type"] == "index_expression" and ast["children"][0]["type"] == "identity" and ast["children"][1]["type"] == "index" ): return ( f'insert(@, `{ast["children"][1]["value"]}`,' f" `{json.dumps(expected_value)}`)" ) if ast["type"] == "field": key = ast["value"] return f"set(@, '{key}' `{json.dumps(expected_value)}`)" if ast["type"] == "subexpression": temporal_object = {} _obj = {} for i, child in enumerate(reversed(ast["children"])): # TODO: manage indexes in subexpressions if child["type"] == "index_expression": return "" if i == 0: _obj = {child["value"]: expected_value} else: _obj = {child["value"]: _obj} temporal_object = _obj elif ast["type"] == "function_expression" and ast["value"] == "type": if expected_value not in REVERSE_JMESPATH_TYPE_PYOBJECT: return "" temporal_object = {} if len(ast["children"]) == 1 and ast["children"][0]["type"] == "field": temporal_object = { ast["children"][0]["value"]: _build_reverse_jmes_type_object( expected_value, ), } elif ( len(ast["children"]) == 1 and ast["children"][0]["type"] == "current" ): temporal_object = _build_reverse_jmes_type_object(expected_value) return f"`{json.dumps(temporal_object, indent=None)}`" else: deep: list[Any] = [] def _iterate_expressions( expressions: list[Any], temporal_object: Any, merge_strategy: Any, deep: list[Any], ) -> tuple[list[Any], Any, Any]: for iexp, fexp in enumerate(reversed(expressions)): _last_field_type_iexp = ( len([e["type"] == "field" for e in expressions[iexp:]]) > 0 ) if fexp["type"] == "field": fexp_value = fexp["value"] elif fexp["type"] == "index_expression": ( tmp_deep, temporal_object, merge_strategy, ) = _iterate_expressions( fexp["children"], temporal_object, merge_strategy, deep, ) deep.extend(tmp_deep) continue elif fexp["type"] == "index": fexp_value = fexp["value"] deep.append(fexp_value) _obj: Any = {} for di, d in enumerate(deep): if di == 0 and _last_field_type_iexp: _obj = _build_reverse_jmes_type_object( expected_value, ) if isinstance(d, str): _obj = {d: _obj} else: # index merge_strategy = [ [ ( "list", "prepend" if d == 0 else "append", ), ("dict", "merge"), ("set", "union"), ], ["override"], ["override"], ] _obj = [_obj] temporal_object = _obj return (deep, temporal_object, merge_strategy) for child in ast["children"]: if child["type"] == "subexpression": expressions = list(child.get("children", [])) (_, temporal_object, merge_strategy) = _iterate_expressions( expressions, temporal_object, merge_strategy, deep, ) elif ( ast["type"] == "function_expression" and ast["value"] == "contains" and len(ast["children"]) == 2 # noqa: PLR2004 and ast["children"][0]["type"] == "function_expression" and ast["children"][0]["value"] == "keys" and ast["children"][0].get("children") and ast["children"][0]["children"][0]["type"] == "current" and ast["children"][1]["type"] == "literal" ): if expected_value is False: # contains(keys(@), 'key') -> false key = ast["children"][1]["value"] fixer_expression = f"unset(@, '{key}')" return fixer_expression else: # pragma: no cover return fixer_expression # default deepmerge fixing if isinstance(merge_strategy, str): merge_strategy_formatted = f"'{merge_strategy}'" else: merge_strategy_formatted = ( f"`{json.dumps(merge_strategy, indent=None)}`" ) fixer_expression += ( f"deepmerge(@," f" `{json.dumps(temporal_object, indent=None)}`," f" {merge_strategy_formatted})" ) return fixer_expression
[docs]def is_literal_jmespath_expression(expression: str) -> bool: """Check if a JMESPath expression is a literal expression.""" parser = Parser() try: ast = parser.parse(expression).parsed except JMESPathParserError: return False return ast["type"] == "literal"