From 102fed3d712ca7b07f151c2e801dc7a1f23ff024 Mon Sep 17 00:00:00 2001 From: Ivan Maslov Date: Sun, 23 Jan 2022 08:10:29 +0300 Subject: [PATCH] #Add process restore when Orchestrator restart #Def to safe init Process --- .../Orchestrator/Managers/Process.py | 96 +++++++++++++++---- .../Orchestrator/__Orchestrator__.py | 60 ++++++++---- 2 files changed, 120 insertions(+), 36 deletions(-) diff --git a/Sources/pyOpenRPA/Orchestrator/Managers/Process.py b/Sources/pyOpenRPA/Orchestrator/Managers/Process.py index 2c7b232d..3381492e 100644 --- a/Sources/pyOpenRPA/Orchestrator/Managers/Process.py +++ b/Sources/pyOpenRPA/Orchestrator/Managers/Process.py @@ -8,6 +8,9 @@ class Process(): With Process instance you can automate your process activity. Use schedule package to set interval when process should be active and when not. + All defs in class are pickle safe! After orchestrator restart (if not the force stop of the orchestrator process) your instance with properties will be restored. But it not coverage the scheduler which is in __Orchestrator__ . + After orc restart you need to reinit all schedule rules: Orchestrator.OrchestratorScheduleGet + Process instance has the following statuses: - 0_STOPPED - 1_STOPPED_MANUAL @@ -17,7 +20,8 @@ class Process(): - 5_STARTED_MANUAL .. code-block:: python - lProcess = Orchestrator.Managers.Process(inAgentHostNameStr="PCNAME",inAgentUserNameStr="USER", + # For the safe init class use ProcessInitSafe + lProcess = Orchestrator.Managers.ProcessInitSafe(inAgentHostNameStr="PCNAME",inAgentUserNameStr="USER", inProcessNameWOExeStr="notepad",inStartCMDStr="notepad",inStopSafeTimeoutSecFloat=3) # Async way to run job lProcess.ScheduleStatusCheckEverySeconds(inIntervalSecondsInt=5) @@ -62,16 +66,32 @@ class Process(): time.sleep(lIntervalSecFloat) return None - def __init__(self, inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr, inStartPathStr=None, inStartCMDStr = None, inStopSafeTimeoutSecFloat=120): - self.mAgentHostNameStr = inAgentHostNameStr - self.mAgentUserNameStr = inAgentUserNameStr - self.mStartPathStr = inStartPathStr - self.mStartCMDStr = inStartCMDStr - self.mProcessNameWOExeStr = inProcessNameWOExeStr - self.mStopSafeTimeoutSecFloat = inStopSafeTimeoutSecFloat - __Orchestrator__.GSettingsGet()["ManagersProcessDict"][(inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper())]=self - lActivityDict = __Orchestrator__.ProcessorActivityItemCreate(inDef=self.StatusCheck,inArgList=[]) - __Orchestrator__.ProcessorActivityItemAppend(inActivityItemDict=lActivityDict) + def __init__(self, inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr, inStartPathStr=None, inStartCMDStr = None, inStopSafeTimeoutSecFloat=300): + """ + Init the class instance. + !ATTENTION! Function can raise exception if process with the same (inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr) is already exists in GSettings (can be restored from previous Orchestrator session). See ProcessInitSafe to sefaty init the instance or restore previous + !ATTENTION! Schedule options you must + + :param inAgentHostNameStr: Agent hostname in any case. Required to identify Process + :param inAgentUserNameStr: Agent user name in any case. Required to identify Process + :param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case + :param inStartPathStr: Path to start process (.cmd/ .exe or something else). Path can be relative (from orc working directory) or absolute + :param inStartCMDStr: CMD script to start program (if no start file is exists) + :param inStopSafeTimeoutSecFloat: Time to wait for stop safe. After that do the stop force (if process is not stopped) + """ + lGS = __Orchestrator__.GSettingsGet() + # Check if Process is not exists in GSettings + if (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper()) not in lGS["ManagersProcessDict"]: + self.mAgentHostNameStr = inAgentHostNameStr + self.mAgentUserNameStr = inAgentUserNameStr + self.mStartPathStr = inStartPathStr + self.mStartCMDStr = inStartCMDStr + self.mProcessNameWOExeStr = inProcessNameWOExeStr + self.mStopSafeTimeoutSecFloat = inStopSafeTimeoutSecFloat + lGS["ManagersProcessDict"][(inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper())]=self + lActivityDict = __Orchestrator__.ProcessorActivityItemCreate(inDef=self.StatusCheck,inArgList=[]) + __Orchestrator__.ProcessorActivityItemAppend(inActivityItemDict=lActivityDict) + else: raise Exception(f"Managers.Process ({inAgentHostNameStr}, {inAgentUserNameStr}, {inProcessNameWOExeStr}): Can't init the Process instance because it already inited in early (see ProcessInitSafe)") def ManualStopTriggerSet(self, inMSTdTSecFloat: float, inMSTdNInt: int) -> None: """ @@ -399,15 +419,36 @@ class Process(): self.StopSafe(inIsManualBool=False) return self.mStatusStr - def ScheduleStatusCheckEverySeconds(self,inIntervalSecondsInt=120): - """ - Run status check every interval in second you specify. - :param inIntervalSecondsInt: Interval in seconds. Default is 120 - :return: None - """ - # Check job in threaded way - __Orchestrator__.OrchestratorScheduleGet().every(inIntervalSecondsInt).seconds.do(__Orchestrator__.OrchestratorThreadStart,self.StatusCheck) +def ProcessInitSafe(inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr, inStartPathStr=None, inStartCMDStr = None, inStopSafeTimeoutSecFloat=300): + """ + Exception safe function. Check if process instance is not exists in GSettings (it can be after restart because Orchestrator restore objects from dump of the previous Orchestrator session) + Return existing instance (if exists) or create new instance and return it. + + :param inAgentHostNameStr: Agent hostname in any case. Required to identify Process + :param inAgentUserNameStr: Agent user name in any case. Required to identify Process + :param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case + :param inStartPathStr: Path to start process (.cmd/ .exe or something else). Path can be relative (from orc working directory) or absolute + :param inStartCMDStr: CMD script to start program (if no start file is exists) + :param inStopSafeTimeoutSecFloat: Time to wait for stop safe. After that do the stop force (if process is not stopped) + :return: Process instance + """ + lProcess = ProcessGet(inAgentHostNameStr = inAgentHostNameStr, inAgentUserNameStr = inAgentUserNameStr, inProcessNameWOExeStr=inProcessNameWOExeStr) + if lProcess is not None: return lProcess.mStatusStr + else: return Process(inAgentHostNameStr=inAgentHostNameStr,inAgentUserNameStr=inAgentUserNameStr,inProcessNameWOExeStr=inProcessNameWOExeStr, + inStartPathStr=inStartPathStr,inStartCMDStr=inStartCMDStr,inStopSafeTimeoutSecFloat=inStopSafeTimeoutSecFloat) + +def ProcessExists(inAgentHostNameStr: str, inAgentUserNameStr: str, inProcessNameWOExeStr: str) -> bool: + """ + Check if the Process instance is exists in GSettings by the (inAgentHostNameStr: str, inAgentUserNameStr: str, inProcessNameWOExeStr: str) + + :param inAgentHostNameStr: Agent hostname in any case. Required to identify Process + :param inAgentUserNameStr: Agent user name in any case. Required to identify Process + :param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case + :return: True - process exists in gsettings; False - else + """ + return (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper()) in __Orchestrator__.GSettingsGet()["ManagersProcessDict"] + def ProcessGet(inAgentHostNameStr: str, inAgentUserNameStr: str, inProcessNameWOExeStr: str) -> Process: """ @@ -602,4 +643,19 @@ def ProcessManualStopListClear(inAgentHostNameStr: str, inAgentUserNameStr: str, """ lProcess = ProcessGet(inAgentHostNameStr=inAgentHostNameStr, inAgentUserNameStr=inAgentUserNameStr, inProcessNameWOExeStr=inProcessNameWOExeStr) - if lProcess is not None: lProcess.ManualStopListClear() \ No newline at end of file + if lProcess is not None: lProcess.ManualStopListClear() + +def ProcessScheduleStatusCheckEverySeconds(inAgentHostNameStr: str, inAgentUserNameStr: str, inProcessNameWOExeStr: str,inIntervalSecondsInt: int = 120): + """ + Run status check every interval in second you specify. + + :param inAgentHostNameStr: Agent hostname in any case. Required to identify Process + :param inAgentUserNameStr: Agent user name in any case. Required to identify Process + :param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case + :param inIntervalSecondsInt: Interval in seconds. Default is 120 + :return: None + """ + lProcess = ProcessGet(inAgentHostNameStr=inAgentHostNameStr, inAgentUserNameStr=inAgentUserNameStr, + inProcessNameWOExeStr=inProcessNameWOExeStr) + # Check job in threaded way + __Orchestrator__.OrchestratorScheduleGet().every(inIntervalSecondsInt).seconds.do(__Orchestrator__.OrchestratorThreadStart,lProcess.StatusCheck) \ No newline at end of file diff --git a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py index 6e9ade98..28c3f533 100644 --- a/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py +++ b/Sources/pyOpenRPA/Orchestrator/__Orchestrator__.py @@ -2,7 +2,7 @@ import subprocess, json, psutil, time, os, win32security, sys, base64, logging, import pickle import inspect import schedule -from partd import Server +#from partd import Server from . import Server from . import Timer @@ -609,6 +609,10 @@ def OrchestratorPySearchInit(inGlobPatternStr, inDefStr = None, inDefArgNameGSet def OrchestratorSessionSave(inGSettings=None): """ Orchestrator session save in file + (from version 1.2.7) + _SessionLast_GSettings.pickle (binary) + + (above the version 1.2.7) _SessionLast_RDPList.json (encoding = "utf-8") _SessionLast_StorageDict.pickle (binary) @@ -619,26 +623,38 @@ def OrchestratorSessionSave(inGSettings=None): lL = inGSettings["Logger"] try: # Dump RDP List in file json - lFile = open("_SessionLast_RDPList.json", "w", encoding="utf-8") - lFile.write(json.dumps(inGSettings["RobotRDPActive"]["RDPList"])) # dump json to file - lFile.close() # Close the file - if inGSettings is not None: + #lFile = open("_SessionLast_RDPList.json", "w", encoding="utf-8") + #lFile.write(json.dumps(inGSettings["RobotRDPActive"]["RDPList"])) # dump json to file + #lFile.close() # Close the file + #if inGSettings is not None: + # if lL: lL.info( + # f"Orchestrator has dump the RDP list before the restart.") + ## _SessionLast_StorageDict.pickle (binary) + #if "StorageDict" in inGSettings: + # with open('_SessionLast_StorageDict.pickle', 'wb') as lFile: + # pickle.dump(inGSettings["StorageDict"], lFile) + # if lL: lL.info( + # f"Orchestrator has dump the StorageDict before the restart.") + + #SessionLast + lDumpDict = {"StorageDict":inGSettings["StorageDict"], "ManagersProcessDict":inGSettings["ManagersProcessDict"], + "RobotRDPActive":{"RDPList": inGSettings["RobotRDPActive"]["RDPList"]}} + with open('_SessionLast_GSettings.pickle', 'wb') as lFile: + pickle.dump(lDumpDict, lFile) if lL: lL.info( - f"Orchestrator has dump the RDP list before the restart.") - # _SessionLast_StorageDict.pickle (binary) - if "StorageDict" in inGSettings: - with open('_SessionLast_StorageDict.pickle', 'wb') as lFile: - pickle.dump(inGSettings["StorageDict"], lFile) - if lL: lL.info( - f"Orchestrator has dump the StorageDict before the restart.") + f"Orchestrator has dump the GSettings (new dump mode from v1.2.7) before the restart.") + except Exception as e: if lL: lL.exception(f"Exception when dump data before restart the Orchestrator") return True def OrchestratorSessionRestore(inGSettings=None): """ - Check _SessionLast_RDPList.json and _SessionLast_StorageDict.pickle in working directory. if exist - load into gsettings - # _SessionLast_StorageDict.pickle (binary) + Check _SessioLast... files in working directory. if exist - load into gsettings + (from version 1.2.7) + _SessionLast_GSettings.pickle (binary) + + (above the version 1.2.7) _SessionLast_RDPList.json (encoding = "utf-8") _SessionLast_StorageDict.pickle (binary) @@ -665,6 +681,18 @@ def OrchestratorSessionRestore(inGSettings=None): in2Dict=lStorageDictDumpDict) # Merge dict 2 into dict 1 if lL: lL.warning(f"StorageDict was restored from previous Orchestrator session") os.remove("_SessionLast_StorageDict.pickle") # remove the temp file + # _SessionLast_Gsettings.pickle (binary) + if os.path.exists("_SessionLast_GSettings.pickle"): + if "StorageDict" not in inGSettings: + inGSettings["StorageDict"] = {} + if "ManagersProcessDict" not in inGSettings: + inGSettings["ManagersProcessDict"] = {} + with open('_SessionLast_GSettings.pickle', 'rb') as lFile: + lStorageDictDumpDict = pickle.load(lFile) + Server.__ComplexDictMerge2to1Overwrite__(in1Dict=inGSettings, + in2Dict=lStorageDictDumpDict) # Merge dict 2 into dict 1 + if lL: lL.warning(f"GSettings was restored from previous Orchestrator session") + os.remove("_SessionLast_GSettings.pickle") # remove the temp file def UACKeyListCheck(inRequest, inRoleKeyList) -> bool: """ @@ -2644,6 +2672,8 @@ def Orchestrator(inGSettings=None, inDumpRestoreBool = True, inRunAsAdministrato lRobotRDPActiveThread.start() # Start the thread execution. if lL: lL.info("Robot RDP active has been started") #Logging + + # Init autocleaner in another thread lAutocleanerThread = threading.Thread(target= GSettingsAutocleaner, kwargs={"inGSettings":gSettingsDict}) lAutocleanerThread.daemon = True # Run the thread in daemon mode. @@ -2676,14 +2706,12 @@ def Orchestrator(inGSettings=None, inDumpRestoreBool = True, inRunAsAdministrato lSchedulerThread.start() # Start the thread execution. if lL: lL.info("Scheduler (old) loop start") #Logging - # Schedule (new) loop lScheduleThread = threading.Thread(target= __schedule_loop__) lScheduleThread.daemon = True # Run the thread in daemon mode. lScheduleThread.start() # Start the thread execution. if lL: lL.info("Schedule module (new) loop start") #Logging - def __schedule_loop__(): while True: schedule.run_pending()