from ast import ExceptHandler
from multiprocessing . dummy import Process
import os
from . . import __Orchestrator__
from . import Process
class Git ( ) :
mAgentHostNameStr = None
mAgentUserNameStr = None
mAbsPathStr = None
mProcessList = [ ] # List of the key turples of the Process instance
def __init__ ( self , inAgentHostNameStr = None , inAgentUserNameStr = None , inGitPathStr = " " ) :
"""
Init the Git repo instance . It helps to detect new changes in repo and auto restart services
: param inAgentHostNameStr : Agent hostname in any case . Required to identify Process . If None - works with Orc session
: param inAgentUserNameStr : Agent user name in any case . Required to identify Process . If None - works with Orc session
: param inGitPathStr : Relative ( from the orchestrator working directory ) or absolute . If " " - work with Orc repo
: return :
"""
lAbsPathUpperStr = os . path . abspath ( inGitPathStr ) . upper ( )
lGS = __Orchestrator__ . GSettingsGet ( )
# Check if Process is not exists in GSettings
if ( inAgentHostNameStr . upper ( ) , inAgentUserNameStr . upper ( ) , lAbsPathUpperStr ) not in lGS [ " ManagersGitDict " ] :
self . mAbsPathUpperStr = lAbsPathUpperStr
self . mAgentHostNameStr = inAgentHostNameStr
self . mAgentUserNameStr = inAgentUserNameStr
lGS [ " ManagersGitDict " ] [ ( inAgentHostNameStr . upper ( ) , inAgentUserNameStr . upper ( ) , lAbsPathUpperStr ) ] = self
else : raise Exception ( f " Managers.Git ( { inAgentHostNameStr } , { inAgentUserNameStr } , { lAbsPathUpperStr } ): Can ' t init the Git instance because it already inited in early " )
def ProcessConnect ( self , inProcess : Process ) :
"""
Connect process to the Git instance . It will apply to stop safe process when upgrade the repo and than start it
: param inProcess : Process instance
: type inProcess : Process
"""
lProcessTurple = inProcess . KeyTurpleGet ( )
if lProcessTurple not in self . mProcessList :
self . mProcessList . append ( lProcessTurple )
else :
raise Exception ( f " Process with current key is already exists in Git process list. " )
def __OSCMDShell__ ( self , inCMDStr ) :
"""
Detect the way of use and send the cmd
: return : None is not exists
"""
if self . mAgentUserNameStr is not None and self . mAgentHostNameStr is not None : # Check if Agent specified
lActivityItemGUIDStr = __Orchestrator__ . AgentOSCMD ( inHostNameStr = self . mAgentHostNameStr , inUserStr = self . mAgentUserNameStr , inCMDStr = inCMDStr , inRunAsyncBool = False , inSendOutputToOrchestratorLogsBool = False )
lCMDResultStr = __Orchestrator__ . AgentActivityItemReturnGet ( inGUIDStr = lActivityItemGUIDStr )
else :
lCMDResultStr = __Orchestrator__ . OSCMD ( inCMDStr = inCMDStr , inRunAsyncBool = False )
return lCMDResultStr
def BranchRevGet ( self , inBranchNameStr = " HEAD " ) :
"""
Get the specified branch revision . Default return the current branch revision
. . code - block : : python
lGit . BranchRevGet ( inBranchNameStr = " dev " ) # Get revision of the local dev branch
lGit . BranchRevGet ( inBranchNameStr = " remotes/origin/dev " ) # Get revision of the remotes dev branch
lGit . BranchRevGet ( inBranchNameStr = " HEAD " ) # Get revision of the current HEAD branch
lGit . BranchRevGet ( ) # Equal to the call inBranchNameStr="HEAD"
: param inBranchNameStr : The branch name where to get revision guid
: return : revision GUID
"""
lCMDStr = f " cd \" { self . mAbsPathUpperStr } \" && git rev-parse { inBranchNameStr } "
lCMDResultStr = self . __OSCMDShell__ ( inCMDStr = lCMDStr )
return lCMDResultStr
def BranchRevIsLast ( self , inBranchLocalStr : str , inBranchRemoteStr : str ) - > bool :
pass
lIsLastBool = False
lLocalBranchRevStr = self . BranchRevGet ( inBranchNameStr = inBranchLocalStr )
lRemoteBranchRevStr = self . BranchRevGet ( inBranchNameStr = inBranchRemoteStr )
if lLocalBranchRevStr == lRemoteBranchRevStr :
lIsLastBool = True
return lIsLastBool
def BranchRevCheck ( self , inBranchRemoteStr ) :
pass
def BranchNameGet ( self ) :
pass
" git rev-parse --abbrev-ref HEAD "
def BranchCheckout ( self , inBranchNameStr ) :
pass
f " git clean -f -d " # Очистить от лишних файлов
f " git reset --hard " # Откатить файлы, которые отслеживаются Git и которые были изменены
f " git checkout { inBranchNameStr } "
def Fetch ( self ) :
"""
Get updates from the git server .
. . code - block : : python
lGit . Fetch ( ) # get updates from the server
: return : None
"""
lCMDStr = f " cd \" { self . mAbsPathUpperStr } \" && git fetch "
lCMDResultStr = self . __OSCMDShell__ ( inCMDStr = lCMDStr )
def GitExists ( inAgentHostNameStr : str , inAgentUserNameStr : str , inGitPathStr : str ) - > bool :
"""
Check if the Git instance is exists in GSettings by the ( inAgentHostNameStr : str , inAgentUserNameStr : str , inGitPathStr : str )
: param inAgentHostNameStr : Agent hostname in any case . Required to identify Process
: param inAgentUserNameStr : Agent user name in any case . Required to identify Process
: param inGitPathStr : Relative ( from the orchestrator working directory ) or absolute . If " " - work with Orc repo
: return : True - process exists in gsettings ; False - else
"""
return ( inAgentHostNameStr . upper ( ) , inAgentUserNameStr . upper ( ) , inGitPathStr . upper ( ) ) in __Orchestrator__ . GSettingsGet ( ) [ " ManagersGitDict " ]
def GitGet ( inAgentHostNameStr : str , inAgentUserNameStr : str , inGitPathStr : str ) - > Git :
"""
Return the Git instance by the ( inAgentHostNameStr : str , inAgentUserNameStr : str , inGitPathStr : str )
: param inAgentHostNameStr : Agent hostname in any case . Required to identify Process
: param inAgentUserNameStr : Agent user name in any case . Required to identify Process
: param inGitPathStr : Relative ( from the orchestrator working directory ) or absolute . If " " - work with Orc repo
: return : Git instance ( if exists ) Else None
"""
lAbsPathUpperStr = os . path . abspath ( inGitPathStr ) . upper ( )
return __Orchestrator__ . GSettingsGet ( ) [ " ManagersGitDict " ] . get ( ( inAgentHostNameStr . upper ( ) , inAgentUserNameStr . upper ( ) , lAbsPathUpperStr ) , None )
def GitBranchRevGet ( inAgentHostNameStr : str , inAgentUserNameStr : str , inGitPathStr : str , inBranchNameStr : str = " HEAD " ) - > str :
lGit = GitGet ( inAgentHostNameStr = inAgentHostNameStr , inAgentUserNameStr = inAgentUserNameStr , inGitPathStr = inGitPathStr )
if lGit is not None : return lGit . BranchRevGet ( inBranchNameStr = inBranchNameStr )
def GitFetch ( inAgentHostNameStr : str , inAgentUserNameStr : str , inGitPathStr : str ) - > None :
lGit = GitGet ( inAgentHostNameStr = inAgentHostNameStr , inAgentUserNameStr = inAgentUserNameStr ,
inGitPathStr = inGitPathStr )
if lGit is not None : lGit . Fetch ( )