pw_presubmit: Split tools.py into three modules
- tools.py has general purpose functions.
- git_repo.py has functions for working with files in a Git repo.
- presubmit.py has abstractions for running a presubmit program
- Rename a few functions to work better with the new module names.
Change-Id: Ie632514e1bbcaa8681bcd733bfe9783cc25c855f
diff --git a/pw_presubmit/BUILD.gn b/pw_presubmit/BUILD.gn
index 6c8d14f..1875615 100644
--- a/pw_presubmit/BUILD.gn
+++ b/pw_presubmit/BUILD.gn
@@ -18,6 +18,6 @@
sources = [ "docs.rst" ]
inputs = [
"py/pw_presubmit/cli.py",
- "py/pw_presubmit/tools.py",
+ "py/pw_presubmit/presubmit.py",
]
}
diff --git a/pw_presubmit/docs.rst b/pw_presubmit/docs.rst
index 80dee8d..3bbfd6f 100644
--- a/pw_presubmit/docs.rst
+++ b/pw_presubmit/docs.rst
@@ -66,8 +66,6 @@
.. autofunction:: pw_presubmit.cli.run
-.. autofunction:: pw_presubmit.run_presubmit
-
.. autodecorator:: pw_presubmit.filter_paths
.. autofunction:: pw_presubmit.call
diff --git a/pw_presubmit/py/presubmit_test.py b/pw_presubmit/py/presubmit_test.py
new file mode 100755
index 0000000..ff2a514
--- /dev/null
+++ b/pw_presubmit/py/presubmit_test.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python3
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests for presubmit tools."""
+
+import unittest
+
+from pw_presubmit import presubmit
+
+
+def _fake_function_1(_):
+ """Fake presubmit function."""
+
+
+def _fake_function_2(_):
+ """Fake presubmit function."""
+
+
+class ProgramsTest(unittest.TestCase):
+ """Tests the presubmit Programs abstraction."""
+ def setUp(self):
+ self._programs = presubmit.Programs(
+ first=[_fake_function_1, (), [(_fake_function_1, )]],
+ second=[_fake_function_2],
+ )
+
+ def test_empty(self):
+ self.assertEqual({}, presubmit.Programs())
+
+ def test_access_present_members(self):
+ self.assertEqual('first', self._programs['first'].name)
+ self.assertEqual((_fake_function_1, _fake_function_1),
+ tuple(self._programs['first']))
+
+ self.assertEqual('second', self._programs['second'].name)
+ self.assertEqual((_fake_function_2, ), tuple(self._programs['second']))
+
+ def test_access_missing_member(self):
+ with self.assertRaises(KeyError):
+ _ = self._programs['not_there']
+
+ def test_all_steps(self):
+ self.assertEqual(
+ {
+ '_fake_function_1': _fake_function_1,
+ '_fake_function_2': _fake_function_2,
+ }, self._programs.all_steps())
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pw_presubmit/py/pw_presubmit/__init__.py b/pw_presubmit/py/pw_presubmit/__init__.py
index e2b6a1c..8ea312d 100644
--- a/pw_presubmit/py/pw_presubmit/__init__.py
+++ b/pw_presubmit/py/pw_presubmit/__init__.py
@@ -13,5 +13,5 @@
# the License.
"""The pw_presubmit package provides tools for running presubmit checks."""
-from pw_presubmit import cli
-from pw_presubmit.tools import *
+from pw_presubmit.tools import log_run, plural
+from pw_presubmit.presubmit import *
diff --git a/pw_presubmit/py/pw_presubmit/cli.py b/pw_presubmit/py/pw_presubmit/cli.py
index 3a25d32..2c46b10 100644
--- a/pw_presubmit/py/pw_presubmit/cli.py
+++ b/pw_presubmit/py/pw_presubmit/cli.py
@@ -20,7 +20,7 @@
import shutil
from typing import Callable, Optional, Sequence
-from pw_presubmit import tools
+from pw_presubmit import git_repo, presubmit
_LOG = logging.getLogger(__name__)
@@ -53,8 +53,8 @@
def _add_programs_arguments(exclusive: argparse.ArgumentParser,
- programs: tools.Programs, default: str):
- def presubmit_program(arg: str) -> tools.Program:
+ programs: presubmit.Programs, default: str):
+ def presubmit_program(arg: str) -> presubmit.Program:
if arg not in programs:
raise argparse.ArgumentTypeError(
f'{arg} is not the name of a presubmit program')
@@ -94,7 +94,7 @@
def add_arguments(parser: argparse.ArgumentParser,
- programs: Optional[tools.Programs] = None,
+ programs: Optional[presubmit.Programs] = None,
default: str = '') -> None:
"""Adds common presubmit check options to an argument parser."""
@@ -141,7 +141,7 @@
"""Processes all arguments from add_arguments and runs the presubmit."""
if not output_directory:
- output_directory = tools.git_repo_path('.presubmit', repo=repository)
+ output_directory = git_repo.path('.presubmit', repo=repository)
_LOG.debug('Using environment at %s', output_directory)
@@ -154,10 +154,10 @@
return 0
- if tools.run_presubmit(program,
- repo_path=repository,
- output_directory=output_directory,
- **other_args):
+ if presubmit.run(program,
+ repo_path=repository,
+ output_directory=output_directory,
+ **other_args):
return 0
return 1
diff --git a/pw_presubmit/py/pw_presubmit/format_code.py b/pw_presubmit/py/pw_presubmit/format_code.py
index 7222ad4..b96ca42 100755
--- a/pw_presubmit/py/pw_presubmit/format_code.py
+++ b/pw_presubmit/py/pw_presubmit/format_code.py
@@ -40,8 +40,8 @@
os.path.abspath(__file__))))
import pw_presubmit
-import pw_presubmit.cli
-from pw_presubmit import file_summary, list_git_files, log_run, plural
+from pw_presubmit import cli, git_repo
+from pw_presubmit.tools import file_summary, log_run, plural
_LOG: logging.Logger = logging.getLogger(__name__)
@@ -358,17 +358,17 @@
base: str) -> int:
"""Checks or fixes formatting for files in a Git repo."""
files = [path.resolve() for path in paths if path.is_file()]
- repo = pw_presubmit.git_repo_path() if pw_presubmit.is_git_repo() else None
+ repo = git_repo.root() if git_repo.is_repo() else None
# If this is a Git repo, list the original paths with git ls-files or diff.
if repo:
_LOG.info(
'Formatting %s',
- pw_presubmit.describe_files_in_repo(repo, Path.cwd(), base, paths,
- exclude))
+ git_repo.describe_files(repo, Path.cwd(), base, paths, exclude))
# Add files from Git and remove duplicates.
- files = sorted(set(list_git_files(base, paths, exclude)) | set(files))
+ files = sorted(
+ set(git_repo.list_files(base, paths, exclude)) | set(files))
elif base:
_LOG.critical(
'A base commit may only be provided if running from a Git repo')
@@ -412,7 +412,7 @@
parser = argparse.ArgumentParser(description=__doc__)
if git_paths:
- pw_presubmit.cli.add_path_arguments(parser)
+ cli.add_path_arguments(parser)
else:
def existing_path(arg: str) -> Path:
diff --git a/pw_presubmit/py/pw_presubmit/git_repo.py b/pw_presubmit/py/pw_presubmit/git_repo.py
new file mode 100644
index 0000000..c7caec0
--- /dev/null
+++ b/pw_presubmit/py/pw_presubmit/git_repo.py
@@ -0,0 +1,166 @@
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Helpful commands for working with a Git repository."""
+
+import collections
+from pathlib import Path
+import subprocess
+from typing import Collection, Dict, Iterable, Iterator, List, Optional
+from typing import Pattern, Union
+
+from pw_presubmit.tools import log_run, plural
+
+PathOrStr = Union[Path, str]
+
+
+def git_stdout(*args: PathOrStr, repo: PathOrStr = '.') -> str:
+ return log_run(['git', '-C', repo, *args],
+ stdout=subprocess.PIPE,
+ check=True).stdout.decode().strip()
+
+
+def _ls_files(args: Collection[PathOrStr], repo: Path) -> List[Path]:
+ return [
+ repo.joinpath(path).resolve() for path in git_stdout(
+ 'ls-files', '--', *args, repo=repo).splitlines()
+ ]
+
+
+def _diff_names(commit: str, paths: Collection[PathOrStr],
+ repo: Path) -> List[Path]:
+ """Returns absolute paths of files changed since the specified commit."""
+ git_root = root(repo)
+ return [
+ git_root.joinpath(path).resolve()
+ for path in git_stdout('diff',
+ '--name-only',
+ '--diff-filter=d',
+ commit,
+ '--',
+ *paths,
+ repo=repo).splitlines()
+ ]
+
+
+def list_files(commit: Optional[str] = None,
+ paths: Collection[PathOrStr] = (),
+ exclude: Collection[Pattern[str]] = (),
+ repo: Optional[Path] = None) -> List[Path]:
+ """Lists files with git ls-files or git diff --name-only.
+
+ This function may only be called if repo points to a Git repository.
+ """
+ if repo is None:
+ repo = Path.cwd()
+
+ if commit:
+ files = _diff_names(commit, paths, repo)
+ else:
+ files = _ls_files(paths, repo)
+
+ git_root = root(repo=repo).resolve()
+ return sorted(
+ file for file in files
+ if not any(e.search(str(file.relative_to(git_root))) for e in exclude))
+
+
+def has_uncommitted_changes(repo: Optional[Path] = None) -> bool:
+ """Returns True if the Git repo has uncommitted changes in it.
+
+ This does not check for untracked files.
+ """
+ if repo is None:
+ repo = Path.cwd()
+
+ # Refresh the Git index so that the diff-index command will be accurate.
+ log_run(['git', '-C', repo, 'update-index', '-q', '--refresh'], check=True)
+
+ # diff-index exits with 1 if there are uncommitted changes.
+ return log_run(['git', '-C', repo, 'diff-index', '--quiet', 'HEAD',
+ '--']).returncode == 1
+
+
+def _describe_constraints(git_root: Path, repo_path: Path,
+ commit: Optional[str],
+ pathspecs: Collection[PathOrStr],
+ exclude: Collection[Pattern]) -> Iterator[str]:
+ if not git_root.samefile(repo_path):
+ yield (
+ f'under the {repo_path.resolve().relative_to(git_root.resolve())} '
+ 'subdirectory')
+
+ if commit:
+ yield f'that have changed since {commit}'
+
+ if pathspecs:
+ paths_str = ', '.join(str(p) for p in pathspecs)
+ yield f'that match {plural(pathspecs, "pathspec")} ({paths_str})'
+
+ if exclude:
+ yield (f'that do not match {plural(exclude, "pattern")} (' +
+ ', '.join(p.pattern for p in exclude) + ')')
+
+
+def describe_files(git_root: Path, repo_path: Path, commit: Optional[str],
+ pathspecs: Collection[PathOrStr],
+ exclude: Collection[Pattern]) -> str:
+ """Completes 'Doing something to ...' for a set of files in a Git repo."""
+ constraints = list(
+ _describe_constraints(git_root, repo_path, commit, pathspecs, exclude))
+ if not constraints:
+ return f'all files in the {git_root.name} repo'
+
+ msg = f'files in the {git_root.name} repo'
+ if len(constraints) == 1:
+ return f'{msg} {constraints[0]}'
+
+ return msg + ''.join(f'\n - {line}' for line in constraints)
+
+
+def is_repo(repo_path: PathOrStr = '.') -> bool:
+ return not subprocess.run(['git', '-C', repo_path, 'rev-parse'],
+ stderr=subprocess.DEVNULL).returncode
+
+
+def root(repo: PathOrStr = '.') -> Path:
+ """Returns the repository root as an absolute path."""
+ return Path(git_stdout('rev-parse', '--show-toplevel', repo=repo))
+
+
+def path(repo_path: PathOrStr,
+ *additional_repo_paths: PathOrStr,
+ repo: PathOrStr = '.') -> Path:
+ """Returns a path relative to a Git repository's root."""
+ return root(repo).joinpath(repo_path, *additional_repo_paths)
+
+
+def find_python_packages(python_paths: Iterable[PathOrStr],
+ repo: PathOrStr = '.') -> Dict[Path, List[Path]]:
+ """Returns Python package directories for the files in python_paths."""
+ setup_pys = [
+ file.parent.as_posix()
+ for file in _ls_files(['setup.py', '*/setup.py'], Path(repo))
+ ]
+
+ package_dirs: Dict[Path, List[Path]] = collections.defaultdict(list)
+
+ for python_path in (Path(p).resolve().as_posix() for p in python_paths):
+ try:
+ setup_dir = max(setup for setup in setup_pys
+ if python_path.startswith(setup))
+ package_dirs[Path(setup_dir).resolve()].append(Path(python_path))
+ except ValueError:
+ continue
+
+ return package_dirs
diff --git a/pw_presubmit/py/pw_presubmit/presubmit.py b/pw_presubmit/py/pw_presubmit/presubmit.py
new file mode 100644
index 0000000..1a5e113
--- /dev/null
+++ b/pw_presubmit/py/pw_presubmit/presubmit.py
@@ -0,0 +1,535 @@
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tools for running presubmit checks in a Git repository.
+
+
+Presubmit checks are defined as a function or other callable. The function may
+take either no arguments or a list of the paths on which to run. Presubmit
+checks communicate failure by raising any exception.
+
+For example, either of these functions may be used as presubmit checks:
+
+ @pw_presubmit.filter_paths(endswith='.py')
+ def file_contains_ni(ctx: PresubmitContext):
+ for path in ctx.paths:
+ with open(path) as file:
+ contents = file.read()
+ if 'ni' not in contents and 'nee' not in contents:
+ raise PresumitFailure('Files must say "ni"!', path=path)
+
+ def run_the_build():
+ subprocess.run(['make', 'release'], check=True)
+
+Presubmit checks that accept a list of paths may use the filter_paths decorator
+to automatically filter the paths list for file types they care about. See the
+pragma_once function for an example.
+
+See pigweed_presbumit.py for an example of how to define presubmit checks.
+"""
+
+import collections
+import contextlib
+import dataclasses
+import enum
+from inspect import signature
+import itertools
+import logging
+import os
+from pathlib import Path
+import re
+import shlex
+import subprocess
+import time
+from typing import Callable, Dict, Iterable, Iterator, List, NamedTuple
+from typing import Optional, Pattern, Sequence, Tuple
+
+from pw_presubmit import git_repo, tools
+from pw_presubmit.tools import plural
+
+_LOG: logging.Logger = logging.getLogger(__name__)
+
+color_red = tools.make_color(31)
+color_bold_red = tools.make_color(31, 1)
+color_black_on_red = tools.make_color(30, 41)
+color_yellow = tools.make_color(33, 1)
+color_green = tools.make_color(32)
+color_black_on_green = tools.make_color(30, 42)
+color_aqua = tools.make_color(36)
+color_bold_white = tools.make_color(37, 1)
+
+_SUMMARY_BOX = '══╦╗ ║║══╩╝'
+_CHECK_UPPER = '━━━┓ '
+_CHECK_LOWER = ' ━━━┛'
+
+WIDTH = 80
+
+_LEFT = 7
+_RIGHT = 11
+
+
+def _title(msg, style=_SUMMARY_BOX) -> str:
+ msg = f' {msg} '.center(WIDTH - 2)
+ return tools.make_box('^').format(*style, section1=msg, width1=len(msg))
+
+
+def _format_time(time_s: float) -> str:
+ minutes, seconds = divmod(time_s, 60)
+ return f' {int(minutes)}:{seconds:04.1f}'
+
+
+def _box(style, left, middle, right, box=tools.make_box('><>')) -> str:
+ return box.format(*style,
+ section1=left + ('' if left.endswith(' ') else ' '),
+ width1=_LEFT,
+ section2=' ' + middle,
+ width2=WIDTH - _LEFT - _RIGHT - 4,
+ section3=right + ' ',
+ width3=_RIGHT)
+
+
+class PresubmitFailure(Exception):
+ """Optional exception to use for presubmit failures."""
+ def __init__(self, description: str = '', path=None):
+ super().__init__(f'{path}: {description}' if path else description)
+
+
+class _Result(enum.Enum):
+
+ PASS = 'PASSED' # Check completed successfully.
+ FAIL = 'FAILED' # Check failed.
+ CANCEL = 'CANCEL' # Check didn't complete.
+
+ def colorized(self, width: int, invert: bool = False) -> str:
+ if self is _Result.PASS:
+ color = color_black_on_green if invert else color_green
+ elif self is _Result.FAIL:
+ color = color_black_on_red if invert else color_red
+ elif self is _Result.CANCEL:
+ color = color_yellow
+ else:
+ color = lambda value: value
+
+ padding = (width - len(self.value)) // 2 * ' '
+ return padding + color(self.value) + padding
+
+
+class Program(collections.abc.Sequence):
+ """A sequence of presubmit checks; basically a tuple with a name."""
+ def __init__(self, name: str, steps: Iterable[Callable]):
+ self.name = name
+ self._steps = tuple(tools.flatten(steps))
+
+ def __getitem__(self, i):
+ return self._steps[i]
+
+ def __len__(self):
+ return len(self._steps)
+
+ def __str__(self):
+ return self.name
+
+
+class Programs(collections.abc.Mapping):
+ """A mapping of presubmit check programs.
+
+ Use is optional. Helpful when managing multiple presubmit check programs.
+ """
+ def __init__(self, **programs: Sequence):
+ """Initializes a name: program mapping from the provided keyword args.
+
+ A program is a sequence of presubmit check functions. The sequence may
+ contain nested sequences, which are flattened.
+ """
+ self._programs: Dict[str, Program] = {
+ name: Program(name, checks)
+ for name, checks in programs.items()
+ }
+
+ def all_steps(self) -> Dict[str, Callable]:
+ return {c.__name__: c for c in itertools.chain(*self.values())}
+
+ def __getitem__(self, item: str) -> Program:
+ return self._programs[item]
+
+ def __iter__(self) -> Iterator[str]:
+ return iter(self._programs)
+
+ def __len__(self) -> int:
+ return len(self._programs)
+
+
+@dataclasses.dataclass(frozen=True)
+class PresubmitContext:
+ """Context passed into presubmit checks."""
+ repo_root: Path
+ output_dir: Path
+ paths: Sequence[Path]
+
+
+class Presubmit:
+ """Runs a series of presubmit checks on a list of files."""
+ def __init__(self, repository_root: Path, output_directory: Path,
+ paths: Sequence[Path]):
+ self._repository_root = repository_root
+ self._output_directory = output_directory
+ self._paths = paths
+
+ def run(self, program: Program, keep_going: bool = False) -> bool:
+ """Executes a series of presubmit checks on the paths."""
+
+ title = f'{program.name if program.name else ""} presubmit checks'
+ checks = _apply_filters(program, self._paths)
+
+ _LOG.debug('Running %s for %s', title, self._repository_root.name)
+ print(_title(f'{self._repository_root.name}: {title}'))
+
+ _LOG.info('%d of %d checks apply to %s in %s', len(checks),
+ len(program), plural(self._paths,
+ 'file'), self._repository_root)
+
+ print()
+ for line in tools.file_summary(self._paths):
+ print(line)
+ print()
+
+ _LOG.debug('Paths:\n%s', '\n'.join(str(path) for path in self._paths))
+ if not self._paths:
+ print(color_yellow('No files are being checked!'))
+
+ _LOG.debug('Checks:\n%s', '\n'.join(c.name for c, _ in checks))
+
+ start_time: float = time.time()
+ passed, failed, skipped = self._execute_checks(checks, keep_going)
+ self._log_summary(time.time() - start_time, passed, failed, skipped)
+
+ return not failed and not skipped
+
+ def _log_summary(self, time_s: float, passed: int, failed: int,
+ skipped: int) -> None:
+ summary_items = []
+ if passed:
+ summary_items.append(f'{passed} passed')
+ if failed:
+ summary_items.append(f'{failed} failed')
+ if skipped:
+ summary_items.append(f'{skipped} not run')
+ summary = ', '.join(summary_items) or 'nothing was done'
+
+ result = _Result.FAIL if failed or skipped else _Result.PASS
+ total = passed + failed + skipped
+
+ _LOG.debug('Finished running %d checks on %s in %.1f s', total,
+ plural(self._paths, 'file'), time_s)
+ _LOG.debug('Presubmit checks %s: %s', result.value, summary)
+
+ print(
+ _box(
+ _SUMMARY_BOX, result.colorized(_LEFT, invert=True),
+ f'{total} checks on {plural(self._paths, "file")}: {summary}',
+ _format_time(time_s)))
+
+ @contextlib.contextmanager
+ def _context(self, name: str, paths: Sequence[Path]):
+ # There are many characters banned from filenames on Windows. To
+ # simplify things, just strip everything that's not a letter, digit,
+ # or underscore.
+ sanitized_name = re.sub(r'[\W_]+', '_', name).lower()
+ output_directory = self._output_directory.joinpath(sanitized_name)
+ os.makedirs(output_directory, exist_ok=True)
+
+ handler = logging.FileHandler(output_directory.joinpath('step.log'),
+ mode='w')
+ handler.setLevel(logging.DEBUG)
+
+ try:
+ _LOG.addHandler(handler)
+
+ yield PresubmitContext(
+ repo_root=self._repository_root.absolute(),
+ output_dir=output_directory.absolute(),
+ paths=paths,
+ )
+
+ finally:
+ _LOG.removeHandler(handler)
+
+ def _execute_checks(self, program,
+ keep_going: bool) -> Tuple[int, int, int]:
+ """Runs presubmit checks; returns (passed, failed, skipped) lists."""
+ passed = failed = 0
+
+ for i, (check, paths) in enumerate(program, 1):
+ paths = [self._repository_root.joinpath(p) for p in paths]
+ with self._context(check.name, paths) as ctx:
+ result = check.run(ctx, i, len(program))
+
+ if result is _Result.PASS:
+ passed += 1
+ elif result is _Result.CANCEL:
+ break
+ else:
+ failed += 1
+ if not keep_going:
+ break
+
+ return passed, failed, len(program) - passed - failed
+
+
+class _Filter(NamedTuple):
+ endswith: Tuple[str, ...] = ('', )
+ exclude: Tuple[str, ...] = ()
+
+
+def _apply_filters(
+ program: Sequence[Callable],
+ paths: Sequence[Path]) -> List[Tuple['_Check', Sequence[Path]]]:
+ """Returns a list of (check, paths_to_check) for checks that should run."""
+ checks = [c if isinstance(c, _Check) else _Check(c) for c in program]
+ filter_to_checks: Dict[_Filter,
+ List[_Check]] = collections.defaultdict(list)
+
+ for check in checks:
+ filter_to_checks[check.filter].append(check)
+
+ check_to_paths = _map_checks_to_paths(filter_to_checks, paths)
+ return [(c, check_to_paths[c]) for c in checks if c in check_to_paths]
+
+
+def _map_checks_to_paths(
+ filter_to_checks: Dict[_Filter, List['_Check']],
+ paths: Sequence[Path]) -> Dict['_Check', Sequence[Path]]:
+ checks_to_paths: Dict[_Check, Sequence[Path]] = {}
+
+ str_paths = [path.as_posix() for path in paths]
+
+ for filt, checks in filter_to_checks.items():
+ exclude = [re.compile(exp) for exp in filt.exclude]
+
+ filtered_paths = tuple(
+ Path(path) for path in str_paths
+ if any(path.endswith(end) for end in filt.endswith) and not any(
+ exp.search(path) for exp in exclude))
+
+ for check in checks:
+ if filtered_paths or check.always_run:
+ checks_to_paths[check] = filtered_paths
+ else:
+ _LOG.debug('Skipping "%s": no relevant files', check.name)
+
+ return checks_to_paths
+
+
+def run(program: Sequence[Callable],
+ base: Optional[str] = None,
+ paths: Sequence[Path] = (),
+ exclude: Sequence[Pattern] = (),
+ repo_path: Path = Optional[None],
+ output_directory: Optional[Path] = None,
+ keep_going: bool = False) -> bool:
+ """Lists files in the current Git repo and runs a Presubmit with them.
+
+ This changes the directory to the root of the Git repository after listing
+ paths, so all presubmit checks can assume they run from there.
+
+ Args:
+ program: list of presubmit check functions to run
+ name: name to use to refer to this presubmit check run
+ base: optional base Git commit to list files against
+ paths: optional list of paths to run the presubmit checks against
+ exclude: regular expressions of paths to exclude from checks
+ repo_path: path in the git repository to check
+ output_directory: where to place output files
+ keep_going: whether to continue running checks if an error occurs
+
+ Returns:
+ True if all presubmit checks succeeded
+ """
+ if repo_path is None:
+ repo_path = Path.cwd()
+
+ if not git_repo.is_repo(repo_path):
+ _LOG.critical('Presubmit checks must be run from a Git repo')
+ return False
+
+ files = git_repo.list_files(base, paths, exclude, repo_path)
+ root = git_repo.root(repo=repo_path)
+
+ _LOG.info('Checking %s',
+ git_repo.describe_files(root, repo_path, base, paths, exclude))
+
+ files = [path.relative_to(root) for path in files]
+
+ if output_directory is None:
+ output_directory = root.joinpath('.presubmit')
+
+ presubmit = Presubmit(
+ repository_root=root,
+ output_directory=Path(output_directory),
+ paths=files,
+ )
+
+ if not isinstance(program, Program):
+ program = Program('', program)
+
+ return presubmit.run(program, keep_going)
+
+
+class _Check:
+ """Wraps a presubmit check function.
+
+ This class consolidates the logic for running and logging a presubmit check.
+ It also supports filtering the paths passed to the presubmit check.
+ """
+ def __init__(self,
+ check_function: Callable,
+ path_filter: _Filter = _Filter(),
+ always_run: bool = True):
+ _ensure_is_valid_presubmit_check_function(check_function)
+
+ self._check: Callable = check_function
+ self.filter: _Filter = path_filter
+ self.always_run: bool = always_run
+
+ # Since _Check wraps a presubmit function, adopt that function's name.
+ self.__name__ = self._check.__name__
+
+ @property
+ def name(self):
+ return self.__name__
+
+ def run(self, ctx: PresubmitContext, count: int, total: int) -> _Result:
+ """Runs the presubmit check on the provided paths."""
+
+ print(
+ _box(_CHECK_UPPER, f'{count}/{total}', self.name,
+ plural(ctx.paths, "file")))
+
+ _LOG.debug('[%d/%d] Running %s on %s', count, total, self.name,
+ plural(ctx.paths, "file"))
+
+ start_time_s = time.time()
+ result = self._call_function(ctx)
+ time_str = _format_time(time.time() - start_time_s)
+ _LOG.debug('%s %s', self.name, result.value)
+
+ print(_box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str))
+ _LOG.debug('%s duration:%s', self.name, time_str)
+
+ return result
+
+ def _call_function(self, ctx: PresubmitContext) -> _Result:
+ try:
+ self._check(ctx)
+ except PresubmitFailure as failure:
+ if str(failure):
+ _LOG.warning('%s', failure)
+ return _Result.FAIL
+ except Exception as failure: # pylint: disable=broad-except
+ _LOG.exception('Presubmit check %s failed!', self.name)
+ return _Result.FAIL
+ except KeyboardInterrupt:
+ print()
+ return _Result.CANCEL
+
+ return _Result.PASS
+
+ def __call__(self, ctx: PresubmitContext, *args, **kwargs):
+ """Calling a _Check calls its underlying function directly.
+
+ This makes it possible to call functions wrapped by @filter_paths. The
+ prior filters are ignored, so new filters may be applied.
+ """
+ return self._check(ctx, *args, **kwargs)
+
+
+def _ensure_is_valid_presubmit_check_function(check: Callable) -> None:
+ """Checks if a Callable can be used as a presubmit check."""
+ try:
+ params = signature(check).parameters
+ except (TypeError, ValueError):
+ raise TypeError('Presubmit checks must be callable, but '
+ f'{check!r} is a {type(check).__name__}')
+
+ required_args = [p for p in params.values() if p.default == p.empty]
+ if len(required_args) != 1:
+ raise TypeError(
+ f'Presubmit check functions must have exactly one required '
+ f'positional argument (the PresubmitContext), but '
+ f'{check.__name__} has {len(required_args)} required arguments' +
+ (f' ({", ".join(a.name for a in required_args)})'
+ if required_args else ''))
+
+
+def filter_paths(endswith: Iterable[str] = (''),
+ exclude: Iterable[str] = (),
+ always_run: bool = False):
+ """Decorator for filtering the paths list for a presubmit check function.
+
+ Path filters only apply when the function is used as a presubmit check.
+ Filters are ignored when the functions are called directly. This makes it
+ possible to reuse functions wrapped in @filter_paths in other presubmit
+ checks, potentially with different path filtering rules.
+
+ Args:
+ endswith: str or iterable of path endings to include
+ exclude: regular expressions of paths to exclude
+
+ Returns:
+ a wrapped version of the presubmit function
+ """
+ def filter_paths_for_function(function: Callable):
+ return _Check(function,
+ _Filter(tools.make_tuple(endswith),
+ tools.make_tuple(exclude)),
+ always_run=always_run)
+
+ return filter_paths_for_function
+
+
+@filter_paths(endswith='.h')
+def pragma_once(ctx: PresubmitContext) -> None:
+ """Presubmit check that ensures all header files contain '#pragma once'."""
+
+ for path in ctx.paths:
+ _LOG.debug('Checking %s', path)
+ with open(path) as file:
+ for line in file:
+ if line.startswith('#pragma once'):
+ break
+ else:
+ raise PresubmitFailure('#pragma once is missing!', path=path)
+
+
+def call(*args, **kwargs) -> None:
+ """Optional subprocess wrapper that causes a PresubmitFailure on errors."""
+ attributes = ', '.join(f'{k}={v}' for k, v in sorted(kwargs.items()))
+ command = ' '.join(shlex.quote(str(arg)) for arg in args)
+ _LOG.debug('[RUN] %s\n%s', attributes, command)
+
+ process = subprocess.run(args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ **kwargs)
+ logfunc = _LOG.warning if process.returncode else _LOG.debug
+
+ logfunc('[FINISHED]\n%s', command)
+ logfunc('[RESULT] %s with return code %d',
+ 'Failed' if process.returncode else 'Passed', process.returncode)
+
+ output = process.stdout.decode(errors='backslashreplace')
+ if output:
+ logfunc('[OUTPUT]\n%s', output)
+
+ if process.returncode:
+ raise PresubmitFailure
diff --git a/pw_presubmit/py/pw_presubmit/python_checks.py b/pw_presubmit/py/pw_presubmit/python_checks.py
index 45c87eb..19abef4 100644
--- a/pw_presubmit/py/pw_presubmit/python_checks.py
+++ b/pw_presubmit/py/pw_presubmit/python_checks.py
@@ -29,7 +29,7 @@
os.path.abspath(__file__))))
import pw_presubmit
-from pw_presubmit import call, filter_paths, PresubmitContext
+from pw_presubmit import call, filter_paths, git_repo
_LOG = logging.getLogger(__name__)
@@ -39,8 +39,8 @@
@filter_paths(endswith='.py')
-def test_python_packages(ctx: PresubmitContext):
- packages = pw_presubmit.find_python_packages(ctx.paths, repo=ctx.repo_root)
+def test_python_packages(ctx: pw_presubmit.PresubmitContext):
+ packages = git_repo.find_python_packages(ctx.paths, repo=ctx.repo_root)
if not packages:
_LOG.info('No Python packages were found.')
@@ -51,7 +51,7 @@
@filter_paths(endswith='.py')
-def pylint(ctx: PresubmitContext):
+def pylint(ctx: pw_presubmit.PresubmitContext):
disable_checkers = [
# BUG(pwbug/22): Hanging indent check conflicts with YAPF 0.29. For
# now, use YAPF's version even if Pylint is doing the correct thing
@@ -71,7 +71,7 @@
@filter_paths(endswith='.py', exclude=r'(?:.+/)?setup\.py')
-def mypy(ctx: PresubmitContext):
+def mypy(ctx: pw_presubmit.PresubmitContext):
env = os.environ.copy()
# Use this environment variable to force mypy to colorize output.
# See https://github.com/python/mypy/issues/7771
diff --git a/pw_presubmit/py/pw_presubmit/tools.py b/pw_presubmit/py/pw_presubmit/tools.py
index dc0b925..42455f1 100644
--- a/pw_presubmit/py/pw_presubmit/tools.py
+++ b/pw_presubmit/py/pw_presubmit/tools.py
@@ -11,53 +11,19 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
-"""Tools for running presubmit checks in a Git repository.
-
-Presubmit checks are defined as a function or other callable. The function may
-take either no arguments or a list of the paths on which to run. Presubmit
-checks communicate failure by raising any exception.
-
-For example, either of these functions may be used as presubmit checks:
-
- @pw_presubmit.filter_paths(endswith='.py')
- def file_contains_ni(ctx: PresubmitContext):
- for path in ctx.paths:
- with open(path) as file:
- contents = file.read()
- if 'ni' not in contents and 'nee' not in contents:
- raise PresumitFailure('Files must say "ni"!', path=path)
-
- def run_the_build():
- subprocess.run(['make', 'release'], check=True)
-
-Presubmit checks that accept a list of paths may use the filter_paths decorator
-to automatically filter the paths list for file types they care about. See the
-pragma_once function for an example.
-
-See pigweed_presbumit.py for an example of how to define presubmit checks.
-"""
+"""General purpose tools for running presubmit checks."""
import collections
from collections import Counter, defaultdict
-import contextlib
-import dataclasses
-import enum
-import itertools
import logging
-import re
import os
from pathlib import Path
import shlex
import subprocess
-import time
-from typing import Any, Callable, Collection, Dict, Iterable, Iterator, List
-from typing import NamedTuple, Optional, Pattern, Sequence, Tuple, Union
-from inspect import signature
+from typing import Any, Dict, Iterable, Iterator, List, Sequence, Tuple
_LOG: logging.Logger = logging.getLogger(__name__)
-PathOrStr = Union[Path, str]
-
def plural(items_or_count,
singular: str,
@@ -86,135 +52,12 @@
return f'{prefix}{num}{result}{suffix}'
-def git_stdout(*args: PathOrStr, repo: PathOrStr = '.') -> str:
- return log_run(['git', '-C', repo, *args],
- stdout=subprocess.PIPE,
- check=True).stdout.decode().strip()
-
-
-def _git_ls_files(args: Collection[PathOrStr], repo: Path) -> List[Path]:
- return [
- repo.joinpath(path).resolve() for path in git_stdout(
- 'ls-files', '--', *args, repo=repo).splitlines()
- ]
-
-
-def _git_diff_names(commit: str, paths: Collection[PathOrStr],
- repo: Path) -> List[Path]:
- """Returns absolute paths of files changed since the specified commit."""
- root = git_repo_path(repo=repo)
- return [
- root.joinpath(path).resolve()
- for path in git_stdout('diff',
- '--name-only',
- '--diff-filter=d',
- commit,
- '--',
- *paths,
- repo=repo).splitlines()
- ]
-
-
-def list_git_files(commit: Optional[str] = None,
- paths: Collection[PathOrStr] = (),
- exclude: Collection[Pattern[str]] = (),
- repo: Optional[Path] = None) -> List[Path]:
- """Lists files with git ls-files or git diff --name-only.
-
- This function may only be called if repo points to a Git repository.
- """
- if repo is None:
- repo = Path.cwd()
-
- if commit:
- files = _git_diff_names(commit, paths, repo)
- else:
- files = _git_ls_files(paths, repo)
-
- root = git_repo_path(repo=repo).resolve()
- return sorted(
- path for path in files
- if not any(exp.search(str(path.relative_to(root))) for exp in exclude))
-
-
-def git_has_uncommitted_changes(repo: Optional[Path] = None) -> bool:
- """Returns True if the Git repo has uncommitted changes in it.
-
- This does not check for untracked files.
- """
- if repo is None:
- repo = Path.cwd()
-
- # Refresh the Git index so that the diff-index command will be accurate.
- log_run(['git', '-C', repo, 'update-index', '-q', '--refresh'], check=True)
-
- # diff-index exits with 1 if there are uncommitted changes.
- return log_run(['git', '-C', repo, 'diff-index', '--quiet', 'HEAD',
- '--']).returncode == 1
-
-
-def _describe_constraints(root: Path, repo_path: Path, commit: Optional[str],
- pathspecs: Collection[PathOrStr],
- exclude: Collection[Pattern]) -> Iterator[str]:
- if not root.samefile(repo_path):
- yield (f'under the {repo_path.resolve().relative_to(root.resolve())} '
- 'subdirectory')
-
- if commit:
- yield f'that have changed since {commit}'
-
- if pathspecs:
- paths_str = ', '.join(str(p) for p in pathspecs)
- yield f'that match {plural(pathspecs, "pathspec")} ({paths_str})'
-
- if exclude:
- yield (f'that do not match {plural(exclude, "pattern")} (' +
- ', '.join(p.pattern for p in exclude) + ')')
-
-
-def describe_files_in_repo(root: Path, repo_path: Path, commit: Optional[str],
- pathspecs: Collection[PathOrStr],
- exclude: Collection[Pattern]) -> str:
- """Completes 'Doing something to ...' for a set of files in a Git repo."""
- constraints = list(
- _describe_constraints(root, repo_path, commit, pathspecs, exclude))
- if not constraints:
- return f'all files in the {root.name} repo'
-
- msg = f'files in the {root.name} repo'
- if len(constraints) == 1:
- return f'{msg} {constraints[0]}'
-
- return msg + ''.join(f'\n - {line}' for line in constraints)
-
-
-def is_git_repo(path='.') -> bool:
- return not subprocess.run(['git', '-C', path, 'rev-parse'],
- stderr=subprocess.DEVNULL).returncode
-
-
-def git_repo_path(*paths, repo: PathOrStr = '.') -> Path:
- """Returns a path relative to a Git repository's root."""
- return Path(git_stdout('rev-parse', '--show-toplevel',
- repo=repo)).joinpath(*paths)
-
-
-def _make_color(*codes: int):
+def make_color(*codes: int):
start = ''.join(f'\033[{code}m' for code in codes)
return f'{start}{{}}\033[0m'.format if os.name == 'posix' else str
-color_red = _make_color(31)
-color_bold_red = _make_color(31, 1)
-color_black_on_red = _make_color(30, 41)
-color_yellow = _make_color(33, 1)
-color_green = _make_color(32)
-color_black_on_green = _make_color(30, 42)
-color_aqua = _make_color(36)
-color_bold_white = _make_color(37, 1)
-
-
-def _make_box(section_alignments: Sequence[str]) -> str:
+def make_box(section_alignments: Sequence[str]) -> str:
indices = [i + 1 for i in range(len(section_alignments))]
top_sections = '{2}'.join('{1:{1}^{width%d}}' % i for i in indices)
mid_sections = '{5}'.join('{section%d:%s{width%d}}' %
@@ -227,115 +70,6 @@
'{7}', *bot_sections, '{10}']) # yapf: disable
-_SUMMARY_BOX = '══╦╗ ║║══╩╝'
-_CHECK_UPPER = '━━━┓ '
-_CHECK_LOWER = ' ━━━┛'
-
-WIDTH = 80
-
-_LEFT = 7
-_RIGHT = 11
-
-
-def _title(msg, style=_SUMMARY_BOX) -> str:
- msg = f' {msg} '.center(WIDTH - 2)
- return _make_box('^').format(*style, section1=msg, width1=len(msg))
-
-
-def _format_time(time_s: float) -> str:
- minutes, seconds = divmod(time_s, 60)
- return f' {int(minutes)}:{seconds:04.1f}'
-
-
-def _box(style, left, middle, right, box=_make_box('><>')) -> str:
- return box.format(*style,
- section1=left + ('' if left.endswith(' ') else ' '),
- width1=_LEFT,
- section2=' ' + middle,
- width2=WIDTH - _LEFT - _RIGHT - 4,
- section3=right + ' ',
- width3=_RIGHT)
-
-
-class PresubmitFailure(Exception):
- """Optional exception to use for presubmit failures."""
- def __init__(self, description: str = '', path=None):
- super().__init__(f'{path}: {description}' if path else description)
-
-
-class _Result(enum.Enum):
-
- PASS = 'PASSED' # Check completed successfully.
- FAIL = 'FAILED' # Check failed.
- CANCEL = 'CANCEL' # Check didn't complete.
-
- def colorized(self, width: int, invert: bool = False) -> str:
- if self is _Result.PASS:
- color = color_black_on_green if invert else color_green
- elif self is _Result.FAIL:
- color = color_black_on_red if invert else color_red
- elif self is _Result.CANCEL:
- color = color_yellow
- else:
- color = lambda value: value
-
- padding = (width - len(self.value)) // 2 * ' '
- return padding + color(self.value) + padding
-
-
-class Program(collections.abc.Sequence):
- """A sequence of presubmit checks; basically a tuple with a name."""
- def __init__(self, name: str, steps: Iterable[Callable]):
- self.name = name
- self._steps = tuple(flatten(steps))
-
- def __getitem__(self, i):
- return self._steps[i]
-
- def __len__(self):
- return len(self._steps)
-
- def __str__(self):
- return self.name
-
-
-class Programs(collections.abc.Mapping):
- """A mapping of presubmit check programs.
-
- Use is optional. Helpful when managing multiple presubmit check programs.
- """
- def __init__(self, **programs: Sequence):
- """Initializes a name: program mapping from the provided keyword args.
-
- A program is a sequence of presubmit check functions. The sequence may
- contain nested sequences, which are flattened.
- """
- self._programs: Dict[str, Program] = {
- name: Program(name, checks)
- for name, checks in programs.items()
- }
-
- def all_steps(self) -> Dict[str, Callable]:
- return {c.__name__: c for c in itertools.chain(*self.values())}
-
- def __getitem__(self, item: str) -> Program:
- return self._programs[item]
-
- def __iter__(self) -> Iterator[str]:
- return iter(self._programs)
-
- def __len__(self) -> int:
- return len(self._programs)
-
-
-@dataclasses.dataclass(frozen=True)
-class PresubmitContext:
- """Context passed into presubmit checks."""
- repo_root: Path
- output_dir: Path
- paths: Sequence[Path]
-
-
def file_summary(paths: Iterable[Path],
levels: int = 2,
max_lines: int = 12,
@@ -387,348 +121,10 @@
return output
-class Presubmit:
- """Runs a series of presubmit checks on a list of files."""
- def __init__(self, repository_root: Path, output_directory: Path,
- paths: Sequence[Path]):
- self._repository_root = repository_root
- self._output_directory = output_directory
- self._paths = paths
-
- def run(self, program: Program, keep_going: bool = False) -> bool:
- """Executes a series of presubmit checks on the paths."""
-
- title = f'{program.name if program.name else ""} presubmit checks'
- checks = _apply_filters(program, self._paths)
-
- _LOG.debug('Running %s for %s', title, self._repository_root.name)
- print(_title(f'{self._repository_root.name}: {title}'))
-
- _LOG.info('%d of %d checks apply to %s in %s', len(checks),
- len(program), plural(self._paths,
- 'file'), self._repository_root)
-
- print()
- for line in file_summary(self._paths):
- print(line)
- print()
-
- _LOG.debug('Paths:\n%s', '\n'.join(str(path) for path in self._paths))
- if not self._paths:
- print(color_yellow('No files are being checked!'))
-
- _LOG.debug('Checks:\n%s', '\n'.join(c.name for c, _ in checks))
-
- start_time: float = time.time()
- passed, failed, skipped = self._execute_checks(checks, keep_going)
- self._log_summary(time.time() - start_time, passed, failed, skipped)
-
- return not failed and not skipped
-
- def _log_summary(self, time_s: float, passed: int, failed: int,
- skipped: int) -> None:
- summary_items = []
- if passed:
- summary_items.append(f'{passed} passed')
- if failed:
- summary_items.append(f'{failed} failed')
- if skipped:
- summary_items.append(f'{skipped} not run')
- summary = ', '.join(summary_items) or 'nothing was done'
-
- result = _Result.FAIL if failed or skipped else _Result.PASS
- total = passed + failed + skipped
-
- _LOG.debug('Finished running %d checks on %s in %.1f s', total,
- plural(self._paths, 'file'), time_s)
- _LOG.debug('Presubmit checks %s: %s', result.value, summary)
-
- print(
- _box(
- _SUMMARY_BOX, result.colorized(_LEFT, invert=True),
- f'{total} checks on {plural(self._paths, "file")}: {summary}',
- _format_time(time_s)))
-
- @contextlib.contextmanager
- def _context(self, name: str, paths: Sequence[Path]):
- # There are many characters banned from filenames on Windows. To
- # simplify things, just strip everything that's not a letter, digit,
- # or underscore.
- sanitized_name = re.sub(r'[\W_]+', '_', name).lower()
- output_directory = self._output_directory.joinpath(sanitized_name)
- os.makedirs(output_directory, exist_ok=True)
-
- handler = logging.FileHandler(output_directory.joinpath('step.log'),
- mode='w')
- handler.setLevel(logging.DEBUG)
-
- try:
- _LOG.addHandler(handler)
-
- yield PresubmitContext(
- repo_root=self._repository_root.absolute(),
- output_dir=output_directory.absolute(),
- paths=paths,
- )
-
- finally:
- _LOG.removeHandler(handler)
-
- def _execute_checks(self, program,
- keep_going: bool) -> Tuple[int, int, int]:
- """Runs presubmit checks; returns (passed, failed, skipped) lists."""
- passed = failed = 0
-
- for i, (check, paths) in enumerate(program, 1):
- paths = [self._repository_root.joinpath(p) for p in paths]
- with self._context(check.name, paths) as ctx:
- result = check.run(ctx, i, len(program))
-
- if result is _Result.PASS:
- passed += 1
- elif result is _Result.CANCEL:
- break
- else:
- failed += 1
- if not keep_going:
- break
-
- return passed, failed, len(program) - passed - failed
-
-
-def _apply_filters(
- program: Sequence[Callable],
- paths: Sequence[Path]) -> List[Tuple['_Check', Sequence[Path]]]:
- """Returns a list of (check, paths_to_check) for checks that should run."""
- checks = [c if isinstance(c, _Check) else _Check(c) for c in program]
- filter_to_checks: Dict[_PathFilter, List[_Check]] = defaultdict(list)
-
- for check in checks:
- filter_to_checks[check.filter].append(check)
-
- check_to_paths = _map_checks_to_paths(filter_to_checks, paths)
- return [(c, check_to_paths[c]) for c in checks if c in check_to_paths]
-
-
-def _map_checks_to_paths(
- filter_to_checks: Dict['_PathFilter', List['_Check']],
- paths: Sequence[Path]) -> Dict['_Check', Sequence[Path]]:
- checks_to_paths: Dict[_Check, Sequence[Path]] = {}
-
- str_paths = [path.as_posix() for path in paths]
-
- for filt, checks in filter_to_checks.items():
- exclude = [re.compile(exp) for exp in filt.exclude]
-
- filtered_paths = tuple(
- Path(path) for path in str_paths
- if any(path.endswith(end) for end in filt.endswith) and not any(
- exp.search(path) for exp in exclude))
-
- for check in checks:
- if filtered_paths or check.always_run:
- checks_to_paths[check] = filtered_paths
- else:
- _LOG.debug('Skipping "%s": no relevant files', check.name)
-
- return checks_to_paths
-
-
-def run_presubmit(program: Sequence[Callable],
- base: Optional[str] = None,
- paths: Sequence[Path] = (),
- exclude: Sequence[Pattern] = (),
- repo_path: Path = Optional[None],
- output_directory: Optional[Path] = None,
- keep_going: bool = False) -> bool:
- """Lists files in the current Git repo and runs a Presubmit with them.
-
- This changes the directory to the root of the Git repository after listing
- paths, so all presubmit checks can assume they run from there.
-
- Args:
- program: list of presubmit check functions to run
- name: name to use to refer to this presubmit check run
- base: optional base Git commit to list files against
- paths: optional list of paths to run the presubmit checks against
- exclude: regular expressions of paths to exclude from checks
- repo_path: path in the git repository to check
- output_directory: where to place output files
- keep_going: whether to continue running checks if an error occurs
-
- Returns:
- True if all presubmit checks succeeded
- """
- if repo_path is None:
- repo_path = Path.cwd()
-
- if not is_git_repo(repo_path):
- _LOG.critical('Presubmit checks must be run from a Git repo')
- return False
-
- files = list_git_files(base, paths, exclude, repo_path)
- root = git_repo_path(repo=repo_path)
-
- _LOG.info('Checking %s',
- describe_files_in_repo(root, repo_path, base, paths, exclude))
-
- files = [path.relative_to(root) for path in files]
-
- if output_directory is None:
- output_directory = root.joinpath('.presubmit')
-
- presubmit = Presubmit(
- repository_root=root,
- output_directory=Path(output_directory),
- paths=files,
- )
-
- if not isinstance(program, Program):
- program = Program('', program)
-
- return presubmit.run(program, keep_going)
-
-
-def find_python_packages(python_paths, repo='.') -> Dict[str, List[str]]:
- """Returns Python package directories for the files in python_paths."""
- setup_pys = [
- os.path.dirname(file)
- for file in _git_ls_files(['setup.py', '*/setup.py'], repo)
- ]
-
- package_dirs: Dict[str, List[str]] = defaultdict(list)
-
- for path in (os.path.abspath(p) for p in python_paths):
- try:
- setup_dir = max(setup for setup in setup_pys
- if path.startswith(setup))
- package_dirs[os.path.abspath(setup_dir)].append(path)
- except ValueError:
- continue
-
- return package_dirs
-
-
-class _PathFilter(NamedTuple):
- endswith: Tuple[str, ...] = ('', )
- exclude: Tuple[str, ...] = ()
-
-
-class _Check:
- """Wraps a presubmit check function.
-
- This class consolidates the logic for running and logging a presubmit check.
- It also supports filtering the paths passed to the presubmit check.
- """
- def __init__(self,
- check_function: Callable,
- path_filter: _PathFilter = _PathFilter(),
- always_run: bool = True):
- _ensure_is_valid_presubmit_check_function(check_function)
-
- self._check: Callable = check_function
- self.filter: _PathFilter = path_filter
- self.always_run: bool = always_run
-
- # Since _Check wraps a presubmit function, adopt that function's name.
- self.__name__ = self._check.__name__
-
- @property
- def name(self):
- return self.__name__
-
- def run(self, ctx: PresubmitContext, count: int, total: int) -> _Result:
- """Runs the presubmit check on the provided paths."""
-
- print(
- _box(_CHECK_UPPER, f'{count}/{total}', self.name,
- plural(ctx.paths, "file")))
-
- _LOG.debug('[%d/%d] Running %s on %s', count, total, self.name,
- plural(ctx.paths, "file"))
-
- start_time_s = time.time()
- result = self._call_function(ctx)
- time_str = _format_time(time.time() - start_time_s)
- _LOG.debug('%s %s', self.name, result.value)
-
- print(_box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str))
- _LOG.debug('%s duration:%s', self.name, time_str)
-
- return result
-
- def _call_function(self, ctx: PresubmitContext) -> _Result:
- try:
- self._check(ctx)
- except PresubmitFailure as failure:
- if str(failure):
- _LOG.warning('%s', failure)
- return _Result.FAIL
- except Exception as failure: # pylint: disable=broad-except
- _LOG.exception('Presubmit check %s failed!', self.name)
- return _Result.FAIL
- except KeyboardInterrupt:
- print()
- return _Result.CANCEL
-
- return _Result.PASS
-
- def __call__(self, ctx: PresubmitContext, *args, **kwargs):
- """Calling a _Check calls its underlying function directly.
-
- This makes it possible to call functions wrapped by @filter_paths. The
- prior filters are ignored, so new filters may be applied.
- """
- return self._check(ctx, *args, **kwargs)
-
-
-def _ensure_is_valid_presubmit_check_function(check: Callable) -> None:
- """Checks if a Callable can be used as a presubmit check."""
- try:
- params = signature(check).parameters
- except (TypeError, ValueError):
- raise TypeError('Presubmit checks must be callable, but '
- f'{check!r} is a {type(check).__name__}')
-
- required_args = [p for p in params.values() if p.default == p.empty]
- if len(required_args) != 1:
- raise TypeError(
- f'Presubmit check functions must have exactly one required '
- f'positional argument (the PresubmitContext), but '
- f'{check.__name__} has {len(required_args)} required arguments' +
- (f' ({", ".join(a.name for a in required_args)})'
- if required_args else ''))
-
-
-def _make_tuple(value: Iterable[str]) -> Tuple[str, ...]:
+def make_tuple(value: Iterable[str]) -> Tuple[str, ...]:
return tuple([value] if isinstance(value, str) else value)
-def filter_paths(endswith: Iterable[str] = (''),
- exclude: Iterable[str] = (),
- always_run: bool = False):
- """Decorator for filtering the paths list for a presubmit check function.
-
- Path filters only apply when the function is used as a presubmit check.
- Filters are ignored when the functions are called directly. This makes it
- possible to reuse functions wrapped in @filter_paths in other presubmit
- checks, potentially with different path filtering rules.
-
- Args:
- endswith: str or iterable of path endings to include
- exclude: regular expressions of paths to exclude
-
- Returns:
- a wrapped version of the presubmit function
- """
- def filter_paths_for_function(function: Callable):
- return _Check(function,
- _PathFilter(_make_tuple(endswith), _make_tuple(exclude)),
- always_run=always_run)
-
- return filter_paths_for_function
-
-
def log_run(args, **kwargs) -> subprocess.CompletedProcess:
"""Logs a command then runs it with subprocess.run.
@@ -742,44 +138,6 @@
return subprocess.run(args, **kwargs)
-def call(*args, **kwargs) -> None:
- """Optional subprocess wrapper that causes a PresubmitFailure on errors."""
- attributes = ', '.join(f'{k}={v}' for k, v in sorted(kwargs.items()))
- command = ' '.join(shlex.quote(str(arg)) for arg in args)
- _LOG.debug('[RUN] %s\n%s', attributes, command)
-
- process = subprocess.run(args,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- **kwargs)
- logfunc = _LOG.warning if process.returncode else _LOG.debug
-
- logfunc('[FINISHED]\n%s', command)
- logfunc('[RESULT] %s with return code %d',
- 'Failed' if process.returncode else 'Passed', process.returncode)
-
- output = process.stdout.decode(errors='backslashreplace')
- if output:
- logfunc('[OUTPUT]\n%s', output)
-
- if process.returncode:
- raise PresubmitFailure
-
-
-@filter_paths(endswith='.h')
-def pragma_once(ctx: PresubmitContext) -> None:
- """Presubmit check that ensures all header files contain '#pragma once'."""
-
- for path in ctx.paths:
- _LOG.debug('Checking %s', path)
- with open(path) as file:
- for line in file:
- if line.startswith('#pragma once'):
- break
- else:
- raise PresubmitFailure('#pragma once is missing!', path=path)
-
-
def flatten(*items) -> Iterator:
"""Yields items from a series of items and nested iterables.
diff --git a/pw_presubmit/py/tools_test.py b/pw_presubmit/py/tools_test.py
index 8db92e9..5c9d165 100755
--- a/pw_presubmit/py/tools_test.py
+++ b/pw_presubmit/py/tools_test.py
@@ -12,7 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
-"""Tests for presubmit tools."""
+"""Tests for general purpose tools."""
import unittest
@@ -45,44 +45,5 @@
[[[[123]]], 45.6])))
-def _fake_function_1(_):
- """Fake presubmit function."""
-
-
-def _fake_function_2(_):
- """Fake presubmit function."""
-
-
-class ProgramsTest(unittest.TestCase):
- """Tests the presubmit Programs abstraction."""
- def setUp(self):
- self._programs = tools.Programs(
- first=[_fake_function_1, (), [(_fake_function_1, )]],
- second=[_fake_function_2],
- )
-
- def test_empty(self):
- self.assertEqual({}, tools.Programs())
-
- def test_access_present_members(self):
- self.assertEqual('first', self._programs['first'].name)
- self.assertEqual((_fake_function_1, _fake_function_1),
- tuple(self._programs['first']))
-
- self.assertEqual('second', self._programs['second'].name)
- self.assertEqual((_fake_function_2, ), tuple(self._programs['second']))
-
- def test_access_missing_member(self):
- with self.assertRaises(KeyError):
- _ = self._programs['not_there']
-
- def test_all_steps(self):
- self.assertEqual(
- {
- '_fake_function_1': _fake_function_1,
- '_fake_function_2': _fake_function_2,
- }, self._programs.all_steps())
-
-
if __name__ == '__main__':
unittest.main()