1
0
Fork 0
tplp-planning-benchmark/benchmark-new/benchmark_repository.py

148 lines
6.2 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
import math
import os
import pygit2
import shutil
import sys
import time
def printProgressBar(title, value, total, overwrite = False):
terminalSize = shutil.get_terminal_size((80, 20))
progress = value / total if total > 0 else 1
progressText = "{: >3d} %".format(math.floor(progress * 100))
remainingWidth = terminalSize.columns - len(title) - len(progressText) - 4
progressBarWidth = min(remainingWidth, 94)
blankWidth = remainingWidth - progressBarWidth
filledWidth = math.floor(progressBarWidth * progress)
progressBar = "#" * filledWidth + "-" * (progressBarWidth - filledWidth)
if overwrite:
print("\x1b[1A\x1b[2K", end = "")
print("{} {}[{}] {}".format(title, " " * blankWidth, progressBar, progressText))
def printCloneProgress(transferProgress):
printProgressBar("Cloning the repository", transferProgress.received_objects, transferProgress.total_objects, overwrite = True)
def printFetchProgress(transferProgress):
printProgressBar("Updating the repository", transferProgress.received_objects, transferProgress.total_objects, overwrite = True)
def printPushProgress(transferProgress):
printProgressBar("Pushing changes to the repository", transferProgress.received_objects, transferProgress.total_objects, overwrite = True)
def checkPushAccepted(refname, message):
if message is not None:
raise Exception("couldnt push {} to the remote: {}".format(refname, message))
class BenchmarkRepository:
def __init__(self, config):
self.basePath = config["basePath"]
self.remote = config["remote"]
self.remoteUser = config["remoteUser"]
self.userName = config["userName"]
self.userEMail = config["userEMail"]
self.userSigningKey = config["userSigningKey"] if "userSigningKey" in config else None
self.branches = config["branches"] if "branches" in config else {"master": "master", "results": "results", "config": "config", "status": "status"}
self.statusLogSize = config["statusLogSize"]
self.worktrees = {}
remoteUser = pygit2.credentials.KeypairFromAgent(self.remoteUser)
callbacks = pygit2.RemoteCallbacks(credentials = remoteUser)
# check for an existing repository
if os.path.isdir(self.branchDir("master")):
repositoryPath = pygit2.discover_repository(self.branchDir("master"))
if repositoryPath is None:
raise Exception("{} exists but is not a Git directory".format(self.branchDir("master")))
callbacks.transfer_progress = printFetchProgress
self.repository = pygit2.Repository(repositoryPath)
for remote in self.repository.remotes:
printProgressBar("Updating the repository", 0, 1)
remote.fetch(callbacks = callbacks)
printProgressBar("Updating the repository", 1, 1, overwrite = True)
self.forceUpdateBranch("master", self.repository)
try:
for branchName in ["results", "config", "status"]:
print("Updating branch {}".format(branchName))
worktree = pygit2.Repository(self.repository.lookup_worktree(branchName).path)
self.worktrees[branchName] = worktree
self.forceUpdateBranch(branchName, worktree)
except pygit2.GitError:
raise Exception("Worktrees are not properly set up")
else:
callbacks.transfer_progress = printCloneProgress
printProgressBar("Cloning the repository", 0, 1)
self.repository = pygit2.clone_repository(self.remote, self.branchDir("master"), callbacks = callbacks, checkout_branch = self.branches["master"])
self.repository = pygit2.Repository(self.repository.path)
printProgressBar("Cloning the repository", 1, 1, overwrite = True)
for branchName in ["results", "config", "status"]:
print("Preparing branch {}".format(branchName))
worktree = pygit2.Repository(self.repository.add_worktree(branchName, self.branchDir(branchName)).path)
self.worktrees[branchName] = worktree
remoteReference = worktree.lookup_reference(self.remoteBranchReference(branchName))
localReference = worktree.create_reference(self.localBranchReference(branchName), remoteReference.resolve().target)
self.forceUpdateBranch(branchName, worktree)
def path(self, branchName, path):
return os.path.join(self.branchDir(branchName), path)
def branchDir(self, branchName):
return os.path.join(self.basePath, branchName)
def localBranchReference(self, branchName):
return "refs/heads/{}".format(branchName)
def remoteBranchReference(self, branchName):
return "refs/remotes/origin/{}".format(self.branches[branchName])
def forceUpdateBranch(self, branchName, worktree):
remoteReference = worktree.lookup_reference(self.remoteBranchReference(branchName))
localReference = worktree.lookup_reference(self.localBranchReference(branchName))
localReference.set_target(remoteReference.resolve().target)
worktree.checkout(self.localBranchReference(branchName), strategy = pygit2.GIT_CHECKOUT_FORCE | pygit2.GIT_CHECKOUT_RECREATE_MISSING)
def commit(self, branchName, message, files):
worktree = self.worktrees[branchName]
index = worktree.index
for file in files:
index.add(file)
tree = index.write_tree()
author = pygit2.Signature(self.userName, self.userEMail)
worktree.create_commit(self.localBranchReference(branchName), author, author, message, tree, [worktree.head.peel().hex])
def push(self, branchName):
printProgressBar("Pushing changes to the repository", 0, 1)
remoteUser = pygit2.credentials.KeypairFromAgent(self.remoteUser)
callbacks = pygit2.RemoteCallbacks(credentials = remoteUser)
callbacks.transfer_progress = printPushProgress
callbacks.push_update_reference = checkPushAccepted
self.repository.remotes["origin"].push(["+{}:{}".format(self.localBranchReference(branchName), "refs/heads/{}".format(self.branches[branchName]))], callbacks = callbacks)
printProgressBar("Pushing changes to the repository", 1, 1, overwrite = True)
def writeStatus(self, message):
statusFilePath = self.path("status", "status.log")
if os.path.exists(statusFilePath):
with open(statusFilePath, "r") as statusFile:
# add the previous status messages, but trancate them
content = statusFile.readlines()[0:(self.statusLogSize - 1)]
else:
content = ""
with open(statusFilePath, "w") as statusFile:
print("{}\t{}\n{}".format(time.strftime("%Y-%m-%d %H:%M:%S %z"), message, "".join(content)), file = statusFile, end = "")
self.commit("status", "test status", ["status.log"])
self.push("status")