Source code for project_config.plugins.inclusion
"""Inclusions checker plugin."""
from __future__ import annotations
import os
import pprint
import stat
from typing import TYPE_CHECKING
from project_config import (
ActionsContext,
Error,
InterruptingError,
ResultValue,
tree,
)
from project_config.utils.jmespath import (
JMESPathError,
compile_JMESPath_expression_or_error,
fix_tree_serialized_file_by_jmespath,
)
if TYPE_CHECKING:
from project_config import Results, Rule
from project_config.types_ import ErrorDict
[docs]def _directories_not_accepted_as_inputs_error(
action_type: str,
action_name: str,
dir_path: str,
definition: str,
) -> ErrorDict:
return {
"message": (
f"Directory found but the {action_type} '{action_name}' does not"
" accepts directories as inputs"
),
"file": f"{dir_path.rstrip(os.sep)}/",
"definition": definition,
}
[docs]class InclusionPlugin:
[docs] @staticmethod
def includeLines(
value: list[str],
_rule: Rule,
context: ActionsContext,
) -> Results:
if not isinstance(value, list):
yield InterruptingError, {
"message": "The value must be of type array",
"definition": ".includeLines",
}
elif not value:
yield InterruptingError, {
"message": "The value must not be empty",
"definition": ".includeLines",
}
expected_lines = []
for i, line in enumerate(value):
fixer_query = ""
if isinstance(line, list):
# Fixer query expression
if len(line) != 2: # noqa: PLR2004
yield InterruptingError, {
"message": (
"The '[expected-line, fixer_query]' array"
f" '{pprint.pformat(line)}'"
" must be of length 2"
),
"definition": f".includeLines[{i}]",
}
line, fixer_query = line # noqa: PLW2901
if not isinstance(line, str) or not isinstance(
fixer_query,
str,
):
yield InterruptingError, {
"message": (
"The '[expected-line, fixer_query]' array items"
f" '{pprint.pformat([line, fixer_query])}'"
" must be of type string"
),
"definition": f".includeLines[{i}]",
}
elif not isinstance(line, str):
yield InterruptingError, {
"message": (
f"The expected line '{pprint.pformat(line)}'"
" must be of type string or array"
),
"definition": f".includeLines[{i}]",
}
clean_line = line.strip("\r\n")
if clean_line in expected_lines:
yield InterruptingError, {
"message": f"Duplicated expected line '{clean_line}'",
"definition": f".includeLines[{i}]",
}
elif not clean_line:
yield InterruptingError, {
"message": "Expected line must not be empty",
"definition": f".includeLines[{i}]",
}
expected_lines.append(clean_line)
for f, fpath in enumerate(context.files):
try:
fstat = os.stat(fpath)
except FileNotFoundError:
continue
if stat.S_ISDIR(fstat.st_mode):
yield (
InterruptingError,
_directories_not_accepted_as_inputs_error(
"verb",
"includeLines",
fpath,
f".files[{f}]",
),
)
fcontent_lines = tree.cached_local_file(fpath)
for line_index, expected_line in enumerate(expected_lines):
if expected_line not in fcontent_lines:
if context.fix:
instance = tree.cached_local_file(
fpath,
serializer="text",
)
if not fixer_query:
instance.append(expected_line)
tree.edit_local_file(fpath, instance)
fixed = True
else:
try:
compiled_fixer_query = (
compile_JMESPath_expression_or_error(
fixer_query,
)
)
except JMESPathError as exc:
yield InterruptingError, {
"message": exc.message,
"definition": (
f".includeLines[{line_index}]"
),
}
try:
changed = fix_tree_serialized_file_by_jmespath(
compiled_fixer_query,
instance,
fpath,
)
except JMESPathError as exc:
yield InterruptingError, {
"message": exc.message,
"definition": (
f".includeLines[{line_index}]"
),
}
else:
fixed = True
if not changed: # pragma: no cover
continue
else:
fixed = False
yield Error, {
"message": f"Expected line '{expected_line}' not found",
"file": fpath,
"definition": f".includeLines[{line_index}]",
"fixed": fixed,
"fixable": True,
}
[docs] @staticmethod
def ifIncludeLines(
value: dict[str, list[str]],
_rule: Rule,
_context: ActionsContext,
) -> Results:
if not isinstance(value, dict):
yield InterruptingError, {
"message": "The value must be of type object",
"definition": ".ifIncludeLines",
}
elif not value:
yield InterruptingError, {
"message": "The value must not be empty",
"definition": ".ifIncludeLines",
}
for fpath, expected_lines in value.items():
if not fpath:
yield InterruptingError, {
"message": "File paths must not be empty",
"definition": ".ifIncludeLines",
}
if not isinstance(expected_lines, list):
yield InterruptingError, {
"message": (
f"The expected lines '{pprint.pformat(expected_lines)}'"
" must be of type array"
),
"definition": f".ifIncludeLines[{fpath}]",
}
elif not expected_lines:
yield InterruptingError, {
"message": "Expected lines must not be empty",
"definition": f".ifIncludeLines[{fpath}]",
}
try:
fstat = os.stat(fpath)
except FileNotFoundError:
yield InterruptingError, {
"message": (
"File specified in conditional"
" 'ifIncludeLines' not found"
),
"file": fpath,
"definition": f".ifIncludeLines[{fpath}]",
}
if stat.S_ISDIR(fstat.st_mode):
yield (
InterruptingError,
_directories_not_accepted_as_inputs_error(
"conditional",
"ifIncludeLines",
fpath,
f".ifIncludeLines[{fpath}]",
),
)
fcontent_lines = tree.cached_local_file(
fpath,
serializer="text",
)
checked_lines = []
for i, line in enumerate(expected_lines):
if not isinstance(line, str):
yield InterruptingError, {
"message": (
f"The expected line '{pprint.pformat(line)}'"
" must be of type string"
),
"definition": f".ifIncludeLines[{fpath}][{i}]",
"file": fpath,
}
clean_line = line.strip("\r\n")
if not clean_line:
yield InterruptingError, {
"message": "Expected line must not be empty",
"definition": f".ifIncludeLines[{fpath}][{i}]",
"file": fpath,
}
elif clean_line in checked_lines:
yield InterruptingError, {
"message": f"Duplicated expected line '{clean_line}'",
"definition": f".ifIncludeLines[{fpath}][{i}]",
"file": fpath,
}
if clean_line not in fcontent_lines:
yield ResultValue, False
else:
checked_lines.append(clean_line)
[docs] @staticmethod
def excludeLines(
value: list[str],
_rule: Rule,
context: ActionsContext,
) -> Results:
if not isinstance(value, list):
yield InterruptingError, {
"message": "The value must be of type array",
"definition": ".excludeLines",
}
elif not value:
yield InterruptingError, {
"message": "The value must not be empty",
"definition": ".excludeLines",
}
expected_lines = []
for i, line in enumerate(value):
fixer_query = ""
if isinstance(line, list):
# Fixer query expression
if len(line) != 2: # noqa: PLR2004
yield InterruptingError, {
"message": (
"The '[expected-line, fixer_query]' array"
f" '{pprint.pformat(line)}'"
" must be of length 2"
),
"definition": f".excludeLines[{i}]",
}
line, fixer_query = line # noqa: PLW2901
if not isinstance(line, str) or not isinstance(
fixer_query,
str,
):
yield InterruptingError, {
"message": (
"The '[expected-line, fixer_query]' array items"
f" '{pprint.pformat([line, fixer_query])}'"
" must be of type string"
),
"definition": f".excludeLines[{i}]",
}
elif not isinstance(line, str):
yield InterruptingError, {
"message": (
f"The expected line '{pprint.pformat(line)}'"
" must be of type string or array"
),
"definition": f".excludeLines[{i}]",
}
clean_line = line.strip("\r\n")
if clean_line in expected_lines:
yield InterruptingError, {
"message": f"Duplicated expected line '{clean_line}'",
"definition": f".excludeLines[{i}]",
}
elif not clean_line:
yield InterruptingError, {
"message": "Expected line must not be empty",
"definition": f".excludeLines[{i}]",
}
expected_lines.append(clean_line)
for f, fpath in enumerate(context.files):
try:
fstat = os.stat(fpath)
except FileNotFoundError:
continue
if stat.S_ISDIR(fstat.st_mode):
yield (
InterruptingError,
_directories_not_accepted_as_inputs_error(
"verb",
"excludeLines",
fpath,
f".files[{f}]",
),
)
fcontent_lines = tree.cached_local_file(fpath)
for line_index, expected_line in enumerate(expected_lines):
if expected_line in fcontent_lines:
if context.fix:
instance = tree.cached_local_file(
fpath,
serializer="text",
)
if not fixer_query:
instance.remove(expected_line)
tree.edit_local_file(fpath, instance)
fixed = True
else:
try:
compiled_fixer_query = (
compile_JMESPath_expression_or_error(
fixer_query,
)
)
except JMESPathError as exc:
yield InterruptingError, {
"message": exc.message,
"definition": (
f".excludeLines[{line_index}]"
),
}
try:
changed = fix_tree_serialized_file_by_jmespath(
compiled_fixer_query,
instance,
fpath,
)
except JMESPathError as exc:
yield InterruptingError, {
"message": exc.message,
"definition": (
f".excludeLines[{line_index}]"
),
}
else:
fixed = True
if not changed: # pragma: no cover
continue
else:
fixed = False
yield Error, {
"message": (
f"Found expected line to exclude '{expected_line}'"
),
"file": fpath,
"definition": f".excludeLines[{line_index}]",
"fixed": fixed,
"fixable": True,
}
[docs] @staticmethod
def includeContent(
value: list[str],
_rule: Rule,
context: ActionsContext,
) -> Results:
if not isinstance(value, list):
yield InterruptingError, {
"message": "The contents to include must be of type array",
"definition": ".includeContent",
}
elif not value:
yield InterruptingError, {
"message": "The contents to include must not be empty",
"definition": ".includeContent",
}
for f, fpath in enumerate(context.files):
try:
fstat = os.stat(fpath)
except FileNotFoundError:
continue
if stat.S_ISDIR(fstat.st_mode):
yield (
InterruptingError,
_directories_not_accepted_as_inputs_error(
"verb",
"includeContent",
fpath,
f".files[{f}]",
),
)
# Normalize newlines
checked_content = []
for i, content in enumerate(value):
fixer_query = ""
if isinstance(content, (list, str)):
if isinstance(content, list):
content, fixer_query = content # noqa: PLW2901
if not isinstance(content, str) or not isinstance(
fixer_query,
str,
):
content_query = pprint.pformat(
[content, fixer_query],
)
yield InterruptingError, {
"message": (
"The '[content-to-include, fixer_query]'"
f" array items '{content_query}'"
" must be of type string"
),
"definition": f".includeContent[{i}]",
}
else:
yield InterruptingError, {
"message": (
"The content to include"
f" '{pprint.pformat(content)}'"
" must be of type string or array"
),
"definition": f".includeContent[{i}]",
"file": fpath,
}
if not content:
yield InterruptingError, {
"message": "The content to include must not be empty",
"definition": f".includeContent[{i}]",
"file": fpath,
}
elif content in checked_content:
yield InterruptingError, {
"message": f"Duplicated content to include '{content}'",
"definition": f".includeContent[{i}]",
"file": fpath,
}
fcontent = tree.cached_local_file(fpath, serializer="_plain")
if content not in fcontent:
if fixer_query:
fixable = True
fixed = False
if context.fix:
try:
compiled_fixer_query = (
compile_JMESPath_expression_or_error(
fixer_query,
)
)
except JMESPathError as exc:
yield InterruptingError, {
"message": exc.message,
"definition": f".includeContent[{i}]",
}
instance = tree.cached_local_file(
fpath,
serializer="text",
)
try:
changed = fix_tree_serialized_file_by_jmespath(
compiled_fixer_query,
instance,
fpath,
)
except JMESPathError as exc:
yield InterruptingError, {
"message": exc.message,
"definition": f".includeContent[{i}]",
}
else:
fixed = True
if not changed: # pragma: no cover
continue
else:
fixed = False
fixable = False
yield Error, {
"message": (
f"Content '{content}' expected to be"
" included not found"
),
"file": fpath,
"definition": f".includeContent[{i}]",
"fixed": fixed,
"fixable": fixable,
}
else:
checked_content.append(content)
[docs] @staticmethod
def excludeContent(
value: list[str],
_rule: Rule,
context: ActionsContext,
) -> Results:
if not isinstance(value, list):
yield InterruptingError, {
"message": "The contents to exclude must be of type array",
"definition": ".excludeContent",
}
elif not value:
yield InterruptingError, {
"message": "The contents to exclude must not be empty",
"definition": ".excludeContent",
}
for f, fpath in enumerate(context.files):
try:
fstat = os.stat(fpath)
except FileNotFoundError:
continue
if stat.S_ISDIR(fstat.st_mode):
yield (
InterruptingError,
_directories_not_accepted_as_inputs_error(
"verb",
"excludeContent",
fpath,
f".files[{f}]",
),
)
# Normalize newlines
checked_content = []
for i, content in enumerate(value):
fixer_query = ""
if isinstance(content, (list, str)):
if isinstance(content, list):
content, fixer_query = content # noqa: PLW2901
if not isinstance(content, str) or not isinstance(
fixer_query,
str,
):
content_query = pprint.pformat(
[content, fixer_query],
)
yield InterruptingError, {
"message": (
"The '[content-to-exclude, fixer_query]'"
f" array items '{content_query}'"
" must be of type string"
),
"definition": f".excludeContent[{i}]",
}
else:
yield InterruptingError, {
"message": (
"The content to exclude"
f" '{pprint.pformat(content)}'"
" must be of type string or array"
),
"definition": f".excludeContent[{i}]",
"file": fpath,
}
if not content:
yield InterruptingError, {
"message": "The content to exclude must not be empty",
"definition": f".excludeContent[{i}]",
"file": fpath,
}
elif content in checked_content:
yield InterruptingError, {
"message": f"Duplicated content to exclude '{content}'",
"definition": f".excludeContent[{i}]",
"file": fpath,
}
fcontent = tree.cached_local_file(fpath, serializer="_plain")
if content in fcontent:
if fixer_query:
fixable = True
fixed = False
if context.fix:
try:
compiled_fixer_query = (
compile_JMESPath_expression_or_error(
fixer_query,
)
)
except JMESPathError as exc:
yield InterruptingError, {
"message": exc.message,
"definition": f".excludeContent[{i}]",
}
instance = tree.cached_local_file(
fpath,
serializer="text",
)
try:
changed = fix_tree_serialized_file_by_jmespath(
compiled_fixer_query,
instance,
fpath,
)
except JMESPathError as exc:
yield InterruptingError, {
"message": exc.message,
"definition": f".excludeContent[{i}]",
}
else:
fixed = True
if not changed: # pragma: no cover
continue
else:
fixed = False
fixable = False
yield Error, {
"message": (
f"Found expected content to exclude '{content}'"
),
"file": fpath,
"definition": f".excludeContent[{i}]",
"fixed": fixed,
"fixable": fixable,
}
else:
checked_content.append(content)