Process: add args, refactorings in Process gow to start, Testing Git + Process

dev-linux
Ivan Maslov 2 years ago
parent 8b2fc6650d
commit 577e518f96

@ -21,6 +21,11 @@ def O2A_Loop(inGSettings):
while inGSettings["O2ADict"]["IsOnlineBool"]: while inGSettings["O2ADict"]["IsOnlineBool"]:
# Send request to the orchestrator server # Send request to the orchestrator server
lRequestBody = None lRequestBody = None
# ConnectionError - CE
lCEPhaseFastTimeLastGoodFloat = time.time()
lCEPhaseFastDurationSecFloat = inGSettings['O2ADict']['ConnectionTimeoutSecFloat']
lCEPhaseFastRetrySecFloat = inGSettings['O2ADict']['RetryTimeoutSecFloat']/5.0
lCEPhaseLongRetrySecFloat = inGSettings['O2ADict']['RetryTimeoutSecFloat']*12.0
try: try:
lProtocolStr= "https" if inGSettings["OrchestratorDict"]["IsHTTPSBool"] else "http" lProtocolStr= "https" if inGSettings["OrchestratorDict"]["IsHTTPSBool"] else "http"
lHostStr = inGSettings["OrchestratorDict"]["HostStr"] lHostStr = inGSettings["OrchestratorDict"]["HostStr"]
@ -28,6 +33,7 @@ def O2A_Loop(inGSettings):
lURLStr=f"{lProtocolStr}://{lHostStr}:{lPortInt}/pyOpenRPA/Agent/O2A" lURLStr=f"{lProtocolStr}://{lHostStr}:{lPortInt}/pyOpenRPA/Agent/O2A"
lDataDict = { "HostNameUpperStr": inGSettings["AgentDict"]["HostNameUpperStr"], "UserUpperStr": inGSettings["AgentDict"]["UserUpperStr"], "ActivityLastGUIDStr": lActivityLastGUIDStr} lDataDict = { "HostNameUpperStr": inGSettings["AgentDict"]["HostNameUpperStr"], "UserUpperStr": inGSettings["AgentDict"]["UserUpperStr"], "ActivityLastGUIDStr": lActivityLastGUIDStr}
lResponse = requests.post(url= lURLStr, cookies = {"AuthToken":inGSettings["OrchestratorDict"]["SuperTokenStr"]}, json=lDataDict, timeout=inGSettings["O2ADict"]["ConnectionTimeoutSecFloat"]) lResponse = requests.post(url= lURLStr, cookies = {"AuthToken":inGSettings["OrchestratorDict"]["SuperTokenStr"]}, json=lDataDict, timeout=inGSettings["O2ADict"]["ConnectionTimeoutSecFloat"])
lCEPhaseFastTimeLastGoodFloat = time.time()
if lResponse.status_code != 200: if lResponse.status_code != 200:
if lL: lL.warning(f"Agent can not connect to Orchestrator. Below the response from the orchestrator:{lResponse}") if lL: lL.warning(f"Agent can not connect to Orchestrator. Below the response from the orchestrator:{lResponse}")
time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"])
@ -53,17 +59,21 @@ def O2A_Loop(inGSettings):
else: else:
if lL: lL.debug(f"Empty response from the orchestrator - loop when refresh the connection between Orc and Agent"); if lL: lL.debug(f"Empty response from the orchestrator - loop when refresh the connection between Orc and Agent");
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
if lL: lL.error(f"O2A Connection error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") if time.time() - lCEPhaseFastTimeLastGoodFloat <= lCEPhaseFastDurationSecFloat:
time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) if lL: lL.error(f"O2A Connection error - orchestrator is not available. Sleep for {lCEPhaseFastRetrySecFloat} s.")
time.sleep(lCEPhaseFastRetrySecFloat)
else:
if lL: lL.error(f"O2A Connection error - orchestrator is not available. Sleep for {lCEPhaseLongRetrySecFloat} s.")
time.sleep(lCEPhaseLongRetrySecFloat)
except ConnectionResetError as e: except ConnectionResetError as e:
if lL: lL.error(f"O2A Connection reset error - orchestrator is not available. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") if lL: lL.error(f"O2A Connection reset error - orchestrator is not available. Sleep for {inGSettings['O2ADict']['RetryTimeoutSecFloat']} s.")
time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"])
except json.decoder.JSONDecodeError as e: except json.decoder.JSONDecodeError as e:
if lL: lL.error(f"O2A JSON decode error - See body of the recieved content from the Orchestrator: {lRequestBody}") if lL: lL.error(f"O2A JSON decode error - See body of the recieved content from the Orchestrator: {lRequestBody}")
time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"])
except requests.exceptions.Timeout as e: except requests.exceptions.Timeout as e:
if lL: lL.exception(f"O2A requests timeout error (no response for long time). Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") if lL: lL.exception(f"O2A requests timeout error (no response for long time). Sleep for {inGSettings['O2ADict']['RetryTimeoutSecFloat']} s.")
time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"])
except Exception as e: except Exception as e:
if lL: lL.exception(f"O2A Error handler. Sleep for {inGSettings['A2ODict']['RetryTimeoutSecFloat']} s.") if lL: lL.exception(f"O2A Error handler. Sleep for {inGSettings['O2ADict']['RetryTimeoutSecFloat']} s.")
time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"]) time.sleep(inGSettings["O2ADict"]["RetryTimeoutSecFloat"])

