[go: up one dir, main page]

blob: 71708a7cb7a347b2fe1f142d900413dbe96dea3a [file] [log] [blame] [edit]
# Copyright 2024 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Gerrit python library.
This library contains all of the functionality for gerrit used by copybot.
"""
from __future__ import annotations
import contextlib
import dataclasses
import enum
import json
import logging
import os
import pathlib
import re
import shlex
import subprocess
import tempfile
import time
from typing import (
Any,
Dict,
Iterable,
List,
Optional,
Protocol,
Sequence,
Tuple,
Union,
)
import requests # pylint: disable=import-error
logger = logging.getLogger(__name__)
# Matches a full 40-character commit hash.
_COMMIT_HASH_PATTERN = re.compile(r"\b[0-9a-f]{40}\b")
# Matches a full 40-character parent commit hash.
_PARENT_COMMIT_HASH_PATTERN = re.compile(r"parent \b[0-9a-f]{40}\b")
class MergeConflictBehavior(enum.Enum):
"""How to behave on merge conflicts.
FAIL: Stop immediately. Don't upload anything.
SKIP: Skip the commit that failed to merge. Summarize the failed
commits at the end of the execution, and exit failure status.
STOP: Stop immediately. Upload staged changes prior to conflict.
ALLOW_CONFLICT: Commit the conflicted CL with conflicts. Summarize
the conflicted CLs at the end of execution, and exit failure
status. Conflicted CLs WILL be uploaded to the downstream.
"Commit: false" will be added to the commit message to prevent
GoB from committing conflicted changes before they are edited.
"""
FAIL = enum.auto()
SKIP = enum.auto()
STOP = enum.auto()
ALLOW_CONFLICT = enum.auto()
class ExclusionBehavior(enum.Enum):
"""How to behave on exclusions.
DROP: Drop any CL containing content in the excluded directories.
This method uses git path exclusions and generally runs faster.
FILTER: Filter out files/folders matching the exclusion rules from
the target CLs.
"""
DROP = enum.auto()
FILTER = enum.auto()
class MergeConflictError(Exception):
"""A commit cannot be cherry-picked due to a conflict."""
class EmptyCommitError(Exception):
"""A commit cannot be cherry-picked as it results in an empty commit."""
class CommitDoesNotApplyError(Exception):
"""A commit cannot be cherry-picked as it does not apply."""
class CopybotFatalError(Exception):
"""Copybot fatal error."""
enum_name = "FAILURE_UNKNOWN"
def __init__(
self,
*args: str,
commits: Optional[Sequence[str]] = None,
**kwargs: str,
):
self.commits = commits
super().__init__(*args, **kwargs)
class FetchError(CopybotFatalError):
"""Copybot died as it has failed to fetch."""
enum_name = "FAILURE_FETCH_ERROR"
class UpstreamFetchError(CopybotFatalError):
"""Copybot died as the upstream failed to fetch."""
enum_name = "FAILURE_UPSTREAM_FETCH_ERROR"
class DownstreamFetchError(CopybotFatalError):
"""Copybot died as the downstream failed to fetch."""
enum_name = "FAILURE_DOWNSTREAM_FETCH_ERROR"
class PushError(CopybotFatalError):
"""Copybot died as it failed to push to the downstream GoB host."""
enum_name = "FAILURE_DOWNSTREAM_PUSH_ERROR"
class MergeConflictsError(CopybotFatalError):
"""Copybot ran, but encountered merge conflicts."""
enum_name = "FAILURE_MERGE_CONFLICTS"
@dataclasses.dataclass
class GerritClInfo:
"""Stores information for a Gerrit CL."""
def __init__(self, change_id: str, hashtags: str, ref: str) -> None:
"""Initialize the Gerrit CL Info.
Args:
change_id: Change ID on Gerrit for this change.
hashtags: Hashtags associated with this change
ref: Current ref for this changes to be able to form a refspec
"""
self.change_id = change_id
self.hashtags = hashtags
self.current_ref = ref
class GitRepoInterface(Protocol):
"""Interface for common Git repository actions."""
git_dir: pathlib.Path
def __init__(self, git_dir: Union[str, "os.PathLike[str]"]) -> None: ...
def rev_parse(self, rev: str = "HEAD") -> str: ...
def fetch(self, remote: str, ref: str = "") -> str: ...
def checkout(self, ref: str, *args: list[str]) -> None: ...
def log(
self,
revision_range: str = "HEAD",
fmt: str = "",
num: int = 0,
subtree: Union[str, "os.PathLike[str]"] = "",
exclude_file_patterns: Iterable[str | "os.PathLike[str]"] = (),
) -> str: ...
def log_raw(self, *args) -> str: ...
def log_hashes(
self,
revision_range: str | None = "HEAD",
num: int = 0,
subtree: Union[str, "os.PathLike[str]"] = "",
exclude_file_patterns: Iterable[str | "os.PathLike[str]"] = (),
) -> List[str]: ...
def get_commit_message(self, rev: str = "HEAD") -> str: ...
def get_author_email(self, rev: str = "HEAD") -> str: ...
def get_author_name(self, rev: str = "HEAD") -> str: ...
def get_subject(self, rev: str = "HEAD") -> str: ...
def commit_file_list(self, rev: str = "HEAD") -> List[str]: ...
def reword(
self, new_message: str, sign_off: bool = False, update_author: str = ""
) -> str: ...
def filter_commit(
self,
rev: str = "HEAD",
patch_dir: Union[str, "os.PathLike[str]"] = "",
files: Iterable[str] = (),
): ...
def cherry_pick(
self,
rev: str,
patch_dir: Union[str, "os.PathLike[str]"] = "",
upstream_subtree: Union[str, "os.PathLike[str]"] = "",
downstream_subtree: Union[str, "os.PathLike[str]"] = "",
include_paths: Optional[List[Union[str, "os.PathLike[str]"]]] = None,
exclude_paths: Optional[List[Union[str, "os.PathLike[str]"]]] = None,
allow_conflict: bool = False,
) -> None: ...
def push(
self, url: str, refspec: str, options: Iterable[str] = ()
) -> None: ...
def get_cl_count(
self,
original_rev: str,
current_rev: str | None,
subtree: Union[str, "os.PathLike[str]"] = "",
) -> int: ...
def add_remote(
self,
url: str,
name: str,
) -> None: ...
class GitRepo:
"""Class wrapping common Git repository actions."""
def __init__(self, git_dir: Union[str, "os.PathLike[str]"]) -> None:
"""Do a `git init` to create a new repository."""
self.git_dir = pathlib.Path(git_dir)
if not (self.git_dir / ".git").exists():
self._run_git("init")
def _run_git(
self, *args: Any, **kwargs: Any
) -> "subprocess.CompletedProcess[str]":
"""Wrapper to run git with the provided arguments."""
argv = ["git", "-C", self.git_dir, "--no-pager", *args]
logger.info("Run `%s`", " ".join(shlex.quote(str(arg)) for arg in argv))
kwargs.setdefault("encoding", "utf-8")
kwargs.setdefault("errors", "replace")
try:
return subprocess.run(
argv,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
except subprocess.CalledProcessError as e:
logger.error("Git command failed!")
logger.error(" STDOUT:")
for line in e.stdout.splitlines():
logger.error(" %s", line)
logger.error(" STDERR:")
for line in e.stderr.splitlines():
logger.error(" %s", line)
raise
def rev_parse(self, rev: str = "HEAD") -> str:
"""Do a `git rev-parse`."""
result = self._run_git("rev-parse", rev)
return result.stdout.rstrip()
def fetch(self, remote: str, ref: str = "") -> str:
"""Do a `git fetch`.
Returns:
The full commit hash corresponding to FETCH_HEAD.
"""
extra_args = []
if ref:
extra_args.append(ref)
self._run_git("fetch", remote, *extra_args)
return self.rev_parse("FETCH_HEAD")
def checkout(self, ref: str, *args: list[str]) -> None:
"""Do a `git checkout`."""
self._run_git("checkout", *args, ref)
def log(
self,
revision_range: str = "HEAD",
fmt: str = "",
num: int = 0,
subtree: Union[str, "os.PathLike[str]"] = "",
exclude_file_patterns: Iterable[str | "os.PathLike[str]"] = (),
) -> str:
"""Do a `git log`."""
extra_args = ["--first-parent"]
if fmt:
extra_args.append(f"--format={fmt}")
if num:
extra_args.append(f"-n{num}")
extra_args.append("--")
extra_args.extend(
[f":!{path}" for path in (exclude_file_patterns or [])]
)
if subtree:
extra_args.append(str(subtree))
result = self._run_git("log", revision_range, *extra_args)
return result.stdout.strip()
def log_raw(self, *args) -> str:
"""Raw version of git log simply passing all provided args."""
result = self._run_git("log", *args)
return result.stdout.strip()
def log_hashes(
self,
revision_range: str | None = "HEAD",
num: int = 0,
subtree: Union[str, "os.PathLike[str]"] = "",
exclude_file_patterns: Iterable[str | "os.PathLike[str]"] = (),
) -> List[str]:
"""Get the commit log as a list of commit hashes."""
assert revision_range is not None
result = self.log(
revision_range=revision_range,
fmt="%H",
num=num,
subtree=subtree,
exclude_file_patterns=exclude_file_patterns,
)
return result.splitlines()
def get_commit_message(self, rev: str = "HEAD") -> str:
"""Get a commit message of a commit."""
return self.log(revision_range=rev, num=1, fmt="%B")
def get_author_email(self, rev: str = "HEAD") -> str:
"""Get the authors email of a commit."""
return self.log(revision_range=rev, num=1, fmt="%aE")
def get_author_name(self, rev: str = "HEAD") -> str:
"""Get the authors name of a commit."""
return self.log(revision_range=rev, num=1, fmt="%aN")
def get_subject(self, rev: str = "HEAD") -> str:
"""Get the subject of a commit."""
return self.log(revision_range=rev, num=1, fmt="%s")
def commit_file_list(self, rev: str = "HEAD") -> List[str]:
"""Get the files modified by a commit."""
result = self._run_git("show", "--pretty=", "--name-only", rev)
return result.stdout.splitlines()
def show(
self, rev: str = "HEAD", files: Iterable[str] = (), patch_dir=""
) -> str:
"""Do a `git show`."""
extra_args = ["--output", f"{patch_dir}/filtered.patch"]
if files:
extra_args.append("--")
extra_args.extend(files)
self._run_git("show", rev, *extra_args)
return f"{patch_dir}/filtered.patch"
def apply(
self,
patch: Union[str, "os.PathLike[str]"],
path: Union[str, "os.PathLike[str]"] = "",
include_paths: Optional[List[Union[str, "os.PathLike[str]"]]] = None,
exclude_paths: Optional[List[Union[str, "os.PathLike[str]"]]] = None,
) -> "subprocess.CompletedProcess[str]":
"""Apply a patch to the staging area."""
extra_args = []
if path and str(path) != ".":
extra_args.append(f"--directory={path}")
if include_paths:
extra_args.extend(f"--include={path}" for path in include_paths)
if exclude_paths:
extra_args.extend(f"--exclude={path}" for path in exclude_paths)
extra_args.append(str(patch))
return self._run_git("apply", *extra_args)
def commit(
self,
message: str,
amend: bool = False,
sign_off: bool = False,
stage: bool = False,
update_author: str = "",
allow_empty: bool = False,
) -> str:
"""Create a commit.
Returns:
The commit hash.
"""
extra_args = []
if update_author:
extra_args.extend(["--author", update_author])
if stage:
extra_args.append("--all")
if amend:
extra_args.append("--amend")
if sign_off:
extra_args.append("--signoff")
if allow_empty:
extra_args.append("--allow-empty")
self._run_git("commit", *extra_args, "-m", message)
return self.rev_parse()
def reword(
self, new_message: str, sign_off: bool = False, update_author: str = ""
) -> str:
"""Reword the commit at HEAD.
Returns:
The new commit hash.
"""
return self.commit(
new_message,
amend=True,
sign_off=sign_off,
update_author=update_author,
)
def format_patch(
self,
rev: str,
output_path: Union[str, "os.PathLike[str]"],
num: int = 1,
relative_path: Union[str, "os.PathLike[str]"] = "",
) -> pathlib.Path:
"""Generate patch from revision with an optional relative path.
Returns:
Path of the output patch.
"""
extra_args = []
if relative_path and str(relative_path) != ".":
extra_args.append(f"--relative={relative_path}")
extra_args.extend([f"-{num}", rev, f"--output-directory={output_path}"])
result = self._run_git("format-patch", *extra_args)
return pathlib.Path(result.stdout.rstrip())
def add(
self,
path: Union[str, "os.PathLike[str]"],
stage: bool = False,
force: bool = False,
) -> "subprocess.CompletedProcess[str]":
"""Add unstaged files."""
extra_args = []
if stage:
extra_args.append("--all")
if force:
extra_args.append("--force")
if path:
extra_args.append(str(path))
return self._run_git("add", *extra_args)
def get_subtree_lowest_working_dir(
self, path: Union[str, "os.PathLike[str]"]
) -> Union[str, "os.PathLike[str]"]:
"""Get the lowest working directory path for subtree."""
patch_dir = pathlib.Path(self.git_dir)
if path:
patch_dir = patch_dir / path
subtree = path
if not patch_dir.is_dir():
subtree = pathlib.Path(path).parents[0]
return subtree
@contextlib.contextmanager
def temp_worktree(self, rev="HEAD"):
"""Context manager to create and destroy a temporary worktree."""
# pylint: disable=consider-using-with
tmpdir = tempfile.TemporaryDirectory()
try:
worktree_dir = pathlib.Path(tmpdir.name)
self._run_git("worktree", "add", "-d", worktree_dir, rev)
try:
yield self.__class__(worktree_dir)
finally:
self._run_git("worktree", "remove", "--force", worktree_dir)
finally:
# We use a try/finally to cleanup the temporary directory
# instead of a context manager as, in the successful
# condition, the worktree directory will no longer exist
# (git will have removed it). In Python 3.10+, one can
# use ignore_cleanup_errors=True, but the chroot is on
# Python 3.6 right now.
try:
tmpdir.cleanup()
except FileNotFoundError:
pass
def filter_commit(
self,
rev: str = "HEAD",
patch_dir: Union[str, "os.PathLike[str]"] = "",
files: Iterable[str] = (),
):
"""Filter a commit to just certain files.
Returns:
The new commit hash.
"""
old_message = self.get_commit_message(rev)
patch = self.show(rev, files, patch_dir)
with self.temp_worktree(f"{rev}~1") as worktree:
worktree.apply(patch)
worktree.add("", stage=True, force=True)
return worktree.commit(old_message)
def is_merge_commit(
self,
rev: str = "HEAD",
):
extra_args = ["-p"]
result = self._run_git("cat-file", rev, *extra_args)
return (
len(
re.findall(
_PARENT_COMMIT_HASH_PATTERN, str(result.stdout.rstrip())
)
)
> 1
)
def cherry_pick(
self,
rev: str,
patch_dir: Union[str, "os.PathLike[str]"] = "",
upstream_subtree: Union[str, "os.PathLike[str]"] = "",
downstream_subtree: Union[str, "os.PathLike[str]"] = "",
include_paths: Optional[List[Union[str, "os.PathLike[str]"]]] = None,
exclude_paths: Optional[List[Union[str, "os.PathLike[str]"]]] = None,
allow_conflict: bool = False,
) -> None:
"""Do a `git cherry-pick`.
This will first try without any merge options, and if that fails,
try again with -Xpatience, which is slower, but may be more likely
to resolve a merge conflict.
Raises:
EmptyCommitError: The resultant commit was empty and should be
skipped.
CommitDoesNotApplyError: The desired upstream CL does not apply
to the downstream repo.
MergeConflictError: There was a merge conflict that could not
be resolved automatically with -Xpatience.
"""
def _try_cherry_pick(extra_flags: List[str]) -> None:
try:
self._run_git("cherry-pick", "-x", rev, *extra_flags)
except subprocess.CalledProcessError as e:
try:
self._run_git("rev-parse", "--verify", "CHERRY_PICK_HEAD")
logger.warning("Could not cherry-pick")
except subprocess.CalledProcessError as err:
if "is a merge but no -m option was given" in e.stderr:
logger.warning("Merge commit detected")
raise MergeConflictError() from err
if allow_conflict:
if "patch does not apply" in e.stderr:
logger.warning("Patch does not apply to downstream")
raise CommitDoesNotApplyError() from e
self.add(downstream_subtree, stage=True, force=True)
self.commit(
self.get_commit_message(rev),
amend=False,
sign_off=False,
stage=True,
)
else:
self._run_git("cherry-pick", "--abort")
if "The previous cherry-pick is now empty" in e.stderr:
raise EmptyCommitError() from e
raise MergeConflictError() from e
cherry_pick_flag_list: List[List[str]] = []
if self.is_merge_commit(rev):
cherry_pick_flag_list += [
["-m", "1"],
["-m", "1", "-Xpatience"],
["-m", "2"],
["-m", "2", "-Xpatience"],
]
else:
cherry_pick_flag_list += [
[],
["-Xpatience"],
["--strategy=recursive", "-X", "theirs"],
]
if downstream_subtree or upstream_subtree or include_paths:
cherry_pick_flag_list = []
for flags in cherry_pick_flag_list:
try:
_try_cherry_pick(flags)
except MergeConflictError:
continue
else:
return
patch = self.format_patch(
rev,
patch_dir,
1,
self.get_subtree_lowest_working_dir(upstream_subtree),
)
try:
self.apply(
patch=patch,
path=self.get_subtree_lowest_working_dir(downstream_subtree),
include_paths=include_paths,
exclude_paths=exclude_paths,
)
except subprocess.CalledProcessError as e:
if not allow_conflict:
raise MergeConflictError() from e
if (
'No valid patches in input (allow with "--allow-empty")'
in e.stderr
):
self.commit(
self.get_commit_message(rev),
amend=False,
sign_off=False,
stage=False,
allow_empty=True,
)
raise EmptyCommitError() from e
self.add(downstream_subtree, stage=True, force=True)
try:
self.commit(
self.get_commit_message(rev),
amend=False,
sign_off=False,
stage=True,
)
except subprocess.CalledProcessError as e:
if "nothing to commit, working tree clean" in e.stderr:
logger.info("Empty commit error")
self.commit(
self.get_commit_message(rev),
amend=False,
sign_off=False,
stage=False,
allow_empty=True,
)
raise EmptyCommitError() from e
return
def push(self, url: str, refspec: str, options: Iterable[str] = ()) -> None:
"""Do a `git push`."""
args = []
if options:
for option in options:
args.extend(["-o", option])
args.append(url)
args.append(refspec)
self._run_git("push", *args)
def get_cl_count(
self,
original_rev: str,
current_rev: str | None,
subtree: Union[str, "os.PathLike[str]"] = "",
) -> int:
"""Get the number of CLs between the specified revisions."""
if not original_rev or not current_rev:
return 0
args = ["--count"]
args.append(f"{original_rev}..{current_rev}")
if subtree:
args.append("--")
args.append(str(subtree))
result = self._run_git("rev-list", *args)
return int(result.stdout.rstrip())
def add_remote(
self,
url: str,
name: str,
) -> None:
self._run_git("remote", "add", name, url)
def _pseudoheader_pattern(separator: str = ":"):
# Matches lines that look like a "header" (the conventional footer
# lines in a commit message).
return re.compile(
rf"^(?{separator}[A-Za-z0-9]+-)*[A-Za-z0-9]+{separator}\s*"
)
class Pseudoheaders:
"""Dictionary-like object for the pseudoheaders from a commit message.
The pseudoheaders are the header-like lines often found in the
bottom of a commit message. Header names are case-insensitive.
Pseudoheaders are parsed the same way that the "git footers"
command parses them.
"""
def __init__(self, header_list: Iterable[Tuple[str, str]] = ()) -> None:
if header_list:
self._header_list = list(header_list)
else:
self._header_list = []
@classmethod
def from_commit_message(
cls, commit_message: str, offset: int = 1, separator: str = ":"
) -> Tuple[Pseudoheaders, str]:
"""Parse pseudoheaders from a commit message.
Args:
commit_message: commit message from git log.
offset: which line to start processing the message from.
Lines less than the offset will not be altered.
separator: character used to separate tag from value
Returns:
Two values, a Pseudoheaders dictionary, and the commit
message without any pseudoheaders.
"""
message_lines = commit_message.splitlines()
rewritten_message = []
header_list = []
for i, line in enumerate(message_lines):
if i < offset or not _pseudoheader_pattern(separator).match(line):
rewritten_message.append(line)
else:
name, _, value = line.partition(separator)
header_list.append((name, value.strip()))
return cls(header_list), "".join(
f"{line}\n" for line in rewritten_message
)
def prefix(
self, prefix: str = "Original-", keep: Iterable[str] = ()
) -> Pseudoheaders:
"""Prefix all header keys with a string.
Args:
prefix: The prefix to use.
keep: Headers which should not be modified.
Returns:
A new Pseudoheaders dictionary.
"""
new_header_list = []
# Constructing a new pseudoheaders dictionary ensures we
# consider the keep list to be case insensitive.
keep_dict: Pseudoheaders = Pseudoheaders()
if keep:
keep_dict = self.__class__([(key, "Keeping") for key in keep])
for key, value in self._header_list:
if keep_dict.get(key):
new_header_list.append((key, value))
else:
new_header_list.append((f"{prefix}{key}", value))
return self.__class__(new_header_list)
def __getitem__(self, item: str) -> str:
"""Get a header value by name."""
for key, value in self._header_list:
if key.lower() == item.lower():
return value
raise KeyError(item)
def get(self, item: str, default: str = "") -> str:
"""Get a header value by name, or return a default value."""
try:
return self[item]
except KeyError:
return default
def as_dict(self) -> Dict[str, str]:
"""Get the dict of stored values."""
return dict(self._header_list)
def __setitem__(self, key: str, value: str) -> None:
"""Add a header."""
self._header_list.append((key, value))
def add_to_commit_message(self, commit_message: str) -> str:
"""Add our pseudoheaders to a commit message.
Returns:
The new commit message.
"""
message_lines = commit_message.splitlines()
if not message_lines:
message_lines = ["NO COMMIT MESSAGE"]
# Ensure exactly one blank line separating body and pseudoheaders.
while not message_lines[-1].strip():
message_lines.pop()
message_lines.append("")
for key, value in self._header_list:
message_lines.append(f"{key}: {value}")
return "".join(f"{line}\n" for line in message_lines)
def __str__(self) -> str:
return "\n".join(f"{key}:{value}" for key, value in self._header_list)
def update(self, other: Union[dict, Pseudoheaders]) -> None:
if isinstance(other, type(self)):
for key, value in other.as_dict().items():
self[key] = value
elif isinstance(other, dict):
for key, value in other:
self[key] = value
else:
raise TypeError(f"Other class has conflicting type({type(other)})")
class GerritInterface(Protocol):
"""Interface for actions on a Gerrit host."""
def __init__(self, hostname: str) -> None: ...
def find_pending_changes(
self,
project: str,
branch: str,
hashtags: Iterable[str] = (),
subtree: Union[str, "os.PathLike[str]"] = "",
exclude_paths: Iterable[str | "os.PathLike[str]"] = (),
) -> Tuple[Dict[str, GerritClInfo], Dict[str, GerritClInfo]]: ...
class Gerrit:
"""Wrapper for actions on a Gerrit host."""
def __init__(self, hostname: str) -> None:
self.hostname = hostname
def search(self, query: str) -> List[Dict[str, Any]]:
"""Do a query on Gerrit."""
url = f"https://{self.hostname}/changes/"
params = [
("q", query),
("o", "CURRENT_REVISION"),
("o", "CURRENT_COMMIT"),
("o", "COMMIT_FOOTERS"),
]
while True:
r = requests.get(url, params=params)
if r.ok:
break
if r.status_code == requests.codes.too_many:
time.sleep(1)
continue
r.raise_for_status()
assert False
if r.text[:5] != ")]}'\n":
logger.error("Bad response from Gerrit: %r", r.text)
raise ValueError("Unexpected JSON payload from gerrit")
result = json.loads(r.text[5:])
return result
def find_pending_changes(
self,
project: str,
branch: str,
hashtags: Iterable[str] = (),
subtree: Union[str, "os.PathLike[str]"] = "",
exclude_paths: Iterable[str | "os.PathLike[str]"] = (),
) -> Tuple[Dict[str, GerritClInfo], Dict[str, GerritClInfo]]:
"""Find pending changes previously opened by CopyBot on Gerrit.
Returns:
A dictionary mapping upstream commit hashes to their
current Change-Id on Gerrit and their associated hashtags.
"""
query = [
"(status:open or status:abandoned)",
f"project:{project}",
f"branch:{branch}",
]
_, subtree_ext = os.path.splitext(subtree)
if hashtags:
query.extend(f"hashtag:{hashtag}" for hashtag in hashtags)
if subtree and not subtree_ext:
query.append(f"directory:{subtree}")
if exclude_paths:
for path in exclude_paths:
_, path_ext = os.path.splitext(path)
if not path_ext:
query.append(f"-directory:{path}")
query_result = self.search(" ".join(query))
pending_change_ids = {}
abandoned_change_ids = {}
for cl in query_result:
change_id = cl["change_id"]
current_revision_hash = cl["current_revision"]
current_revision_data = cl["revisions"][current_revision_hash]
commit_message = current_revision_data["commit"]["message"]
origin_rev_id = get_origin_rev_id(commit_message)
rev_id = []
if origin_rev_id:
rev_id.append(origin_rev_id)
else:
for commit_hash in _COMMIT_HASH_PATTERN.finditer(
commit_message
):
rev_id.append(commit_hash.group(0))
for rev in rev_id:
if cl["status"] == "ABANDONED":
abandoned_change_ids[rev] = GerritClInfo(
change_id,
cl["hashtags"].copy(),
current_revision_data["ref"],
)
else:
pending_change_ids[rev] = GerritClInfo(
change_id,
cl["hashtags"].copy(),
current_revision_data["ref"],
)
return pending_change_ids, abandoned_change_ids
def generate_change_id() -> str:
"""Generate a Unique Change-Id."""
return f"I{os.urandom(20).hex()}"
def get_change_id(commit_message: str) -> str:
"""Get the Change-Id from a commit message.
Returns:
The Change-Id if one was found, or None otherwise.
"""
pseudoheaders, _ = Pseudoheaders.from_commit_message(commit_message)
return pseudoheaders.get("Change-Id")
def get_origin_rev_id(commit_message: str) -> str:
"""Get the origin revision hash from a commit message.
Returns:
The revision hash if one was found, or None otherwise.
"""
pseudoheaders, _ = Pseudoheaders.from_commit_message(commit_message)
origin_revid = pseudoheaders.get("GitOrigin-RevId")
if not origin_revid:
origin_revid = pseudoheaders.get("Original-Commit-Id")
return origin_revid