Source code for project_config.project

"""High level logic for checking a project."""

from __future__ import annotations

import argparse
import os
import shutil
import sys
import typing as t
from dataclasses import dataclass

from project_config.config import Config
from project_config.constants import Error, InterruptingError, ResultValue
from project_config.plugins import InvalidPluginFunction, Plugins
from project_config.reporters import get_reporter
from project_config.serializers import (
    build_empty_file_for_serializer,
    guess_preferred_serializer,
)
from project_config.tree import Tree
from project_config.types import ActionsContext, Rule


[docs]class InterruptCheck(Exception): """An action has reported an invalid context for a rule. This exceptions prevents to continue executing subsecuents rules. """
[docs]class ConditionalsFalseResult(InterruptCheck): """A conditional must skip a rule."""
[docs]@dataclass class Project: """Wrapper for a single project. This class encapsulates all the high level logic to execute all CLI commands against a project. Its public method expose the commands that can be executed through the CLI. Args: config_path (str): Custom configuration file path. rootdir (str): Root directory of the project. reporter_ (dict): Reporter to use. color (bool): Colorized output in reporters. fix (bool): Fix errors. """ config_path: str rootdir: str reporter_: t.Dict[str, t.Any] color: bool _actions_context: t.Optional[ActionsContext] = None fix: bool = False only_hints: bool = False
[docs] def _load( self, fetch_styles: bool = True, init_tree: bool = True, ) -> None: self.config = Config( self.rootdir, self.config_path, ) ( self.color, self.reporter_, self.rootdir, config_only_hints, ) = self.config.guess_from_cli_arguments( self.color, self.reporter_, self.rootdir, ) # TODO: Configure fix from config file if fetch_styles: self.config.load_style() if init_tree: self.tree = Tree(self.rootdir) self.reporter = get_reporter( self.reporter_["name"], self.reporter_["kwargs"], self.color, self.rootdir, only_hints=self.only_hints or config_only_hints, ) # set rootdir as an internal environment variable to be used by plugins os.environ["PROJECT_CONFIG_ROOTDIR"] = self.rootdir self._actions_context = ActionsContext(fix=self.fix)
[docs] def _check_files_existence( self, files: t.List[t.Tuple[str, t.Any]], rule_index: int, ) -> None: for f, (fpath, fcontent) in enumerate(files): ftype = "directory" if fpath.endswith(("/", os.sep)) else "file" if fcontent is None: # file or directory does not exist if self.fix: if ftype == "directory": os.makedirs(fpath, exist_ok=True) self.tree.cache_files([fpath]) else: _, serializer_name = guess_preferred_serializer(fpath) new_content = ( "" if not serializer_name else build_empty_file_for_serializer( serializer_name, ) ) with open(fpath, "w", encoding="utf-8") as fd: fd.write(new_content) self.tree.cache_files([fpath]) self.reporter.report_error( { "message": f"Expected existing {ftype} does not exists", "file": fpath, "definition": f"rules[{rule_index}].files[{f}]", "fixed": self.fix, "fixable": True, }, )
[docs] def _check_files_absence( self, files: t.Union[t.List[str], t.Dict[str, str]], rule_index: int, ) -> None: fixed_files = [] if isinstance(files, dict): for fpath, reason in files.items(): normalized_fpath = os.path.join(self.rootdir, fpath) ftype = "directory" if fpath.endswith(("/", os.sep)) else "file" exists = ( os.path.isdir(normalized_fpath) if ftype == "directory" else os.path.isfile(normalized_fpath) ) if exists: if self.fix: if ftype == "directory": shutil.rmtree(normalized_fpath) else: os.remove(normalized_fpath) fixed_files.append(fpath) message = f"Expected absent {ftype} exists" if reason: message += f". {reason}" self.reporter.report_error( { "message": message, "file": f"{fpath}/" if ftype == "directory" else fpath, "definition": ( f"rules[{rule_index}].files.not[{fpath}]" ), "fixed": self.fix, "fixable": True, }, ) else: for f, fpath in enumerate(files): normalized_fpath = os.path.join(self.rootdir, fpath) ftype = "directory" if fpath.endswith(("/", os.sep)) else "file" exists = ( os.path.isdir(normalized_fpath) if ftype == "directory" else os.path.isfile(normalized_fpath) ) if exists: if self.fix: if ftype == "directory": shutil.rmtree(normalized_fpath) else: os.remove(normalized_fpath) fixed_files.append(fpath) self.reporter.report_error( { "message": f"Expected absent {ftype} exists", "file": fpath, "definition": f"rules[{rule_index}].files.not[{f}]", "fixed": self.fix, "fixable": True, }, ) if fixed_files: self.tree.cache_files(fixed_files)
[docs] def _process_conditionals_for_rule( self, conditionals: t.List[str], tree: Tree, rule: Rule, rule_index: int, ) -> None: conditional_failed = False for conditional in conditionals: try: action_function = ( self.config.style.plugins.get_function_for_action( conditional, ) ) except InvalidPluginFunction as exc: self.reporter.report_error( { "message": exc.message, "definition": f"rules[{rule_index}].{conditional}", }, ) raise InterruptCheck() for breakage_type, breakage_value in action_function( # typed dict with dinamic key, this type must be ignored # until some literal quirk comes, see: # https://stackoverflow.com/a/59583427/9167585 rule[conditional], # type: ignore tree, rule, self._actions_context, ): if breakage_type in (InterruptingError, Error): breakage_value["definition"] = ( f"rules[{rule_index}]" + breakage_value["definition"] ) self.reporter.report_error(breakage_value) conditional_failed = True elif breakage_type == ResultValue: if breakage_value is False: raise ConditionalsFalseResult() else: # pragma: no cover break else: raise NotImplementedError( f"Breakage type '{breakage_type}' is not implemented" " for conditionals checking", ) if conditional_failed: raise InterruptCheck()
[docs] def _run_check(self) -> None: for r, rule in enumerate(self.config["style"]["rules"]): files = rule.pop("files") if isinstance(files, list): self.tree.cache_files(files) # check if files exists self._check_files_existence(self.tree.files, r) else: # requiring absent of files self._check_files_absence(files["not"], r) continue # any other verb can be used in the rule hint = rule.pop("hint", None) verbs, conditionals = ([], []) for action in rule: if action.startswith("if"): conditionals.append(action) else: verbs.append(action) # handle conditionals try: self._process_conditionals_for_rule( conditionals, self.tree, rule, r, ) except ConditionalsFalseResult: # conditionals skipping the rule, next... continue # handle verbs for verb in verbs: try: action_function = ( self.config.style.plugins.get_function_for_action( verb, ) ) except InvalidPluginFunction as exc: self.reporter.report_error( { "message": exc.message, "definition": f"rules[{r}].{verb}", }, ) raise InterruptCheck() # TODO: show 'INTERRUPTED' in report for breakage_type, breakage_value in action_function( rule[verb], self.tree, rule, self._actions_context, ): if breakage_type == Error: # prepend rule index to definition, so plugins do not # need to specify them breakage_value["definition"] = ( f"rules[{r}]" + breakage_value["definition"] ) if not self.fix: breakage_value["fixed"] = False # show hint if defined in the rule if hint: breakage_value["hint"] = hint self.reporter.report_error(breakage_value) elif breakage_type == InterruptingError: breakage_value["definition"] = ( f"rules[{r}]" + breakage_value["definition"] ) self.reporter.report_error(breakage_value) raise InterruptCheck() # TODO: show 'INTERRUPTED' in report else: raise NotImplementedError( f"Breakage type '{breakage_type}' is not" " implemented for verbal checking", )
[docs] def check(self, args: argparse.Namespace) -> None: """Checks that the styles configured for a project match. Raises an error if report errors. """ self._load() try: self._run_check() except InterruptCheck: pass finally: self.reporter.raise_errors()
[docs] def show(self, args: argparse.Namespace) -> None: """Show configuration or fetched style for a project. It will depend in the ``subargs.data`` property. """ if args.data == "cache": from project_config.cache import Cache report = Cache.get_directory() else: if args.data == "config": self._load(fetch_styles=False, init_tree=False) data = self.config.dict_ data.pop("cache") data["cache"] = data.pop("_cache") elif args.data == "plugins": self._load(fetch_styles=False, init_tree=False) data = Plugins( # type: ignore prepare_all=True, ).plugin_action_names else: # style self._load(init_tree=False) data = self.config.dict_.pop("style") # type: ignore report = self.reporter.generate_data_report(args.data, data) sys.stdout.write(f"{report}\n")
[docs] def clean(self, args: argparse.Namespace) -> None: """Cleaning command.""" from project_config.cache import Cache if Cache.clean(): sys.stdout.write("Cache removed successfully!\n")
[docs] def init(self, args: argparse.Namespace) -> None: """Initialize the configuration for a project.""" from project_config.config import initialize_config cwd = os.getcwd() rootdir = ( cwd if getattr(args, "rootdir", None) is None else args.rootdir ) config_path = initialize_config( os.path.join( rootdir, getattr(args, "config", None) or ".project-config.toml", ), ) sys.stdout.write( "Configuration initialized at" f" {os.path.relpath(config_path, cwd)}\n", )