From 1cea34c8c811bb4a6a010790812dfaad233a4107 Mon Sep 17 00:00:00 2001 From: Ivan Maslov Date: Mon, 21 Feb 2022 19:14:47 +0300 Subject: [PATCH] Try to returnn list to Agent (no one item) need test --- Orchestrator/ControlPanel/CP_VersionCheck.py | 10 ++++- Orchestrator/OrchestratorSettings.py | 7 ++-- Orchestrator/pyOpenRPA.Orchestrator_x64.cmd | 3 +- Sources/pyOpenRPA/Agent/O2A.py | 29 ++++++++------- .../pyOpenRPA/Orchestrator/Managers/Git.py | 37 +++++++++++++++---- .../Orchestrator/Managers/Process.py | 10 +++-- .../pyOpenRPA/Orchestrator/ServerSettings.py | 23 ++++++------ .../Orchestrator/__Orchestrator__.py | 6 +++ 8 files changed, 82 insertions(+), 43 deletions(-) diff --git a/Orchestrator/ControlPanel/CP_VersionCheck.py b/Orchestrator/ControlPanel/CP_VersionCheck.py index 5ea6ae6a..c368bda9 100644 --- a/Orchestrator/ControlPanel/CP_VersionCheck.py +++ b/Orchestrator/ControlPanel/CP_VersionCheck.py @@ -1,5 +1,6 @@ # !ATTENTION - Current Control panel works only from pyOpenRPA v1.2.0! from pyOpenRPA import Orchestrator +from pyOpenRPA.Orchestrator import Managers def ControlPanelRenderDict(inGSettings): # Example of the JS code in Python code lJSCheckVersion = f""" @@ -33,16 +34,21 @@ def ControlPanelRenderDict(inGSettings): #Orchestrator settings def SettingsUpdate(inGSettings): #Add RobotRDPActive in control panel - inGSettings["ControlPanelDict"]["RobotList"].append({"RenderFunction": ControlPanelRenderDict, "KeyStr": "VersionCheck"}) + Orchestrator.WebCPUpdate(inCPKeyStr="VersionCheck", inHTMLRenderDef=ControlPanelRenderDict) #Orchestrator.Managers.ControlPanel(inControlPanelNameStr="TestTTT",inRefreshHTMLJinja2TemplatePathStr="ControlPanel\\test.html", inJinja2TemplateRefreshBool = True) lProcess = Orchestrator.Managers.ProcessInitSafe(inAgentHostNameStr="IVANMASLOV-DESKTOP",inAgentUserNameStr="ND", inProcessNameWOExeStr="notepad",inStartCMDStr="notepad",inStopSafeTimeoutSecFloat=3) # Some test + Orchestrator.OrchestratorInitWait() #lProcess.ScheduleStatusCheckEverySeconds(inIntervalSecondsInt=5) + #lProcess.Start(inIsManualBool=False) + lProcess.StartCheck() #Orchestrator.OrchestratorScheduleGet().every(2).seconds.do(Orchestrator.OrchestratorThreadStart, # lProcess.StartCheck) #Orchestrator.OrchestratorScheduleGet().every(5).seconds.do(Orchestrator.OrchestratorThreadStart,lProcess.StatusCheckStart) #lProcess.Start() - lGit = Orchestrator.Managers.Git(inAgentHostNameStr="IVANMASLOV-DESKTOP",inAgentUserNameStr="ND",inGitPathStr="") + lGit = Orchestrator.Managers.Git(inAgentHostNameStr="IVANMASLOV-DESKTOP",inAgentUserNameStr="ND",inGitPathStr="C:\Abs\Archive\scopeSrcUL\pyStore") + lGit.BranchRevLastGetInterval(inBranchLocalStr="prd", inBranchRemoteStr="origin/prd", inPreviousBranchRestoreBool=False,inIntervalSecFloat=10.0) + lGit.ProcessConnect(inProcess=lProcess) return inGSettings \ No newline at end of file diff --git a/Orchestrator/OrchestratorSettings.py b/Orchestrator/OrchestratorSettings.py index 164da1e4..5262325d 100644 --- a/Orchestrator/OrchestratorSettings.py +++ b/Orchestrator/OrchestratorSettings.py @@ -1,6 +1,5 @@ import psutil, datetime, logging, os, sys -from pyOpenRPA.Orchestrator.__Orchestrator__ import OrchestratorLoggerGet # stdout from logging # Config settings lPyOpenRPASourceFolderPathStr = r"..\Sources" # Path for test pyOpenRPA package @@ -18,8 +17,7 @@ if not Orchestrator.OrchestratorIsAdmin(): else: gSettings = Orchestrator.GSettingsGet() #gSettings = SettingsTemplate.Create(inModeStr="BASIC") # Create GSettings with basic configuration - no more config is available from the box - you can create own - # Set the debug level - OrchestratorLoggerGet().setLevel(logging.DEBUG) + Orchestrator.OrchestratorLoggerGet().setLevel(logging.INFO) # TEST Add User ND - Add Login ND to superuser of the Orchestrator lUACClientDict = SettingsTemplate.__UACClientAdminCreate__() Orchestrator.UACUpdate(inGSettings=gSettings, inADLoginStr="ND", inADStr="", inADIsDefaultBool=True, inURLList=[], inRoleHierarchyAllowedDict=lUACClientDict) @@ -32,8 +30,9 @@ else: # Restore DUMP Orchestrator.OrchestratorSessionRestore(inGSettings=gSettings) # Autoinit control panels starts with CP_ - lPyModules = Orchestrator.OrchestratorPySearchInit(inGlobPatternStr="ControlPanel\\CP_*.py", inDefStr="SettingsUpdate", inDefArgNameGSettingsStr="inGSettings") + lPyModules = Orchestrator.OrchestratorPySearchInit(inGlobPatternStr="ControlPanel\\CP_*.py", inDefStr="SettingsUpdate", inDefArgNameGSettingsStr="inGSettings", inAsyncInitBool=True) # Call the orchestrator def Orchestrator.Orchestrator(inGSettings=gSettings, inDumpRestoreBool=False) + diff --git a/Orchestrator/pyOpenRPA.Orchestrator_x64.cmd b/Orchestrator/pyOpenRPA.Orchestrator_x64.cmd index 7b3a7668..73270e4f 100644 --- a/Orchestrator/pyOpenRPA.Orchestrator_x64.cmd +++ b/Orchestrator/pyOpenRPA.Orchestrator_x64.cmd @@ -1,4 +1,5 @@ cd %~dp0 taskkill /im "pyOpenRPA_Orchestrator.exe" /F /fi "username eq %username%" copy /Y ..\Resources\WPy64-3720\python-3.7.2.amd64\python.exe ..\Resources\WPy64-3720\python-3.7.2.amd64\pyOpenRPA_Orchestrator.exe -.\..\Resources\WPy64-3720\python-3.7.2.amd64\pyOpenRPA_Orchestrator.exe "OrchestratorSettings.py" \ No newline at end of file +.\..\Resources\WPy64-3720\python-3.7.2.amd64\pyOpenRPA_Orchestrator.exe "OrchestratorSettings.py" +pause>nul \ No newline at end of file diff --git a/Sources/pyOpenRPA/Agent/O2A.py b/Sources/pyOpenRPA/Agent/O2A.py index f416c462..859f1485 100644 --- a/Sources/pyOpenRPA/Agent/O2A.py +++ b/Sources/pyOpenRPA/Agent/O2A.py @@ -35,20 +35,21 @@ def O2A_Loop(inGSettings): lRequestBody = lResponse.text lBodyLenInt = len(lRequestBody) if lBodyLenInt != 0: # CHeck if not empty result when close the connection from orch - lQueueItem = lResponse.json() # Try to get JSON - # Append QUEUE item in ProcessorDict > ActivityList - lActivityLastGUIDStr = lQueueItem["GUIDStr"] - # Check if ActivityItem ["ThreadBool"] = False > go sync mode in processor queue; Else: New thread - if lQueueItem.get("ThreadBool",False) == False: - inGSettings["ProcessorDict"]["ActivityList"].append(lQueueItem) - else: - Processor.ProcessorRunAsync(inGSettings=inGSettings,inActivityList=[lQueueItem]) - # Log full version if bytes size is less than limit . else short - lAgentLimitLogSizeBytesInt = 500 - if lBodyLenInt <= lAgentLimitLogSizeBytesInt: - if lL: lL.info(f"ActivityItem was received from orchestrator: {lQueueItem}"); - else: - if lL: lL.info(f"ActivityItem was received from orchestrator: Was supressed because of big size. Max is {lAgentLimitLogSizeBytesInt} bytes"); + lQueueList = lResponse.json() # Try to get JSON + for lQueueItem in lQueueList: + # Append QUEUE item in ProcessorDict > ActivityList + lActivityLastGUIDStr = lQueueItem["GUIDStr"] + # Check if ActivityItem ["ThreadBool"] = False > go sync mode in processor queue; Else: New thread + if lQueueItem.get("ThreadBool",False) == False: + inGSettings["ProcessorDict"]["ActivityList"].append(lQueueItem) + else: + Processor.ProcessorRunAsync(inGSettings=inGSettings,inActivityList=[lQueueItem]) + # Log full version if bytes size is less than limit . else short + lAgentLimitLogSizeBytesInt = 500 + if lBodyLenInt <= lAgentLimitLogSizeBytesInt: + if lL: lL.info(f"ActivityItem from orchestrator: {lQueueItem}"); + else: + if lL: lL.info(f"ActivityItem from orchestrator: Supressed - big size. Size is {lBodyLenInt} bytes"); else: if lL: lL.debug(f"Empty response from the orchestrator - loop when refresh the connection between Orc and Agent"); except requests.exceptions.ConnectionError as e: diff --git a/Sources/pyOpenRPA/Orchestrator/Managers/Git.py b/Sources/pyOpenRPA/Orchestrator/Managers/Git.py index bf132b0c..7676f6bd 100644 --- a/Sources/pyOpenRPA/Orchestrator/Managers/Git.py +++ b/Sources/pyOpenRPA/Orchestrator/Managers/Git.py @@ -1,9 +1,10 @@ -from ast import ExceptHandler +import time import os from .. import __Orchestrator__ from . import Process - +import threading from typing import List +from typing import Tuple from pyOpenRPA import Orchestrator @@ -12,7 +13,7 @@ class Git(): mAgentHostNameStr = None mAgentUserNameStr = None mAbsPathStr = None - mProcessList: List[Process] = [] # List of the key turples of the Process instance + mProcessList: List[Tuple] = [] # List of the key turples of the Process instance def __init__(self, inAgentHostNameStr=None, inAgentUserNameStr=None, inGitPathStr=""): """ @@ -23,10 +24,12 @@ class Git(): :param inGitPathStr: Relative (from the orchestrator working directory) or absolute. If "" - work with Orc repo :return: """ - lAbsPathUpperStr = os.path.abspath(inGitPathStr).upper() + lAbsPathStr = os.path.abspath(inGitPathStr) + lAbsPathUpperStr = lAbsPathStr.upper() lGS = __Orchestrator__.GSettingsGet() # Check if Process is not exists in GSettings if (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), lAbsPathUpperStr) not in lGS["ManagersGitDict"]: + self.mAbsPathStr = lAbsPathStr self.mAbsPathUpperStr = lAbsPathUpperStr self.mAgentHostNameStr = inAgentHostNameStr self.mAgentUserNameStr = inAgentUserNameStr @@ -49,10 +52,25 @@ class Git(): def ProcessListSaveStopSafe(self): """ Save the state and do the stop safe for the all processes + Will send safe stop in parallel mode but wait to the end of the safestop for the all processes. After that will continue """ - for lProcessItem in self.mProcessList: + lIntervalScheckSecFloat = 5.0 + lThreadList:List[threading.Thread] = [] + for lProcessItemTuple in self.mProcessList: + lProcessItem = Orchestrator.Managers.ProcessGet(*lProcessItemTuple) lProcessItem.StatusSave() - lProcessItem.StopSafe() + lThread = threading.Thread(target=lProcessItem.StopSafe) + lThread.start() + lThreadList.append(lThread) + # Wait for all process will be safe stopped + lAllThreadStoppedBool = False + while not lAllThreadStoppedBool: + lAllThreadStoppedBool = True + for lThread in lThreadList: + if lThread.is_alive() == True: + lAllThreadStoppedBool = False + break + time.sleep(lIntervalScheckSecFloat) def ProcessListRestore(self): """ @@ -137,7 +155,9 @@ class Git(): # check if the correct revision lCMDResultStr = None if self.BranchRevIsLast(inBranchLocalStr=inBranchLocalStr, inBranchRemoteStr=inBranchRemoteStr) == False: - Orchestrator.OrchestratorLoggerGet().debug(f"Managers.Git ({self.mAbsPathStr}): self.BranchRevLastGet, new revision has been detected - start to merge") + Orchestrator.OrchestratorLoggerGet().info(f"Managers.Git ({self.mAbsPathStr}): self.BranchRevLastGet, new rev (branch: {inBranchLocalStr}) has been detected - merge (branch: {inBranchRemoteStr})") + # Do the stop safe for the connected process + self.ProcessListSaveStopSafe() lBranchNameCurrentStr = self.BranchNameGet() # reset all changes in local folder self.Clear() @@ -149,6 +169,9 @@ class Git(): if inPreviousBranchRestoreBool == True: # checkout to the source branch which was self.BranchCheckout(inBranchNameStr=lBranchNameCurrentStr) + # do the orc restart + Orchestrator.OrchestratorLoggerGet().info(f"Managers.Git ({self.mAbsPathStr}): self.BranchRevLastGet, merge done, restart orc") + Orchestrator.OrchestratorRestart() return lCMDResultStr def BranchNameGet(self) -> str: diff --git a/Sources/pyOpenRPA/Orchestrator/Managers/Process.py b/Sources/pyOpenRPA/Orchestrator/Managers/Process.py index 307406ac..4969d391 100644 --- a/Sources/pyOpenRPA/Orchestrator/Managers/Process.py +++ b/Sources/pyOpenRPA/Orchestrator/Managers/Process.py @@ -2,6 +2,8 @@ from .. import __Orchestrator__ import os import time + +from pyOpenRPA import Orchestrator class Process(): """ Manager process, which is need to be started / stopped / restarted @@ -170,7 +172,7 @@ class Process(): :return: Process status. See self.mStatusStr. 0_STOPPED 1_STOPPED_MANUAL 2_STOP_SAFE 3_STOP_SAFE_MANUAL 4_STARTED 5_STARTED_MANUAL """ if inIsManualBool == False: self.ManualStopTriggerNewStart() # Set the time - if (self.mStatusStr == "1_STOPPED_MANUAL" or "STOP_SAFE" in self.mStatusStr) and inIsManualBool == False: + if self.mStatusStr is not None and (self.mStatusStr == "1_STOPPED_MANUAL" or "STOP_SAFE" in self.mStatusStr) and inIsManualBool == False: lStr = f"Managers.Process ({self.mAgentHostNameStr}, {self.mAgentUserNameStr}, {self.mProcessNameWOExeStr}): Process will not start because of stopped manual or stop safe is now." __Orchestrator__.OrchestratorLoggerGet().warning(lStr) return self.mStatusStr @@ -346,7 +348,7 @@ class Process(): """ self.StatusCheck() # check current status # Do some action - if self.mStatusSavedStr != self.mStatusStr: + if self.mStatusSavedStr != self.mStatusStr and self.mStatusSavedStr is not None: lManualBool = False if "MANUAL" in self.mStatusSavedStr: lManualBool = True @@ -354,6 +356,8 @@ class Process(): self.StopSafe(inIsManualBool=lManualBool) if "STARTED" in self.mStatusSavedStr and "STARTED" not in self.mStatusStr: self.Start(inIsManualBool=lManualBool) + Orchestrator.OrchestratorLoggerGet().info(f"Managers.Process ({self.mAgentHostNameStr}, {self.mAgentUserNameStr}, {self.mProcessNameWOExeStr}): Status has been restored to {self.mStatusSavedStr}") + self.mStatusSavedStr = None return self.mStatusStr def StatusChangeLog(self): @@ -428,7 +432,7 @@ class Process(): return self.mStatusStr -def ProcessInitSafe(inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr, inStartPathStr=None, inStartCMDStr = None, inStopSafeTimeoutSecFloat=300): +def ProcessInitSafe(inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr, inStartPathStr=None, inStartCMDStr = None, inStopSafeTimeoutSecFloat=300) -> Process: """ Exception safe function. Check if process instance is not exists in GSettings (it can be after restart because Orchestrator restore objects from dump of the previous Orchestrator session) Return existing instance (if exists) or create new instance and return it. diff --git a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py index 35b96a45..6ca13be4 100644 --- a/Sources/pyOpenRPA/Orchestrator/ServerSettings.py +++ b/Sources/pyOpenRPA/Orchestrator/ServerSettings.py @@ -341,6 +341,7 @@ def pyOpenRPA_Agent_O2A(inRequest, inGSettings): if lActivityLifetimeSecFloat > lActivityItemLifetimeLimitSecFloat: lActivityItem = lThisAgentDict["ActivityList"].pop(0) else: + lReturnActivityItemList = [] lReturnActivityItemDict = None # If lInput['ActivityLastGUIDStr'] is '' > return 0 element for send in Agent if lInput['ActivityLastGUIDStr'] == "": @@ -351,26 +352,24 @@ def pyOpenRPA_Agent_O2A(inRequest, inGSettings): for lForActivityItemDict in lQueueList: if lForTriggerGetNextItem == True: lReturnActivityItemDict = lForActivityItemDict - break + lReturnActivityItemList.append(lReturnActivityItemDict) # 2022 02 21 - Maslov Return list - not one item + #break if lForActivityItemDict['GUIDStr'] == lInput['ActivityLastGUIDStr']: lForTriggerGetNextItem = True # CASE if GUID is not detected - return 0 element if lReturnActivityItemDict == None and lForTriggerGetNextItem == False: lReturnActivityItemDict = lThisAgentDict["ActivityList"][0] + lReturnActivityItemList.append(lReturnActivityItemDict) # 2022 02 21 - Maslov Return list - not one item # Send QUEUE ITEM - if lReturnActivityItemDict is not None: - lReturnActivityItemDict = copy.deepcopy(lReturnActivityItemDict) - if "CreatedByDatetime" in lReturnActivityItemDict: - del lReturnActivityItemDict["CreatedByDatetime"] - inRequest.OpenRPAResponseDict["Body"] = bytes(json.dumps(lReturnActivityItemDict), "utf8") + if len(lReturnActivityItemList) > 0: + lReturnActivityItemList = copy.deepcopy(lReturnActivityItemList) + for lItemDict in lReturnActivityItemList: + if "CreatedByDatetime" in lItemDict: + del lItemDict["CreatedByDatetime"] + inRequest.OpenRPAResponseDict["Body"] = bytes(json.dumps(lReturnActivityItemList), "utf8") # Log full version if bytes size is less than limit . else short lBodyLenInt = len(inRequest.OpenRPAResponseDict["Body"]) lAgentLimitLogSizeBytesInt = inGSettings["ServerDict"]["AgentLimitLogSizeBytesInt"] - if lBodyLenInt <= lAgentLimitLogSizeBytesInt: - if lL: lL.info(f"Activity item to agent Hostname {lInput['HostNameUpperStr']}, User {lInput['UserUpperStr']}. Activity item: {lReturnActivityItemDict}") - else: - if lL: lL.info( - f"Activity item to agent Hostname {lInput['HostNameUpperStr']}, User {lInput['UserUpperStr']}. " - f"Activity item: Was suppressed because of body size of {lBodyLenInt} bytes. Max is {lAgentLimitLogSizeBytesInt}") + if lL: lL.debug(f"ActivityItem to Agent ({lInput['HostNameUpperStr']}, {lInput['UserUpperStr']}): Item count: {len(lReturnActivityItemList)}, bytes size: {lBodyLenInt}") lDoLoopBool = False # CLose the connection else: # Nothing to send - sleep for the next iteration time.sleep(lAgentLoopSleepSecFloat) diff --git a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py index 6e8d465c..e65a451b 100644 --- a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py +++ b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py @@ -2824,6 +2824,12 @@ def Orchestrator(inGSettings=None, inDumpRestoreBool = True, inRunAsAdministrato lScheduleThread.start() # Start the thread execution. if lL: lL.info("Schedule module (new) loop start") #Logging + # Restore state for process + for lProcessKeyTuple in inGSettings["ManagersProcessDict"]: + lProcess = inGSettings["ManagersProcessDict"][lProcessKeyTuple] + lThread = threading.Thread(target= lProcess.StatusRestore) + lThread.start() + def __schedule_loop__(): while True: schedule.run_pending()