@ -128,6 +128,9 @@ def OSCMD(inCMDStr, inRunAsyncBool=True, inGSettings = None, inSendOutputToOrche
:return: :return:
""" """
lResultStr = "" lResultStr = ""
# New feature
if inSendOutputToOrchestratorLogsBool == False and inCaptureBool == False:
inCMDStr = f"start {inCMDStr}"
# Subdef to listen OS result # Subdef to listen OS result
def _CMDRunAndListenLogs(inCMDStr, inSendOutputToOrchestratorLogsBool, inCMDEncodingStr, inGSettings = None, inCaptureBool = True): def _CMDRunAndListenLogs(inCMDStr, inSendOutputToOrchestratorLogsBool, inCMDEncodingStr, inGSettings = None, inCaptureBool = True):
lL = inGSettings.get("Logger",None) if type(inGSettings) is dict else None lL = inGSettings.get("Logger",None) if type(inGSettings) is dict else None

@ -44,6 +44,8 @@ class Process():
mAgentUserNameStr = None mAgentUserNameStr = None
mStartPathStr = None mStartPathStr = None
mStartCMDStr = None mStartCMDStr = None
mStartArgDict = None
mStatusCheckIntervalSecFloat = None
mProcessNameWOExeStr = None mProcessNameWOExeStr = None
mStopSafeTimeoutSecFloat = None mStopSafeTimeoutSecFloat = None
mStatusStr = None # 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL mStatusStr = None # 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL
@ -76,7 +78,7 @@ class Process():
return (self.mAgentHostNameStr.upper(), self.mAgentUserNameStr.upper(), self.mProcessNameWOExeStr.upper()) return (self.mAgentHostNameStr.upper(), self.mAgentUserNameStr.upper(), self.mProcessNameWOExeStr.upper())
def __init__(self, inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr, inStartPathStr=None, inStartCMDStr = None, inStopSafeTimeoutSecFloat=300): def __init__(self, inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr, inStartPathStr=None, inStartCMDStr = None, inStopSafeTimeoutSecFloat=300, inStartArgDict=None, inStatusCheckIntervalSecFloat=30):
""" """
Init the class instance. Init the class instance.
!ATTENTION! Function can raise exception if process with the same (inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr) is already exists in GSettings (can be restored from previous Orchestrator session). See ProcessInitSafe to sefaty init the instance or restore previous !ATTENTION! Function can raise exception if process with the same (inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr) is already exists in GSettings (can be restored from previous Orchestrator session). See ProcessInitSafe to sefaty init the instance or restore previous
@ -92,6 +94,7 @@ class Process():
lGS = __Orchestrator__.GSettingsGet() lGS = __Orchestrator__.GSettingsGet()
# Check if Process is not exists in GSettings # Check if Process is not exists in GSettings
if (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper()) not in lGS["ManagersProcessDict"]: if (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper()) not in lGS["ManagersProcessDict"]:
self.mStartArgDict = inStartArgDict
self.mAgentHostNameStr = inAgentHostNameStr self.mAgentHostNameStr = inAgentHostNameStr
self.mAgentUserNameStr = inAgentUserNameStr self.mAgentUserNameStr = inAgentUserNameStr
self.mStartPathStr = inStartPathStr self.mStartPathStr = inStartPathStr
@ -101,6 +104,8 @@ class Process():
lGS["ManagersProcessDict"][(inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper())]=self lGS["ManagersProcessDict"][(inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper())]=self
lActivityDict = __Orchestrator__.ProcessorActivityItemCreate(inDef=self.StatusCheck,inArgList=[], inThreadBool=True) lActivityDict = __Orchestrator__.ProcessorActivityItemCreate(inDef=self.StatusCheck,inArgList=[], inThreadBool=True)
__Orchestrator__.ProcessorActivityItemAppend(inActivityItemDict=lActivityDict) __Orchestrator__.ProcessorActivityItemAppend(inActivityItemDict=lActivityDict)
if inStatusCheckIntervalSecFloat is not None: __Orchestrator__.OrchestratorScheduleGet().every(inStatusCheckIntervalSecFloat).seconds.do(Orchestrator.OrchestratorThreadStart,self.StatusCheck)
self.mStatusCheckIntervalSecFloat = inStatusCheckIntervalSecFloat
else: raise Exception(f"Managers.Process ({inAgentHostNameStr}, {inAgentUserNameStr}, {inProcessNameWOExeStr}): Can't init the Process instance because it already inited in early (see ProcessInitSafe)") else: raise Exception(f"Managers.Process ({inAgentHostNameStr}, {inAgentUserNameStr}, {inProcessNameWOExeStr}): Can't init the Process instance because it already inited in early (see ProcessInitSafe)")
def ManualStopTriggerSet(self, inMSTdTSecFloat: float, inMSTdNInt: int) -> None: def ManualStopTriggerSet(self, inMSTdTSecFloat: float, inMSTdNInt: int) -> None:
@ -163,7 +168,7 @@ class Process():
if lLogBool == True: self.StatusChangeLog() if lLogBool == True: self.StatusChangeLog()
return self.mStatusStr return self.mStatusStr
def Start(self, inIsManualBool = True) -> str: def Start(self, inIsManualBool = True, inStartArgDict=None) -> str:
""" """
Manual/Auto start. Manual start will block scheduling execution. To return schedule execution use def Manual2Auto. Manual/Auto start. Manual start will block scheduling execution. To return schedule execution use def Manual2Auto.
Will not start if STOP SAFE is now and don't start auto is stopped manual now Will not start if STOP SAFE is now and don't start auto is stopped manual now
@ -177,8 +182,14 @@ class Process():
__Orchestrator__.OrchestratorLoggerGet().warning(lStr) __Orchestrator__.OrchestratorLoggerGet().warning(lStr)
return self.mStatusStr return self.mStatusStr
# Send activity item to agent - wait result # Send activity item to agent - wait result
if self.mStartPathStr is not None: lCMDStr = f"start {os.path.abspath(self.mStartPathStr)}" if self.mStartPathStr is not None: lCMDStr = os.path.abspath(self.mStartPathStr)
elif self.mStartCMDStr is not None: lCMDStr = f"start {self.mStartCMDStr}" elif self.mStartCMDStr is not None: lCMDStr = self.mStartCMDStr
# Append args
if inStartArgDict is not None: self.mStartArgDict = inStartArgDict
if self.mStartArgDict is not None:
for lItemKeyStr in self.mStartArgDict:
lItemValueStr = self.mStartArgDict[lItemKeyStr]
lCMDStr = f"{lCMDStr} {lItemKeyStr} {lItemValueStr}"
#import pdb #import pdb
#pdb.set_trace() #pdb.set_trace()
self.MuteWait() self.MuteWait()
@ -340,6 +351,14 @@ class Process():
if lWarnSafeBool==True: __Orchestrator__.OrchestratorLoggerGet().warning(f"Managers.Process ({self.mAgentHostNameStr}, {self.mAgentUserNameStr}, {self.mProcessNameWOExeStr}): Safe status has been catched when safe > change saved status to stopped.") if lWarnSafeBool==True: __Orchestrator__.OrchestratorLoggerGet().warning(f"Managers.Process ({self.mAgentHostNameStr}, {self.mAgentUserNameStr}, {self.mProcessNameWOExeStr}): Safe status has been catched when safe > change saved status to stopped.")
return self.mStatusStr return self.mStatusStr
def StatusCheckIntervalRestore(self):
"""Call from orchestrator when init
"""
if self.mStatusCheckIntervalSecFloat is not None:
__Orchestrator__.OrchestratorLoggerGet().info(f"Managers.Process ({self.mAgentHostNameStr}, {self.mAgentUserNameStr}, {self.mProcessNameWOExeStr}): Restore schedule to StatusCheck in interval of {self.mStatusCheckIntervalSecFloat} sec.")
__Orchestrator__.OrchestratorScheduleGet().every(self.mStatusCheckIntervalSecFloat).seconds.do(Orchestrator.OrchestratorThreadStart,self.StatusCheck)
def StatusRestore(self): def StatusRestore(self):
""" """
Execute the StatusCheck, after that restore status to the saved state (see StatusSave). Work when orchestrator is restarted. Execute the StatusCheck, after that restore status to the saved state (see StatusSave). Work when orchestrator is restarted.
@ -349,14 +368,15 @@ class Process():
self.StatusCheck() # check current status self.StatusCheck() # check current status
# Do some action # Do some action
if self.mStatusSavedStr != self.mStatusStr and self.mStatusSavedStr is not None: if self.mStatusSavedStr != self.mStatusStr and self.mStatusSavedStr is not None:
lManualBool = False #lManualBool = False
if "MANUAL" in self.mStatusSavedStr: #if "MANUAL" in self.mStatusSavedStr:
lManualBool = True # lManualBool = True
if "STOPPED" in self.mStatusSavedStr and "STOPPED" not in self.mStatusStr: if "STOPPED" in self.mStatusSavedStr and "STOPPED" not in self.mStatusStr:
self.StopSafe(inIsManualBool=lManualBool) self.StopSafe(inIsManualBool=True)
if "STARTED" in self.mStatusSavedStr and "STARTED" not in self.mStatusStr: if "STARTED" in self.mStatusSavedStr and "STARTED" not in self.mStatusStr:
self.Start(inIsManualBool=lManualBool) self.Start(inIsManualBool=True)
Orchestrator.OrchestratorLoggerGet().info(f"Managers.Process ({self.mAgentHostNameStr}, {self.mAgentUserNameStr}, {self.mProcessNameWOExeStr}): Status has been restored to {self.mStatusSavedStr}") Orchestrator.OrchestratorLoggerGet().info(f"Managers.Process ({self.mAgentHostNameStr}, {self.mAgentUserNameStr}, {self.mProcessNameWOExeStr}): Status has been restored to {self.mStatusSavedStr}")
self.mStatusStr = self.mStatusSavedStr
self.mStatusSavedStr = None self.mStatusSavedStr = None
return self.mStatusStr return self.mStatusStr

@ -483,6 +483,9 @@ def OSCMD(inCMDStr, inRunAsyncBool=True, inLogger = None):
""" """
if inLogger is None: inLogger = OrchestratorLoggerGet() if inLogger is None: inLogger = OrchestratorLoggerGet()
lResultStr = "" lResultStr = ""
# New feature
if inRunAsyncBool == True:
inCMDStr = f"start {inCMDStr}"
# Subdef to listen OS result # Subdef to listen OS result
def _CMDRunAndListenLogs(inCMDStr, inLogger): def _CMDRunAndListenLogs(inCMDStr, inLogger):
lResultStr = "" lResultStr = ""
@ -2827,6 +2830,7 @@ def Orchestrator(inGSettings=None, inDumpRestoreBool = True, inRunAsAdministrato
# Restore state for process # Restore state for process
for lProcessKeyTuple in inGSettings["ManagersProcessDict"]: for lProcessKeyTuple in inGSettings["ManagersProcessDict"]:
lProcess = inGSettings["ManagersProcessDict"][lProcessKeyTuple] lProcess = inGSettings["ManagersProcessDict"][lProcessKeyTuple]
lProcess.StatusCheckIntervalRestore()
lThread = threading.Thread(target= lProcess.StatusRestore) lThread = threading.Thread(target= lProcess.StatusRestore)
lThread.start() lThread.start()

Loading…
Cancel
Save