Source code for project_config.commands.check

"""project-config check command."""

from __future__ import annotations

import argparse
import os
import shutil
from typing import TYPE_CHECKING, Any

from contextlib_chdir import chdir as chdir_ctx

from project_config import tree
from project_config.config import Config, reporter_from_config
from project_config.constants import Error, InterruptingError, ResultValue
from project_config.plugins import InvalidPluginFunction
from project_config.serializers import (
    EMPTY_CONTENT_BY_SERIALIZER,
    guess_preferred_serializer,
)
from project_config.types_ import ActionsContext


if TYPE_CHECKING:
    from project_config.types_ import Rule


[docs]class InterruptCheck(Exception): """An action has reported an invalid context for a rule. This exceptions prevents to continue executing subsecuents rules. """
[docs]class ConditionalsFalseResult(InterruptCheck): """A conditional must skip a rule."""
[docs]class ProjectConfigChecker: """Project configuration checker.""" def __init__( self, config: Config, fix_mode: bool = False, # noqa: FBT001, FBT002 ): """Initialize the checker. Args: config (:py:class:`project_config.config.Config`): Configuration to use. fix_mode (bool): Whether to fix the errors or not. """ self.config = config self.reporter = reporter_from_config(config) self.config.load_style() self.actions_context = ActionsContext(fix=fix_mode, files=[])
[docs] def _check_files_existence( self, files: list[str], rule_index: int, ) -> None: for findex, fpath in enumerate(files): ftype = "directory" if fpath.endswith(("/", os.sep)) else "file" if ftype == "directory": exists = os.path.isdir(fpath) else: exists = os.path.isfile(fpath) if not exists: # file or directory does not exist if self.actions_context.fix: if ftype == "directory": os.makedirs(fpath, exist_ok=True) else: _, serializer_name = guess_preferred_serializer(fpath) new_content = ( "" if not serializer_name else EMPTY_CONTENT_BY_SERIALIZER.get( serializer_name, "", ) ) with open(fpath, "w", encoding="utf-8") as fd: fd.write(new_content) # Cache the file. # # Serialization errors can't be raised here # because the file is empty and created by the # checker itself (see above) tree.cache_file( fpath, forbid_serializers=("py",), ) self.reporter.report_error( { "message": f"Expected existing {ftype} does not exists", "file": fpath, "definition": f"rules[{rule_index}].files[{findex}]", "fixed": self.actions_context.fix, "fixable": True, }, )
[docs] def _check_files_absence( self, files: list[str] | dict[str, str | int], rule_index: int, ) -> None: if isinstance(files, list): # i used for file index in the rule files = {fn: i for i, fn in enumerate(files)} for fpath, reason_or_index in files.items(): normalized_fpath = os.path.join( self.config.dict_["cli"]["rootdir"], fpath, ) isdir = False if fpath.endswith("/"): isdir = True exists = os.path.isdir(normalized_fpath) else: exists = os.path.isfile(normalized_fpath) if exists: if self.actions_context.fix: if isdir: shutil.rmtree(normalized_fpath) else: os.remove(normalized_fpath) # Take into account that this removal don't need # to be cached as the digest of the file has been # discarded message = ( f"Expected absent {'directory' if isdir else 'file'} exists" ) if isinstance(reason_or_index, str): message += f". {reason_or_index}" file_index = ( fpath if isinstance(reason_or_index, str) else reason_or_index ) self.reporter.report_error( { "message": message, "file": fpath, "definition": ( f"rules[{rule_index}].files.not[{file_index}]" ), "fixed": self.actions_context.fix, "fixable": True, }, )
[docs] def _process_conditionals_for_rule( self, conditionals: list[tuple[str, Any]], rule: Rule, rule_index: int, ) -> None: conditional_failed = False for conditional, action_function in conditionals: for breakage_type, breakage_value in action_function( # typed dict with dinamic key, this type must be ignored # until some literal quirk comes, see: # https://stackoverflow.com/a/59583427/9167585 rule[conditional], # type: ignore rule, self.actions_context, ): if breakage_type in (InterruptingError, Error): breakage_value["definition"] = ( f"rules[{rule_index}]" + breakage_value["definition"] ) self.reporter.report_error(breakage_value) conditional_failed = True elif breakage_type == ResultValue: if breakage_value is False: raise ConditionalsFalseResult() break else: raise NotImplementedError( f"Breakage type '{breakage_type}' is not implemented" " for conditionals checking", ) if conditional_failed: raise InterruptCheck()
[docs] def _run_check(self) -> None: # noqa: PLR0912 for r, rule in enumerate(self.config.dict_["style"]["rules"]): hint = rule.pop("hint", None) files = rule.pop("files", []) verbs, conditionals_functions = [], [] for action in rule: if action.startswith("if"): try: action_function = ( self.config.style.plugins.get_function_for_action( action, ) ) except InvalidPluginFunction as exc: self.reporter.report_error( { "message": exc.message, "definition": f"rules[{r}].{action}", }, ) raise InterruptCheck() from exc conditionals_functions.append((action, action_function)) else: verbs.append(action) try: self._process_conditionals_for_rule( conditionals_functions, rule, r, ) except ConditionalsFalseResult: # conditionals skipping the rule, next... continue if isinstance(files, list): for file in files: tree.cache_file( file, forbid_serializers=("py",), ignore_serialization_errors=True, ) # check if files exists self._check_files_existence(files, r) else: # requiring absent of files self._check_files_absence(files["not"], r) continue # no other verb can be used in the rule self.actions_context.files = files # handle verbs for verb in verbs: try: action_function = ( self.config.style.plugins.get_function_for_action( verb, ) ) except InvalidPluginFunction as exc: self.reporter.report_error( { "message": exc.message, "definition": f"rules[{r}].{verb}", }, ) raise InterruptCheck() from exc # TODO: show 'INTERRUPTED' in report? for breakage_type, breakage_value in action_function( rule[verb], # type: ignore rule, self.actions_context, ): if breakage_type == Error: # prepend rule index to definition, so plugins do not # need to specify them # # TODO: Currently the cast to ErrorDict is not available # at runtime without installing typing_extensions, # so we need to ignore the type here. breakage_value["definition"] = f"rules[{r}]" + ( # type: ignore breakage_value["definition"] # type: ignore ) if not self.actions_context.fix: breakage_value["fixed"] = False # type: ignore # show hint if defined in the rule if hint: breakage_value["hint"] = hint # type: ignore self.reporter.report_error(breakage_value) elif breakage_type == InterruptingError: breakage_value["definition"] = f"rules[{r}]" + ( # type: ignore breakage_value["definition"] # type: ignore ) self.reporter.report_error(breakage_value) raise InterruptCheck() # TODO: show 'INTERRUPTED' in report? else: raise NotImplementedError( f"Breakage type '{breakage_type}' is not" " implemented for verbal checking", )
[docs] def run(self) -> None: """Run the checker.""" try: self._run_check() except InterruptCheck: pass finally: self.reporter.raise_errors()
[docs]def check(args: argparse.Namespace) -> None: """Checks that the styles configured for a project match. Raises errors if reported. """ with chdir_ctx(args.rootdir): ProjectConfigChecker( Config(args), fix_mode=args.command == "fix", ).run()