Source code for project_config.plugins.jmespath

"""JMESPath expressions plugin."""

from __future__ import annotations

import copy
import json
import os
import pprint
import stat
from typing import TYPE_CHECKING, Any

from project_config import (
    ActionsContext,
    Error,
    InterruptingError,
    ResultValue,
    tree,
)


if TYPE_CHECKING:
    from project_config import Results, Rule

from project_config.fetchers import FetchError
from project_config.serializers import SerializerError
from project_config.utils.jmespath import (
    JMESPathError,
    compile_JMESPath_expression_or_error,
    compile_JMESPath_or_expected_value_error,
    compile_JMESPath_or_expected_value_from_other_file_error,
    evaluate_JMESPath,
    evaluate_JMESPath_or_expected_value_error,
    fix_tree_serialized_file_by_jmespath,
    is_literal_jmespath_expression,
    smart_fixer_by_expected_value,
)


[docs]class JMESPathPlugin:
[docs] @staticmethod def JMESPathsMatch( value: list[list[Any]], _rule: Rule, context: ActionsContext, ) -> Results: if not isinstance(value, list): yield InterruptingError, { "message": "The JMES path match tuples must be of type array", "definition": ".JMESPathsMatch", } if not value: yield InterruptingError, { "message": "The JMES path match tuples must not be empty", "definition": ".JMESPathsMatch", } for i, jmespath_match_tuple in enumerate(value): if not isinstance(jmespath_match_tuple, list): yield InterruptingError, { "message": ( "The JMES path match tuple must be of type array" ), "definition": f".JMESPathsMatch[{i}]", } if len(jmespath_match_tuple) not in (2, 3): yield InterruptingError, { "message": ( "The JMES path match tuple must be of length 2 or 3" ), "definition": f".JMESPathsMatch[{i}]", } if not isinstance(jmespath_match_tuple[0], str): yield InterruptingError, { "message": ( "The JMES path expression must be of type string" ), "definition": f".JMESPathsMatch[{i}][0]", } if len(jmespath_match_tuple) == 2: # noqa: PLR2004 jmespath_match_tuple.append(None) elif not isinstance(jmespath_match_tuple[2], str): yield InterruptingError, { "message": ( "The JMES path fixer query must be of type string" ), "definition": f".JMESPathsMatch[{i}][2]", } files = copy.copy(context.files) for f, fpath in enumerate(files): try: fstat = os.stat(fpath) except FileNotFoundError: continue if stat.S_ISDIR(fstat.st_mode): yield InterruptingError, { "message": ( "A JMES path can not be applied to a directory" ), "definition": f".files[{f}]", "file": f'{fpath.rstrip("/")}/', } instance = tree.cached_local_file(fpath) for e, (expression, expected_value, fixer_query) in enumerate( value, ): try: compiled_expression = ( compile_JMESPath_or_expected_value_error( expression, expected_value, ) ) except JMESPathError as exc: yield InterruptingError, { "message": exc.message, "definition": f".JMESPathsMatch[{e}][0]", "file": fpath, } try: expression_result = ( evaluate_JMESPath_or_expected_value_error( compiled_expression, expected_value, instance, ) ) except JMESPathError as exc: yield Error, { "message": exc.message, "definition": f".JMESPathsMatch[{e}]", "file": fpath, } continue if expression_result != expected_value: if not fixer_query: fixer_query = ( # noqa: PLW2901 smart_fixer_by_expected_value( compiled_expression, expected_value, ) ) if context.fix and fixer_query: try: compiled_fixer_query = ( compile_JMESPath_expression_or_error( fixer_query, ) ) except JMESPathError as exc: yield InterruptingError, { "message": exc.message, "definition": f".JMESPathsMatch[{e}][2]", } try: changed = fix_tree_serialized_file_by_jmespath( compiled_fixer_query, instance, fpath, ) except JMESPathError as exc: yield InterruptingError, { "message": exc.message, "definition": f".JMESPathsMatch[{e}][2]", } else: fixed = True if not changed: # pragma: no cover continue else: fixed = False yield Error, { "message": ( f"JMESPath '{expression}' does not match." f" Expected {pprint.pformat(expected_value)}," f" returned {pprint.pformat(expression_result)}" ), "definition": f".JMESPathsMatch[{e}]", "file": fpath, "fixed": fixed, "fixable": bool(fixer_query), }
[docs] @staticmethod def ifJMESPathsMatch( value: dict[str, list[list[str]]], _rule: Rule, _context: ActionsContext, ) -> Results: if not isinstance(value, dict): yield InterruptingError, { "message": ( "The files - JMES path match tuples must be of type object" ), "definition": ".ifJMESPathsMatch", } elif not value: yield InterruptingError, { "message": ( "The files - JMES path match tuples must not be empty" ), "definition": ".ifJMESPathsMatch", } for fpath, jmespath_match_tuples in value.items(): if not isinstance(jmespath_match_tuples, list): yield InterruptingError, { "message": ( "The JMES path match tuples must be of type array" ), "definition": f".ifJMESPathsMatch[{fpath}]", } if not jmespath_match_tuples: yield InterruptingError, { "message": ("The JMES path match tuples must not be empty"), "definition": f".ifJMESPathsMatch[{fpath}]", } for i, jmespath_match_tuple in enumerate(jmespath_match_tuples): if not isinstance(jmespath_match_tuple, list): yield InterruptingError, { "message": ( "The JMES path match tuple must be of type array" ), "definition": f".ifJMESPathsMatch[{fpath}][{i}]", } if len(jmespath_match_tuple) != 2: # noqa: PLR2004 yield InterruptingError, { "message": ( "The JMES path match tuple must be of length 2" ), "definition": f".ifJMESPathsMatch[{fpath}][{i}]", } if not isinstance(jmespath_match_tuple[0], str): yield InterruptingError, { "message": "The JMES path must be of type string", "definition": f".ifJMESPathsMatch[{fpath}][{i}][0]", } for fpath, jmespath_match_tuples in value.items(): try: fstat = os.stat(fpath) except FileNotFoundError: yield InterruptingError, { "message": ( "The file to check if matches against JMES paths does" " not exist" ), "definition": f".ifJMESPathsMatch[{fpath}]", "file": fpath, } if stat.S_ISDIR(fstat.st_mode): yield InterruptingError, { "message": "A JMES path can not be applied to a directory", "definition": f".ifJMESPathsMatch[{fpath}]", "file": f'{fpath.rstrip("/")}/', } try: instance = tree.cached_local_file(fpath) except SerializerError as exc: yield InterruptingError, { "message": exc.message, "definition": f".ifJMESPathsMatch[{fpath}]", "file": fpath, } for e, (expression, expected_value) in enumerate( jmespath_match_tuples, ): try: compiled_expression = ( compile_JMESPath_or_expected_value_error( expression, expected_value, ) ) except JMESPathError as exc: yield InterruptingError, { "message": exc.message, "definition": f".ifJMESPathsMatch[{fpath}][{e}][0]", "file": fpath, } try: expression_result = ( evaluate_JMESPath_or_expected_value_error( compiled_expression, expected_value, instance, ) ) except JMESPathError as exc: yield Error, { "message": exc.message, "definition": f".ifJMESPathsMatch[{fpath}][{e}]", "file": fpath, } continue if expression_result != expected_value: yield ResultValue, False return yield ResultValue, True
[docs] @staticmethod def crossJMESPathsMatch( value: list[list[Any]], _rule: Rule, context: ActionsContext, ) -> Results: if not isinstance(value, list): yield InterruptingError, { "message": "The pipes must be of type array", "definition": ".crossJMESPathsMatch", } if not value: yield InterruptingError, { "message": "The pipes must not be empty", "definition": ".crossJMESPathsMatch", } # each pipe is evaluated for each file for f, fpath in enumerate(context.files): try: fstat = os.stat(fpath) except FileNotFoundError: continue if stat.S_ISDIR(fstat.st_mode): yield InterruptingError, { "message": ( "A JMES path can not be applied to a directory" ), "definition": f".files[{f}]", "file": f'{fpath.rstrip("/")}/', } for i, pipe in enumerate(value): if not isinstance(pipe, list): yield InterruptingError, { "message": "The pipe must be of type array", "definition": f".crossJMESPathsMatch[{i}]", } elif len(pipe) < 3: # noqa: PLR2004 yield InterruptingError, { "message": "The pipe must be, at least, of length 3", "definition": f".crossJMESPathsMatch[{i}]", } files_expression = pipe[0] # the first value in the array is the expression for `files` if not isinstance(files_expression, str): yield InterruptingError, { "message": "The file expression must be of type string", "definition": f".crossJMESPathsMatch[{i}][0]", } elif not files_expression: yield InterruptingError, { "message": "The file expression must not be empty", "definition": f".crossJMESPathsMatch[{i}][0]", } else: files_expression = files_expression.strip() final_expression = pipe[-2] if not isinstance(final_expression, str): yield InterruptingError, { "message": ( "The final expression must be of type string" ), "definition": ( f".crossJMESPathsMatch[{i}][{len(pipe) - 2}]" ), } elif not final_expression: yield InterruptingError, { "message": "The final expression must not be empty", "definition": ( f".crossJMESPathsMatch[{i}][{len(pipe) - 2}]" ), } expected_value = pipe[-1] try: final_compiled_expression = ( compile_JMESPath_or_expected_value_error( # noqa: E501 final_expression, expected_value, ) ) except JMESPathError as exc: yield InterruptingError, { "message": exc.message, "definition": ( f".crossJMESPathsMatch[{i}][{len(pipe) - 2}]" ), "file": fpath, } if is_literal_jmespath_expression(files_expression): files_result = json.loads(files_expression[1:-1]) else: files_instance = tree.cached_local_file(fpath) try: files_compiled_expression = ( compile_JMESPath_expression_or_error( # noqa: E501 files_expression, ) ) except JMESPathError as exc: yield InterruptingError, { "message": exc.message, "definition": f".crossJMESPathsMatch[{i}][0]", } try: files_result = evaluate_JMESPath( files_compiled_expression, files_instance, ) except JMESPathError as exc: yield InterruptingError, { "message": exc.message, "definition": f".crossJMESPathsMatch[{i}][0]", "file": fpath, } other_results = [] for other_index, other_data in enumerate(pipe[1:-2]): pipe_index = other_index + 1 if not isinstance(other_data, list): yield InterruptingError, { "message": ( "The file path and expression tuple must be of" " type array" ), "definition": ( f".crossJMESPathsMatch[{i}][{pipe_index}]" ), } elif len(other_data) != 2: # noqa: PLR2004 yield InterruptingError, { "message": ( "The file path and expression tuple must be of" " length 2" ), "definition": ( f".crossJMESPathsMatch[{i}][{pipe_index}]" ), } other_fpath, other_expression = other_data if not isinstance(other_fpath, str): yield InterruptingError, { "message": "The file path must be of type string", "definition": ( f".crossJMESPathsMatch[{i}][{pipe_index}][0]" ), } elif not other_fpath: yield InterruptingError, { "message": "The file path must not be empty", "definition": ( f".crossJMESPathsMatch[{i}][{pipe_index}][0]" ), } if not isinstance(other_expression, str): yield InterruptingError, { "message": "The expression must be of type string", "definition": ( f".crossJMESPathsMatch[{i}][{pipe_index}][1]" ), } elif not other_expression: yield InterruptingError, { "message": "The expression must not be empty", "definition": ( f".crossJMESPathsMatch[{i}][{pipe_index}][1]" ), } try: other_compiled_expression = compile_JMESPath_or_expected_value_from_other_file_error( # noqa: E501 other_expression, other_fpath, other_expression, ) except JMESPathError as exc: yield InterruptingError, { "message": exc.message, "definition": ( f".crossJMESPathsMatch[{i}][{pipe_index}]" ), "file": other_fpath, } try: other_instance = tree.fetch_remote_file(other_fpath) except (SerializerError, FetchError) as exc: yield InterruptingError, { "message": exc.message, "definition": ( f".crossJMESPathsMatch[{i}][{pipe_index}][0]" ), "file": other_fpath, } except FileNotFoundError: yield InterruptingError, { "message": f"'{other_fpath}' file not found", "definition": ( f".crossJMESPathsMatch[{i}][{pipe_index}][0]" ), "file": other_fpath, } try: other_result = evaluate_JMESPath( other_compiled_expression, other_instance, ) except JMESPathError as exc: yield InterruptingError, { "message": exc.message, "definition": ( f".crossJMESPathsMatch[{i}][{pipe_index}]" ), "file": other_fpath, } else: other_results.append(other_result) try: final_result = evaluate_JMESPath( final_compiled_expression, [files_result, *other_results], ) except JMESPathError as exc: yield InterruptingError, { "message": exc.message, "definition": ( f".crossJMESPathsMatch[{i}][{len(pipe) - 2}]" ), "file": fpath, } if final_result != expected_value: yield Error, { "message": ( f"JMESPath '{final_expression}' does not match." f" Expected {pprint.pformat(expected_value)}," f" returned {pprint.pformat(final_result)}" ), "definition": f".crossJMESPathsMatch[{i}]", "file": fpath, }