Source code for

#!/usr/bin/env python
import logging
import os
import shutil

import git

from .. import exception, util
from . import base

logger = logging.getLogger(__name__)

[docs]def git_url_with_username_password(git_url, username, password): # This part handles git clone via http and not ssh. It is strongly advised to use # user/password combination when cloning in https. Most servers don't accept anonymous calls anymore if username and password: if 'http' in git_url: return git_url.replace('://', '://{}:{}@'.format( username, password )) else: return git_url
[docs]class GitStorage(base.Storage): def __init__(self, git_url, folder=None, tracking_record_id=None, config_user_name=None, config_user_email=None, **kwargs): """Store data in git remote server :param git_url: full url of the repository to store data in :type git_url: str :param folder: path :type folder: pathlib.Path or str :param config_user_name: git config user name :type config_user_name: str :param config_user_email: git config user email :type config_user_email: str """ super(GitStorage, self).__init__(tracking_record_id=tracking_record_id, **kwargs) self.folder = folder if folder is not None and not isinstance(folder, util.Path): self.folder = util.Path(folder) self.config_user_name = config_user_name self.config_user_email = config_user_email self.git_url = git_url_with_username_password(git_url, kwargs.get('username'), kwargs.get('password'))
[docs] def save_data(self, name, content=None, local_path=None, **kwargs): with util.TempDir() as temp_folder: try: file_path, temp_file_path, git_repo = self.__clone(folder=temp_folder.path, name=name) # make sure parent folder and sub-folders exist parent_dir = temp_file_path.parent if not parent_dir.exists(): os.makedirs(str(parent_dir), mode=0o700) if content is not None: # create file locally, as needed to push to remote repository temp_file_path.write_bytes(content) else: shutil.copy(str(local_path), str(temp_file_path)) git_repo.git.add(file_path) # build commit message commit_message = 'File update {name}'.format(name=name) if self.tracking_record_id is not None: commit_message += ' for tracking record {}'.format(self.tracking_record_id) # commit locally # allow-empty will still add commit message with new information even if data itself did not change git_repo.git.commit('--allow-empty', '-m', commit_message) # push changes remotely git_repo.git.push()"File {name} successfully saved".format(name=name)) except git.exc.GitCommandError: logger.error("Unable to save file {name} to remote server.".format(name=name)) raise
[docs] def retrieve_data(self, name, **kwargs): with util.TempDir() as temp_folder: file_path, temp_file_path, repo = self.__clone(folder=temp_folder.path, name=name) if temp_file_path.is_file(): return temp_file_path.read_bytes() else: raise exception.NotFound("File does not exist: {file_path}".format(file_path=file_path))
def __clone(self, folder, name): # clone repository in local folder git_repo = git.Repo.clone_from(self.git_url, folder) # set user identity if self.config_user_name: git_repo.config_writer().set_value("user", "name", self.config_user_name).release() if self.config_user_email: git_repo.config_writer().set_value("user", "email", self.config_user_email).release() file_path = name if self.folder is not None: file_path = self.folder / name temp_file_path = folder / file_path return file_path, temp_file_path, git_repo