#Add process restore when Orchestrator restart

#Def to safe init Process
dev-linux
Ivan Maslov 3 years ago
parent 4741b53083
commit 102fed3d71

@ -8,6 +8,9 @@ class Process():
With Process instance you can automate your process activity. Use schedule package to set interval when process should be active and when not.
All defs in class are pickle safe! After orchestrator restart (if not the force stop of the orchestrator process) your instance with properties will be restored. But it not coverage the scheduler which is in __Orchestrator__ .
After orc restart you need to reinit all schedule rules: Orchestrator.OrchestratorScheduleGet
Process instance has the following statuses:
- 0_STOPPED
- 1_STOPPED_MANUAL
@ -17,7 +20,8 @@ class Process():
- 5_STARTED_MANUAL
.. code-block:: python
lProcess = Orchestrator.Managers.Process(inAgentHostNameStr="PCNAME",inAgentUserNameStr="USER",
# For the safe init class use ProcessInitSafe
lProcess = Orchestrator.Managers.ProcessInitSafe(inAgentHostNameStr="PCNAME",inAgentUserNameStr="USER",
inProcessNameWOExeStr="notepad",inStartCMDStr="notepad",inStopSafeTimeoutSecFloat=3)
# Async way to run job
lProcess.ScheduleStatusCheckEverySeconds(inIntervalSecondsInt=5)
@ -62,16 +66,32 @@ class Process():
time.sleep(lIntervalSecFloat)
return None
def __init__(self, inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr, inStartPathStr=None, inStartCMDStr = None, inStopSafeTimeoutSecFloat=120):
def __init__(self, inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr, inStartPathStr=None, inStartCMDStr = None, inStopSafeTimeoutSecFloat=300):
"""
Init the class instance.
!ATTENTION! Function can raise exception if process with the same (inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr) is already exists in GSettings (can be restored from previous Orchestrator session). See ProcessInitSafe to sefaty init the instance or restore previous
!ATTENTION! Schedule options you must
:param inAgentHostNameStr: Agent hostname in any case. Required to identify Process
:param inAgentUserNameStr: Agent user name in any case. Required to identify Process
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:param inStartPathStr: Path to start process (.cmd/ .exe or something else). Path can be relative (from orc working directory) or absolute
:param inStartCMDStr: CMD script to start program (if no start file is exists)
:param inStopSafeTimeoutSecFloat: Time to wait for stop safe. After that do the stop force (if process is not stopped)
"""
lGS = __Orchestrator__.GSettingsGet()
# Check if Process is not exists in GSettings
if (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper()) not in lGS["ManagersProcessDict"]:
self.mAgentHostNameStr = inAgentHostNameStr
self.mAgentUserNameStr = inAgentUserNameStr
self.mStartPathStr = inStartPathStr
self.mStartCMDStr = inStartCMDStr
self.mProcessNameWOExeStr = inProcessNameWOExeStr
self.mStopSafeTimeoutSecFloat = inStopSafeTimeoutSecFloat
__Orchestrator__.GSettingsGet()["ManagersProcessDict"][(inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper())]=self
lGS["ManagersProcessDict"][(inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper())]=self
lActivityDict = __Orchestrator__.ProcessorActivityItemCreate(inDef=self.StatusCheck,inArgList=[])
__Orchestrator__.ProcessorActivityItemAppend(inActivityItemDict=lActivityDict)
else: raise Exception(f"Managers.Process ({inAgentHostNameStr}, {inAgentUserNameStr}, {inProcessNameWOExeStr}): Can't init the Process instance because it already inited in early (see ProcessInitSafe)")
def ManualStopTriggerSet(self, inMSTdTSecFloat: float, inMSTdNInt: int) -> None:
"""
@ -399,15 +419,36 @@ class Process():
self.StopSafe(inIsManualBool=False)
return self.mStatusStr
def ScheduleStatusCheckEverySeconds(self,inIntervalSecondsInt=120):
def ProcessInitSafe(inAgentHostNameStr, inAgentUserNameStr, inProcessNameWOExeStr, inStartPathStr=None, inStartCMDStr = None, inStopSafeTimeoutSecFloat=300):
"""
Run status check every interval in second you specify.
Exception safe function. Check if process instance is not exists in GSettings (it can be after restart because Orchestrator restore objects from dump of the previous Orchestrator session)
Return existing instance (if exists) or create new instance and return it.
:param inIntervalSecondsInt: Interval in seconds. Default is 120
:return: None
:param inAgentHostNameStr: Agent hostname in any case. Required to identify Process
:param inAgentUserNameStr: Agent user name in any case. Required to identify Process
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:param inStartPathStr: Path to start process (.cmd/ .exe or something else). Path can be relative (from orc working directory) or absolute
:param inStartCMDStr: CMD script to start program (if no start file is exists)
:param inStopSafeTimeoutSecFloat: Time to wait for stop safe. After that do the stop force (if process is not stopped)
:return: Process instance
"""
# Check job in threaded way
__Orchestrator__.OrchestratorScheduleGet().every(inIntervalSecondsInt).seconds.do(__Orchestrator__.OrchestratorThreadStart,self.StatusCheck)
lProcess = ProcessGet(inAgentHostNameStr = inAgentHostNameStr, inAgentUserNameStr = inAgentUserNameStr, inProcessNameWOExeStr=inProcessNameWOExeStr)
if lProcess is not None: return lProcess.mStatusStr
else: return Process(inAgentHostNameStr=inAgentHostNameStr,inAgentUserNameStr=inAgentUserNameStr,inProcessNameWOExeStr=inProcessNameWOExeStr,
inStartPathStr=inStartPathStr,inStartCMDStr=inStartCMDStr,inStopSafeTimeoutSecFloat=inStopSafeTimeoutSecFloat)
def ProcessExists(inAgentHostNameStr: str, inAgentUserNameStr: str, inProcessNameWOExeStr: str) -> bool:
"""
Check if the Process instance is exists in GSettings by the (inAgentHostNameStr: str, inAgentUserNameStr: str, inProcessNameWOExeStr: str)
:param inAgentHostNameStr: Agent hostname in any case. Required to identify Process
:param inAgentUserNameStr: Agent user name in any case. Required to identify Process
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:return: True - process exists in gsettings; False - else
"""
return (inAgentHostNameStr.upper(), inAgentUserNameStr.upper(), inProcessNameWOExeStr.upper()) in __Orchestrator__.GSettingsGet()["ManagersProcessDict"]
def ProcessGet(inAgentHostNameStr: str, inAgentUserNameStr: str, inProcessNameWOExeStr: str) -> Process:
"""
@ -603,3 +644,18 @@ def ProcessManualStopListClear(inAgentHostNameStr: str, inAgentUserNameStr: str,
lProcess = ProcessGet(inAgentHostNameStr=inAgentHostNameStr, inAgentUserNameStr=inAgentUserNameStr,
inProcessNameWOExeStr=inProcessNameWOExeStr)
if lProcess is not None: lProcess.ManualStopListClear()
def ProcessScheduleStatusCheckEverySeconds(inAgentHostNameStr: str, inAgentUserNameStr: str, inProcessNameWOExeStr: str,inIntervalSecondsInt: int = 120):
"""
Run status check every interval in second you specify.
:param inAgentHostNameStr: Agent hostname in any case. Required to identify Process
:param inAgentUserNameStr: Agent user name in any case. Required to identify Process
:param inProcessNameWOExeStr: The process name without extension .exe (the key of the Process instance). Any case - will be processed to the upper case
:param inIntervalSecondsInt: Interval in seconds. Default is 120
:return: None
"""
lProcess = ProcessGet(inAgentHostNameStr=inAgentHostNameStr, inAgentUserNameStr=inAgentUserNameStr,
inProcessNameWOExeStr=inProcessNameWOExeStr)
# Check job in threaded way
__Orchestrator__.OrchestratorScheduleGet().every(inIntervalSecondsInt).seconds.do(__Orchestrator__.OrchestratorThreadStart,lProcess.StatusCheck)

@ -2,7 +2,7 @@ import subprocess, json, psutil, time, os, win32security, sys, base64, logging,
import pickle
import inspect
import schedule
from partd import Server
#from partd import Server
from . import Server
from . import Timer
@ -609,6 +609,10 @@ def OrchestratorPySearchInit(inGlobPatternStr, inDefStr = None, inDefArgNameGSet
def OrchestratorSessionSave(inGSettings=None):
"""
Orchestrator session save in file
(from version 1.2.7)
_SessionLast_GSettings.pickle (binary)
(above the version 1.2.7)
_SessionLast_RDPList.json (encoding = "utf-8")
_SessionLast_StorageDict.pickle (binary)
@ -619,26 +623,38 @@ def OrchestratorSessionSave(inGSettings=None):
lL = inGSettings["Logger"]
try:
# Dump RDP List in file json
lFile = open("_SessionLast_RDPList.json", "w", encoding="utf-8")
lFile.write(json.dumps(inGSettings["RobotRDPActive"]["RDPList"])) # dump json to file
lFile.close() # Close the file
if inGSettings is not None:
if lL: lL.info(
f"Orchestrator has dump the RDP list before the restart.")
# _SessionLast_StorageDict.pickle (binary)
if "StorageDict" in inGSettings:
with open('_SessionLast_StorageDict.pickle', 'wb') as lFile:
pickle.dump(inGSettings["StorageDict"], lFile)
#lFile = open("_SessionLast_RDPList.json", "w", encoding="utf-8")
#lFile.write(json.dumps(inGSettings["RobotRDPActive"]["RDPList"])) # dump json to file
#lFile.close() # Close the file
#if inGSettings is not None:
# if lL: lL.info(
# f"Orchestrator has dump the RDP list before the restart.")
## _SessionLast_StorageDict.pickle (binary)
#if "StorageDict" in inGSettings:
# with open('_SessionLast_StorageDict.pickle', 'wb') as lFile:
# pickle.dump(inGSettings["StorageDict"], lFile)
# if lL: lL.info(
# f"Orchestrator has dump the StorageDict before the restart.")
#SessionLast
lDumpDict = {"StorageDict":inGSettings["StorageDict"], "ManagersProcessDict":inGSettings["ManagersProcessDict"],
"RobotRDPActive":{"RDPList": inGSettings["RobotRDPActive"]["RDPList"]}}
with open('_SessionLast_GSettings.pickle', 'wb') as lFile:
pickle.dump(lDumpDict, lFile)
if lL: lL.info(
f"Orchestrator has dump the StorageDict before the restart.")
f"Orchestrator has dump the GSettings (new dump mode from v1.2.7) before the restart.")
except Exception as e:
if lL: lL.exception(f"Exception when dump data before restart the Orchestrator")
return True
def OrchestratorSessionRestore(inGSettings=None):
"""
Check _SessionLast_RDPList.json and _SessionLast_StorageDict.pickle in working directory. if exist - load into gsettings
# _SessionLast_StorageDict.pickle (binary)
Check _SessioLast... files in working directory. if exist - load into gsettings
(from version 1.2.7)
_SessionLast_GSettings.pickle (binary)
(above the version 1.2.7)
_SessionLast_RDPList.json (encoding = "utf-8")
_SessionLast_StorageDict.pickle (binary)
@ -665,6 +681,18 @@ def OrchestratorSessionRestore(inGSettings=None):
in2Dict=lStorageDictDumpDict) # Merge dict 2 into dict 1
if lL: lL.warning(f"StorageDict was restored from previous Orchestrator session")
os.remove("_SessionLast_StorageDict.pickle") # remove the temp file
# _SessionLast_Gsettings.pickle (binary)
if os.path.exists("_SessionLast_GSettings.pickle"):
if "StorageDict" not in inGSettings:
inGSettings["StorageDict"] = {}
if "ManagersProcessDict" not in inGSettings:
inGSettings["ManagersProcessDict"] = {}
with open('_SessionLast_GSettings.pickle', 'rb') as lFile:
lStorageDictDumpDict = pickle.load(lFile)
Server.__ComplexDictMerge2to1Overwrite__(in1Dict=inGSettings,
in2Dict=lStorageDictDumpDict) # Merge dict 2 into dict 1
if lL: lL.warning(f"GSettings was restored from previous Orchestrator session")
os.remove("_SessionLast_GSettings.pickle") # remove the temp file
def UACKeyListCheck(inRequest, inRoleKeyList) -> bool:
"""
@ -2644,6 +2672,8 @@ def Orchestrator(inGSettings=None, inDumpRestoreBool = True, inRunAsAdministrato
lRobotRDPActiveThread.start() # Start the thread execution.
if lL: lL.info("Robot RDP active has been started") #Logging
# Init autocleaner in another thread
lAutocleanerThread = threading.Thread(target= GSettingsAutocleaner, kwargs={"inGSettings":gSettingsDict})
lAutocleanerThread.daemon = True # Run the thread in daemon mode.
@ -2676,14 +2706,12 @@ def Orchestrator(inGSettings=None, inDumpRestoreBool = True, inRunAsAdministrato
lSchedulerThread.start() # Start the thread execution.
if lL: lL.info("Scheduler (old) loop start") #Logging
# Schedule (new) loop
lScheduleThread = threading.Thread(target= __schedule_loop__)
lScheduleThread.daemon = True # Run the thread in daemon mode.
lScheduleThread.start() # Start the thread execution.
if lL: lL.info("Schedule module (new) loop start") #Logging
def __schedule_loop__():
while True:
schedule.run_pending()

Loading…
Cancel
Save