You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ORPA-pyOpenRPA/Sources/pyOpenRPA/Orchestrator/Managers/Git.py

142 lines
6.8 KiB

from ast import ExceptHandler
from multiprocessing.dummy import Process
import os
from .. import __Orchestrator__
from . import Process
class Git():
mAgentHostNameStr = None
mAgentUserNameStr = None
mAbsPathStr = None
mProcessList = [] # List of the key turples of the Process instance
def __init__(self, inAgentHostNameStr=None, inAgentUserNameStr=None, inGitPathStr=""):
"""
Init the Git repo instance. It helps to detect new changes in repo and auto restart services
:param inAgentHostNameStr: Agent hostname in any case. Required to identify Process. If None - works with Orc session
:param inAgentUserNameStr: Agent user name in any case. Required to identify Process. If None - works with Orc session
:param inGitPathStr: Relative (from the orchestrator working directory) or absolute. If "" - work with Orc repo
:return:
"""
lAbsPathUpperStr = os.path.abspath(inGitPathStr).upper()
lGS = __Orchestrator__.GSettingsGet()
# Check if Process is not exists in GSettings
if (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), lAbsPathUpperStr) not in lGS["ManagersGitDict"]:
self.mAbsPathUpperStr = lAbsPathUpperStr
self.mAgentHostNameStr = inAgentHostNameStr
self.mAgentUserNameStr = inAgentUserNameStr
lGS["ManagersGitDict"][(inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), lAbsPathUpperStr)]=self
else: raise Exception(f"Managers.Git ({inAgentHostNameStr}, {inAgentUserNameStr}, {lAbsPathUpperStr}): Can't init the Git instance because it already inited in early")
def ProcessConnect(self, inProcess: Process):
"""
Connect process to the Git instance. It will apply to stop safe process when upgrade the repo and than start it
:param inProcess: Process instance
:type inProcess: Process
"""
lProcessTurple = inProcess.KeyTurpleGet()
if lProcessTurple not in self.mProcessList:
self.mProcessList.append(lProcessTurple)
else:
raise Exception(f"Process with current key is already exists in Git process list.")
def __OSCMDShell__(self, inCMDStr):
"""
Detect the way of use and send the cmd
:return: None is not exists
"""
if self.mAgentUserNameStr is not None and self.mAgentHostNameStr is not None: # Check if Agent specified
lActivityItemGUIDStr = __Orchestrator__.AgentOSCMD(inHostNameStr=self.mAgentHostNameStr,inUserStr=self.mAgentUserNameStr,inCMDStr=inCMDStr,inRunAsyncBool=False,inSendOutputToOrchestratorLogsBool=False)
lCMDResultStr = __Orchestrator__.AgentActivityItemReturnGet(inGUIDStr=lActivityItemGUIDStr)
else:
lCMDResultStr = __Orchestrator__.OSCMD(inCMDStr=inCMDStr, inRunAsyncBool=False)
return lCMDResultStr
def BranchRevGet(self, inBranchNameStr="HEAD"):
"""
Get the specified branch revision. Default return the current branch revision
.. code-block:: python
lGit.BranchRevGet(inBranchNameStr="dev") # Get revision of the local dev branch
lGit.BranchRevGet(inBranchNameStr="remotes/origin/dev") # Get revision of the remotes dev branch
lGit.BranchRevGet(inBranchNameStr="HEAD") # Get revision of the current HEAD branch
lGit.BranchRevGet() # Equal to the call inBranchNameStr="HEAD"
:param inBranchNameStr: The branch name where to get revision guid
:return: revision GUID
"""
lCMDStr = f"cd \"{self.mAbsPathUpperStr}\" && git rev-parse {inBranchNameStr}"
lCMDResultStr = self.__OSCMDShell__(inCMDStr=lCMDStr)
return lCMDResultStr
def BranchRevIsLast(self, inBranchRemoteStr):
pass
lIsLastBool = False
lLocalBranchRevStr = self.BranchRevGet()
lRemoteBranchRevStr = self.BranchRevGet(inBranchNameStr=inBranchRemoteStr)
if lLocalBranchRevStr == lRemoteBranchRevStr:
lIsLastBool = True
return lIsLastBool
def BranchRevCheck(self, inBranchRemoteStr):
pass
def BranchNameGet(self):
pass
"git rev-parse --abbrev-ref HEAD"
def BranchCheckout(self, inBranchNameStr):
pass
f"git clean -f -d" # Очистить от лишних файлов
f"git reset --hard" # Откатить файлы, которые отслеживаются Git и которые были изменены
f"git checkout {inBranchNameStr}"
def Fetch(self):
"""
Get updates from the git server.
.. code-block:: python
lGit.Fetch() # get updates from the server
:return: None
"""
lCMDStr = f"cd \"{self.mAbsPathUpperStr}\" && git fetch"
lCMDResultStr = self.__OSCMDShell__(inCMDStr=lCMDStr)
def GitExists(inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str) -> bool:
"""
Check if the Git instance is exists in GSettings by the (inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str)
:param inAgentHostNameStr: Agent hostname in any case. Required to identify Process
:param inAgentUserNameStr: Agent user name in any case. Required to identify Process
:param inGitPathStr: Relative (from the orchestrator working directory) or absolute. If "" - work with Orc repo
:return: True - process exists in gsettings; False - else
"""
return (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inGitPathStr.upper()) in __Orchestrator__.GSettingsGet()["ManagersGitDict"]
def GitGet(inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str) -> Git:
"""
Return the Git instance by the (inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str)
:param inAgentHostNameStr: Agent hostname in any case. Required to identify Process
:param inAgentUserNameStr: Agent user name in any case. Required to identify Process
:param inGitPathStr: Relative (from the orchestrator working directory) or absolute. If "" - work with Orc repo
:return: Git instance (if exists) Else None
"""
lAbsPathUpperStr = os.path.abspath(inGitPathStr).upper()
return __Orchestrator__.GSettingsGet()["ManagersGitDict"].get((inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), lAbsPathUpperStr),None)
def GitBranchRevGet(inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str, inBranchNameStr: str="HEAD") -> str:
lGit = GitGet(inAgentHostNameStr = inAgentHostNameStr, inAgentUserNameStr = inAgentUserNameStr, inGitPathStr=inGitPathStr)
if lGit is not None: return lGit.BranchRevGet(inBranchNameStr=inBranchNameStr)
def GitFetch(inAgentHostNameStr: str, inAgentUserNameStr: str, inGitPathStr: str) -> None:
lGit = GitGet(inAgentHostNameStr=inAgentHostNameStr, inAgentUserNameStr=inAgentUserNameStr,
inGitPathStr=inGitPathStr)
if lGit is not None: lGit.Fetch()