#!/usr/bin/python3 import math import os import pygit2 import shutil import sys import time def printProgressBar(title, value, total, overwrite = False): terminalSize = shutil.get_terminal_size((80, 20)) progress = value / total if total > 0 else 1 progressText = "{: >3d} %".format(math.floor(progress * 100)) remainingWidth = terminalSize.columns - len(title) - len(progressText) - 4 progressBarWidth = min(remainingWidth, 94) blankWidth = remainingWidth - progressBarWidth filledWidth = math.floor(progressBarWidth * progress) progressBar = "#" * filledWidth + "-" * (progressBarWidth - filledWidth) if overwrite: print("\x1b[1A\x1b[2K", end = "") print("{} {}[{}] {}".format(title, " " * blankWidth, progressBar, progressText)) def printCloneProgress(transferProgress): printProgressBar("Cloning the repository", transferProgress.received_objects, transferProgress.total_objects, overwrite = True) def printFetchProgress(transferProgress): printProgressBar("Updating the repository", transferProgress.received_objects, transferProgress.total_objects, overwrite = True) def printPushProgress(transferProgress): printProgressBar("Pushing changes to the repository", transferProgress.received_objects, transferProgress.total_objects, overwrite = True) def checkPushAccepted(refname, message): if message is not None: raise Exception("couldn’t push {} to the remote: {}".format(refname, message)) class BenchmarkRepository: def __init__(self, config): self.basePath = config["basePath"] self.remote = config["remote"] self.remoteUser = config["remoteUser"] self.userName = config["userName"] self.userEMail = config["userEMail"] self.userSigningKey = config["userSigningKey"] if "userSigningKey" in config else None self.branches = config["branches"] if "branches" in config else {"master": "master", "results": "results", "config": "config", "status": "status"} self.statusLogSize = config["statusLogSize"] self.worktrees = {} remoteUser = pygit2.credentials.KeypairFromAgent(self.remoteUser) callbacks = pygit2.RemoteCallbacks(credentials = remoteUser) # check for an existing repository if os.path.isdir(self.branchDir("master")): repositoryPath = pygit2.discover_repository(self.branchDir("master")) if repositoryPath is None: raise Exception("{} exists but is not a Git directory".format(self.branchDir("master"))) callbacks.transfer_progress = printFetchProgress self.repository = pygit2.Repository(repositoryPath) for remote in self.repository.remotes: printProgressBar("Updating the repository", 0, 1) remote.fetch(callbacks = callbacks) printProgressBar("Updating the repository", 1, 1, overwrite = True) self.forceUpdateBranch("master", self.repository) try: for branchName in ["results", "config", "status"]: print("Updating branch {}".format(branchName)) worktree = pygit2.Repository(self.repository.lookup_worktree(branchName).path) self.worktrees[branchName] = worktree self.forceUpdateBranch(branchName, worktree) except pygit2.GitError: raise Exception("Worktrees are not properly set up") else: callbacks.transfer_progress = printCloneProgress printProgressBar("Cloning the repository", 0, 1) self.repository = pygit2.clone_repository(self.remote, self.branchDir("master"), callbacks = callbacks, checkout_branch = self.branches["master"]) self.repository = pygit2.Repository(self.repository.path) printProgressBar("Cloning the repository", 1, 1, overwrite = True) for branchName in ["results", "config", "status"]: print("Preparing branch {}".format(branchName)) worktree = pygit2.Repository(self.repository.add_worktree(branchName, self.branchDir(branchName)).path) self.worktrees[branchName] = worktree remoteReference = worktree.lookup_reference(self.remoteBranchReference(branchName)) localReference = worktree.create_reference(self.localBranchReference(branchName), remoteReference.resolve().target) self.forceUpdateBranch(branchName, worktree) def path(self, branchName, path): return os.path.join(self.branchDir(branchName), path) def branchDir(self, branchName): return os.path.join(self.basePath, branchName) def localBranchReference(self, branchName): return "refs/heads/{}".format(branchName) def remoteBranchReference(self, branchName): return "refs/remotes/origin/{}".format(self.branches[branchName]) def forceUpdateBranch(self, branchName, worktree): remoteReference = worktree.lookup_reference(self.remoteBranchReference(branchName)) localReference = worktree.lookup_reference(self.localBranchReference(branchName)) localReference.set_target(remoteReference.resolve().target) worktree.checkout(self.localBranchReference(branchName), strategy = pygit2.GIT_CHECKOUT_FORCE | pygit2.GIT_CHECKOUT_RECREATE_MISSING) def commit(self, branchName, message, files): worktree = self.worktrees[branchName] index = worktree.index for file in files: index.add(file) tree = index.write_tree() author = pygit2.Signature(self.userName, self.userEMail) worktree.create_commit(self.localBranchReference(branchName), author, author, message, tree, [worktree.head.peel().hex]) def push(self, branchName): printProgressBar("Pushing changes to the repository", 0, 1) remoteUser = pygit2.credentials.KeypairFromAgent(self.remoteUser) callbacks = pygit2.RemoteCallbacks(credentials = remoteUser) callbacks.transfer_progress = printPushProgress callbacks.push_update_reference = checkPushAccepted self.repository.remotes["origin"].push(["+{}:{}".format(self.localBranchReference(branchName), "refs/heads/{}".format(self.branches[branchName]))], callbacks = callbacks) printProgressBar("Pushing changes to the repository", 1, 1, overwrite = True) def writeStatus(self, message): statusFilePath = self.path("status", "status.log") if os.path.exists(statusFilePath): with open(statusFilePath, "r") as statusFile: # add the previous status messages, but trancate them content = statusFile.readlines()[0:(self.statusLogSize - 1)] else: content = "" with open(statusFilePath, "w") as statusFile: print("{}\t{}\n{}".format(time.strftime("%Y-%m-%d %H:%M:%S %z"), message, "".join(content)), file = statusFile, end = "") self.commit("status", "test status", ["status.log"]) self.push("status")