Я использую gitpython==2.1.11
с Python 3.7.Ниже приведена моя функция push, в которой я сначала пробую высокоуровневый push, а затем низкоуровневый push при необходимости.Обратите внимание, как я проверяю возвращаемое значение любой команды.Я также регистрирую действия push, и это объясняет, что происходит на каждом этапе.
class GitCommandError(Exception):
pass
class Git:
def _commit_and_push_repo(self) -> None:
repo = self._repo
remote = repo.remote()
remote_name = remote.name
branch_name = repo.active_branch.name
# Note: repo.index.entries was observed to also include unpushed files in addition to uncommitted files.
log.debug('Committing repository index in active branch "%s".', branch_name)
self._repo.index.commit('')
log.info('Committed repository index in active branch "%s".', branch_name)
def _is_pushed(push_info: git.remote.PushInfo) -> bool:
valid_flags = {push_info.FAST_FORWARD, push_info.NEW_HEAD} # UP_TO_DATE flag is intentionally skipped.
return push_info.flags in valid_flags # This check can require the use of & instead.
push_desc = f'active branch "{branch_name}" to repository remote "{remote_name}"'
log.debug('Pushing %s.', push_desc)
try:
push_info = remote.push()[0]
except git.exc.GitCommandError: # Could be due to no upstream branch.
log.warning('Failed to push %s. This could be due to no matching upstream branch.', push_desc)
log.info('Reattempting to push %s using a lower-level command which also sets upstream branch.', push_desc)
push_output = repo.git.push('--set-upstream', remote_name, branch_name)
log.info('Push output was: %s', push_output)
expected_msg = f"Branch '{branch_name}' set up to track remote branch '{branch_name}' from '{remote_name}'."
if push_output != expected_msg:
raise RepoPushError(f'Failed to push {push_desc}.')
else:
is_pushed = _is_pushed(push_info)
logger = log.debug if is_pushed else log.warning
logger('Push flags were %s and message was "%s".', push_info.flags, push_info.summary.strip())
if not is_pushed:
log.warning('Failed first attempt at pushing %s. A pull will be performed.', push_desc)
self._pull_repo()
log.info('Reattempting to push %s.', push_desc)
push_info = remote.push()[0]
is_pushed = _is_pushed(push_info)
logger = log.debug if is_pushed else log.error
logger('Push flags were %s and message was "%s".', push_info.flags, push_info.summary.strip())
if not is_pushed:
raise RepoPushError(f'Failed to push {push_desc} despite a pull.')
log.info('Pushed %s.', push_desc)