Source code for project_config.config.style

"""Style loader, blender and checker."""

from __future__ import annotations

import contextlib
import os
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any

from project_config import tree
from project_config.cache import Cache
from project_config.config.exceptions import ProjectConfigInvalidConfigSchema
from project_config.fetchers import resolve_maybe_relative_url, resolve_url
from project_config.plugins import Plugins
from project_config.serializers import serialize_for_url


[docs]class ProjectConfigInvalidStyle(ProjectConfigInvalidConfigSchema): """Invalid style error."""
if TYPE_CHECKING: from project_config.compat import NotRequired, TypeAlias, TypedDict from project_config.config import ConfigType from project_config.types_ import Rule class StyleType(TypedDict): """Style type.""" rules: NotRequired[list[Rule]] plugins: NotRequired[list[str]] extends: NotRequired[list[str]] PluginType: TypeAlias = type StyleLoaderIterator: TypeAlias = Iterator[StyleType | str]
[docs]class Style: """Wrapper for style loader, blender and checker.""" def __init__(self, config: Any) -> None: """Style object initializer. Args: config (dict): Configuration for the project. """ self.plugins = Plugins() self.config = config
[docs] @classmethod def from_config(cls, config: Any) -> Style: """Loads styles to the configuration passed as argument.""" if ( # pragma: no cover isinstance(config.dict_["style"], str) and not os.path.isfile(config.dict_["style"]) ) or ( isinstance(config.dict_["style"], list) and not all(os.path.isfile(url) for url in config.dict_["style"]) ): with contextlib.suppress(Exception): # if an exception is raised, will be raised again # in the synchronous style loader _prefetch_urls(config) style = cls(config) style_gen = style._load_styles_from_config() error_messages: list[str] = [] while True: try: style_or_error = next(style_gen) except StopIteration: break else: if isinstance(style_or_error, dict): # final style collected style.config.dict_["style"] = style_or_error else: error_messages.append(style_or_error) if error_messages: raise ProjectConfigInvalidStyle(style.config.path, error_messages) return style
[docs] def _load_styles_from_config(self) -> StyleLoaderIterator: # noqa: PLR0912 """Load styles yielding error messages if found. Error messages are of type string and style is of type dict. If the first yielded value is a dict, we have a style without errors. """ self.config.dict_["_style"] = self.config.dict_["style"] style_urls = self.config.dict_["style"] if isinstance(style_urls, str): try: style = tree.fetch_remote_file(style_urls) except FileNotFoundError: yield f"style -> '{style_urls}' file not found" else: _partial_style_is_valid = True validator = self._validate_style_preparing_new_plugins( style_urls, style, ) while True: try: yield next(validator) except StopIteration: break else: _partial_style_is_valid = False if _partial_style_is_valid: if "extends" in style: # extend the style yield from self._extend_partial_style( style_urls, style, ) yield style elif isinstance(style_urls, list): style = {"rules": [], "plugins": []} for s, partial_style_url in enumerate(style_urls): try: partial_style = tree.fetch_remote_file(partial_style_url) except FileNotFoundError: yield f"style[{s}] -> '{partial_style_url}' file not found" continue # extend style only if it is valid _partial_style_is_valid = True validator = self._validate_style_preparing_new_plugins( partial_style_url, partial_style, ) while True: try: yield next(validator) except StopIteration: break else: _partial_style_is_valid = False if _partial_style_is_valid: if "extends" in partial_style: yield from self._extend_partial_style( partial_style_url, partial_style, ) self._add_new_rules_plugins_to_style( style, partial_style.get("rules", []), partial_style.get("plugins", []), ) yield style
[docs] def _extend_partial_style( self, parent_style_url: str, style: StyleType, ) -> StyleLoaderIterator: for s, extend_url in enumerate(style.pop("extends", [])): try: partial_style = tree.fetch_remote_file(extend_url) except FileNotFoundError: yield ( f"{parent_style_url}: .extends[{s}]" f" -> '{extend_url}' file not found" ) continue _partial_style_is_valid = True validator = self._validate_style_preparing_new_plugins( extend_url, partial_style, ) while True: try: yield next(validator) except StopIteration: break else: # pragma: no cover # NOTE: this is marked as not covered, but putting a # `print` statement here it can be seen that the # `else` branch is reached, so probably it is a bug # in coverage.py _partial_style_is_valid = False if _partial_style_is_valid: if "extends" in partial_style: # extend the style recursively yield from self._extend_partial_style( extend_url, partial_style, ) self._add_new_rules_plugins_to_style( style, partial_style.get("rules", []), partial_style.get("plugins", []), prepend=True, ) yield style
[docs] def _add_new_rules_plugins_to_style( self, style: StyleType, new_rules: list[Rule], new_plugins: list[str], prepend: bool = False, # noqa: FBT001, FBT002 ) -> None: style["plugins"] = style.pop("plugins", []) style["rules"] = style.pop("rules", []) if prepend: style["rules"] = new_rules + style["rules"] style["plugins"] = list(set(new_plugins + style["plugins"])) else: style["rules"].extend(new_rules) style["plugins"] = list(set(style["plugins"] + new_plugins))
[docs] def _validate_style_preparing_new_plugins( # noqa: PLR0912, PLR0915 self, style_url: str, style: Any, ) -> Iterator[str]: # validate extends urls if "extends" in style: if not isinstance(style["extends"], list): yield f"{style_url}: .extends -> must be of type array" elif not style["extends"]: yield f"{style_url}: .extends -> must not be empty" else: for u, url in enumerate(style["extends"]): if not isinstance(url, str): yield ( f"{style_url}: .extends[{u}] -> must be of" " type string" ) elif not url: yield f"{style_url}: .extends[{u}] -> must not be empty" else: # resolve "extends" url given the style url style["extends"][u] = resolve_maybe_relative_url( url, style_url, self.config.dict_["cli"]["rootdir"], ) # validate plugins data consistency if "plugins" in style: if not isinstance(style["plugins"], list): yield f"{style_url}: .plugins -> must be of type array" elif not style["plugins"]: yield f"{style_url}: .plugins -> must not be empty" else: for p, plugin_name in enumerate(style["plugins"]): if not isinstance(plugin_name, str): yield ( f"{style_url}: .plugins[{p}]" " -> must be of type string" ) elif not plugin_name: yield f"{style_url}: .plugins[{p}] -> must not be empty" else: # cache plugins on demand self.plugins.prepare_3rd_party_plugin(plugin_name) # validate rules if "rules" not in style: if "extends" not in style: yield ( f"{style_url}: .rules or .extends" " -> one of both is required" ) elif not isinstance(style["rules"], list): yield f"{style_url}: .rules -> must be of type array" elif not style["rules"]: yield f"{style_url}: .rules -> at least one rule is required" else: for r, rule in enumerate(style["rules"]): if not isinstance(rule, dict): yield f"{style_url}: .rules[{r}] -> must be of type object" continue elif "files" not in rule: yield f"{style_url}: .rules[{r}].files -> is required" elif not isinstance(rule["files"], (list, dict)): yield ( f"{style_url}: .rules[{r}].files -> must be" " of type array or object" ) elif not rule["files"]: yield ( f"{style_url}: .rules[{r}].files -> at least" " one file is required" ) elif isinstance(rule["files"], dict): # requiring absence of files with # `files: {not: {<file>: reason}}` if len(rule["files"]) != 1 or "not" not in rule["files"]: yield ( f"{style_url}: .rules[{r}].files" " -> when files is an object, must" " have one 'not' key" ) elif not isinstance( rule["files"]["not"], (dict, list), ): yield ( f"{style_url}: .rules[{r}].files.not" " -> must be of type array or object" ) elif not rule["files"]["not"]: yield ( f"{style_url}: .rules[{r}].files.not" " -> must not be empty" ) elif isinstance(rule["files"]["not"], dict): # when 'not' is an object, is a mapping # from files to absence reasons for fpath, reason in rule["files"]["not"].items(): if reason and not isinstance(reason, str): yield ( f"{style_url}: .rules[{r}].files" f".not.{fpath} -> must be of type" " string" ) if not isinstance(fpath, str): yield ( f"{style_url}: .rules[{r}].files" f".not[{fpath}] -> file path must" " be of type string" ) elif not fpath: yield ( f"{style_url}: .rules[{r}].files" f".not[''] -> file path must" " not be empty" ) else: for f, fpath in enumerate( rule["files"]["not"], ): if not isinstance(fpath, str): yield ( f"{style_url}: .rules[{r}].files" f".not[{f}] -> must be of type" " string" ) elif not fpath: yield ( f"{style_url}: .rules[{r}].files" f".not[{f}] -> must not be empty" ) # when requiring absence of files, # no other action can be used if len(rule) != 1 and not ( len(rule) == 2 and "hint" in rule # noqa: PLR2004 ): yield ( f"{style_url}: .rules[{r}] -> when" " requiring absence of files with" " '.files.not', no other actions can" " be used in the same rule" ) else: for f, file in enumerate(rule["files"]): if not isinstance(file, str): yield ( f"{style_url}: .rules[{r}].files[{f}]" " -> must be of type string" ) elif not file: yield ( f"{style_url}: .rules[{r}].files[{f}]" " -> must not be empty" ) # Validate rules properties consistency against plugins for action in rule: if action in ["files", "hint"]: continue # the action must be prepared if not action: yield ( f"{style_url}: .rules[{r}].''" " -> action must not be empty" ) elif not self.plugins.is_valid_action(action): yield ( f"{style_url}: .rules[{r}].{action}" " -> invalid action, not found in" " defined plugins:" f" {', '.join(self.plugins.plugin_names)}" )
[docs]def _prefetch_urls(config_dict: ConfigType) -> None: """Prefetch urls concurrently and store them in cache. This function is used to store urls in cache before they are used, so the network calls are speedup a lot. Args: config_dict: The config_dict object. """ from concurrent.futures import as_completed from requests_futures.sessions import FuturesSession session = FuturesSession() style_urls_ = config_dict["style"] if isinstance(style_urls_, str): style_urls_ = [style_urls_] def prefetch_partial_style( parent_style_url: str, extend_urls: list[str], ) -> None: urls = {} for extend_url in extend_urls: resolved_extend_url = resolve_maybe_relative_url( extend_url, parent_style_url, config_dict["cli"]["rootdir"], ) url, _ = resolve_url(resolved_extend_url) if Cache.get(url) is not None: continue urls[url] = resolved_extend_url if not urls: return futures = [session.get(url) for url in urls] for future in as_completed(futures): resp = future.result() Cache.set(resp.url, resp.text) style_obj = serialize_for_url(resp.url, resp.text) if isinstance(style_obj.get("extends"), list): prefetch_partial_style(urls[resp.url], style_obj["extends"]) def prefetch_style(style_urls: StyleType) -> None: urls = {} for style_url in style_urls: url, scheme = resolve_url(style_url) if scheme == "file" or Cache.get(url) is not None: continue urls[url] = style_url if not urls: return futures = [session.get(url) for url in urls] for future in as_completed(futures): resp = future.result() Cache.set(resp.url, resp.text) style_obj = serialize_for_url(resp.url, resp.text) if isinstance(style_obj.get("extends"), list): prefetch_partial_style(urls[resp.url], style_obj["extends"]) prefetch_style(style_urls_)