1
0
Fork 0
tplp-planning-benchmark/benchmark-new/benchmark_repository.py

82 lines
3.2 KiB
Python
Executable File

#!/usr/bin/python3
import math
import os
import pygit2
import shutil
import sys
def printProgressBar(title, value, total, overwrite = False):
terminalSize = shutil.get_terminal_size((80, 20))
progress = value / total if total > 0 else 1
progressText = "{: >3d} %".format(math.floor(progress * 100))
remainingWidth = terminalSize.columns - len(title) - len(progressText) - 4
progressBarWidth = min(remainingWidth, 94)
blankWidth = remainingWidth - progressBarWidth
filledWidth = math.floor(progressBarWidth * progress)
progressBar = "#" * filledWidth + "-" * (progressBarWidth - filledWidth)
if overwrite:
print("\x1b[1A\x1b[2K", end = "")
print("{} {}[{}] {}".format(title, " " * blankWidth, progressBar, progressText))
def printCloneProgress(transferProgress):
printProgressBar("Cloning the repository", transferProgress.received_objects, transferProgress.total_objects, overwrite = True)
def printFetchProgress(transferProgress):
printProgressBar("Updating the repository", transferProgress.received_objects, transferProgress.total_objects, overwrite = True)
class BenchmarkRepository:
def __init__(self, config):
self.basePath = config["basePath"]
self.remote = config["remote"]
self.remoteUser = config["remoteUser"]
self.userName = config["userName"]
self.userEMail = config["userEMail"]
self.userSigningKey = config["userSigningKey"] if "userSigningKey" in config else None
self.branches = config["branches"] if "branches" in config else {"master": "master", "results": "results", "config": "config", "status": "status"}
self.statusLogSize = config["statusLogSize"]
remoteUser = pygit2.credentials.KeypairFromAgent(self.remoteUser)
callbacks = pygit2.RemoteCallbacks(credentials = remoteUser)
# check for an existing repository
if os.path.isdir(self.branchDir("master")):
repositoryPath = pygit2.discover_repository(self.branchDir("master"))
if repositoryPath is None:
raise Exception("{} exists but is not a Git directory".format(self.branchDir("master")))
callbacks.transfer_progress = printFetchProgress
self.repository = pygit2.Repository(repositoryPath)
for remote in self.repository.remotes:
printProgressBar("Updating the repository", 0, 1)
remote.fetch(callbacks = callbacks)
printProgressBar("Updating the repository", 1, 1, overwrite = True)
try:
pass
#self.repository.lookup_worktree("results")
#self.repository.lookup_worktree("config")
#self.repository.lookup_worktree("status")
except pygit2.GitError:
raise Exception("Worktrees are not properly set up")
else:
callbacks.transfer_progress = printCloneProgress
printProgressBar("Cloning the repository", 0, 1)
self.repository = pygit2.clone_repository(self.remote, self.branchDir("master"), callbacks = callbacks, checkout_branch = self.branches["master"])
printProgressBar("Cloning the repository", 1, 1, overwrite = True)
for branchName in ["results", "config", "status"]:
worktree = self.repository.add_worktree(branchName, self.branchDir(branchName))
worktree = pygit2.Repository(worktree.path)
branch = worktree.branches.local.create(branchName)
#worktree.checkout(self.branches["results"])
def branchDir(self, branchName):
return os.path.join(self.basePath, branchName)