import os from .. import __Orchestrator__ class Git(): mAgentHostNameStr = None mAgentUserNameStr = None mAbsPathStr = None def __init__(self, inAgentHostNameStr=None, inAgentUserNameStr=None, inGitPathStr=""): """ Init the Git repo instance. It helps to detect new changes in repo and auto restart services :param inAgentHostNameStr: Agent hostname in any case. Required to identify Process. If None - works with Orc session :param inAgentUserNameStr: Agent user name in any case. Required to identify Process. If None - works with Orc session :param inGitPathStr: Relative (from the orchestrator working directory) or absolute. If "" - work with Orc repo :return: """ lAbsPathUpperStr = os.path.abspath(inGitPathStr).upper() lGS = __Orchestrator__.GSettingsGet() # Check if Process is not exists in GSettings if (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), lAbsPathUpperStr) not in lGS["ManagersGitDict"]: self.mAbsPathUpperStr = lAbsPathUpperStr self.mAgentHostNameStr = inAgentHostNameStr self.mAgentUserNameStr = inAgentUserNameStr lGS["ManagersGitDict"][(inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), lAbsPathUpperStr)]=self else: raise Exception(f"Managers.Git ({inAgentHostNameStr}, {inAgentUserNameStr}, {lAbsPathUpperStr}): Can't init the Git instance because it already inited in early") def __OSCMDShell__(self, inCMDStr): """ Detect the way of use and send the cmd :return: None is not exists """ if self.mAgentUserNameStr is not None and self.mAgentHostNameStr is not None: # Check if Agent specified lActivityItemGUIDStr = __Orchestrator__.AgentOSCMD(inHostNameStr=self.mAgentHostNameStr,inUserStr=self.mAgentUserNameStr,inCMDStr=inCMDStr,inRunAsyncBool=False,inSendOutputToOrchestratorLogsBool=False) lCMDResultStr = __Orchestrator__.AgentActivityItemReturnGet(inGUIDStr=lActivityItemGUIDStr) else: lCMDResultStr = __Orchestrator__.OSCMD(inCMDStr=inCMDStr, inRunAsyncBool=False) return lCMDResultStr def BranchRevGet(self, inBranchNameStr="HEAD"): """ Get the specified branch revision. Default return the current branch revision .. code-block:: python lGit.BranchRevGet(inBranchNameStr="dev") # Get revision of the local dev branch lGit.BranchRevGet(inBranchNameStr="remotes/origin/dev") # Get revision of the remotes dev branch lGit.BranchRevGet(inBranchNameStr="HEAD") # Get revision of the current HEAD branch lGit.BranchRevGet() # Equal to the call inBranchNameStr="HEAD" :param inBranchNameStr: The branch name where to get revision guid :return: revision GUID """ lCMDStr = f"cd \"{self.mAbsPathUpperStr}\" && git rev-parse {inBranchNameStr}" lCMDResultStr = self.__OSCMDShell__(inCMDStr=lCMDStr) return lCMDResultStr def Fetch(self): """ Get updates from the git server. .. code-block:: python lGit.Fetch() # get updates from the server :return: None """ lCMDStr = f"cd \"{self.mAbsPathUpperStr}\" && git fetch" lCMDResultStr = self.__OSCMDShell__(inCMDStr=lCMDStr) def GitExists(inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str) -> bool: """ Check if the Git instance is exists in GSettings by the (inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str) :param inAgentHostNameStr: Agent hostname in any case. Required to identify Process :param inAgentUserNameStr: Agent user name in any case. Required to identify Process :param inGitPathStr: Relative (from the orchestrator working directory) or absolute. If "" - work with Orc repo :return: True - process exists in gsettings; False - else """ return (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inGitPathStr.upper()) in __Orchestrator__.GSettingsGet()["ManagersGitDict"] def GitGet(inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str) -> Git: """ Return the Git instance by the (inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str) :param inAgentHostNameStr: Agent hostname in any case. Required to identify Process :param inAgentUserNameStr: Agent user name in any case. Required to identify Process :param inGitPathStr: Relative (from the orchestrator working directory) or absolute. If "" - work with Orc repo :return: Git instance (if exists) Else None """ lAbsPathUpperStr = os.path.abspath(inGitPathStr).upper() return __Orchestrator__.GSettingsGet()["ManagersGitDict"].get((inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), lAbsPathUpperStr),None) def GitBranchRevGet(inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str, inBranchNameStr: str="HEAD") -> str: lGit = GitGet(inAgentHostNameStr = inAgentHostNameStr, inAgentUserNameStr = inAgentUserNameStr, inGitPathStr=inGitPathStr) if lGit is not None: return lGit.BranchRevGet(inBranchNameStr=inBranchNameStr) def GitFetch(inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str) -> None: lGit = GitGet(inAgentHostNameStr=inAgentHostNameStr, inAgentUserNameStr=inAgentUserNameStr, inGitPathStr=inGitPathStr) if lGit is not None: lGit.Fetch()