#!/usr/bin/python3 import math import os import pygit2 import shutil import sys def printProgressBar(title, value, total, overwrite = False): terminalSize = shutil.get_terminal_size((80, 20)) progress = value / total if total > 0 else 1 progressText = "{: >3d} %".format(math.floor(progress * 100)) remainingWidth = terminalSize.columns - len(title) - len(progressText) - 4 progressBarWidth = min(remainingWidth, 94) blankWidth = remainingWidth - progressBarWidth filledWidth = math.floor(progressBarWidth * progress) progressBar = "#" * filledWidth + "-" * (progressBarWidth - filledWidth) if overwrite: print("\x1b[1A\x1b[2K", end = "") print("{} {}[{}] {}".format(title, " " * blankWidth, progressBar, progressText)) def printCloneProgress(transferProgress): printProgressBar("Cloning the repository", transferProgress.received_objects, transferProgress.total_objects, overwrite = True) def printFetchProgress(transferProgress): printProgressBar("Updating the repository", transferProgress.received_objects, transferProgress.total_objects, overwrite = True) class BenchmarkRepository: def __init__(self, config): self.basePath = config["basePath"] self.remote = config["remote"] self.remoteUser = config["remoteUser"] self.userName = config["userName"] self.userEMail = config["userEMail"] self.userSigningKey = config["userSigningKey"] if "userSigningKey" in config else None self.branches = config["branches"] if "branches" in config else {"master": "master", "results": "results", "config": "config", "status": "status"} self.statusLogSize = config["statusLogSize"] remoteUser = pygit2.credentials.KeypairFromAgent(self.remoteUser) callbacks = pygit2.RemoteCallbacks(credentials = remoteUser) # check for an existing repository if os.path.isdir(self.branchDir("master")): repositoryPath = pygit2.discover_repository(self.branchDir("master")) if repositoryPath is None: raise Exception("{} exists but is not a Git directory".format(self.branchDir("master"))) callbacks.transfer_progress = printFetchProgress self.repository = pygit2.Repository(repositoryPath) for remote in self.repository.remotes: printProgressBar("Updating the repository", 0, 1) remote.fetch(callbacks = callbacks) printProgressBar("Updating the repository", 1, 1, overwrite = True) self.forceUpdateBranch("master", self.repository) try: for branchName in ["results", "config", "status"]: print("Updating branch {}".format(branchName)) worktree = self.repository.lookup_worktree(branchName) worktree = pygit2.Repository(worktree.path) self.forceUpdateBranch(branchName, worktree) except pygit2.GitError: raise Exception("Worktrees are not properly set up") print(list(self.repository.branches)) else: callbacks.transfer_progress = printCloneProgress printProgressBar("Cloning the repository", 0, 1) self.repository = pygit2.clone_repository(self.remote, self.branchDir("master"), callbacks = callbacks, checkout_branch = self.branches["master"]) self.repository = pygit2.Repository(self.repository.path) printProgressBar("Cloning the repository", 1, 1, overwrite = True) for branchName in ["results", "config", "status"]: print("Preparing branch {}".format(branchName)) worktree = self.repository.add_worktree(branchName, self.branchDir(branchName)) worktree = pygit2.Repository(worktree.path) remoteReference = worktree.lookup_reference("refs/remotes/origin/{}".format(self.branches[branchName])) localReference = worktree.create_reference("refs/heads/{}".format(branchName), remoteReference.resolve().target) self.forceUpdateBranch(branchName, worktree) def branchDir(self, branchName): return os.path.join(self.basePath, branchName) def forceUpdateBranch(self, branchName, worktree): worktree.checkout("refs/heads/{}".format(branchName), strategy = pygit2.GIT_CHECKOUT_FORCE | pygit2.GIT_CHECKOUT_RECREATE_MISSING) #remoteReference = worktree.lookup_reference("refs/remotes/origin/{}".format(self.branches[branchName])) #worktree.reset(remoteReference.target, pygit2.GIT_RESET_HARD)